In [None]:
import numpy as np
from torch.nn.utils.rnn import pad_sequence

all_activities = [
    'closing_bottle',
    'closing_door_inside',
    'closing_door_outside',
    'closing_laptop',
    'drinking',
    'eating',
    'entering_car',
    'exiting_car',
    'fastening_seat_belt',
    'fetching_an_object',
    'interacting_with_phone',
    'looking_or_moving_around (e.g. searching)',
    'opening_backpack',
    'opening_bottle',
    'opening_door_inside',
    'opening_door_outside',
    'opening_laptop',
    'placing_an_object',
    'preparing_food',
    'pressing_automation_button',
    'putting_laptop_into_backpack',
    'putting_on_jacket',
    'putting_on_sunglasses',
    'reading_magazine',
    'reading_newspaper',
    'sitting_still',
    'taking_laptop_from_backpack',
    'taking_off_jacket',
    'taking_off_sunglasses',
    'talking_on_phone',
    'unfastening_seat_belt',
    'using_multimedia_display',
    'working_on_laptop',
    'writing'
]
all_activity_mapper = {all_activities[i]: i for i in range(len(all_activities))}


class VideoDataset:
    def __init__(self, annotation_df, poses, max_len=30) -> None:
        self.pose_info = poses
        self.annotation_df = annotation_df
        self.max_len = max_len
        self.all_activities = set([])
        self.samples = []
        self.shuffle()

    def shuffle(self):
        self.activities = []
        for idx, annotation in self.annotation_df.iterrows():
            frame_start, frame_end = annotation.frame_index_start, annotation.frame_index_end
            self.samples.append(dict(
                frame_start = frame_start,
                frame_end = frame_end,
                label=all_activity_mapper[annotation.activity],
                file_name=annotation.video_name
            ))

    def __len__(self):
        return self.annotation_df.shape[0]

    def __getitem__(self, idx):
        return dict(
            idx=idx,
            activity=self.activities[idx],
            pose_2d=self.pose_2d_sequences[idx],
            pose_3d=self.pose_3d_sequences[idx],
            valid_len=self.sequnce_valid_len[idx],
            
        )
        # {
        #     'video': <video_tensor>,     # Shape: (C, T, H, W)
        #     'audio': <audio_tensor>,     # Shape: (S)
        #     'label': <action_label>,     # Integer defining class annotation
        #     'video_name': <video_path>,  # Video file path stem
        #     'video_index': <video_id>,   # index of video used by sampler
        #     'clip_index': <clip_id>      # index of clip sampled within video
        # }


In [7]:
import pickle

with open('output/inner_mirror/train_pose_info.pkl', 'rb') as f:
    train_pose_info_list = pickle.load(f)
train_pose_info_list = sorted(train_pose_info_list, key=lambda x: x['index'])

with open('output/inner_mirror/train_annotation.pkl', 'rb') as f:
    train_annotation = pickle.load(f)

In [8]:
train_annotation.head()

Unnamed: 0,participant_id,video_name,annotation_id,activity,frame_index_start,frame_index_end
0,1,vp1/run1b_2018-05-29-14-02-47.ids_1,1,closing_door_outside,0,25
1,1,vp1/run1b_2018-05-29-14-02-47.ids_1,3,opening_door_outside,25,54
2,1,vp1/run1b_2018-05-29-14-02-47.ids_1,4,entering_car,54,80
3,1,vp1/run1b_2018-05-29-14-02-47.ids_1,5,closing_door_inside,80,97
4,1,vp1/run1b_2018-05-29-14-02-47.ids_1,6,fastening_seat_belt,97,152


In [4]:
from modules.action_recognizer.dataset.video_dataset import (all_activities, all_activity_mapper)
# class_labels = sorted()
label2id = {label: i for i, label in enumerate(class_labels)}
# id2label = {i: label for label, i in label2id.items()}

# print(f"Unique classes: {list(label2id.keys())}.")

Unique classes: ['closing_bottle', 'closing_door_inside', 'closing_door_outside', 'closing_laptop', 'drinking', 'eating', 'entering_car', 'exiting_car', 'fastening_seat_belt', 'fetching_an_object', 'interacting_with_phone', 'looking_or_moving_around (e.g. searching)', 'opening_backpack', 'opening_bottle', 'opening_door_inside', 'opening_door_outside', 'opening_laptop', 'placing_an_object', 'preparing_food', 'pressing_automation_button', 'putting_laptop_into_backpack', 'putting_on_jacket', 'putting_on_sunglasses', 'reading_magazine', 'reading_newspaper', 'sitting_still', 'taking_laptop_from_backpack', 'taking_off_jacket', 'taking_off_sunglasses', 'talking_on_phone', 'unfastening_seat_belt', 'using_multimedia_display', 'working_on_laptop', 'writing'].


In [6]:
# from transformers import VivitImageProcessor, VivitForVideoClassification

# model_ckpt = "google/vivit-b-16x2-kinetics400"
# image_processor = VivitImageProcessor.from_pretrained(model_ckpt)

# model = VivitForVideoClassification.from_pretrained(
#     model_ckpt,
#     label2id=label2id,
#     id2label=id2label,
#     ignore_mismatched_sizes=True,  # provide this in case you're planning to fine-tune an already fine-tuned checkpoint
# )



preprocessor_config.json:   0%|          | 0.00/401 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/18.6k [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/356M [00:00<?, ?B/s]

  return self.fget.__get__(instance, owner)()
Some weights of VivitForVideoClassification were not initialized from the model checkpoint at google/vivit-b-16x2-kinetics400 and are newly initialized because the shapes did not match:
- classifier.weight: found shape torch.Size([400, 768]) in the checkpoint and torch.Size([34, 768]) in the model instantiated
- classifier.bias: found shape torch.Size([400]) in the checkpoint and torch.Size([34]) in the model instantiated
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [None]:
# import pytorchvideo.data

# from pytorchvideo.transforms import (
#     ApplyTransformToKey,
#     Normalize,
#     RandomShortSideScale,
#     RemoveKey,
#     ShortSideScale,
#     UniformTemporalSubsample,
# )

# from torchvision.transforms import (
#     Compose,
#     Lambda,
#     RandomCrop,
#     RandomHorizontalFlip,
#     Resize,
# )

# mean = image_processor.image_mean
# std = image_processor.image_std
# if "shortest_edge" in image_processor.size:
#     height = width = image_processor.size["shortest_edge"]
# else:
#     height = image_processor.size["height"]
#     width = image_processor.size["width"]
# resize_to = (height, width)

# num_frames_to_sample = model.config.num_frames
# sample_rate = 4
# fps = 30
# clip_duration = num_frames_to_sample * sample_rate / fps

# train_transform = Compose(
#     [
#         ApplyTransformToKey(
#             key="video",
#             transform=Compose(
#                 [
#                     UniformTemporalSubsample(num_frames_to_sample),
#                     Lambda(lambda x: x / 255.0),
#                     Normalize(mean, std),
#                     RandomShortSideScale(min_size=256, max_size=320),
#                     RandomCrop(resize_to),
#                     RandomHorizontalFlip(p=0.5),
#                 ]
#             ),
#         ),
#     ]
# )

# train_dataset = pytorchvideo.data.Ucf101(
#     data_path=os.path.join(dataset_root_path, "train"),
#     clip_sampler=pytorchvideo.data.make_clip_sampler("random", clip_duration),
#     decode_audio=False,
#     transform=train_transform,
# )

# val_transform = Compose(
#     [
#         ApplyTransformToKey(
#             key="video",
#             transform=Compose(
#                 [
#                     UniformTemporalSubsample(num_frames_to_sample),
#                     Lambda(lambda x: x / 255.0),
#                     Normalize(mean, std),
#                     Resize(resize_to),
#                 ]
#             ),
#         ),
#     ]
# )

# val_dataset = pytorchvideo.data.Ucf101(
#     data_path=os.path.join(dataset_root_path, "val"),
#     clip_sampler=pytorchvideo.data.make_clip_sampler("uniform", clip_duration),
#     decode_audio=False,
#     transform=val_transform,
# )

# test_dataset = pytorchvideo.data.Ucf101(
#     data_path=os.path.join(dataset_root_path, "test"),
#     clip_sampler=pytorchvideo.data.make_clip_sampler("uniform", clip_duration),
#     decode_audio=False,
#     transform=val_transform,
# )

In [None]:
# import imageio
# import numpy as np
# from IPython.display import Image

# def unnormalize_img(img):
#     """Un-normalizes the image pixels."""
#     img = (img * std) + mean
#     img = (img * 255).astype("uint8")
#     return img.clip(0, 255)

# def create_gif(video_tensor, filename="sample.gif"):
#     """Prepares a GIF from a video tensor.
    
#     The video tensor is expected to have the following shape:
#     (num_frames, num_channels, height, width).
#     """
#     frames = []
#     for video_frame in video_tensor:
#         frame_unnormalized = unnormalize_img(video_frame.permute(1, 2, 0).numpy())
#         frames.append(frame_unnormalized)
#     kargs = {"duration": 0.25}
#     imageio.mimsave(filename, frames, "GIF", **kargs)
#     return filename

# def display_gif(video_tensor, gif_name="sample.gif"):
#     """Prepares and displays a GIF from a video tensor."""
#     video_tensor = video_tensor.permute(1, 0, 2, 3)
#     gif_filename = create_gif(video_tensor, gif_name)
#     return Image(filename=gif_filename)

# sample_video = next(iter(train_dataset))
# video_tensor = sample_video["video"]
# display_gif(video_tensor)

https://huggingface.co/docs/transformers/en/tasks/video_classification
https://huggingface.co/docs/transformers/main/model_doc/vivit