# Imports

In [None]:
import os
from pathlib import Path

import torch
import numpy as np
import pandas as pd
import torchmetrics
import seaborn as sns
import torch.utils.data
import pytorchvideo.data
import pytorch_lightning
import torch.nn.functional as F
from matplotlib import pyplot as plt
from tqdm import tqdm_notebook as tqdm
from sklearn.metrics import confusion_matrix

from pytorchvideo.transforms import (
    ApplyTransformToKey,
    Normalize,
    RandomShortSideScale,
    ShortSideScale,
    UniformTemporalSubsample,
)
from torchvision.transforms import (
    Compose,
    Lambda,
    RandomCrop,
    CenterCrop,
    RandomHorizontalFlip,
)

# Prepare dataset

In [41]:
def move_videos_to_dir(labels: pd.DataFrame, out_dir: Path):
    in_dir = Path("data", "videos")
    for _, label in tqdm(labels.iterrows(), total=len(labels)):
        class_name = label["label"]
        class_dir = out_dir / class_name
        filename = label["youtube_id"] + ".mp4"
        file = in_dir / filename
        if file.is_file():
            if not class_dir.is_dir():
                Path.mkdir(class_dir)
            file.rename(class_dir / filename)

In [None]:
csv_filenames = {
    "train": "dancing-train.csv",
    "val": "dancing-validate.csv",
}
for phase, filename in csv_filenames.items():
    labels_df = pd.read_csv(f"data/{filename}")
    move_videos_to_dir(labels_df, Path("data", "videos", phase))

# Train models

## Init data module

In [1]:
class KineticsDataModule(pytorch_lightning.LightningDataModule):

  # Dataset configuration
  _DATA_PATH = "./data/videos"
  _CLIP_DURATION = 2  # Duration of sampled clip for each video
  _BATCH_SIZE = 4
  _NUM_WORKERS = 0  # Number of parallel processes fetching data

  def train_dataloader(self):
    """
    Create the Kinetics train partition from the list of video labels
    in {self._DATA_PATH}/train
    """
    train_transform = Compose([
        ApplyTransformToKey(
            key="video",
            transform=Compose([
                UniformTemporalSubsample(16),
                Lambda(lambda x: x / 255.0),
                Normalize((0.45, 0.45, 0.45), (0.225, 0.225, 0.225)),
                RandomShortSideScale(min_size=256, max_size=320),
                RandomCrop(224),
                RandomHorizontalFlip(p=0.5),
            ]),
        ),
    ])
    train_dataset = pytorchvideo.data.Kinetics(
        data_path=os.path.join(self._DATA_PATH, "train"),
        clip_sampler=pytorchvideo.data.make_clip_sampler("random", self._CLIP_DURATION),
        transform=train_transform,
        decode_audio=False,
    )
    return torch.utils.data.DataLoader(
        train_dataset,
        batch_size=self._BATCH_SIZE,
        num_workers=self._NUM_WORKERS,
    )

  def val_dataloader(self):
    """
    Create the Kinetics validation partition from the list of video labels
    in {self._DATA_PATH}/val
    """
    val_transform = Compose([
        ApplyTransformToKey(
            key="video",
            transform=Compose([
                UniformTemporalSubsample(16),
                Lambda(lambda x: x / 255.0),
                Normalize((0.45, 0.45, 0.45), (0.225, 0.225, 0.225)),
                ShortSideScale(224),
                CenterCrop(224),
                RandomHorizontalFlip(p=0.5),
            ]),
        ),
    ])
    val_dataset = pytorchvideo.data.Kinetics(
        data_path=os.path.join(self._DATA_PATH, "val"),
        clip_sampler=pytorchvideo.data.make_clip_sampler("uniform", self._CLIP_DURATION),
        transform=val_transform,
        decode_audio=False,
    )
    return torch.utils.data.DataLoader(
        val_dataset,
        batch_size=self._BATCH_SIZE,
        num_workers=self._NUM_WORKERS,
    )

  def get_classes(self):
      folder = os.path.join(self._DATA_PATH, "train")
      return [f for f in os.listdir(folder)]

## Init torch lightning trainer

In [7]:
class VideoClassificationLightningModule(pytorch_lightning.LightningModule):
    def __init__(self, classes, model_fn):
        super().__init__()
        self.model = model_fn()
        self.accuracy = torchmetrics.Accuracy(task='multiclass', num_classes=15)
        self.f1 = torchmetrics.F1Score(task="multiclass", average="weighted", num_classes=15)
        self.classes = classes

    def forward(self, x):
        return self.model(x)

    def training_step(self, batch, batch_idx):
        y_hat = self.model(batch["video"])
        loss = F.cross_entropy(y_hat, batch["label"])
        self.log("train_loss", loss.item(), batch_size=len(batch))
        return loss

    def validation_step(self, batch, batch_idx):
        preds = self.model(batch["video"])
        loss = F.cross_entropy(preds, batch["label"])
        self.accuracy(preds, batch["label"])
        self.f1(preds, batch["label"])
        self.log("val_loss", loss)
        return {"loss": loss, "preds": preds, "target": batch["label"]}

    def validation_epoch_end(self, outs):
        preds = torch.cat([tmp['preds'] for tmp in outs]).to('cpu').numpy()
        preds = np.argmax(preds, axis=1)
        preds = [self.classes[i] for i in preds]
        targets = torch.cat([tmp['target'] for tmp in outs]).to('cpu').numpy()
        targets = [self.classes[i] for i in targets]
        cf_matrix = confusion_matrix(targets, preds, labels=self.classes)

        plt.figure(figsize = (10, 7))
        fig_ = sns.heatmap(
            cf_matrix/np.sum(cf_matrix),
            annot=True,
            fmt='.1%',
            cmap='Blues'
        ).get_figure()
        plt.close(fig_)

        self.logger.experiment.add_figure(
            "Val confusion matrix",
            fig_,
            self.current_epoch,
        )

        self.log("val_acc_epoch", self.accuracy)
        self.log("val_f1_epoch", self.f1)


    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=1e-4)
        scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.8)
        outputs = {
            "optimizer": optimizer,
            "lr_scheduler": scheduler,
        }
        return outputs

## Init models

In [2]:
def make_kinetics_slow():
    model = torch.hub.load('facebookresearch/pytorchvideo', 'slow_r50', pretrained=False)
    model.blocks[:-1].requires_grad_(False)
    model.blocks[-1].proj = torch.nn.Linear(
        in_features=model.blocks[-1].proj.in_features,
        out_features=15
    )
    return model

def make_kinetics_x3d_m():
    model = torch.hub.load('facebookresearch/pytorchvideo', 'x3d_m', pretrained=True)
    model.blocks[:-1].requires_grad_(False)
    model.blocks[-1].proj = torch.nn.Linear(
        in_features=model.blocks[-1].proj.in_features,
        out_features=15
    )
    return model

def make_kinetics_mvit():
    model = torch.hub.load('facebookresearch/pytorchvideo', 'mvit_base_16x4', pretrained=True)
    model.blocks[:-1].requires_grad_(False)
    model.head.proj = torch.nn.Linear(
        in_features=model.head.proj.in_features,
        out_features=15
    )
    return model

## Train

In [8]:
def train(model_fn):
    data_module = KineticsDataModule()
    classification_module = VideoClassificationLightningModule(
        classes=data_module.get_classes(),
        model_fn=model_fn,
    )
    trainer = pytorch_lightning.Trainer(gpus=-1, max_epochs=3)
    trainer.fit(classification_module, data_module)

In [None]:
train(model_fn=make_kinetics_mvit)