## Setup

In [None]:
!pip install -r "/content/drive/MyDrive/mia_starter_project/requirements.txt"
!apt install imagemagick -q

In [None]:
import random
import torch
import imageio
import wandb
from pathlib import Path
from torch.utils.data import Dataset, DataLoader
from moviepy.editor import VideoFileClip, ImageSequenceClip
from IPython.display import Image
,
from torchvision.transforms import (
    Compose,
    Lambda,
    RandomCrop,
    RandomHorizontalFlip,
    Resize
)
from torchvision.transforms._transforms_video import CenterCropVideo
from pytorchvideo.transforms import (
    ApplyTransformToKey,
    Normalize,
    ShortSideScale
)

import torch.nn as nn
import torch.nn.functional as F
import pytorch_lightning as pl
from torchmetrics import Accuracy

import numpy as np

ModuleNotFoundError: ignored

In [None]:
!wandb login

[34m[1mwandb[0m: Logging into wandb.ai. (Learn how to deploy a W&B server locally: https://wandb.me/wandb-server)
[34m[1mwandb[0m: You can find your API key in your browser here: https://wandb.ai/authorize
[34m[1mwandb[0m: Paste an API key from your profile and hit enter, or press ctrl+c to quit: 

## Data
The dataset is divided into 10 folders for each exercise.
`ActioClassificationDataset` basically maps folders into classes, and loads video chunks, according to `clip_duration = num_frames * sample_rate / vid.fps`, where `num_frames` and `sample_rate` are often determined by the parameters of the pre-trained model we are using.

In [None]:
project_dir = Path("/content/drive/MyDrive/mia_starter_project")
data_dir = project_dir / "dataset"
train_dir = data_dir / "train"
val_dir = data_dir / "validation_clean"

In [None]:
class ActionClassificationDataset(Dataset):
    def __init__(
        self,
        data_dir: Path,
        num_frames: int,
        sample_rate: int,
        random_sampler: bool,
        transform=None,
    ):
        self.data_dir = data_dir
        self.labels = sorted([sub.name for sub in data_dir.iterdir() if sub.is_dir()])
        self.num_classes = len(self.labels)
        self.class_to_idx = {cls: idx for idx, cls in enumerate(self.labels)}
        self.idx_to_class = {idx: cls for idx, cls in enumerate(self.labels)}
        self.clips = []
        self.num_frames = num_frames
        self.sample_rate = sample_rate
        self.random_sampler = random_sampler
        self.transform = transform

        for cls in self.labels:
            class_dir = data_dir / cls
            for video_path in class_dir.glob("video/*.mp4"):
                self.clips.append((video_path, self.class_to_idx[cls]))

    def __len__(self) -> int:
        return len(self.clips)

    def __getitem__(self, idx) -> tuple[torch.float32, int]:
        video_path, label = self.clips[idx]

        with VideoFileClip(str(video_path)) as vid:
          clip_duration = self.num_frames * self.sample_rate / vid.fps
          subsample_fps = vid.fps / self.sample_rate

          # by default take the start
          start_time = 0
          # for training, random sample
          if self.random_sampler:
              start_time = random.uniform(0, vid.duration - clip_duration)
          end_time = start_time + clip_duration

          video_frames = list(vid.subclip(start_time, end_time).iter_frames(fps=subsample_fps))[
              : self.num_frames
          ]

        video_np = np.moveaxis(np.array(video_frames), [0, -1], [1, 0])
        video_data = {"video": torch.from_numpy(video_np), "audio": None}
        video_data = self.transform(video_data)
        inputs = video_data["video"]
        return inputs, label


### X3D Params

In [None]:
# X3D Params
model_name = "x3d_xs"
mean = [0.45, 0.45, 0.45]
std = [0.225, 0.225, 0.225]
model_transform_params = {
    "x3d_xs": {
        "side_size": 182,
        "crop_size": 182,
        "num_frames": 4,
        "sampling_rate": 12,
    },
    "x3d_s": {
        "side_size": 182,
        "crop_size": 182,
        "num_frames": 13,
        "sampling_rate": 6,
    },
    "x3d_m": {
        "side_size": 256,
        "crop_size": 256,
        "num_frames": 16,
        "sampling_rate": 5,
    }
}

# Data/Training Params
flip_p = 0.5
batch_size = 16
epochs = 15
lr = 1e-3

Build datasets. Standard transformation are being used. More augmentations for training can be explored later on.

In [None]:
config = model_transform_params[model_name]
config.update({"flip_p": flip_p,
                "batch_size": batch_size,
                "epochs": epochs,
                "lr": lr})

train_transform = Compose(
    [
        ApplyTransformToKey(
            key="video",
            transform=Compose(
                [
                    Lambda(lambda x: x / 255.0),
                    Normalize(mean, std),
                    ShortSideScale(size=config["side_size"]),
                    CenterCropVideo(crop_size=(config["crop_size"], config["crop_size"])),
                    RandomHorizontalFlip(p=flip_p),
                ]
            ),
        ),
    ]
)

val_transform = Compose(
    [
        ApplyTransformToKey(
            key="video",
            transform=Compose(
                [
                    Lambda(lambda x: x / 255.0),
                    Normalize(mean, std),
                    ShortSideScale(size=config["side_size"]),
                    CenterCropVideo(crop_size=(config["crop_size"], config["crop_size"]))
                ]
            ),
        ),
    ]
)

train_ds = ActionClassificationDataset(data_dir=train_dir,
                                       num_frames=config["num_frames"],
                                       sample_rate=config["sampling_rate"],
                                       random_sampler=True,
                                       transform=train_transform)

val_ds = ActionClassificationDataset(data_dir=val_dir,
                                    num_frames=config["num_frames"],
                                    sample_rate=config["sampling_rate"],
                                    random_sampler=False,
                                    transform=val_transform)
num_classes = train_ds.num_classes
print(f"num classes: {num_classes}")
print(f"number of training samples: {len(train_ds)}")
print(f"number of validation samples: {len(val_ds)}")

By using an inverse normilzation class, we can easily visualize the videos for training after all transformations, except scaling and normalization.

In [None]:
class InverseNormalize(Normalize):
    def __init__(self, mean, std):
        mean = torch.as_tensor(mean)
        std = torch.as_tensor(std)
        std_inv = 1 / (std + torch.finfo(torch.float32).eps)
        mean_inv = -mean * std_inv
        super().__init__(mean=mean_inv, std=std_inv)

    def __call__(self, tensor):
        return super().__call__(tensor.clone())


def inv_transform(sample):
  inv_transform = Compose([InverseNormalize(mean, std),
                           Lambda(lambda x: torch.clip((x * 255.0).to(torch.uint8), min=0, max=255)),
                           Lambda(lambda x: x.permute(1, 2, 3, 0))])
  sample_np = inv_transform(sample).detach().cpu().numpy()
  return sample_np

 Visualize training sample. Notice that since `random_sampler=True` for the training dataset, each sample will randomly start from some time on each run.

In [None]:
sample = train_ds[0]
sample_video = sample[0]
label = sample[1]
print(label)
print(val_ds.class_to_idx)
print(train_ds.class_to_idx)

sample_video = inv_transform(sample_video)
clip = ImageSequenceClip(list(sample_video), fps=5)
clip.ipython_display()

In [None]:
train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_ds, batch_size=batch_size, shuffle=False)

## Modeling
X3D transfer learning model, where a Kinetics400 pre-trained model is load from Pytorch Hub. Then, the last fully connected is removed and replaced by a randomly initialized fully connected layer with 10 neurons for 10 classes.

In [None]:
class LitX3DTransfer(pl.LightningModule):
    def __init__(self, model_name: str, num_classes: int):
        super().__init__()
        self.model_name = model_name
        model = torch.hub.load("facebookresearch/pytorchvideo", model_name, pretrained=True)
        layers = list(model.blocks.children())
        # feature extractor
        backbone = layers[:-1]
        self.feature_extractor = nn.Sequential(*backbone)
        # classifier
        self.fc = layers[-1]
        num_filters = self.fc.proj.in_features
        self.num_classes = num_classes
        self.fc.proj = nn.Linear(in_features=num_filters, out_features=num_classes, bias=True)
        # step outputs
        self.training_step_outputs = []
        self.validation_step_outputs = []

    def forward(self, x):
        representations = self.feature_extractor(x)
        predictions = self.fc(representations)
        return predictions

    def training_step(self, batch, batch_idx):
        x, y = batch
        self.feature_extractor.eval()
        with torch.no_grad():
          x = self.feature_extractor(x)
        y_hat = self.fc(x)
        loss = F.cross_entropy(y_hat, y)
        acc = self.accuracy(y_hat, y)

        artifacts = {"loss": loss,
                     "acc": acc}

        self.log("train_acc", acc, prog_bar=True, on_epoch=True)
        self.log("train_loss", loss.item(), prog_bar=True, on_epoch=True)
        return loss

    def validation_step(self, batch, batch_idx):
        x, y = batch
        y_hat = self(x)
        loss = F.cross_entropy(y_hat, y)
        acc = self.accuracy(y_hat, y)

        artifacts = {"loss": loss,
                     "acc": acc}

        self.log("val_acc", acc, prog_bar=True)
        self.log("val_loss", loss.item(), prog_bar=True)
        return loss

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=lr)

    @staticmethod
    def accuracy(predictions, labels):
        classes = torch.argmax(predictions, dim=1)
        mean_acc = torch.mean((classes == labels).float())
        return mean_acc

## Training



In [None]:
x3d = LitX3DTransfer(model_name=model_name, num_classes=num_classes)

In [None]:
early_stop_cb = pl.callbacks.EarlyStopping(monitor="val_loss")
model_ckpt_cb = pl.callbacks.ModelCheckpoint(monitor="val_loss", mode="min")
wandb_logger = pl.loggers.WandbLogger(project="mia-starter-project")

# logs_dir = Path("/content/drive/MyDrive/mia_starter_project/logs") / run.name
trainer = pl.Trainer(max_epochs=epochs, callbacks=[early_stop_cb, model_ckpt_cb], logger=wandb_logger)
trainer.fit(model=x3d, train_dataloaders=train_loader, val_dataloaders=val_loader)

In [None]:
wandb.finish()

## Analysis

In [None]:
ckpt = torch.load("/content/drive/MyDrive/mia_starter_project/logs/smooth-voice-6/lightning_logs/version_0/checkpoints/epoch=4-step=75.ckpt", map_location=torch.device("cpu"))
infer_model = LitX3DTransfer(model_name, num_classes)
infer_model.load_state_dict(ckpt["state_dict"])
infer_model.eval()

In [None]:
!pip install --upgrade imageio

In [None]:
from IPython.display import HTML, display
import imageio
from PIL import Image


def show_batch_vid(batch_vid):
  vid = inv_transform(x)
  # im = Image.from_array(vid)
  # clip = ImageSequenceClip(list(vid), fps=5)
  # clip.write_videofile("check.mp4", logger=None)
  # clip = VideoFileClip("check.mp4").preview()
  vidd = [Image.fromarray(x).convert("RGB") for x in list(vid)]
  imageio.mimwrite("check.gif", vidd, duration=0.1)



def grid(video_filenames):
  # Create an HTML table to display the videos in a grid
  video_grid_html = "<table style='width:100%'>"

  # Set the number of columns in the grid (e.g., 2 columns)
  num_columns = 2

  for i, video_filename in enumerate(video_filenames):
      if i % num_columns == 0:
          video_grid_html += "<tr>"

      video_grid_html += f"<td><video width='182' height='182' controls><source src='{video_filename}' type='video/mp4'></video></td>"

      if (i + 1) % num_columns == 0 or (i + 1) == len(video_filenames):
          video_grid_html += "</tr>"

  video_grid_html += "</table>"

  # Display the video grid in Colab
  display(HTML(video_grid_html))

In [None]:
# predictions = []
# for x, y in val_ds:
#   show_batch_vid(x)
#   # y_hat = infer_model(x)
#   # pred = torch.argmax(y_hat, dim=1)
#   break

In [None]:
grid(["/content/check.mp4"])