In [None]:
# Requiremments installation

!pip install --quiet torch  timm==0.9.16 einops accelerate scikit-learn torchaudio torchvision
!pip install --quiet transformers evaluate decord 'git+https://github.com/facebookresearch/pytorchvideo.git'

import torch, os, math, random, time, json, shutil, glob
from pathlib import Path

from google.colab import drive
drive.mount('/content/drive')

In [13]:
# Model Configuration
# Replace dataset_root with a path toward split dataset

cfg = {
    "dataset_root": "/content/drive/MyDrive/CADAR Attack Video Dataset",
    "clip_duration": 13,
    "frames_per_clip": 8,
    "batch_size": 16,
    "num_workers": 0,
    "epochs": 10,
    "base_lr":  1e-4,
    "weight_decay": 5e-3,
    "num_classes": 5,
}

# Note: CUDA must be available to handle training
device = "cuda" if torch.cuda.is_available() else "cpu"

In [17]:
import glob

def get_class_counts (root):
    class_dirs = [d for d in os.listdir(root) if os.path.isdir(os.path.join(root, d))]
    counts = []
    for cls in sorted (class_dirs): # keep label order deterministi
      n = 0
      for ext in ('mp4', 'avi', 'mov'):
        n += len(glob.glob(os.path.join(root, cls, f'* {ext}')))
      counts.append(n)
    return counts
class_counts = get_class_counts(os.path.join(cfg ["dataset_root"], "train"))

In [18]:
import os, glob, random
import torch, numpy as np
from pathlib import Path
from torch.utils.data import Dataset, DataLoader
from pytorchvideo.data.encoded_video import EncodedVideo
from pytorchvideo.transforms import UniformTemporalSubsample
from torchvision.transforms import Resize

# Video transformation
def make_video_transform():
    resize = Resize((400, 400))
    mean   = torch.tensor([0.45,0.45,0.45]).view(3,1,1,1)
    std    = torch.tensor([0.225,0.225,0.225]).view(3,1,1,1)

    def video_transform(x: torch.Tensor):
        if x.ndim==4 and x.shape[-1]==3:
            x = x.permute(3,0,1,2)
        elif x.ndim==4 and x.shape[1]==3 and x.shape[0]!=3:
            x = x.permute(1,0,2,3)

        x = UniformTemporalSubsample(cfg["frames_per_clip"])(x)
        x = x[:, :2]

        C,T,H,W = x.shape
        x = x.contiguous().to(torch.float32) / 255.0
        x = torch.nn.functional.interpolate(
            x.view(C*T,1,H,W),
            size=(400,400),
            mode="bilinear",
            align_corners=False
        ).view(C,T,400,400)
        x = (x - mean.to(x.device)) / std.to(x.device)

        return x.permute(1,0,2,3).contiguous()

    return video_transform

video_transform = make_video_transform()

# Collects 1 clip per video for test and validation set
class FirstClipDataset(Dataset):
    def __init__(self, labeled_videos, clip_duration, transform):
        self.data = labeled_videos
        self.clip_duration = clip_duration
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        path, meta = self.data[idx]
        label = meta["label"]
        ev = EncodedVideo.from_path(path)
        clip = ev.get_clip(0.0, self.clip_duration)
        frames = clip["video"]
        frames = self.transform(frames)
        return {
            "video":      frames,
            "label":      torch.tensor(label, dtype=torch.long),
            "video_name": Path(path).name
        }

# Returns final dataloader
def make_loader(split):
    split_dir = os.path.join(cfg["dataset_root"], split)
    class_names = sorted(d for d in os.listdir(split_dir)
                         if os.path.isdir(os.path.join(split_dir, d)))
    class_to_idx = {c:i for i,c in enumerate(class_names)}

    pool = []
    for cls in class_names:
        cls_dir = os.path.join(split_dir, cls)
        for ext in (".mp4",".avi",".mov"):
            for fp in glob.glob(os.path.join(cls_dir,f"*{ext}")):
                pool.append((fp, {"label": class_to_idx[cls]}))

    if split=="train":
        lbls = [m["label"] for _,m in pool]
        counts = np.bincount(lbls, minlength=len(class_names))
        w_cls  = 1.0/(counts+1e-6)
        w_samp = np.array([w_cls[l] for l in lbls])
        p_samp = w_samp / w_samp.sum()
        N = len(pool)*2
        idxs = np.random.choice(len(pool), size=N, p=p_samp)
        videos = [pool[i] for i in idxs]
        random.shuffle(videos)
    else:
        videos = pool

    ds = FirstClipDataset(
        labeled_videos = videos,
        clip_duration  = cfg["clip_duration"],
        transform      = video_transform
    )
    return DataLoader(
        ds,
        batch_size  = cfg["batch_size"],
        shuffle     = (split=="train"),
        num_workers = cfg["num_workers"],
        pin_memory  = True,
    )

train_loader = make_loader("train")
val_loader   = make_loader("val")
test_loader  = make_loader("test")

In [None]:
# Build ViViT model

from transformers import VivitForVideoClassification
from torch.optim import AdamW
from accelerate import Accelerator
from torch.cuda.amp import GradScaler, autocast

model = VivitForVideoClassification.from_pretrained(
    "google/vivit-b-16x2-kinetics400",
    ignore_mismatched_sizes=True,
    num_labels = cfg["num_classes"],
    torch_dtype = torch.float32
).to(device)

acc = Accelerator(mixed_precision="fp16")

# Build finite weight vector
eps      = 1e-6
counts   = torch.tensor(class_counts, dtype=torch.float32)
weights  = 1.0 / (counts + eps)
weights  = weights / weights.sum()

# Focal loss method
class FocalLoss(torch.nn.Module):
    def __init__(self, gamma=2.0, reduction='mean'):
        super().__init__()
        self.gamma = gamma
        self.reduction = reduction
    def forward(self, logits, targets):
        ce  = torch.nn.functional.cross_entropy(logits, targets, reduction='none')
        pt  = torch.exp(-ce)
        fl  = (1 - pt) ** self.gamma * ce
        return fl.mean() if self.reduction == 'mean' else fl.sum()

criterion = FocalLoss(gamma=2.0).to(device)

# Build optimizer, scheduler
optimizer  = AdamW(model.parameters(), lr=cfg["base_lr"],
                   weight_decay=cfg["weight_decay"])

from transformers import get_cosine_schedule_with_warmup
import math

def steps_per_epoch(loader, batch_size):
    try:
        return len(loader)
    except TypeError:
        ds = loader.dataset
        for attr in ("_labeled_videos", "labeled_videos"):
            if hasattr(ds, attr):
                n_samples = len(getattr(ds, attr))
                return math.ceil(n_samples / batch_size)
        raise RuntimeError("Cannot infer dataset size for warmup calc.")

sp_epoch = steps_per_epoch(train_loader, cfg["batch_size"])
total_steps = sp_epoch * cfg["epochs"]
warmup_steps = int(0.1 * total_steps)

scheduler = get_cosine_schedule_with_warmup(
    optimizer,
    num_warmup_steps=warmup_steps,
    num_training_steps=total_steps,
)

model, optimizer, scheduler = acc.prepare(model, optimizer, scheduler)
scaler = GradScaler()

In [None]:
# Class accuracy computation for main training loop

def compute_per_class_acc(y_true, y_pred, n_classes):
    correct_per_class = np.zeros(n_classes, dtype=np.int32)
    total_per_class   = np.zeros(n_classes, dtype=np.int32)
    for true, pred in zip(y_true, y_pred):
        total_per_class[true] += 1
        if true == pred:
            correct_per_class[true] += 1
    return [(correct_per_class[i] / total_per_class[i] if total_per_class[i] > 0 else 0.0)
                     for i in range(n_classes)]

In [None]:
from tqdm.auto import tqdm
from math import ceil
import numpy as np
from collections import Counter
from sklearn.metrics import confusion_matrix
import textwrap, torch

def run_epoch(loader, *, train=True, debug=False):

    model.train(train) if train else model.eval()

    epoch_loss, correct_tot, seen_tot = 0.0, 0, 0
    per_class_correct = np.zeros(cfg["num_classes"], dtype=np.int64)
    per_class_seen    = np.zeros(cfg["num_classes"], dtype=np.int64)
    all_true, all_pred = [], []

    try:
        total_batches = len(loader)
    except TypeError:
        total_batches = ceil(len(loader.dataset._labeled_videos) / loader.batch_size)

    loop = tqdm(loader, desc="Train" if train else "Val",
                total=total_batches, leave=False)

    for b_idx, batch in enumerate(loop):
        vids   = batch["video"].to(device)
        labels = batch["label"].to(device)

        if vids.shape[2] == 1:
            vids = vids.repeat(1, 1, 3, 1, 1)

        if train:
            optimizer.zero_grad(set_to_none=True)

        with torch.cuda.amp.autocast():
            logits = model(vids, interpolate_pos_encoding=True).logits
            loss   = criterion(logits, labels)

        if train:
            acc.backward(loss)
            optimizer.step()
            scheduler.step()

        # Performance metrics computation
        preds = logits.argmax(1)
        bs    = labels.size(0)

        epoch_loss   += loss.item() * bs
        correct_tot  += (preds == labels).sum().item()
        seen_tot     += bs

        for c in range(cfg["num_classes"]):
            mask = (labels == c)
            per_class_seen[c]    += mask.sum().item()
            per_class_correct[c] += (preds[mask] == c).sum().item()

        all_true.extend(labels.cpu().tolist())
        all_pred.extend(preds.cpu().tolist())

        overall_acc = correct_tot / seen_tot
        class_accs  = [ per_class_correct[i] / per_class_seen[i]
                        if per_class_seen[i] else 0
                        for i in range(cfg["num_classes"]) ]

        post = {"loss": f"{loss.item():.4f}", "all": f"{overall_acc:.3f}"}
        for i, a in enumerate(class_accs):
            post[f"C{i}"] = f"{a:.3f}"
        loop.set_postfix(post)

    # Epoch aggregates
    epoch_loss  /= seen_tot
    overall_acc  = correct_tot / seen_tot
    class_accs   = [ per_class_correct[i] / per_class_seen[i]
                     if per_class_seen[i] else 0
                     for i in range(cfg["num_classes"]) ]

    return epoch_loss, overall_acc, class_accs

In [None]:
# Main training loop
use_amp=True

for epoch in range(cfg["epochs"]):
    tr_loss, tr_acc, tr_class_acc = run_epoch(train_loader, train=True)

    with torch.no_grad():
        vl_loss, vl_acc, vl_class_acc = run_epoch(val_loader, train=False)

    # Optional metrics output
    print(f"Epoch {epoch+1}/{cfg['epochs']}")
    print("  Train loss {:.4f}  overall acc {:.3f}".format(tr_loss, tr_acc))
    print("  Val   loss {:.4f}  overall acc {:.3f}".format(vl_loss, vl_acc))
    print("  Per‑class train acc:", ["{:.3f}".format(a) for a in tr_class_acc])
    print("  Per‑class val acc:", " ".join([f"C{i}={a:.3f}" for i, a in enumerate(vl_class_acc)]))

In [None]:
# Test set performance evaluation

from sklearn.metrics import precision_recall_fscore_support, confusion_matrix, classification_report
import pandas as pd
import numpy as np
import torch

model.eval()
all_preds, all_labels = [], []

use_cuda = torch.cuda.is_available()

with torch.no_grad(), torch.amp.autocast("cuda", enabled=use_cuda):
    for batch in test_loader:
        vids   = batch["video"].to(device)
        labels = batch["label"].to(device)

        logits = model(vids, interpolate_pos_encoding=True).logits

        all_preds.append(logits.argmax(1).cpu())
        all_labels.append(labels.cpu())

y_pred  = torch.cat(all_preds).numpy()
y_true  = torch.cat(all_labels).numpy()

prec, rec, f1, _ = precision_recall_fscore_support(
    y_true, y_pred, labels=np.arange(5), zero_division=0
)

cm = confusion_matrix(y_true, y_pred, labels=np.arange(5))
N  = cm.sum()
tp = np.diag(cm)
fp = cm.sum(axis=0) - tp
fn = cm.sum(axis=1) - tp
tn = N - tp - fp - fn
acc = (tp + tn) / N

class_names = ["Removal", "No Attack", "Visual Modification", "Text Modification", "Addition"]

df = pd.DataFrame({
    "Attack Class": class_names,
    "Accuracy":  acc,
    "Precision": prec,
    "F1 Score":  f1,
})

pd.set_option("display.precision", 5)
print("\nPer‑class metrics (for Table 2):")
print(df.to_string(index=False))

print("\nDetailed classification report:")
print(classification_report(y_true, y_pred, target_names=class_names, digits=5))

print("\nConfusion matrix:")
print(cm)

In [None]:
# Optional model download

OUT_PATH = "" # Drive path

torch.save(model.state_dict(), OUT_PATH)

# Optional local download
from google.colab import files
files.download(OUT_PATH)