In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
import gdown
import zipfile
#!pip install transformers torch torchvision datasets evaluate torchmetrics

# Google Drive file ID
file_id = "1RhMfnXLlQVPT4O9cyTDKsbWy2LL-hbbY"
url = f"https://drive.google.com/uc?id={file_id}"

# Download file
output = "HMDB_simp_clean.zip"
gdown.download(url, output, quiet=False)

# Unzip the file
with zipfile.ZipFile(output, 'r') as zip_ref:
    zip_ref.extractall(".")

print("Download and extraction complete!")

Downloading...
From (original): https://drive.google.com/uc?id=1RhMfnXLlQVPT4O9cyTDKsbWy2LL-hbbY
From (redirected): https://drive.google.com/uc?id=1RhMfnXLlQVPT4O9cyTDKsbWy2LL-hbbY&confirm=t&uuid=37d0f79f-af75-4b87-ab0f-5268edfe49c2
To: /content/HMDB_simp_clean.zip
100%|██████████| 2.03G/2.03G [00:20<00:00, 101MB/s]


Download and extraction complete!


In [3]:
!pip install transformers torch torchvision datasets evaluate torchmetrics



# Data

## Create dataset

In [4]:
CATEGORY_INDEX = {
    "brush_hair": 0,
    "cartwheel": 1,
    "catch": 2,
    "chew": 3,
    "climb": 4,
    "climb_stairs": 5,
    "draw_sword": 6,
    "eat": 7,
    "fencing": 8,
    "flic_flac": 9,
    "golf": 10,
    "handstand": 11,
    "kiss": 12,
    "pick": 13,
    "pour": 14,
    "pullup": 15,
    "pushup": 16,
    "ride_bike": 17,
    "shoot_bow": 18,
    "shoot_gun": 19,
    "situp": 20,
    "smile": 21,
    "smoke": 22,
    "throw": 23,
    "wave": 24
}

In [5]:
import os
import torch
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
from PIL import Image
from tqdm.notebook import trange, tqdm
import torchvision.transforms.functional as F

# Transformation: Resize to 224x224 and Convert to Tensor
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor()
])

def load_sampled_frames(frame_dir, frame_rate=8):
    """
    Load every [frame_rate]-th frame from a directory and apply transformations.
    """
    frame_files = sorted(os.listdir(frame_dir))  # Ensure frames are in order
    sampled_frames = []
    frame_metadata = []

    for i in range(0, len(frame_files), frame_rate):
        frame_path = os.path.join(frame_dir, frame_files[i])
        if os.path.isfile(frame_path):
          frame = Image.open(frame_path).convert("RGB")  # Convert to RGB
          frame = transform(frame)  # Apply transformations
          sampled_frames.append(frame)
          frame_metadata.append({'index': i, 'used_in_clip': False, 'file_path': frame_path}) #set up
        else:
          print(f"skiping {frame_path}")

    return sampled_frames, frame_metadata  # List of torch tensors

def create_clips(frames, frame_metadata, clip_size=8):
    """
    Given a list of sampled frames, create multiple [clip_size]-frame clips.
    """
    clips = []
    #clip_indices = []
    #updated_metadata = []

    clips = make_clip(frames, random_horizontal_flip, 8, 32)
    return clips


def make_clip(video, augmentation_type, clip_len=8, sample_rate=32):
    """
    Creates a clip from a video list of tensors based on the length conditions.

    Args:
        video (List[Tensor]): List of frame tensors, each of shape (C, H, W).
        augmentation_type (callable): Augmentation function applied to each frame.
        clip_len (int): Number of frames in the final clip.
        sample_rate (int): Interval between sampled frames.

    Returns:
        Tensor: A single clip tensor of shape (clip_len, C, H, W).
    """

    num_frames = len(video)

    if num_frames >= clip_len * sample_rate:
        # Sample every `sample_rate` frames to create an `clip_len`-frame clip
        clip = [video[i] for i in range(0, clip_len * sample_rate, sample_rate)]

    else:
        # Not enough frames → get what you can + repeat with circular sampling
        n = num_frames // sample_rate
        n_clip = [video[i] for i in range(0, n * sample_rate, sample_rate)]
        remaining_frames_needed = clip_len - len(n_clip)

        additional_frames = []
        start_idx = 0
        while len(additional_frames) < remaining_frames_needed:
            idx = (start_idx + sample_rate) % num_frames  # circular index
            additional_frames.append(video[idx])
            start_idx += sample_rate

        clip = n_clip + additional_frames

    # Apply augmentation and stack into a (clip_len, C, H, W) tensor
    clip = [augmentation_type(f) for f in clip]
    clip = torch.stack(clip)


    return clip


def vertical_down_translation(frame, shift=20):
    """
    Apply vertical down translation to an image frame.

    Args:
        frame (Tensor): Image tensor of shape (C, H, W).
        shift (int): Number of pixels to shift the image downward.

    Returns:
        Tensor: Translated image tensor of shape (C, H, W).
    """
    C, H, W = frame.shape  # Get channel, height, width

    # Create a black canvas (zero tensor)
    translated_frame = torch.zeros_like(frame)

    # Shift the original image down, filling the top with black pixels
    if shift < H:  # Ensure shift is within bounds
        translated_frame[:, shift:, :] = frame[:, :-shift, :]

    return translated_frame


def random_horizontal_flip(frame, p=0.8):
    """
    #this highly preserve the content of the image#
    Apply random horizontal flip to an image frame.

    Args:
        frame (Tensor): Image tensor of shape (C, H, W).
        p (float): Probability of applying the flip.

    Returns:
        Tensor: Horizontally flipped image tensor of shape (C, H, W) if flipped, else the original.
    """
    if random.random() < p:  # Flip with probability p
        return F.hflip(frame)
    return frame

In [6]:
DATASET_PATH = "/content/HMDB_simp_clean"

import random

def split_sources(dataset_path, train_ratio=0.8):
    """
    Splits source folders into train and val sets before processing clips.
    Ensures that all clips from a source video stay in the same set.
    """
    train_sources = {}
    val_sources = {}

    for category in os.listdir(dataset_path):  # Iterate over action categories
        category_path = os.path.join(dataset_path, category)
        if not os.path.isdir(category_path):
            continue

        instances = os.listdir(category_path)  # List all source folders (video IDs)
        random.shuffle(instances)  # Shuffle instances before splitting

        split_idx = int(len(instances) * train_ratio)
        train_sources[category] = instances[:split_idx]  # First 80% for training
        val_sources[category] = instances[split_idx:]  # Last 20% for validation

    return train_sources, val_sources


def process_dataset(dataset_path, sources_dict):
    """
    Processes dataset based on a predefined list of sources.
    """
    dataset = []
    all_updated_metadata = []

    for category, instances in tqdm(sources_dict.items()):
        category_path = os.path.join(dataset_path, category)

        for instance in instances:
            instance_path = os.path.join(category_path, instance)
            if not os.path.isdir(instance_path):
                print(f"Skipping non-directory file: {instance_path}")
                continue  # Skip non-directory files

            # Load sampled frames
            frames, frame_metadata = load_sampled_frames(instance_path)

            # Create 8-frame clips
            clips  =  create_clips(frames, frame_metadata, 8)
            dataset.append((clips, CATEGORY_INDEX[category]))

            """for i, clip in enumerate(clips):
                dataset.append((clip, CATEGORY_INDEX[category]))  # Store (clip, label)"""
                #all_updated_metadata.append(updated_metadata[i])
            #print(f"len(dataset) = {len(dataset)}")

    return dataset  # List of (clip, label)

class VideoDataset(Dataset):
    def __init__(self, dataset):
        self.dataset = dataset

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        clip, label = self.dataset[idx]
        return clip, torch.tensor(label, dtype=torch.long)

from torch.utils.data import default_collate

class VideoDataCollator:
    """
    Custom data collator for TimeSFormer.
    Converts (clip, label) tuples into a dictionary format.
    """
    def __call__(self, features):
        clips, labels = zip(*features)  # Unpack (clip, label)
        batch = {
            "pixel_values": torch.stack(clips),  # Stack clips into batch
            "labels": torch.tensor(labels, dtype=torch.long)  # Convert labels to tensor
        }
        return batch


In [7]:
from collections import Counter

def count_classes(dataset):
    """
    Function to print the number of clips of 8 created per class
    """
    class_counts = Counter(label for _, label in dataset)
    sorted_class_counts = dict(sorted(class_counts.items()))

    for class_label, count in sorted_class_counts.items():
        print(f"Class {class_label}: {count} clips of 8")

    return sorted_class_counts

#count_classes(train_dataset);


In [8]:

# Split source folders into train & val
train_sources, val_sources = split_sources(DATASET_PATH)

# Process train and val sets separately
train_dataset  = process_dataset(DATASET_PATH, train_sources)
val_dataset = process_dataset(DATASET_PATH, val_sources)

dataset_size = len(train_dataset) + len(val_dataset)

print(f"Total clips: {dataset_size}, Train: {len(train_dataset)}, Val: {len(val_dataset)}")

  0%|          | 0/25 [00:00<?, ?it/s]

skiping /content/HMDB_simp_clean/wave/C6E07F42/.ipynb_checkpoints


  0%|          | 0/25 [00:00<?, ?it/s]

Total clips: 1250, Train: 1000, Val: 250


In [9]:
arr = [0]*25

for i in range(len(train_dataset)):
  arr[train_dataset[i][1]] += 1

print(arr)

arr2 = [0]*25
for i in range(len(val_dataset)):
  arr2[val_dataset[i][1]] += 1
print(arr2)
print(len(train_sources))
print(f"Total clips: {dataset_size}, Train: {len(train_dataset)}, Val: {len(val_dataset)}")

[40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40]
[10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10]
25
Total clips: 1250, Train: 1000, Val: 250


# Train

## TimeSFormer

In [10]:
from transformers import AutoFeatureExtractor, AutoModelForVideoClassification


extractor = AutoFeatureExtractor.from_pretrained("facebook/timesformer-base-finetuned-k400")
model = AutoModelForVideoClassification.from_pretrained(
    "facebook/timesformer-base-finetuned-k400",
    num_labels=len(CATEGORY_INDEX),  # Adjust for our dataset
    ignore_mismatched_sizes=True,
)

# Send model to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
print(f"Model on: {device}")

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.
Some weights of TimesformerForVideoClassification were not initialized from the model checkpoint at facebook/timesformer-base-finetuned-k400 and are newly initialized because the shapes did not match:
- classifier.weight: found shape torch.Size([400, 768]) in the checkpoint and torch.Size([25, 768]) in the model instantiated
- classifier.bias: found shape torch.Size([400]) in the checkpoint and torch.Size([25]) in the model instantiated
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Model on: cuda


In [11]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchmetrics
import evaluate
import numpy as np
from transformers import AutoFeatureExtractor, AutoModelForVideoClassification, TrainingArguments, Trainer

accuracy_metric = evaluate.load("accuracy")
f1_metric = evaluate.load("f1")
top5_metric = torchmetrics.classification.Accuracy(top_k=5, task="multiclass", num_classes=len(CATEGORY_INDEX)).to(device)

# Compute Metrics Function
def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = torch.tensor(logits).argmax(dim=1)

    # Compute Accuracy
    top1_acc = accuracy_metric.compute(predictions=predictions.numpy(), references=labels)["accuracy"]

    # Compute Top-5 Accuracy
    top5_acc = top5_metric(torch.tensor(logits).to(device), torch.tensor(labels).to(device)).item()

    # Compute F1-score (macro)
    f1 = f1_metric.compute(predictions=predictions.numpy(), references=labels, average="macro")["f1"]

    return {
        "accuracy": top1_acc,
        "top-5 accuracy": top5_acc,
        "f1-score": f1
    }

In [17]:
from transformers import Trainer, TrainingArguments

training_args = TrainingArguments(
    output_dir="./timesformer_output",  # Save checkpoints
    eval_strategy="epoch",  # Evaluate after every epoch
    save_strategy="epoch",  # Save model after each epoch
    per_device_train_batch_size=4,
    per_device_eval_batch_size=4,
    num_train_epochs=5,
    learning_rate=5e-5,
    weight_decay=0.01,
    logging_dir="./logs",  # TensorBoard logs
    logging_steps=10,
    save_total_limit=2,  # Keep only last 2 checkpoints
    load_best_model_at_end=True,
    metric_for_best_model="accuracy",
    push_to_hub=False
)

data_collator = VideoDataCollator()
# Initialize Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    tokenizer=extractor,  # Feature extractor
    compute_metrics=compute_metrics,
    data_collator=data_collator
)

# Train Model
trainer.train()

  trainer = Trainer(
[34m[1mwandb[0m: Using wandb-core as the SDK backend.  Please refer to https://wandb.me/wandb-core for more information.


<IPython.core.display.Javascript object>

[34m[1mwandb[0m: Logging into wandb.ai. (Learn how to deploy a W&B server locally: https://wandb.me/wandb-server)
[34m[1mwandb[0m: You can find your API key in your browser here: https://wandb.ai/authorize
wandb: Paste an API key from your profile and hit enter:

 ··········


[34m[1mwandb[0m: No netrc file found, creating one.
[34m[1mwandb[0m: Appending key for api.wandb.ai to your netrc file: /root/.netrc
[34m[1mwandb[0m: Currently logged in as: [33mlisik-tea[0m ([33mlisik-tea-university-of-surrey[0m) to [32mhttps://api.wandb.ai[0m. Use [1m`wandb login --relogin`[0m to force relogin


Epoch,Training Loss,Validation Loss,Accuracy,Top-5 accuracy,F1-score
1,0.8711,0.755521,0.756,0.964,0.734011
2,0.167,0.564329,0.82,0.984,0.819608
3,0.0063,0.538788,0.844,0.98,0.844732
4,0.0032,0.557536,0.832,0.976,0.83047
5,0.0014,0.570068,0.832,0.976,0.830649


TrainOutput(global_step=1250, training_loss=0.3466434930428863, metrics={'train_runtime': 2775.5022, 'train_samples_per_second': 1.801, 'train_steps_per_second': 0.45, 'total_flos': 4.38137551724544e+18, 'train_loss': 0.3466434930428863, 'epoch': 5.0})