In [None]:
#-- Install Libraries -------------------------------------------------------------------------------------------
!pip install torchsummary
!pip install pytorchvideo

from IPython import display
display.clear_output()
#-----------------------------------------------------------------------------------------------------------------

In [None]:
#-- Imports ------------------------------------------------------------------------------------------------------
import torch

import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

import torchvision
import torchvision.transforms as transforms

from torchsummary import summary

import cv2
import matplotlib.pyplot as plt

import numpy as np
import random

import os
import shutil
import copy
#-----------------------------------------------------------------------------------------------------------------

In [None]:
#-- Initialize ---------------------------------------------------------------------------------------------------
ds_zip_path = '/kaggle/input/fight-detection-x3d-preprocess/data.zip'
ds_unzip_path = '/kaggle/working/data/'

DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'device: {DEVICE}')

BATCH_SIZE = 8
NUM_EPOCHS = 100

NUM_FRAMES = 16
FRAME_W = 256
FRAME_H = 256
#-----------------------------------------------------------------------------------------------------------------

In [None]:
#-- unzip data ---------------------------------------------------------------------------------------------------
shutil.unpack_archive(ds_zip_path, ds_unzip_path, 'zip')
#-----------------------------------------------------------------------------------------------------------------

In [None]:
#-- Function to count number of samples is ds --------------------------------------------------------------------
def count_samples(data_dir):
    counts = {}
    
    for subset in ['train', 'val', 'test']:
        subset_path = os.path.join(data_dir, subset)
        counts[subset] = {}
        
        for category in ['Violence', 'NonViolence']:
            category_path = os.path.join(subset_path, category)
            num_samples = len(os.listdir(category_path))
            counts[subset][category] = num_samples
    
    return counts
#-----------------------------------------------------------------------------------------------------------------

In [None]:
#-- Dataset size -------------------------------------------------------------------------------------------------
sample_counts = count_samples(ds_unzip_path)

# Print the sample counts
for subset, categories in sample_counts.items():
    print(f"{subset} set:")
    for category, count in categories.items():
        print(f"  {category}: {count} samples")
#-----------------------------------------------------------------------------------------------------------------

In [None]:
#-- Dataset class for loading the videos ------------------------------------------------------------------------
class VideoDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.classes = ['Violence', 'NonViolence']
        self.frame_paths = []
        self.labels = []

        for label, class_name in enumerate(self.classes):
            class_dir = os.path.join(root_dir, class_name)
            for frame_file in os.listdir(class_dir):
                frame_path = os.path.join(class_dir, frame_file)
                self.frame_paths.append(frame_path)
                self.labels.append(label)

    def __len__(self):
        return len(self.frame_paths)

    def __getitem__(self, idx):
        frame_path = self.frame_paths[idx]
        label = self.labels[idx]

        frames = np.load(frame_path)

        if self.transform:
            frames = self.transform(frames)

        return frames, label
#-----------------------------------------------------------------------------------------------------------------

In [None]:
#-- Class To Convert Frames to normalized Tensors ---------------------------------------------------------------
class ToTensor(object):
    def __call__(self, sample):
        frames = sample
        
        #-- Convert to Tensor (C, T, H, W) --
        frames = frames.transpose((3, 0, 1, 2))
        frames = torch.from_numpy(frames).float()

        #-- Normalize --
        frames = frames / 255.0
        return frames
#-----------------------------------------------------------------------------------------------------------------

In [None]:
#-- Load Data ----------------------------------------------------------------------------------------------------
#-- Set the paths to dataset --
train_dir = ds_unzip_path + 'train'
val_dir = ds_unzip_path + 'val'
test_dir = ds_unzip_path + 'test'


#-- Data transformations --
data_transforms = {
    'train': transforms.Compose([ToTensor()]),
    'val': transforms.Compose([ToTensor()]),
    'test': transforms.Compose([ToTensor()]),
}

#-- Create datasets for train, val, and test --
train_dataset = VideoDataset(root_dir=train_dir, transform=data_transforms['train'])
val_dataset = VideoDataset(root_dir=val_dir, transform=data_transforms['val'])
test_dataset = VideoDataset(root_dir=test_dir, transform=data_transforms['test'])

#-- Create DataLoaders --
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=4)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=4)
#-----------------------------------------------------------------------------------------------------------------

In [None]:
#-- Check Data Loaders -------------------------------------------------------------------------------------------
for i, (videos, labels) in enumerate(train_loader):
        print(videos.shape, labels.shape)
        break
#-----------------------------------------------------------------------------------------------------------------

In [None]:
#-- Function to Modify X3D for finetuning ------------------------------------------------------------------------
def custome_X3D(num_classes):
    #-- load X3D model --
    model_name = 'x3d_m'
    model = torch.hub.load('facebookresearch/pytorchvideo', model_name, pretrained=True)   
    
    #-- set last layer for custome classification --
    input_size = model.blocks[-1].proj.in_features
    model.blocks[-1].proj = nn.Linear(in_features=input_size, out_features=num_classes)

    return model
#-----------------------------------------------------------------------------------------------------------------

In [None]:
#-- Function to Train Model --------------------------------------------------------------------------------------
def train(model, loader, optimizer, criterion):
    model.train()
    
    running_loss = 0.0
    correct = 0
    total = 0
    
    for videos, labels in loader:
        
        videos, labels = videos.to(DEVICE), labels.to(DEVICE)

        optimizer.zero_grad()
        outputs = model(videos)       
        
        
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        _, predicted = outputs.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()
        

    loss = running_loss / len(loader)
    accuracy = correct / total
    return loss , accuracy
#-----------------------------------------------------------------------------------------------------------------

In [None]:
#-- Function to Evaluate Model -----------------------------------------------------------------------------------
def evaluate(model, loader, criterion):
    model.eval()
    
    correct = 0
    total = 0
    running_loss = 0.0

    with torch.no_grad():
        for inputs, labels in loader:
            inputs, labels = inputs.to(DEVICE), labels.to(DEVICE)
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            running_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    loss = running_loss / len(loader)
    acc = correct / total
    return loss , acc
#-----------------------------------------------------------------------------------------------------------------

In [None]:
# #-- Create and Initialize Model and its Params -------------------------------------------------------------------
num_classes = 2  # For Fighting and Normal
model = custome_X3D(num_classes).to(DEVICE)


optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()
# #-----------------------------------------------------------------------------------------------------------------

In [None]:
#-- Display the model summary ------------------------------------------------------------------------------------
summary(model, input_size=(3,16,256,256), device=DEVICE.type, batch_size=-1)
#-----------------------------------------------------------------------------------------------------------------

In [None]:
#-- Train Model -------------------------------------------------------------------------------------------------
train_losses = []
train_accuracies = []
val_losses = []
val_accuracies = []

best_model = None
best_acc = 0

for epoch in range(NUM_EPOCHS):
    train_loss, train_acc = train(model, train_loader, optimizer, criterion)
    val_loss, val_acc = evaluate(model, val_loader, criterion)    
    
    train_losses.append(train_loss)
    train_accuracies.append(train_acc)
    val_losses.append(val_loss)
    val_accuracies.append(val_acc)
    
    if val_acc> best_acc:
        best_acc = val_acc
        best_model = copy.deepcopy(model)
        
    
    print(f'Epoch {epoch+1}/{NUM_EPOCHS}, '
          f'Train Loss: {train_loss:.4f}, Train ACC: {train_acc:.4f}, '
          f'Val Loss: {val_loss:.4f}, Val ACC: {val_acc:.4f}')
    
last_model = copy.deepcopy(model)
#-----------------------------------------------------------------------------------------------------------------

In [None]:
#-- Plot ACC and Loss --------------------------------------------------------------------------------------------
epochs = range(1, NUM_EPOCHS + 1)

plt.figure(figsize=(14, 5))

# Plot training & validation loss
plt.subplot(1, 2, 1)
plt.plot(epochs, train_losses, 'bo-', label='Train Loss')
plt.plot(epochs, val_losses, 'ro-', label='Val Loss')
plt.title('Training and Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()

# Plot training & validation accuracy
plt.subplot(1, 2, 2)
plt.plot(epochs, train_accuracies, 'bo-', label='Train Accuracy')
plt.plot(epochs, val_accuracies, 'ro-', label='Val Accuracy')
plt.title('Training and Validation Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()

plt.show()
#-----------------------------------------------------------------------------------------------------------------

In [None]:
# #--Evaluate Model On Test Data --------------------------------------------------------------------------------
test_loss, test_acc = evaluate(best_model, test_loader, criterion)
print(f'Final Test Accuracy: {test_acc:.4f}')
# #---------------------------------------------------------------------------------------------------------------

In [None]:
#-- Compare Last Model and Best Model ----------------------------------------------------------------------------

#-- last model --
train_loss, train_acc = evaluate(last_model, train_loader, criterion)   
val_loss, val_acc = evaluate(last_model, val_loader, criterion)   
test_loss, test_acc = evaluate(last_model, test_loader, criterion)

print('Last Weights:\n'
         f'Train Loss: {train_loss: .4f} - Train ACC: {train_acc: .4f}\n'
         f'Val Loss: {val_loss: .4f} - Val ACC: {val_acc: .4f}\n'
         f'Test Loss: {test_loss: .4f} - Test ACC: {test_acc: .4f}')

#-- best model --
train_loss, train_acc = evaluate(best_model, train_loader, criterion)   
val_loss, val_acc = evaluate(best_model, val_loader, criterion)   
test_loss, test_acc = evaluate(best_model, test_loader, criterion)

print('\n\nBest Weights:\n'
         f'Train Loss: {train_loss: .4f} - Train ACC: {train_acc: .4f}\n'
         f'Val Loss: {val_loss: .4f} - Val ACC: {val_acc: .4f}\n'
         f'Test Loss: {test_loss: .4f} - Test ACC: {test_acc: .4f}')
#----------------------------------------------------------------------------------------------------------------