In [None]:
#-- Unfreez  Last Block --
#-- Fold 1 --

In [None]:
#-- Install Libraries -------------------------------------------------------------------------------------------
!pip install torchsummary
!pip install pytorchvideo

from IPython import display
display.clear_output()
#-----------------------------------------------------------------------------------------------------------------

In [None]:
#-- Imports ------------------------------------------------------------------------------------------------------
import torch

import torchvision
import torchvision.transforms as transforms

import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

from torchsummary import summary

import cv2
import matplotlib.pyplot as plt
import seaborn as sns
from PIL import Image

from sklearn.model_selection import KFold
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix

import numpy as np
import random

import os
import shutil
import copy
import pickle
#-----------------------------------------------------------------------------------------------------------------

In [None]:
#-- Initialize ---------------------------------------------------------------------------------------------------
folds_indices_file = '/kaggle/input/fight-detection-x3d-32-v2-1-create-folds/folds_indices.pkl'
data_file = '/kaggle/input/fight-detection-x3d-32-v2-1-create-folds/video_paths_labels.pkl'

DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'device: {DEVICE}')

BATCH_SIZE = 8
NUM_EPOCHS = 50

NUM_FRAMES = 32

CLASS_NAMES = ['Violence','NonViolence']
NUM_Classes = 2  

FOLD_NUMBER = 1

train_metrics_file = f'{FOLD_NUMBER}_train_metrics.pkl'
val_metrics_file = f'{FOLD_NUMBER}_val_metrics.pkl'
test_metrics_file = f'{FOLD_NUMBER}_test_metrics.pkl'
#-----------------------------------------------------------------------------------------------------------------

In [None]:
#-- Load data ---------------------------------------------------------------------------------------------------
with open(folds_indices_file, 'rb') as f:
    all_folds_indices = pickle.load(f)
    
with open(data_file, 'rb') as f:
    all_video_paths, all_labels = pickle.load(f)


print(f'Number of Videos: {len(all_video_paths)}\nNumber of Labels:{len(all_labels)}')
#-----------------------------------------------------------------------------------------------------------------

In [None]:
#-- Define the Custom Dataset for videos -------------------------------------------------------------------------
class VideoDataset(Dataset):
    def __init__(self, frames_paths, labels, transform=None):
        self.frames_paths = frames_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.frames_paths)

    def __getitem__(self, idx):
        frames_paths = self.frames_paths[idx]
        frames = np.load(frames_paths)  #-- videos are saved as numpy arrays of shape (NUM_FRAMES, H, W, C) --

        if self.transform:
            frames = self.transform(frames)

        label = self.labels[idx]
        return frames, label  
#-----------------------------------------------------------------------------------------------------------------

In [None]:
#-- Class To Convert Frames to Tensors --------------------------------------------------------------------------
class ToTensor(object):
    def __call__(self, frames):        
        
        #-- Convert to Tensor (C, T, H, W) --
        frames = frames.transpose((3, 0, 1, 2))
        frames = torch.from_numpy(frames).float()

        return frames
#-----------------------------------------------------------------------------------------------------------------

In [None]:
#-- Function to evaluate the model ------------------------------------------------------------------------------
def calculate_metrics(model, data_loader):
    model.eval()
    predictions = []
    true_labels = []

    with torch.no_grad():
        for videos, labels in data_loader:
            videos = videos.to(DEVICE)
            labels = labels.to(DEVICE)

            outputs = model(videos)
            _, preds = torch.max(outputs, 1)

            predictions.extend(preds.cpu().numpy())
            true_labels.extend(labels.cpu().numpy())
    
    predictions = np.array(predictions)
    true_labels = np.array(true_labels)
    
    acc = accuracy_score(true_labels, predictions)
    precision = precision_score(true_labels, predictions, average='binary')
    recall = recall_score(true_labels, predictions, average='binary')
    f1 = f1_score(true_labels, predictions, average='binary')
    cm = confusion_matrix(true_labels, predictions)

    return acc, precision, recall, f1, cm
#-----------------------------------------------------------------------------------------------------------------

In [None]:
#-- Function to Modify X3D for finetuning ------------------------------------------------------------------------
def custome_X3D(num_classes):
    #-- load X3D model --
    model_name = 'x3d_m'
    model = torch.hub.load('facebookresearch/pytorchvideo', model_name, pretrained=True)   
    
     #-- Freeze all layers --
    for param in model.parameters():
        param.requires_grad = False
    
    #-- set last layer for custome classification --
    input_size = model.blocks[-1].proj.in_features
    model.blocks[-1].proj = nn.Linear(in_features=input_size, out_features=num_classes)
    
    #-- Unfreeze Last Block --
    for param in model.blocks[-1].parameters():
        param.requires_grad = True

    return model
#-----------------------------------------------------------------------------------------------------------------

In [None]:
#-- Function to Train Model --------------------------------------------------------------------------------------
def train(model, loader, optimizer, criterion):
    model.train()
    
    running_loss = 0.0
    correct = 0
    total = 0
    
    for videos, labels in loader:
        
        videos, labels = videos.to(DEVICE), labels.to(DEVICE)

        optimizer.zero_grad()
        outputs = model(videos)       
        
        
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        _, predicted = outputs.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()
        

    loss = running_loss / len(loader)
    accuracy = correct / total
    return loss , accuracy
#-----------------------------------------------------------------------------------------------------------------

In [None]:
#-- Function to Evaluate Model -----------------------------------------------------------------------------------
def evaluate(model, loader, criterion):
    model.eval()
    
    correct = 0
    total = 0
    running_loss = 0.0

    with torch.no_grad():
        for inputs, labels in loader:
            inputs, labels = inputs.to(DEVICE), labels.to(DEVICE)
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            running_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    loss = running_loss / len(loader)
    acc = correct / total
    return loss , acc
#-----------------------------------------------------------------------------------------------------------------

In [None]:
#-- Plot ACC and Loss --------------------------------------------------------------------------------------------
def plot_acc_and_loss(train_losses, val_losses, train_accuracies, val_accuracies):
    
    epochs = range(1, NUM_EPOCHS + 1)

    plt.figure(figsize=(14, 5))

    #-- Plot training & validation loss --
    plt.subplot(1, 2, 1)
    plt.plot(epochs, train_losses, 'bo-', label='Train Loss')
    plt.plot(epochs, val_losses, 'ro-', label='Val Loss')
    plt.title('Training and Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()

    #-- Plot training & validation accuracy --
    plt.subplot(1, 2, 2)
    plt.plot(epochs, train_accuracies, 'bo-', label='Train Accuracy')
    plt.plot(epochs, val_accuracies, 'ro-', label='Val Accuracy')
    plt.title('Training and Validation Accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()

    plt.show()
#-----------------------------------------------------------------------------------------------------------------

In [None]:
#-- Run k-Fold --------------------------------------------------------------------------------------------------
print(f'Running Fold {FOLD_NUMBER} ==================================================') 

#-- set fold indices --
train_idx, test_idx = all_folds_indices[FOLD_NUMBER-1]
    
#-- Get train and test data for this fold --
train_videos = [all_video_paths[i] for i in train_idx]
train_labels = [all_labels[i] for i in train_idx]
    
test_videos = [all_video_paths[i] for i in test_idx]
test_labels = [all_labels[i] for i in test_idx]   
    
#-- Split train data into train and validation sets (70% train, 30% validation) --
train_videos, val_videos, train_labels, val_labels = train_test_split(train_videos,
                                                                          train_labels,
                                                                          test_size=0.3,
                                                                          random_state=42)    
print('Dataset Size: '
        f'\tTrain = {len(train_videos)}'
        f'\tVal = {len(val_videos)}'
        f'\tTest = {len(test_videos)}')
    
#-- Create datasets --
train_dataset = VideoDataset(train_videos,train_labels, transforms.Compose([ToTensor()]))
val_dataset = VideoDataset(val_videos,val_labels, transforms.Compose([ToTensor()]))
test_dataset = VideoDataset(test_videos,test_labels, transforms.Compose([ToTensor()]))

#-- Create dataloaders --
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

#-- Initialize model --
model = custome_X3D(NUM_Classes).to(DEVICE)

optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()
    
#-- Train model --
train_losses = []
train_accuracies = []
val_losses = []
val_accuracies = []

best_model = None
best_acc = 0
best_epoch =0

for epoch in range(NUM_EPOCHS):        
    train_loss, train_acc = train(model, train_loader, optimizer, criterion)
    val_loss, val_acc = evaluate(model, val_loader, criterion)    

    train_losses.append(train_loss)
    train_accuracies.append(train_acc)
    val_losses.append(val_loss)
    val_accuracies.append(val_acc)

    if val_acc> best_acc:
        best_acc = val_acc
        best_model = copy.deepcopy(model)
        best_epoch = epoch          


    print(f'Epoch {epoch+1}/{NUM_EPOCHS}, '
            f'Train Loss: {train_loss:.4f}, Train ACC: {train_acc:.4f}, '
            f'Val Loss: {val_loss:.4f}, Val ACC: {val_acc:.4f}')

last_model = copy.deepcopy(model)    
    
#-- plot ACC and LOSS --
plot_acc_and_loss(train_losses, val_losses, train_accuracies, val_accuracies)
    
#-- Calculate metrics for train, val and test data --
train_acc, train_precision, train_recall, train_f1, train_cm = calculate_metrics(best_model, train_loader)
val_acc, val_precision, val_recall, val_f1, val_cm = calculate_metrics(best_model, val_loader)
test_acc, test_precision, test_recall, test_f1, test_cm = calculate_metrics(best_model, test_loader)
    
# -- Print metrics --
print(f'Train: .......................\n'
        f'Accuracy: {train_acc:.4f}\n'
        f'Precision: {train_precision:.4f}\n'
        f'Recall: {train_recall:.4f}\n'
        f'F1 Score: {train_f1:.4f}')
    
print(f'Val: .......................\n'
        f'Accuracy: {val_acc:.4f}\n'
        f'Precision: {val_precision:.4f}\n'
        f'Recall: {val_recall:.4f}\n'
        f'F1 Score: {val_f1:.4f}')
    
print(f'Test: .......................\n'
        f'Accuracy: {test_acc:.4f}\n'
        f'Precision: {test_precision:.4f}\n'
        f'Recall: {test_recall:.4f}\n'
        f'F1 Score: {test_f1:.4f}')   
    
    

#-- Plot Train confusion matrix --
plt.figure(figsize=(4, 3))
sns.heatmap(train_cm, annot=True, fmt='d', cmap='Blues',
            xticklabels=['Fight', 'Normal'], yticklabels=['Fight', 'Normal'],
            cbar=False)
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.title(f'Train Confusion Matrix for Fold {FOLD_NUMBER}')
plt.show()
    
#-- Plot Val confusion matrix --
plt.figure(figsize=(4, 3))
sns.heatmap(val_cm, annot=True, fmt='d', cmap='Blues',
            xticklabels=['Fight', 'Normal'], yticklabels=['Fight', 'Normal'],
            cbar=False)
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.title(f'Val Confusion Matrix for Fold {FOLD_NUMBER}')
plt.show()
    
#-- Plot Test confusion matrix --
plt.figure(figsize=(4, 3))
sns.heatmap(test_cm, annot=True, fmt='d', cmap='Blues',
            xticklabels=['Fight', 'Normal'], yticklabels=['Fight', 'Normal'],
            cbar=False)
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.title(f'Test Confusion Matrix for Fold {FOLD_NUMBER}')
plt.show()
    
#-- Save the metrics --
train_metrics = {
    'train_acc': train_acc,
    'train_precision': train_precision,
    'train_recall': train_recall,
    'train_f1': train_f1,
    'train_cm': train_cm
}

val_metrics = {
    'val_acc': val_acc,
    'val_precision': val_precision,
    'val_recall': val_recall,
    'val_f1': val_f1,
    'val_cm': val_cm
}

test_metrics = {
    'test_acc': test_acc,
    'test_precision': test_precision,
    'test_recall': test_recall,
    'test_f1': test_f1,
    'test_cm': test_cm
}

with open(train_metrics_file, 'wb') as f:
    pickle.dump(train_metrics_file, f)
    
with open(val_metrics_file, 'wb') as f:
    pickle.dump(val_metrics_file, f)
    
with open(test_metrics_file, 'wb') as f:
    pickle.dump(test_metrics_file, f)
    
#----------------------------------------------------------------------------------------------------------------

In [None]:
#-- Save model ---------------------------------------------------------------------------------------------------
torch.save(last_model.state_dict(), f'{FOLD_NUMBER}_last.pth')
torch.save(best_model.state_dict(), f'{FOLD_NUMBER}_best.pth')
#-----------------------------------------------------------------------------------------------------------------