In [6]:
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [7]:
!unzip "/content/drive/MyDrive/data_trimmed_clean.zip" -d /content/

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
  inflating: /content/data_trimmed/Train/Shooting/Shooting042_x264_1030.png  
 extracting: /content/data_trimmed/Train/Shooting/Shooting020_x264_2680.png  
  inflating: /content/data_trimmed/Train/Shooting/Shooting029_x264_1260.png  
  inflating: /content/data_trimmed/Train/Shooting/Shooting009_x264_2690.png  
 extracting: /content/data_trimmed/Train/Shooting/Shooting014_x264_2740.png  
  inflating: /content/data_trimmed/Train/Shooting/Shooting006_x264_4510.png  
  inflating: /content/data_trimmed/Train/Shooting/Shooting006_x264_11010.png  
  inflating: /content/data_trimmed/Train/Shooting/Shooting006_x264_9020.png  
  inflating: /content/data_trimmed/Train/Shooting/Shooting005_x264_1860.png  
 extracting: /content/data_trimmed/Train/Shooting/Shooting052_x264_4560.png  
  inflating: /content/data_trimmed/Train/Shooting/Shooting009_x264_130.png  
 extracting: /content/data_trimmed/Train/Shooting/Shooting027_x264_140.png  


In [8]:
# ONLY USED FOR TESTING AND DEBUGGING - for final model we will use the whole dataset

import os
import shutil

def create_debug_subset_sequential(source_dir, dest_dir, train_limit=4000, test_limit=1500):
    if os.path.exists(dest_dir):
        shutil.rmtree(dest_dir)
    os.makedirs(dest_dir, exist_ok=True)

    for split, limit in [("Train", train_limit), ("Test", test_limit)]:
        src_split_path = os.path.join(source_dir, split)
        dst_split_path = os.path.join(dest_dir, split)
        os.makedirs(dst_split_path, exist_ok=True)

        for class_name in os.listdir(src_split_path):
            class_src = os.path.join(src_split_path, class_name)
            class_dst = os.path.join(dst_split_path, class_name)
            os.makedirs(class_dst, exist_ok=True)

            valid_images = sorted([f for f in os.listdir(class_src) if f.endswith(".png") and not f.startswith("._")])
            selected_images = valid_images[:limit]

            for img in selected_images:
                shutil.copy(os.path.join(class_src, img), os.path.join(class_dst, img))

create_debug_subset_sequential("/content/data_trimmed", "/content/data_trimmed_debug", train_limit=4000, test_limit=1500)

# paths
train_dir = "/content/data_trimmed_debug/Train"
test_dir = "/content/data_trimmed_debug/Test"

In [9]:
import os
import shutil
from collections import defaultdict

def extract_video_id(filename):
    """
    Extracts the video ID by removing the frame suffix (assumes last underscore + digits is the frame number).
    For example:
        Normal_Videos_003_x264_0.png → Normal_Videos_003_x264
        Assault_001_frame_010.png → Assault_001
    """
    parts = filename.rsplit("_", 1)
    return parts[0] if len(parts) == 2 else filename.split("_frame")[0]

def restructure_dataset(src_dir, dst_dir):
    os.makedirs(dst_dir, exist_ok=True)

    for class_name in os.listdir(src_dir):
        class_path = os.path.join(src_dir, class_name)
        if not os.path.isdir(class_path):
            continue

        print(f"Processing class: {class_name}")
        video_frame_dict = defaultdict(list)

        for fname in os.listdir(class_path):
            if not fname.endswith('.png'):
                continue

            video_id = extract_video_id(fname)
            video_frame_dict[video_id].append(fname)

        for video_id, frames in video_frame_dict.items():
            video_folder_path = os.path.join(dst_dir, class_name, video_id)
            os.makedirs(video_folder_path, exist_ok=True)

            for frame in frames:
                src = os.path.join(class_path, frame)
                dst = os.path.join(video_folder_path, frame)
                shutil.copy2(src, dst)

    print(f"Done restructuring: {src_dir} → {dst_dir}\n")

# Paths
train_dir = "/content/data_trimmed_debug/Train"
test_dir = "/content/data_trimmed_debug/Test"

train_dst = "/content/data_trimmed_debug_restructured/Train"
test_dst = "/content/data_trimmed_debug_restructured/Test"

# Run restructuring
restructure_dataset(train_dir, train_dst)
restructure_dataset(test_dir, test_dst)

Processing class: Arson
Processing class: Arrest
Processing class: Shooting
Processing class: Burglary
Processing class: Explosion
Processing class: NormalVideos
Processing class: Assault
Processing class: Fighting
Done restructuring: /content/data_trimmed_debug/Train → /content/data_trimmed_debug_restructured/Train

Processing class: Arson
Processing class: Arrest
Processing class: Shooting
Processing class: Burglary
Processing class: Explosion
Processing class: NormalVideos
Processing class: Assault
Processing class: Fighting
Done restructuring: /content/data_trimmed_debug/Test → /content/data_trimmed_debug_restructured/Test



In [1]:
IMG_HEIGHT = 64
IMG_WIDTH = 64
SEQUENCE_LENGTH = 16  # Pad/truncate each video to this many frames
BATCH_SIZE = 8
CLASS_NAMES = ['Arrest','Arson','Assault','Burglary','Explosion','Fighting','NormalVideos','Shooting']
NUM_CLASSES = len(CLASS_NAMES)

In [2]:
import tensorflow as tf
import numpy as np
import os
from glob import glob
from tensorflow.keras.utils import to_categorical
from PIL import Image

def load_video_frames(video_dir, sequence_length, img_size):
    # Get sorted list of frame paths
    frame_paths = sorted(glob(os.path.join(video_dir, "*.png")))

    frames = []
    for path in frame_paths[:sequence_length]:
        img = Image.open(path).resize(img_size)
        frame = np.array(img).astype("float32") / 255.0  # Normalize to [0, 1]
        frames.append(frame)

    # Pad with zeros if not enough frames
    while len(frames) < sequence_length:
        frames.append(np.zeros((img_size[1], img_size[0], 3), dtype="float32"))

    return np.stack(frames)

def get_video_paths_and_labels(base_dir, class_names):
    video_paths = []
    labels = []

    for class_index, class_name in enumerate(class_names):
        class_path = os.path.join(base_dir, class_name)
        for video_folder in os.listdir(class_path):
            video_path = os.path.join(class_path, video_folder)
            if os.path.isdir(video_path):
                video_paths.append(video_path)
                labels.append(class_index)

    return video_paths, labels

def build_video_dataset(base_dir, sequence_length, img_size, batch_size, class_names, shuffle=True):
    video_paths, labels = get_video_paths_and_labels(base_dir, class_names)

    def generator():
        for video_path, label in zip(video_paths, labels):
            frames = load_video_frames(video_path, sequence_length, img_size)
            yield frames, to_categorical(label, num_classes=len(class_names))

    dataset = tf.data.Dataset.from_generator(
        generator,
        output_signature=(
            tf.TensorSpec(shape=(sequence_length, img_size[1], img_size[0], 3), dtype=tf.float32),
            tf.TensorSpec(shape=(len(class_names),), dtype=tf.float32)
        )
    )

    if shuffle:
        dataset = dataset.shuffle(buffer_size=len(video_paths))

    return dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)

In [3]:
train_seq_dir = "/content/data_trimmed_debug_restructured/Train"
test_seq_dir = "/content/data_trimmed_debug_restructured/Test"

train_dataset = build_video_dataset(
    base_dir=train_seq_dir,
    sequence_length=SEQUENCE_LENGTH,
    img_size=(IMG_WIDTH, IMG_HEIGHT),
    batch_size=BATCH_SIZE,
    class_names=CLASS_NAMES,
    shuffle=True
)

test_dataset = build_video_dataset(
    base_dir=test_seq_dir,
    sequence_length=SEQUENCE_LENGTH,
    img_size=(IMG_WIDTH, IMG_HEIGHT),
    batch_size=BATCH_SIZE,
    class_names=CLASS_NAMES,
    shuffle=False
)

In [4]:
for x, y in train_dataset.take(1):
    print(x.shape)  # (8, 16, 64, 64, 3)
    print(y.shape)  # (8, 8)

(8, 16, 64, 64, 3)
(8, 8)


## DINOv2 + LSTM (using PyTorch)

In [14]:
## run just once on colab!
!pip install transformers datasets timm

Collecting datasets
  Downloading datasets-3.5.0-py3-none-any.whl.metadata (19 kB)
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting xxhash (from datasets)
  Downloading xxhash-3.5.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting multiprocess<0.70.17 (from datasets)
  Downloading multiprocess-0.70.16-py311-none-any.whl.metadata (7.2 kB)
Collecting fsspec<=2024.12.0,>=2023.1.0 (from fsspec[http]<=2024.12.0,>=2023.1.0->datasets)
  Downloading fsspec-2024.12.0-py3-none-any.whl.metadata (11 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch->timm)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch->timm)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch->timm)
  Downloa

In [5]:
import os
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision.io import read_image
from torchvision.transforms import Compose, Resize, ToTensor
from transformers import AutoImageProcessor, AutoModel
from PIL import Image
from glob import glob
from sklearn.preprocessing import LabelEncoder
import random

# Constants
SEQUENCE_LENGTH = 16
IMG_SIZE = 224
NUM_CLASSES = 8
BATCH_SIZE = 4
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
CLASS_NAMES = ['Arrest','Arson','Assault','Burglary','Explosion','Fighting','NormalVideos','Shooting']

# Label encoder
label_encoder = LabelEncoder()
label_encoder.fit(CLASS_NAMES)

In [7]:
class VideoDataset(Dataset):
    def __init__(self, base_dir, sequence_length=16, image_size=224):
        self.sequence_length = sequence_length
        self.image_size = image_size
        self.samples = []
        self.processor = AutoImageProcessor.from_pretrained('facebook/dinov2-base')

        for class_name in os.listdir(base_dir):
            class_path = os.path.join(base_dir, class_name)
            if not os.path.isdir(class_path):
                continue
            for video_folder in os.listdir(class_path):
                video_path = os.path.join(class_path, video_folder)
                if os.path.isdir(video_path):
                    frame_paths = sorted(glob(os.path.join(video_path, '*.png')))
                    if len(frame_paths) > 0:
                        self.samples.append((frame_paths, label_encoder.transform([class_name])[0]))

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        frame_paths, label = self.samples[idx]
        num_frames = len(frame_paths)

        # Sample or pad frames
        if num_frames >= self.sequence_length:
            selected = sorted(random.sample(frame_paths, self.sequence_length))
        else:
            selected = sorted(frame_paths)
            while len(selected) < self.sequence_length:
                selected.append(selected[-1])

        images = [Image.open(p).convert("RGB").resize((self.image_size, self.image_size)) for p in selected]
        processed = self.processor(images=images, return_tensors="pt")
        pixel_values = processed['pixel_values'].squeeze(0)  # shape: (T, 3, 224, 224)
        label = torch.tensor(label, dtype=torch.long)
        return pixel_values, label

# Paths
train_dir = "/content/data_trimmed_debug_restructured/Train"
test_dir = "/content/data_trimmed_debug_restructured/Test"

# Datasets & loaders
train_dataset = VideoDataset(train_dir, sequence_length=SEQUENCE_LENGTH, image_size=IMG_SIZE)
test_dataset = VideoDataset(test_dir, sequence_length=SEQUENCE_LENGTH, image_size=IMG_SIZE)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)

In [8]:
class DINOv2_LSTM(nn.Module):
    def __init__(self, model_name='facebook/dinov2-base', hidden_dim=128, num_classes=NUM_CLASSES):
        super().__init__()
        self.vit = AutoModel.from_pretrained(model_name)
        self.vit.eval()
        for param in self.vit.parameters():
            param.requires_grad = False

        self.lstm = nn.LSTM(input_size=768, hidden_size=hidden_dim, batch_first=True)
        self.dropout = nn.Dropout(0.3)
        self.fc = nn.Linear(hidden_dim, num_classes)

    def forward(self, x):  # x: (B, T, 3, 224, 224)
        B, T, C, H, W = x.shape
        x = x.view(B * T, C, H, W)

        with torch.no_grad():
            vit_out = self.vit(pixel_values=x).last_hidden_state[:, 0]  # CLS token

        x_seq = vit_out.view(B, T, -1)  # (B, T, 768)
        x_seq, _ = self.lstm(x_seq)
        x_seq = self.dropout(x_seq[:, -1, :])  # last time step
        return self.fc(x_seq)

# Model
model = DINOv2_LSTM().to(DEVICE)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

config.json:   0%|          | 0.00/548 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/346M [00:00<?, ?B/s]

In [9]:
from sklearn.metrics import accuracy_score, roc_auc_score
import torch.nn.functional as F

def evaluate(model, loader, criterion, split='Test'):
    model.eval()
    total_loss = 0.0
    total_preds = []
    total_probs = []
    total_labels = []

    with torch.no_grad():
        for x, y in loader:
            x, y = x.to(DEVICE), y.to(DEVICE)
            logits = model(x)
            probs = F.softmax(logits, dim=1)
            loss = criterion(logits, y)

            total_loss += loss.item()
            total_probs.append(probs.cpu())
            total_preds.append(torch.argmax(probs, dim=1).cpu())
            total_labels.append(y.cpu())

    y_true = torch.cat(total_labels).numpy()
    y_pred = torch.cat(total_preds).numpy()
    y_probs = torch.cat(total_probs).numpy()

    acc = accuracy_score(y_true, y_pred)

    try:
        auc = roc_auc_score(y_true, y_probs, multi_class='ovr', average='macro')
    except ValueError:
        auc = 0.0

    avg_loss = total_loss / len(loader)
    print(f"{split} Loss: {avg_loss:.4f} | Accuracy: {acc:.4f} | AUC: {auc:.4f}")


for epoch in range(1):
    model.train()
    running_loss = 0.0
    for x, y in train_loader:
        x, y = x.to(DEVICE), y.to(DEVICE)
        optimizer.zero_grad()
        logits = model(x)
        loss = criterion(logits, y)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

    avg_train_loss = running_loss / len(train_loader)
    print(f"\nEpoch {epoch+1}, Train Loss: {avg_train_loss:.4f}")
    evaluate(model, train_loader, criterion, split='Train')
    evaluate(model, test_loader, criterion, split='Test')


Epoch 1, Train Loss: 2.1125
Train Loss: 1.9961 | Accuracy: 0.2667 | AUC: 0.6938
Test Loss: 2.1345 | Accuracy: 0.0909 | AUC: 0.4611


## DINOv2 + Transformer Encoder (using PyTorch)

In [10]:
import torch
import torch.nn as nn
from transformers import AutoModel

class DINOv2_Transformer(nn.Module):
    def __init__(self, model_name='facebook/dinov2-base', hidden_dim=768, num_classes=8, num_heads=4, ff_dim=1024, dropout=0.1):
        super().__init__()

        # DINOv2 backbone
        self.vit = AutoModel.from_pretrained(model_name)
        self.vit.eval()
        for param in self.vit.parameters():
            param.requires_grad = False

        # Transformer Encoder block
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=hidden_dim,
            nhead=num_heads,
            dim_feedforward=ff_dim,
            dropout=dropout,
            batch_first=True
        )
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=1)

        # Final classifier
        self.dropout = nn.Dropout(dropout)
        self.fc = nn.Linear(hidden_dim, num_classes)

    def forward(self, x):  # x: (B, T, 3, 224, 224)
        B, T, C, H, W = x.shape
        x = x.view(B * T, C, H, W)

        with torch.no_grad():
            vit_out = self.vit(pixel_values=x).last_hidden_state[:, 0]  # CLS token

        x_seq = vit_out.view(B, T, -1)  # (B, T, 768)
        x_encoded = self.transformer_encoder(x_seq)  # (B, T, 768)
        x_pooled = x_encoded.mean(dim=1)  # average over time
        return self.fc(self.dropout(x_pooled))

model = DINOv2_Transformer().to(DEVICE)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

In [11]:
for epoch in range(8):
    model.train()
    running_loss = 0.0
    for x, y in train_loader:
        x, y = x.to(DEVICE), y.to(DEVICE)
        optimizer.zero_grad()
        logits = model(x)
        loss = criterion(logits, y)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

    avg_train_loss = running_loss / len(train_loader)
    print(f"\nEpoch {epoch+1}, Train Loss: {avg_train_loss:.4f}")
    evaluate(model, train_loader, criterion, split='Train')
    evaluate(model, test_loader, criterion, split='Test')


Epoch 1, Train Loss: 2.1344
Train Loss: 1.2298 | Accuracy: 0.6286 | AUC: 0.9179
Test Loss: 1.9764 | Accuracy: 0.2955 | AUC: 0.6358

Epoch 2, Train Loss: 1.2078
Train Loss: 0.7310 | Accuracy: 0.8190 | AUC: 0.9948
Test Loss: 1.9461 | Accuracy: 0.3409 | AUC: 0.6479

Epoch 3, Train Loss: 0.7406
Train Loss: 0.4261 | Accuracy: 0.9905 | AUC: 0.9995
Test Loss: 1.9916 | Accuracy: 0.2955 | AUC: 0.6613

Epoch 4, Train Loss: 0.4058
Train Loss: 0.2217 | Accuracy: 1.0000 | AUC: 1.0000
Test Loss: 2.0073 | Accuracy: 0.3409 | AUC: 0.6750

Epoch 5, Train Loss: 0.2381
Train Loss: 0.1347 | Accuracy: 1.0000 | AUC: 1.0000
Test Loss: 2.1880 | Accuracy: 0.2727 | AUC: 0.6475

Epoch 6, Train Loss: 0.1242
Train Loss: 0.0752 | Accuracy: 1.0000 | AUC: 1.0000
Test Loss: 2.2460 | Accuracy: 0.3182 | AUC: 0.6649

Epoch 7, Train Loss: 0.0862
Train Loss: 0.0534 | Accuracy: 1.0000 | AUC: 1.0000
Test Loss: 2.3812 | Accuracy: 0.2727 | AUC: 0.6606

Epoch 8, Train Loss: 0.0606
Train Loss: 0.0374 | Accuracy: 1.0000 | AUC: 1.