In [None]:
!pip install torchsummary

In [None]:
#-- Imports -------------------------------------------------------------------------------------------------------
import torch

import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

import torchvision
import torchvision.transforms as transforms

from torchsummary import summary

import os
import shutil

import cv2
import matplotlib.pyplot as plt

import numpy as np

import copy
#------------------------------------------------------------------------------------------------------------------

In [None]:
#-- Initialize ---------------------------------------------------------------------------------------------------
ds_zip_path = '/kaggle/input/ucf-get-fights-and-normals/data.zip'
ds_unzip_path = '/kaggle/working/data'

DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'device: {DEVICE}')

BATCH_SIZE = 8
NUM_EPOCHS = 10

NUM_FRAMES = 16
FRAME_W = 64
FRAME_H = 64
#-----------------------------------------------------------------------------------------------------------------

In [None]:
#-- unzip data ---------------------------------------------------------------------------------------------------
shutil.unpack_archive(ds_zip_path, ds_unzip_path, 'zip')
#-----------------------------------------------------------------------------------------------------------------

In [None]:
#-- Function to count number of samples is ds --------------------------------------------------------------------
def count_samples(data_dir):
    counts = {}
    
    for subset in ['train', 'val', 'test']:
        subset_path = os.path.join(data_dir, subset)
        counts[subset] = {}
        
        for category in ['Normal', 'Fighting']:
            category_path = os.path.join(subset_path, category)
            num_samples = len(os.listdir(category_path))
            counts[subset][category] = num_samples
    
    return counts
#------------------------------------------------------------------------------------------------------------------

In [None]:
#-- Dataset size -------------------------------------------------------------------------------------------------
sample_counts = count_samples(ds_unzip_path)

# Print the sample counts
for subset, categories in sample_counts.items():
    print(f"{subset} set:")
    for category, count in categories.items():
        print(f"  {category}: {count} samples")
#------------------------------------------------------------------------------------------------------------------

In [None]:
# #-- Check Number of Frames and Resolution of Videos -------------------------------------------------------------
# for root, dirs, files in os.walk(ds_unzip_path):
    
#     for filename in files:
#         file_path = os.path.join(root, filename)   
        
#         if file_path.endswith(('.mp4')):          
#             cap = cv2.VideoCapture(file_path)        

#             width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
#             height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

#             num_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

#             print(f"Video: {filename} | number of frames: {num_frames} - Resolution: {width} x {height}")

#             # Release the video capture object
#             cap.release()
# #------------------------------------------------------------------------------------------------------------------

In [None]:
#-- Function to Convert Video to Numpy Array ---------------------------------------------------------------------
def load_video_frames(video_path, num_frames=NUM_FRAMES, resize=(FRAME_W, FRAME_H)):
    cap = cv2.VideoCapture(video_path)
    frames = []
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    
    for i in range(num_frames):
        frame_id = int(i * total_frames / num_frames)
        cap.set(cv2.CAP_PROP_POS_FRAMES, frame_id)
        ret, frame = cap.read()
        if ret:
            frame = cv2.resize(frame, resize)
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)  
            frames.append(frame)
        else:
            break
    
    cap.release()

    while len(frames) < num_frames:
        frames.append(np.zeros((resize[0], resize[1], 3)))

    return np.array(frames)
#-----------------------------------------------------------------------------------------------------------------

In [None]:
#-- Dataset class for loading the videos ------------------------------------------------------------------------
class VideoDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.classes = ['Fighting', 'Normal'] 
        self.samples = []
        for class_idx, class_name in enumerate(self.classes):
            class_path = os.path.join(self.root_dir, class_name)
            for video_file in os.listdir(class_path):
                if video_file.endswith(('.mp4')):  
                    self.samples.append((os.path.join(class_path, video_file), class_idx))

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        video_path, class_idx = self.samples[idx]
        video_frames = load_video_frames(video_path) 
        
        video_frames = torch.from_numpy(video_frames).float()  
        video_frames = video_frames.permute(3, 0, 1, 2)  #-- Channels first
        
        #-- Convert frames from uint8 to float32 and scale to [0, 1]
        video_frames = video_frames.float() / 255.0
        
        if self.transform:
            video_frames = self.transform(video_frames)
        return video_frames, class_idx

#-----------------------------------------------------------------------------------------------------------------

In [None]:
#-- Load Data ----------------------------------------------------------------------------------------------------
#-- Set the paths to dataset --
train_dir = '/kaggle/working/data/train'
val_dir = '/kaggle/working/data/val'
test_dir = '/kaggle/working/data/test'


#-- Create datasets for train, val, and test --
train_dataset = VideoDataset(root_dir=train_dir)
val_dataset = VideoDataset(root_dir=val_dir)
test_dataset = VideoDataset(root_dir=test_dir)

#-- Create DataLoaders --
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=4)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=4)
#-----------------------------------------------------------------------------------------------------------------

In [None]:
# #-- Check Data Loaders -------------------------------------------------------------------------------------------
# for i, (videos, labels) in enumerate(train_loader):
#         print(videos.shape, labels.shape)
#         break
# #-----------------------------------------------------------------------------------------------------------------

In [None]:
#-- 3DCNN based Model for Fight Detection ------------------------------------------------------------------------
class FightNet(nn.Module):
    def __init__(self, num_classes=2):
        super(FightNet, self).__init__()
        
        self.conv1 = nn.Conv3d(in_channels=3, out_channels=64, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv3d(in_channels=64, out_channels=128, kernel_size=3, stride=1, padding=1)
        self.conv3 = nn.Conv3d(in_channels=128, out_channels=256, kernel_size=3, stride=1, padding=1)
        
        self.fc1 = nn.Linear(256 * 2 * 8 * 8, 512) 
        self.fc2 = nn.Linear(512, num_classes)
        self.pool = nn.MaxPool3d(kernel_size=(2, 2, 2), stride=(2, 2, 2))

    def forward(self, x):        
#         print('0----' , x.shape)
        x = self.pool(F.relu(self.conv1(x)))      
#         print('1----' , x.shape)
        x = self.pool(F.relu(self.conv2(x)))   
#         print('2----' , x.shape)
        x = self.pool(F.relu(self.conv3(x)))
#         print('3----' , x.shape)
        
        x = x.view(x.size(0), -1)  #-- Flatten       
#         print('4----' , x.shape)
        x = F.relu(self.fc1(x))        
        x = self.fc2(x)        
        return x
#-----------------------------------------------------------------------------------------------------------------

In [None]:
#-- Function to Train Model --------------------------------------------------------------------------------------
def train(model, loader, optimizer, criterion):
    model.train()
    
    running_loss = 0.0
    correct = 0
    total = 0
    
    for videos, labels in loader:
        
        videos, labels = videos.to(DEVICE), labels.to(DEVICE)

        optimizer.zero_grad()
        outputs = model(videos)       
        
        
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        _, predicted = outputs.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()
        

    loss = running_loss / len(loader)
    accuracy = correct / total
    return loss , accuracy
#-----------------------------------------------------------------------------------------------------------------

In [None]:
# #-- Function to Evaluate Model -----------------------------------------------------------------------------------
def evaluate(model, loader, criterion):
    model.eval()
    
    correct = 0
    total = 0
    running_loss = 0.0

    with torch.no_grad():
        for inputs, labels in loader:
            inputs, labels = inputs.to(DEVICE), labels.to(DEVICE)
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            running_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    loss = running_loss / len(loader)
    acc = correct / total
    return loss , acc
# #-----------------------------------------------------------------------------------------------------------------

In [None]:
#-- Create and Initialize Model and its Params -------------------------------------------------------------------
model = FightNet(num_classes=2).to(DEVICE)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
#-----------------------------------------------------------------------------------------------------------------

In [None]:
#-- Display the model summary ------------------------------------------------------------------------------------
summary(model, input_size=(3,16,64,64))
#-----------------------------------------------------------------------------------------------------------------

In [None]:
#-- Train Model -------------------------------------------------------------------------------------------------
train_losses = []
train_accuracies = []
val_losses = []
val_accuracies = []

best_model = None
best_acc = 0

for epoch in range(NUM_EPOCHS):
    train_loss, train_acc = train(model, train_loader, optimizer, criterion)
    val_loss, val_acc = evaluate(model, val_loader, criterion)    
    
    train_losses.append(train_loss)
    train_accuracies.append(train_acc)
    val_losses.append(val_loss)
    val_accuracies.append(val_acc)
    
    if val_acc> best_acc:
        best_acc = val_acc
        best_model = copy.deepcopy(model)
        
    
    print(f'Epoch {epoch+1}/{NUM_EPOCHS}, '
          f'Train Loss: {train_loss:.4f}, Train ACC: {train_acc:.4f}, '
          f'Val Loss: {val_loss:.4f}, Val ACC: {val_acc:.4f}')
    
last_model = copy.deepcopy(model)
#-----------------------------------------------------------------------------------------------------------------

In [None]:
#-- Plot ACC and Loss --------------------------------------------------------------------------------------------
epochs = range(1, NUM_EPOCHS + 1)

plt.figure(figsize=(14, 5))

# Plot training & validation loss
plt.subplot(1, 2, 1)
plt.plot(epochs, train_losses, 'bo-', label='Train Loss')
plt.plot(epochs, val_losses, 'ro-', label='Val Loss')
plt.title('Training and Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()

# Plot training & validation accuracy
plt.subplot(1, 2, 2)
plt.plot(epochs, train_accuracies, 'bo-', label='Train Accuracy')
plt.plot(epochs, val_accuracies, 'ro-', label='Val Accuracy')
plt.title('Training and Validation Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()

plt.show()
#-----------------------------------------------------------------------------------------------------------------

In [None]:
# #--Evaluate Model On Test Data ---------------------------------------------------------------------------------
test_loss, test_acc = evaluate(best_model, test_loader, criterion)
print(f'Final Test Accuracy: {test_acc:.4f}')
# #----------------------------------------------------------------------------------------------------------------

In [None]:
#-- Compare Last Model and Best Model ----------------------------------------------------------------------------

#-- last model --
train_loss, train_acc = evaluate(last_model, train_loader, criterion)   
val_loss, val_acc = evaluate(last_model, val_loader, criterion)   
test_loss, test_acc = evaluate(last_model, test_loader, criterion)

print('Last Weights:\n'
         f'Train Loss: {train_loss: .4f} - Train ACC: {train_acc: .4f}\n'
         f'Val Loss: {val_loss: .4f} - Val ACC: {val_acc: .4f}\n'
         f'Test Loss: {test_loss: .4f} - Test ACC: {test_loss: .4f}')

#-- best model --
train_loss, train_acc = evaluate(best_model, train_loader, criterion)   
val_loss, val_acc = evaluate(best_model, val_loader, criterion)   
test_loss, test_acc = evaluate(best_model, test_loader, criterion)

print('Best Weights:\n'
         f'Train Loss: {train_loss: .4f} - Train ACC: {train_acc: .4f}\n'
         f'Val Loss: {val_loss: .4f} - Val ACC: {val_acc: .4f}\n'
         f'Test Loss: {test_loss: .4f} - Test ACC: {test_acc: .4f}')
#----------------------------------------------------------------------------------------------------------------