In [None]:
from huggingface_hub import notebook_login

notebook_login()

In [None]:
model_ckpt = "MCG-NJU/videomae-base" # pre-trained model from which to fine-tune
batch_size = 4 # batch size for training and evaluation

In [None]:
!pip install pytorchvideo transformers evaluate -q

In [None]:
!git config --global credential.helper store

In [None]:
from huggingface_hub import hf_hub_download


hf_dataset_identifier = "sayakpaul/ucf101-subset"
filename = "UCF101_subset.tar.gz"
file_path = hf_hub_download(
    repo_id=hf_dataset_identifier, filename=filename, repo_type="dataset"
)

In [None]:
!tar xf {file_path}

In [None]:
dataset_root_path = "UCF101_subset"

!find {dataset_root_path} | head -5

In [None]:
import pathlib

dataset_root_path = pathlib.Path(dataset_root_path)

In [None]:
video_count_train = len(list(dataset_root_path.glob("train/*/*.avi")))
video_count_val = len(list(dataset_root_path.glob("val/*/*.avi")))
video_count_test = len(list(dataset_root_path.glob("test/*/*.avi")))
video_total = video_count_train + video_count_val + video_count_test
print(f"Total videos: {video_total}")

In [None]:
all_video_file_paths = (
    list(dataset_root_path.glob("train/*/*.avi"))
    + list(dataset_root_path.glob("val/*/*.avi"))
    + list(dataset_root_path.glob("test/*/*.avi"))
)
all_video_file_paths[:5]

In [None]:
class_labels = sorted({str(path).split("/")[2] for path in all_video_file_paths})
label2id = {label: i for i, label in enumerate(class_labels)}
id2label = {i: label for label, i in label2id.items()}

print(f"Unique classes: {list(label2id.keys())}.")

In [None]:
from transformers import VideoMAEImageProcessor, VideoMAEForVideoClassification


image_processor = VideoMAEImageProcessor.from_pretrained(model_ckpt)
model = VideoMAEForVideoClassification.from_pretrained(
    model_ckpt,
    label2id=label2id,
    id2label=id2label,
    ignore_mismatched_sizes=True,  # provide this in case you're planning to fine-tune an already fine-tuned checkpoint
)

In [None]:
import pytorchvideo.data

from pytorchvideo.transforms import (
    ApplyTransformToKey,
    Normalize,
    RandomShortSideScale,
    RemoveKey,
    ShortSideScale,
    UniformTemporalSubsample,
)

from torchvision.transforms import (
    Compose,
    Lambda,
    RandomCrop,
    RandomHorizontalFlip,
    Resize,
)

In [None]:
import os

mean = image_processor.image_mean
std = image_processor.image_std
if "shortest_edge" in image_processor.size:
    height = width = image_processor.size["shortest_edge"]
else:
    height = image_processor.size["height"]
    width = image_processor.size["width"]
resize_to = (height, width)

num_frames_to_sample = model.config.num_frames
sample_rate = 4
fps = 30
clip_duration = num_frames_to_sample * sample_rate / fps


# Training dataset transformations.
train_transform = Compose(
    [
        ApplyTransformToKey(
            key="video",
            transform=Compose(
                [
                    UniformTemporalSubsample(num_frames_to_sample),
                    Lambda(lambda x: x / 255.0),
                    Normalize(mean, std),
                    RandomShortSideScale(min_size=256, max_size=320),
                    RandomCrop(resize_to),
                    RandomHorizontalFlip(p=0.5),
                ]
            ),
        ),
    ]
)

# Training dataset.
train_dataset = pytorchvideo.data.Ucf101(
    data_path=os.path.join(dataset_root_path, "train"),
    clip_sampler=pytorchvideo.data.make_clip_sampler("random", clip_duration),
    decode_audio=False,
    transform=train_transform,
)

# Validation and evaluation datasets' transformations.
val_transform = Compose(
    [
        ApplyTransformToKey(
            key="video",
            transform=Compose(
                [
                    UniformTemporalSubsample(num_frames_to_sample),
                    Lambda(lambda x: x / 255.0),
                    Normalize(mean, std),
                    Resize(resize_to),
                ]
            ),
        ),
    ]
)

# Validation and evaluation datasets.
val_dataset = pytorchvideo.data.Ucf101(
    data_path=os.path.join(dataset_root_path, "val"),
    clip_sampler=pytorchvideo.data.make_clip_sampler("uniform", clip_duration),
    decode_audio=False,
    transform=val_transform,
)

test_dataset = pytorchvideo.data.Ucf101(
    data_path=os.path.join(dataset_root_path, "test"),
    clip_sampler=pytorchvideo.data.make_clip_sampler("uniform", clip_duration),
    decode_audio=False,
    transform=val_transform,
)

In [None]:
# We can access the `num_videos` argument to know the number of videos we have in the
# dataset.
train_dataset.num_videos, val_dataset.num_videos, test_dataset.num_videos

In [None]:
sample_video = next(iter(train_dataset))
sample_video.keys()

In [None]:
def investigate_video(sample_video):
    """Utility to investigate the keys present in a single video sample."""
    for k in sample_video:
        if k == "video":
            print(k, sample_video["video"].shape)
        else:
            print(k, sample_video[k])

    print(f"Video label: {id2label[sample_video[k]]}")


investigate_video(sample_video)

In [None]:
import imageio
import numpy as np
from IPython.display import Image


def unnormalize_img(img):
    """Un-normalizes the image pixels."""
    img = (img * std) + mean
    img = (img * 255).astype("uint8")
    return img.clip(0, 255)


def create_gif(video_tensor, filename="sample.gif"):
    """Prepares a GIF from a video tensor.

    The video tensor is expected to have the following shape:
    (num_frames, num_channels, height, width).
    """
    frames = []
    for video_frame in video_tensor:
        frame_unnormalized = unnormalize_img(video_frame.permute(1, 2, 0).numpy())
        frames.append(frame_unnormalized)
    kargs = {"duration": 0.25}
    imageio.mimsave(filename, frames, "GIF", **kargs)
    return filename


def display_gif(video_tensor, gif_name="sample.gif"):
    """Prepares and displays a GIF from a video tensor."""
    video_tensor = video_tensor.permute(1, 0, 2, 3)
    gif_filename = create_gif(video_tensor, gif_name)
    return Image(filename=gif_filename)

In [None]:
video_tensor = sample_video["video"]
display_gif(video_tensor)

In [None]:
!pip install accelerate -U

In [None]:
!pip install transformers[torch]

In [None]:
from transformers import TrainingArguments, Trainer

model_name = model_ckpt.split("/")[-1]
new_model_name = f"{model_name}-finetuned-ucf101-subset"
num_epochs = 50

args = TrainingArguments(
    new_model_name,
    remove_unused_columns=False,
    evaluation_strategy="epoch",
    save_strategy="epoch",
    learning_rate=5e-5,
    per_device_train_batch_size=batch_size,
    per_device_eval_batch_size=batch_size,
    warmup_ratio=0.1,
    logging_steps=10,
    load_best_model_at_end=True,
    metric_for_best_model="accuracy",
    push_to_hub=True,
    max_steps=(train_dataset.num_videos // batch_size) * num_epochs,
)

In [None]:
# train_dataset.num_videos
# batch_size

In [None]:
import evaluate

metric = evaluate.load("accuracy")

In [None]:
# the compute_metrics function takes a Named Tuple as input:
# predictions, which are the logits of the model as Numpy arrays,
# and label_ids, which are the ground-truth labels as Numpy arrays.
def compute_metrics(eval_pred):
    """Computes accuracy on a batch of predictions."""
    predictions = np.argmax(eval_pred.predictions, axis=1)
    return metric.compute(predictions=predictions, references=eval_pred.label_ids)

In [None]:
import torch


def collate_fn(examples):
    """The collation function to be used by `Trainer` to prepare data batches."""
    # permute to (num_frames, num_channels, height, width)
    pixel_values = torch.stack(
        [example["video"].permute(1, 0, 2, 3) for example in examples]
    )
    labels = torch.tensor([example["label"] for example in examples])
    return {"pixel_values": pixel_values, "labels": labels}

In [None]:
trainer = Trainer(
    model,
    args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    tokenizer=image_processor,
    compute_metrics=compute_metrics,
    data_collator=collate_fn,
)

In [None]:
train_results = trainer.train()

In [None]:
trainer.evaluate(test_dataset)

In [None]:
trainer.save_model()
test_results = trainer.evaluate(test_dataset)
trainer.log_metrics("test", test_results)
trainer.save_metrics("test", test_results)
trainer.save_state()

In [None]:
trainer.push_to_hub()

In [None]:
trained_model = VideoMAEForVideoClassification.from_pretrained(new_model_name)

In [None]:
import matplotlib.pyplot as plt

# Provided data
epochs = list(range(50))

training_loss = [
    2.317400, 2.029300, 0.853300, 1.067800, 0.485300, 0.365500, 0.184200, 0.019300, 0.148800, 0.006100,
    0.004000, 0.005000, 0.309300, 0.001800, 0.005100, 0.073100, 0.005800, 0.001800, 0.169400, 0.001000,
    0.000900, 0.000800, 0.099200, 0.010800, 0.204700, 0.000700, 0.000700, 0.000600, 0.000600, 0.000600,
    0.000500, 0.000500, 0.000500, 0.000500, 0.000400, 0.000400, 0.000600, 0.000400, 0.000400, 0.000500,
    0.000400, 0.000400, 0.000400, 0.000400, 0.000400, 0.000400, 0.000400, 0.000400, 0.000400, 0.000400
]

validation_loss = [
    2.312683, 1.786833, 0.853605, 0.771807, 0.847871, 0.217931, 0.214317, 0.965183, 0.360681, 0.355820,
    0.154675, 0.263832, 0.132459, 0.097235, 0.099100, 0.089497, 0.001582, 0.154889, 0.001553, 0.065555,
    0.133894, 0.001920, 0.123095, 0.001610, 0.196485, 0.200892, 0.435730, 0.020803, 0.007020, 0.007961,
    0.002760, 0.240054, 0.138528, 0.141033, 0.138613, 0.137332, 0.318122, 0.145141, 0.179960, 0.161636,
    0.154311, 0.143776, 0.171561, 0.147133, 0.165054, 0.160013, 0.158805, 0.157711, 0.153817, 0.148383
]

accuracy = [
    0.142857, 0.442857, 0.700000, 0.757143, 0.700000, 0.914286, 0.957143, 0.757143, 0.900000, 0.928571,
    0.957143, 0.928571, 0.971429, 0.985714, 0.985714, 0.985714, 1.000000, 0.971429, 1.000000, 0.985714,
    0.971429, 1.000000, 0.971429, 1.000000, 0.942857, 0.957143, 0.928571, 0.985714, 1.000000, 1.000000,
    1.000000, 1.000000, 0.957143, 0.985714, 0.985714, 0.985714, 0.985714, 0.985714, 0.942857, 0.985714,
    0.957143, 0.971429, 0.971429, 0.985714, 0.971429, 0.985714, 0.971429, 0.971429, 0.971429, 0.971429
]

# Plotting training loss vs. epoch
# plt.plot(epochs, training_loss, label='Training Loss')
# plt.xlabel('Epoch')
# plt.ylabel('Training Loss')
# plt.title('Training Loss vs. Epoch')
# plt.legend()
# plt.grid(True)
# plt.show()

# Plotting validation loss vs. epoch
plt.plot(epochs, validation_loss, label='Validation Loss')
plt.plot(epochs, training_loss, label='Training Loss')
plt.plot(epochs, accuracy, label='Accuracy')
plt.xlabel('Epoch')
# plt.ylabel('Validation Loss')
# plt.title('Validation Loss vs. Epoch')
plt.legend()
plt.grid(True)
plt.show()

# Plotting accuracy vs. epoch
# plt.plot(epochs, accuracy, label='Accuracy')
# plt.xlabel('Epoch')
# plt.ylabel('Accuracy')
# plt.title('Accuracy vs. Epoch')
# plt.legend()
# plt.grid(True)
# plt.show()


In [None]:
import matplotlib.pyplot as plt
import pandas as pd

# Sample data
epochs = [0, 1, 2, 3]
training_loss = [1.767700, 0.842900, 0.291400, 0.275800]
validation_loss = [1.654982, 0.748549, 0.326110, 0.203674]
accuracy = [0.371429, 0.685714, 0.828571, 0.900000]

# Plotting epoch accuracy graph
plt.plot(epochs, accuracy, label='Accuracy', marker='o')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title('Epoch Accuracy')
plt.legend()
plt.grid(True)
plt.show()

# Creating a DataFrame for further analysis
data = {'Epoch': epochs, 'Training Loss': training_loss, 'Validation Loss': validation_loss, 'Accuracy': accuracy}
df = pd.DataFrame(data)

# Plotting training and validation loss
plt.plot(df['Epoch'], df['Training Loss'], label='Training Loss', marker='o')
plt.plot(df['Epoch'], df['Validation Loss'], label='Validation Loss', marker='o')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Validation Loss')
plt.legend()
plt.grid(True)
plt.show()


In [None]:
sample_test_video = next(iter(test_dataset))
investigate_video(sample_test_video)

In [None]:
def run_inference(model, video):
    """Utility to run inference given a model and test video.

    The video is assumed to be preprocessed already.
    """
    # (num_frames, num_channels, height, width)
    perumuted_sample_test_video = video.permute(1, 0, 2, 3)

    inputs = {
        "pixel_values": perumuted_sample_test_video.unsqueeze(0),
        "labels": torch.tensor(
            [sample_test_video["label"]]
        ),  # this can be skipped if you don't have labels available.
    }
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    inputs = {k: v.to(device) for k, v in inputs.items()}
    model = model.to(device)

    # forward pass
    with torch.no_grad():
        outputs = model(**inputs)
        logits = outputs.logits

    return logits

In [None]:
logits = run_inference(trained_model, sample_test_video["video"])

In [None]:
display_gif(sample_test_video["video"])

In [None]:
predicted_class_idx = logits.argmax(-1).item()
print("Predicted class:", model.config.id2label[predicted_class_idx])

In [None]:
# prompt: make confusion matrix for this model

from sklearn.metrics import confusion_matrix
import seaborn as sns

# Get the true and predicted labels
true_labels = [sample["label"] for sample in test_dataset]
predicted_labels = []
for sample in test_dataset:
    logits = run_inference(trained_model, sample["video"])
    predicted_labels.append(logits.argmax(-1).item())

# Create the confusion matrix
cm = confusion_matrix(true_labels, predicted_labels)

# Visualize the confusion matrix
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=class_labels, yticklabels=class_labels)
plt.xlabel("Predicted")
plt.ylabel("True")
plt.title("Confusion Matrix")
plt.show()
