# Import Modules

In [1]:
import os 
import numpy as np 
import pandas as pd
import cv2
import torch 
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader,Dataset, random_split
from glob import glob
import torchvision.transforms as T
from tqdm import tqdm
from torchmetrics import Accuracy

# Data Preprocessing 

In [2]:
class GaussianNoise:
    def __init__(self, std=0.03):
        self.std = std
        
    def __call__(self, tensor):
        if self.std == 0:
            return tensor
            # randn samples from normal distribution(mu=0 , std=1)
        return tensor + torch.randn(tensor.size()) * self.std
        
transforms_train=T.Compose([
    T.ToPILImage(),
    T.Resize((224,224)),
    T.ToTensor(),
    T.RandomHorizontalFlip(p=.5),
    T.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    T.RandomRotation(15),
    T.Normalize(mean=[0.485, 0.456, 0.406],std=[0.229, 0.224, 0.225]), # (mean, Var); Is specified ! As we normalize frames according to mean, var from image net dist, since we'll use pretrained Swin
    #mostly trained on imagenet.
    GaussianNoise(std=.03)
])

transforms_val=T.Compose([
    T.ToPILImage(),
    T.Resize((224,224)),
    T.ToTensor(),
    T.Normalize(mean=[0.485, 0.456, 0.406],std=[0.229, 0.224, 0.225]),  
])

        
def extract_frames(video,frame_count):
    # ~ 30 FPS => !s has 30 frames, so we're considering 16 frames , ~=50% of a frame
    # That is >.5s (a moment) , rather than entire frame (redundant)
    vid=cv2.VideoCapture(video)
    total_frames=int(vid.get(cv2.CAP_PROP_FRAME_COUNT))

    # For higher probability to detext cpture from monment , we 
    # select evenly spaced frames in time series. (Uniform Sampling)

    frame_indices=list(torch.linspace(0,total_frames-1,steps=frame_count).long().numpy())
    frames=[]
    
    for i in range(total_frames):
        ret, frame = vid.read() # captures each frame
        if not ret:
            break
        if i in frame_indices: # if frame ind considered
            frame = cv2.cvtColor(frame,cv2.COLOR_BGR2RGB)  # Convert BGR to RGB
            frames.append(frame)
            
    vid.release()
    return frames

In [3]:
#Pytorch Custom Dataset Template expectation

"""class YourDataset(Dataset):
    def __init(self,...):
        #load path, labels, transforms

    def __len__(self):
        return len(samples)

    def __getitem__()"""

'class YourDataset(Dataset):\n    def __init(self,...):\n        #load path, labels, transforms\n\n    def __len__(self):\n        return len(samples)\n\n    def __getitem__()'

In [4]:
class VD(Dataset):
    def __init__(self, video_paths, labels, num_frames=16,train=1):
        self.video_paths = video_paths
        self.labels = labels
        self.num_frames = num_frames
        self.train_flag=train

    def __len__(self):
        return len(self.video_paths)

    def __getitem__(self, idx):
        video_path = self.video_paths[idx]
        label = self.labels[idx]

        frames = extract_frames(video_path, self.num_frames)
        try:
            if self.train_flag==1:
                processed_frames = [transforms_train(frame) for frame in frames]
            else:
                processed_frames = [transforms_val(frame) for frame in frames]
            
            video_tensor = torch.stack(processed_frames)
            # print(processed_frames[0].shape)
        
        except Exception as e: # Skipping the corrupted videos encountered during training
            print(f"Skipping corrupted video at index {idx}: {e}")
            return self.__getitem__((idx + 1) % len(self))  # try next video

        return video_tensor, torch.tensor(label, dtype=torch.long)

# Prepare Train , Val , Test Sets

In [5]:
def load_split_dataset(root_dir, num_frames=16, train_ratio=0.8, val_ratio=0.195):
    import random
    samples = []
    labels = []
    class_map = {"Violence":1 , "NonViolence":0}

    for label_name, label in class_map.items():
        folder = os.path.join(root_dir, label_name)
        video_files = glob(os.path.join(folder, '*.mp4'))
        samples.extend(video_files) # add list to another list using extend
        labels.extend([label] * len(video_files))

    # Shuffle before splitting
    combined = list(zip(samples, labels))
    random.shuffle(combined)
    samples[:], labels[:] = zip(*combined)

    total = len(samples)
    train_end = int(total * train_ratio)
    val_end = train_end + int(total * val_ratio)

    train_dataset = VD(samples[:train_end], labels[:train_end],num_frames,train=1)
    val_dataset = VD(samples[train_end:val_end], labels[train_end:val_end],num_frames, train=0)
    test_dataset = VD(samples[val_end:], labels[val_end:], num_frames,train=0)

    return train_dataset, val_dataset, test_dataset

In [6]:
dataset_path="/kaggle/input/real-life-violence-situations-dataset/Real Life Violence Dataset"
train_ds, val_ds , test_ds =  load_split_dataset(dataset_path,num_frames=16)

train_loader= DataLoader(train_ds,batch_size=4,shuffle=True,num_workers=4)
val_loader= DataLoader(val_ds,batch_size=4,shuffle=False , num_workers=4)
test_loader=DataLoader(test_ds,batch_size=4,shuffle=False , num_workers=4)

In [7]:
for batch_videos, batch_labels in train_loader:
    print("Train batch shape:", batch_videos.shape)  # (B, T, C, H, W)
    print("Train labels:", batch_labels)
    break
for batch_videos, batch_labels in val_loader:
    print("Train batch shape:", batch_videos.shape)  # (B, T, C, H, W)
    print("Train labels:", batch_labels)
    break
for batch_videos, batch_labels in test_loader:
    print("Train batch shape:", batch_videos.shape)  # (B, T, C, H, W)
    print("Train labels:", batch_labels)
    break

Train batch shape: torch.Size([4, 16, 3, 224, 224])
Train labels: tensor([0, 0, 0, 0])
Train batch shape: torch.Size([4, 16, 3, 224, 224])
Train labels: tensor([1, 1, 1, 0])
Train batch shape: torch.Size([4, 16, 3, 224, 224])
Train labels: tensor([1, 1, 0, 1])


# Model Build

**Video-->Normalized Frames ---> Swin S Transformer (Train head , freeze Rest) ---> Prediction**

In [23]:
# An advanced version of swin 2D , here temporal and spatial dependencies are jointly computed, instead of 
# independent frame by frame computation in swin2D (additional LSTM/Transformer nedded for temporal dependencies).

from torchvision.models.video import swin3d_t

class VDC(nn.Module):
    def __init__(self,num_classes=1):
        super(VDC,self).__init__()

        self.backbone=swin3d_t(progress=True)

        #Freeze the backbone
        for params in self.backbone.parameters():
            params.requires_grad=False
        # Unfreeze the selectives
        for name, param in self.backbone.features[6].named_parameters():
            param.requires_grad = True

    # Unfreeze final normalization layer
        for name, param in self.backbone.norm.named_parameters():
            param.requires_grad = True
            
        #By default all layers are requires_grad= True : Train the head
        # small head
        self.backbone.head=nn.Sequential(
            # 768 is a dimensional vector , not feature map alike in CNNs
            # [b,t,c,h,w]=====[4,16,224,224]------->[4,16,768,7,7]--->Avg.Pooling---->[768*1] vector dim
            nn.Linear( 768, 192), 
            nn.BatchNorm1d(192),
            nn.ReLU(),
            nn.Linear(192,32),
            nn.BatchNorm1d(32),
            nn.ReLU(),
            nn.Linear(32,num_classes)
            
        ) # Freezing weights except the classifier head...-> Will only train the head .

    def forward(self,x):
        return self.backbone(x).squeeze(1)

device= "cuda" if torch.cuda.is_available() else "cpu"
print(device)

model=VDC().to(device)

cuda


In [24]:
print("Trainanble layers are:")
for name, param in model.named_parameters():
        if param.requires_grad:
            print(name)

Trainanble layers are:
backbone.features.6.0.norm1.weight
backbone.features.6.0.norm1.bias
backbone.features.6.0.attn.relative_position_bias_table
backbone.features.6.0.attn.qkv.weight
backbone.features.6.0.attn.qkv.bias
backbone.features.6.0.attn.proj.weight
backbone.features.6.0.attn.proj.bias
backbone.features.6.0.norm2.weight
backbone.features.6.0.norm2.bias
backbone.features.6.0.mlp.0.weight
backbone.features.6.0.mlp.0.bias
backbone.features.6.0.mlp.3.weight
backbone.features.6.0.mlp.3.bias
backbone.features.6.1.norm1.weight
backbone.features.6.1.norm1.bias
backbone.features.6.1.attn.relative_position_bias_table
backbone.features.6.1.attn.qkv.weight
backbone.features.6.1.attn.qkv.bias
backbone.features.6.1.attn.proj.weight
backbone.features.6.1.attn.proj.bias
backbone.features.6.1.norm2.weight
backbone.features.6.1.norm2.bias
backbone.features.6.1.mlp.0.weight
backbone.features.6.1.mlp.0.bias
backbone.features.6.1.mlp.3.weight
backbone.features.6.1.mlp.3.bias
backbone.norm.weight


# Training and Validation

In [None]:
# Hyper Parameters

loss_fn = nn.BCEWithLogitsLoss() # Sigmoid inbuilt , so not used explicitly in the model
base_lr=.001
optimizer = torch.optim.AdamW([
    {"params": model.backbone.features[6].parameters(), "lr": base_lr * 0.5},
    {"params": model.backbone.norm.parameters(), "lr": base_lr},
    {"params": model.backbone.head.parameters(), "lr": base_lr}
], lr=base_lr)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=3, verbose=True)
epochs = 100
patience = 5
best_val_acc = 0.0
patience_counter = 0
best_model_state = None

# Metrics
train_metric = Accuracy(task="binary").to(device)
val_metric = Accuracy(task="binary").to(device)

# Training loop
for epoch in range(1, epochs + 1):
    model.train()
    train_metric.reset()
    total_train_loss = 0.0

    train_loader_tqdm = tqdm(train_loader, desc=f"Epoch {epoch}/{epochs} [Training]", leave=False)
    for batch_idx, (x, y) in enumerate(train_loader_tqdm):
        x = x.to(device) #[B,T,C,H,W]
        y = y.float().to(device) # since all needs to be of same dtype , previously it was long
        x=x.permute(0,2,1,3,4).to(device) #[b,c,t,h,w]
        
        optimizer.zero_grad()
        output = model(x)
        loss = loss_fn(output, y)
        loss.backward()
        # nn.utils.clip_grad_norm(model.parameters(),max_norm=1.0)
        optimizer.step()

        train_metric.update(output, y)
        total_train_loss += loss.item()

        if (batch_idx + 1) % 10 == 0:
            train_loader_tqdm.set_postfix(loss=loss.item())

    train_acc = train_metric.compute()
    avg_train_loss = total_train_loss / len(train_loader)

    # Validation
    model.eval()
    val_metric.reset()
    total_val_loss = 0.0

    with torch.no_grad():
        val_loader_tqdm = tqdm(val_loader, desc=f"Epoch {epoch}/{epochs} [Validation]", leave=False)
        for x, y in val_loader_tqdm:
            x = x.to(device)
            y = y.float().to(device)
            x=x.permute(0,2,1,3,4).to(device) #[b,c,t,h,w]
            
            preds = model(x)
            val_loss = loss_fn(preds, y)
            total_val_loss += val_loss.item()
            val_metric.update(preds, y)

    val_acc = val_metric.compute()
    avg_val_loss = total_val_loss / len(val_loader)

    # Reduce LR scheduler
    scheduler.step(avg_val_loss)

    # Logging
    if epoch % 2 == 0 or epoch == 1:
        print(f"Epoch {epoch}/{epochs} | Train Loss: {avg_train_loss:.4f} | Val Loss: {avg_val_loss:.4f} | "
              f"Train Acc: {train_acc:.4f} | Val ACC: {val_acc:.4f}")

    # Early stopping
    if val_acc > best_val_acc:
        best_val_acc = val_acc
        patience_counter = 0
        best_model_state = model.state_dict()
    else:
        patience_counter += 1
        if patience_counter >= patience:
            print(f"\nEarly stopping at epoch {epoch}. Best Val ACC: {best_val_acc:.4f}")
            if best_model_state is not None:
                torch.save(best_model_state, 'VD_classification.pt')
                model.load_state_dict(best_model_state)
            break
# This is a running notebook ! The best acc sampled was 96% , however this is an experimental version notebook .
# Accuracy may vary ! Proper tuning wrt to your dataset can exceed our expected accuracy too !!
# Further updates to this repo , will ensure models consistency, and is planned to come up bundled with Zero shot learning capability.

# Testing

In [None]:
with torch.no_grad():
    for x, y in test_loader:
        x = x.to(device)
        y = y.float().to(device)
        x=x.permute(0,2,1,3,4).to(device) #[b,c,t,h,w]
        
        preds = model(x)
        predictions=preds.squeeze()
        
        predictions=torch.sigmoid(predictions) # using sigmoid explicitly for , it's absence in the architecture.
        predictions=(predictions>=.5).long()
        print(predictions)