# Title: Violence Detection in Videos
The prevalence of videos containing violent or sensitive content on social media platforms can
have significant negative impacts on individuals&#39; mental health. As such, it is crucial to develop
systems that can detect and flag such content, allowing users to be warned before viewing it.
This project aims to create a deep learning model capable of detecting violence in videos and
automatically generating a trigger warning to inform users of potential violent content.

In [5]:
import os
from pathlib import Path
import shutil
from sklearn.model_selection import train_test_split

# Paste your paths
train_videos = "/kaggle/input/project-data/Complete Dataset/train/HockeyFight"
val_videos = "/kaggle/input/project-data/Complete Dataset/val/HockeyFight"

# New base dir for extracted frames
base_out_dir = "/kaggle/working/frames_split"

# Classes
classes = ['Violent', 'NonViolent']  # Your dataset must have 2 classes


The snippet above will:

Collect all video file paths from your existing train/HockeyFight and val/HockeyFight directories.

Define a writable output root (/kaggle/working/frames_split) where extracted frames will be stored.

Specify the two target classes—Violent and NonViolent—so that downstream code can split and label each video appropriately.

In [6]:
all_videos = []

# Collect from both train and val
for class_name in classes:
    for folder in [train_videos, val_videos]:
        class_folder = os.path.join(folder, class_name)
        if os.path.exists(class_folder):
            video_files = list(Path(class_folder).glob("*.avi"))
            all_videos.extend([(str(f), class_name) for f in video_files])


> This block initializes an empty list, all_videos, and then walks through both your training and validation directories, looking specifically for folders named Violent or NonViolent (as defined in classes). Whenever it finds one, it gathers all .avi files in that folder, and appends a tuple of (video_path, class_name) to all_videos.

By the end of this loop, all_videos contains a complete listing of every video in your dataset—each paired with its correct label—ready for you to stratify and split into train/val/test sets.

In [7]:
from pathlib import Path

# Dataset root
train_dir = '/kaggle/input/project-data/Complete Dataset/train'
val_dir   = '/kaggle/input/project-data/Complete Dataset/val'

# Folder → Label mapping
folder_to_label = {
    'HockeyFight': 'Violent',
    'NonFight': 'NonViolent'  # Change this if it's named differently
}

# Collect videos with correct label
all_videos = []

for root_dir in [train_dir, val_dir]:
    for folder_name, label in folder_to_label.items():
        folder_path = Path(root_dir) / folder_name
        if folder_path.exists():
            videos = list(folder_path.glob("*.avi"))
            print(f"Found {len(videos)} videos in {folder_path} ({label})")
            all_videos.extend([(str(video), label) for video in videos])
        else:
            print(f"⚠️ Folder not found: {folder_path}")


Found 400 videos in /kaggle/input/project-data/Complete Dataset/train/HockeyFight (Violent)
Found 1000 videos in /kaggle/input/project-data/Complete Dataset/train/NonFight (NonViolent)
Found 100 videos in /kaggle/input/project-data/Complete Dataset/val/HockeyFight (Violent)
Found 275 videos in /kaggle/input/project-data/Complete Dataset/val/NonFight (NonViolent)


> This snippet walks through your train and val directories, looks for the two subfolders you’ve defined—HockeyFight and NonFight—and for each one:

> Checks existence of the folder (so you’re alerted if it’s missing).

> Globs all .avi video files inside it.

> Prints how many videos it found in each (with their assigned label).

> Appends each file path plus its label ("Violent" or "NonViolent") into the all_videos list.

In [8]:
from sklearn.model_selection import train_test_split

# Split into 85% train+val, 15% test
train_val_videos, test_videos = train_test_split(
    all_videos,
    test_size=0.15,
    stratify=[label for _, label in all_videos],
    random_state=42
)

# From train+val, take 15% as validation
train_videos, val_videos = train_test_split(
    train_val_videos,
    test_size=0.1765,  # 0.1765 * 85% ≈ 15% of total
    stratify=[label for _, label in train_val_videos],
    random_state=42
)

print(f"Total: {len(all_videos)}")
print(f"Train: {len(train_videos)}, Val: {len(val_videos)}, Test: {len(test_videos)}")


Total: 1775
Train: 1241, Val: 267, Test: 267


**Combine & Shuffle

all_videos is randomly shuffled and then split so that 15% of your total videos go into test_videos, and the remaining 85% go into train_val_videos.

Stratification (stratify=[label for _, label in all_videos]) ensures each split preserves the same ratio of “Violent” vs. “NonViolent” as the original.

Train / Validation Split

From the 85% “train+val” pool, a second split takes 17.65% of that subset for val_videos.

Since 17.65% of 85% ≈ 15% of the total, you end up with roughly:

70% train

15% validation

15% test

Final Counts Printout

Total: shows the overall count of videos.

Train:, Val:, and Test: report how many videos ended up in each split.**

In [9]:
import cv2

def extract_and_save_frames(video_list, split_name, frames_per_video=10):
    for video_path, label in video_list:
        output_dir = os.path.join(base_out_dir, split_name, label)
        os.makedirs(output_dir, exist_ok=True)

        cap = cv2.VideoCapture(video_path)
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        step = max(1, total_frames // frames_per_video)
        
        i = 0
        count = 0
        while cap.isOpened() and count < frames_per_video:
            ret, frame = cap.read()
            if not ret:
                break
            if i % step == 0:
                filename = f"{Path(video_path).stem}_frame{count}.jpg"
                frame_path = os.path.join(output_dir, filename)
                cv2.imwrite(frame_path, frame)
                count += 1
            i += 1
        cap.release()


This extract_and_save_frames function takes a list of (video_path, label) pairs and, for each video:

Creates the output directory under /kaggle/working/frames_split/{split_name}/{label}/.

Opens the video with OpenCV and reads its total frame count.

Computes a uniform sampling interval (step) so you grab frames_per_video frames spread evenly across the video’s length.

Iterates through the video frames:

Whenever the frame index i matches a sampling point (i % step == 0), it saves that frame as a JPEG named {video_stem}_frame{count}.jpg.

Stops after saving the desired number of frames (frames_per_video).

Releases the video capture when done.1. 

In [10]:
extract_and_save_frames(train_videos, 'train')
extract_and_save_frames(val_videos, 'val')
extract_and_save_frames(test_videos, 'test')


These three lines kick off the actual frame-extraction process for each of your dataset splits:

extract_and_save_frames(train_videos, 'train')

Goes through every (video_path, label) in your train_videos list.

For each video, it samples a fixed number of frames (e.g. 10) evenly across its duration.

Saves those frames as JPEGs under

extract_and_save_frames(val_videos, 'val')

Does exactly the same for your validation set, writing frames into

extract_and_save_frames(test_videos, 'test')

Processes your test split in the same way, saving frames in

In [11]:
from torchvision.transforms import RandomAffine

data_transforms = {
    'train': transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.RandomHorizontalFlip(p=0.5),
        transforms.RandomRotation(degrees=15),
        transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
        transforms.RandomResizedCrop(224, scale=(0.8, 1.0)),
        RandomAffine(degrees=0, shear=10, translate=(0.1, 0.1)),  # Add shear for motion
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
}

Shear (RandomAffine with shear=10) simulates an off-axis camera angle or slanted viewpoint.

Translation (translate=(0.1,0.1)) shifts the image up/down or left/right by up to 10%, emulating small framing changes.

Combined with your existing flips, rotations, color jitter, and random crops, this should give your model robustness to motion blur, camera shake, and lighting variations commonly found in real-world video frames.


In [12]:
from torchvision import datasets
from torch.utils.data import DataLoader

train_dataset = datasets.ImageFolder('/kaggle/working/frames_split/train', transform=data_transforms['train'])
val_dataset   = datasets.ImageFolder('/kaggle/working/frames_split/val', transform=data_transforms['val'])
test_dataset  = datasets.ImageFolder('/kaggle/working/frames_split/test', transform=data_transforms['val'])  # use same as val

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, num_workers=2)
val_loader   = DataLoader(val_dataset, batch_size=16, shuffle=False, num_workers=2)
test_loader  = DataLoader(test_dataset, batch_size=16, shuffle=False, num_workers=2)


In the above code, we are loading the dataset and also add the batch size as hyperprameter

In [13]:
import os

def count_frames(root_dir):
    total = 0
    print(f"\nCounting frames in: {root_dir}")
    for split in ['train', 'val', 'test']:
        split_path = os.path.join(root_dir, split)
        if not os.path.exists(split_path):
            print(f"{split} directory not found.")
            continue
        
        split_total = 0
        print(f"\n📁 {split.upper()}:")

        for class_name in os.listdir(split_path):
            class_path = os.path.join(split_path, class_name)
            if os.path.isdir(class_path):
                num_images = len([f for f in os.listdir(class_path) if f.lower().endswith(('.jpg', '.jpeg', '.png'))])
                split_total += num_images
                print(f"  - {class_name}: {num_images} frames")

        print(f"🔢 Total {split} frames: {split_total}")
        total += split_total

    print(f"\n✅ Overall total frames: {total}")

# Run the function
count_frames('/kaggle/working/frames_split')



Counting frames in: /kaggle/working/frames_split

📁 TRAIN:
  - NonViolent: 8910 frames
  - Violent: 3500 frames
🔢 Total train frames: 12410

📁 VAL:
  - NonViolent: 1920 frames
  - Violent: 750 frames
🔢 Total val frames: 2670

📁 TEST:
  - NonViolent: 1920 frames
  - Violent: 750 frames
🔢 Total test frames: 2670

✅ Overall total frames: 17750


In the above code I have counted the all frames that are being created from the datasets as from the Train, val and Test

In [14]:
import torch
import torch.nn as nn
from torchvision import models
from tqdm import tqdm


In [17]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
num_classes = 2  # Violent, NonViolent


In [16]:
class_weights = torch.tensor([1.0, 2.0]).to(device)  # Higher weight for violent class (1)
criterion = nn.CrossEntropyLoss(weight=class_weights)

In [18]:

# Load pretrained ResNet50
model = models.resnet50(pretrained=True)

# Freeze all layers
for param in model.parameters():
    param.requires_grad = False

# Replace final FC layer
num_ftrs = model.fc.in_features

model.fc = nn.Linear(model.fc.in_features, 2)  # 2 classes

model = model.to(device)


Downloading: "https://download.pytorch.org/models/resnet50-0676ba61.pth" to /root/.cache/torch/hub/checkpoints/resnet50-0676ba61.pth
100%|██████████| 97.8M/97.8M [00:00<00:00, 198MB/s] 


# Loading the Pretrained Model

1st load the pretrained model 
2nd Freeze all layers 

In [19]:

optimizer = torch.optim.Adam(model.fc.parameters(), lr=1e-3) 

from torch.optim.lr_scheduler import StepLR
scheduler = StepLR(optimizer, step_size=5, gamma=0.1)


torch.optim.Adam: An adaptive learning rate optimization algorithm that adjusts the learning rate for each parameter. It's widely used because of its performance and efficiency.

model.fc.parameters(): Specifies that only the parameters of the final fully connected (fc) layer of the model will be optimized. This is common in transfer learning, where pretrained layers are frozen and only the final classifier layer is trained.

lr=1e-3: Sets the initial learning rate to 0.001, which controls how much the model's parameters are updated during each step.

This sets up a learning rate scheduler that adjusts the learning rate as training progresses.

StepLR: A scheduler that decreases the learning rate by a factor every few epochs.

optimizer: The optimizer whose learning rate will be updated.

step_size=5: Every 5 epochs, the scheduler will adjust the learning rate.

gamma=0.1: The learning rate will be multiplied by 0.1 after every step. For example:

Epochs 0–4: LR = 1e-3

Epochs 5–9: LR = 1e-4

Epochs 10–14: LR = 1e-5, and so on.

In [20]:
def train_phase(model, train_loader, val_loader, criterion, optimizer, scheduler, num_epochs=5):
    print("🔒 Phase 1: Training classifier head (frozen base)")
    best_acc = 0.0
    for epoch in range(num_epochs):
        print(f"\nEpoch {epoch+1}/{num_epochs}")
        model.train()
        running_loss = 0.0
        running_corrects = 0

        for inputs, labels in tqdm(train_loader, desc="Training"):
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            _, preds = torch.max(outputs, 1)
            running_loss += loss.item() * inputs.size(0)
            running_corrects += (preds == labels).sum().item()

        epoch_loss = running_loss / len(train_loader.dataset)
        epoch_acc = running_corrects / len(train_loader.dataset)

        # Validation
        model.eval()
        val_correct = 0
        val_total = 0
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                _, preds = torch.max(outputs, 1)
                val_correct += (preds == labels).sum().item()
                val_total += labels.size(0)

        val_acc = val_correct / val_total
        print(f"Train Loss: {epoch_loss:.4f} | Train Acc: {epoch_acc:.4f} | Val Acc: {val_acc:.4f}")
        scheduler.step()


In [21]:
for param in model.parameters():
    param.requires_grad = False

# Unfreeze from layer4 onward
for param in model.layer4.parameters():
    param.requires_grad = True

# Use lower learning rate for fine-tuning
optimizer = torch.optim.Adam(model.parameters(), lr=1e-5, weight_decay=1e-4)
scheduler = StepLR(optimizer, step_size=5, gamma=0.1)


for param in model.parameters():
    param.requires_grad = False
This loop freezes all layers of the model by setting requires_grad = False.

This ensures no gradient will be computed for these layers, and their weights will not be updated during training.

Commonly used in transfer learning to preserve pretrained features.

# Unfreeze from layer4 onward
for param in model.layer4.parameters():
    param.requires_grad = True
This loop unfreezes only the last block (layer4) of the model.

Gradients will be computed for these parameters, allowing only layer4 to be updated during training.

This technique allows you to:

Use the power of pretrained features from earlier layers.

Adapt deeper layers to your specific dataset (e.g., medical images, satellite data).



In [22]:
def fine_tune(model, train_loader, val_loader, criterion, optimizer, scheduler, num_epochs=10):
    print("🔓 Phase 2: Fine-tuning full model")
    best_acc = 0.0
    for epoch in range(num_epochs):
        print(f"\nEpoch {epoch+1}/{num_epochs}")
        model.train()
        running_loss = 0.0
        running_corrects = 0

        for inputs, labels in tqdm(train_loader, desc="Fine-Tuning"):
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            _, preds = torch.max(outputs, 1)
            running_loss += loss.item() * inputs.size(0)
            running_corrects += (preds == labels).sum().item()

        epoch_loss = running_loss / len(train_loader.dataset)
        epoch_acc = running_corrects / len(train_loader.dataset)

        # Validation
        model.eval()
        val_correct = 0
        val_total = 0
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                _, preds = torch.max(outputs, 1)
                val_correct += (preds == labels).sum().item()
                val_total += labels.size(0)

        val_acc = val_correct / val_total
        print(f"Train Loss: {epoch_loss:.4f} | Train Acc: {epoch_acc:.4f} | Val Acc: {val_acc:.4f}")
        if val_acc > best_acc:
            best_acc = val_acc
            torch.save(model.state_dict(), 'best_resnet50_finetuned.pth')
            print("✅ Best fine-tuned model saved.")
        scheduler.step()


In [23]:
train_phase(model, train_loader, val_loader, criterion, optimizer, scheduler, num_epochs=5)


🔒 Phase 1: Training classifier head (frozen base)

Epoch 1/5


Training: 100%|██████████| 776/776 [01:08<00:00, 11.38it/s]


Train Loss: 0.1684 | Train Acc: 0.9486 | Val Acc: 0.9805

Epoch 2/5


Training: 100%|██████████| 776/776 [01:06<00:00, 11.63it/s]


Train Loss: 0.0647 | Train Acc: 0.9808 | Val Acc: 0.9843

Epoch 3/5


Training: 100%|██████████| 776/776 [01:08<00:00, 11.37it/s]


Train Loss: 0.0434 | Train Acc: 0.9869 | Val Acc: 0.9850

Epoch 4/5


Training: 100%|██████████| 776/776 [01:07<00:00, 11.49it/s]


Train Loss: 0.0320 | Train Acc: 0.9909 | Val Acc: 0.9873

Epoch 5/5


Training: 100%|██████████| 776/776 [01:07<00:00, 11.42it/s]


Train Loss: 0.0210 | Train Acc: 0.9952 | Val Acc: 0.9865


In [24]:
optimizer = torch.optim.Adam(model.parameters(), lr=5e-5, weight_decay=1e-4)
scheduler = StepLR(optimizer, step_size=5, gamma=0.1)
fine_tune(model, train_loader, val_loader, criterion, optimizer, scheduler, num_epochs=5)

🔓 Phase 2: Fine-tuning full model

Epoch 1/5


Fine-Tuning: 100%|██████████| 776/776 [01:08<00:00, 11.38it/s]


Train Loss: 0.0403 | Train Acc: 0.9887 | Val Acc: 0.9835
✅ Best fine-tuned model saved.

Epoch 2/5


Fine-Tuning: 100%|██████████| 776/776 [01:07<00:00, 11.48it/s]


Train Loss: 0.0203 | Train Acc: 0.9946 | Val Acc: 0.9899
✅ Best fine-tuned model saved.

Epoch 3/5


Fine-Tuning: 100%|██████████| 776/776 [01:07<00:00, 11.42it/s]


Train Loss: 0.0167 | Train Acc: 0.9964 | Val Acc: 0.9906
✅ Best fine-tuned model saved.

Epoch 4/5


Fine-Tuning: 100%|██████████| 776/776 [01:07<00:00, 11.49it/s]


Train Loss: 0.0097 | Train Acc: 0.9973 | Val Acc: 0.9869

Epoch 5/5


Fine-Tuning: 100%|██████████| 776/776 [01:08<00:00, 11.39it/s]


Train Loss: 0.0086 | Train Acc: 0.9977 | Val Acc: 0.9918
✅ Best fine-tuned model saved.


In [25]:
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)


In [26]:
from torchvision import models
import torch.nn as nn
import torch

# Load ResNet50 again and adjust the FC layer
model = models.resnet50(pretrained=False)
model.fc = nn.Linear(model.fc.in_features, 2)
model.load_state_dict(torch.load("best_resnet50_finetuned.pth"))
model = model.to(device)
model.eval()




ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): Bottleneck(
      (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (downsample): Sequential(
        (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 

In [27]:
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score, roc_curve
import numpy as np

all_labels = []
all_preds = []
all_probs = []

with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)

        probs = torch.softmax(outputs, dim=1)
        preds = torch.argmax(probs, dim=1)

        all_labels.extend(labels.cpu().numpy())
        all_preds.extend(preds.cpu().numpy())
        all_probs.extend(probs[:, 1].cpu().numpy())  # Probability of "Violent" class


In [28]:
from sklearn.metrics import classification_report

report = classification_report(all_labels, all_preds, target_names=['NonViolent', 'Violent'])
report_path = "/kaggle/working/classification_report.txt"

with open(report_path, "w") as f:
    f.write("Classification Report\n")
    f.write("======================\n")
    f.write(report)

print(f"✅ Classification report saved at: {report_path}")


✅ Classification report saved at: /kaggle/working/classification_report.txt


In [29]:
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix

cm = confusion_matrix(all_labels, all_preds)
plt.figure(figsize=(5, 4))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
            xticklabels=['NonViolent', 'Violent'],
            yticklabels=['NonViolent', 'Violent'])
plt.title('Confusion Matrix')
plt.xlabel('Predicted')
plt.ylabel('Actual')

cm_path = "/kaggle/working/confusion_matrix.png"
plt.savefig(cm_path)
plt.close()

print(f"✅ Confusion matrix saved at: {cm_path}")


✅ Confusion matrix saved at: /kaggle/working/confusion_matrix.png


In [30]:
from sklearn.metrics import roc_curve, roc_auc_score

fpr, tpr, thresholds = roc_curve(all_labels, all_probs)
auc = roc_auc_score(all_labels, all_probs)

plt.figure()
plt.plot(fpr, tpr, label=f'ROC curve (AUC = {auc:.4f})')
plt.plot([0, 1], [0, 1], 'k--')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
plt.legend(loc="lower right")
plt.grid()

roc_path = "/kaggle/working/roc_curve.png"
plt.savefig(roc_path)
plt.close()

print(f"✅ ROC curve saved at: {roc_path}")


✅ ROC curve saved at: /kaggle/working/roc_curve.png


In [2]:
!pip install -q gradio


[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m54.3/54.3 MB[0m [31m24.6 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0mm
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m323.6/323.6 kB[0m [31m23.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m95.3/95.3 kB[0m [31m8.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m11.6/11.6 MB[0m [31m50.3 MB/s[0m eta [36m0:00:00[0m00:01[0m0:01[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m72.0/72.0 kB[0m [31m6.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m62.4/62.4 kB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0m
[?25h

In [32]:
import gradio as gr
import torch
import torch.nn as nn
import torchvision.transforms as transforms
from torchvision.models import resnet50
from PIL import Image
import numpy as np
import cv2
import tempfile
import os
import random
from pathlib import Path

# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Path to your training frames
FRAMES_DIR = "/kaggle/working/frames_split"

def load_sample_frames():
    """Load sample frames from your training data"""
    frames_path = Path(FRAMES_DIR)
    sample_frames = []
    
    # Load samples from each class
    for class_dir in ["HockeyFight", "NonFight"]:
        class_path = frames_path / class_dir
        if class_path.exists():
            frame_files = list(class_path.glob("*.jpg")) + list(class_path.glob("*.png"))
            if frame_files:
                # Get a few random samples
                samples = random.sample(frame_files, min(3, len(frame_files)))
                for sample in samples:
                    sample_frames.append((str(sample), class_dir))
    
    return sample_frames

# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Load the model
def load_model(model_path):
    """Load the fine-tuned ResNet50 model"""
    model = resnet50(pretrained=False)
    # Binary classification: HockeyFight (Violent) vs NonFight (NonViolent)
    model.fc = nn.Linear(model.fc.in_features, 2)
    
    # Load the saved weights
    try:
        checkpoint = torch.load(model_path, map_location=device)
        if isinstance(checkpoint, dict) and 'model_state_dict' in checkpoint:
            model.load_state_dict(checkpoint['model_state_dict'])
        else:
            model.load_state_dict(checkpoint)
        model.to(device)
        model.eval()
        print(f"Model loaded successfully from {model_path}")
        return model
    except Exception as e:
        print(f"Error loading model: {e}")
        return None

# Image preprocessing - Make sure this matches your training preprocessing
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], 
                       std=[0.229, 0.224, 0.225])
])

# Alternative preprocessing (try if above doesn't work)
transform_alternative = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], 
                       std=[0.229, 0.224, 0.225])
])

# Load the model
MODEL_PATH = "/kaggle/working/best_resnet50_finetuned.pth"
model = load_model(MODEL_PATH)

# Load sample frames for testing
sample_frames = load_sample_frames()

def extract_frames(video_path, num_frames=16, max_frames=None):
    """Extract frames from video for classification"""
    cap = cv2.VideoCapture(video_path)
    frames = []
    
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    if max_frames:
        total_frames = min(total_frames, max_frames)
    
    # Calculate frame indices to extract evenly distributed frames
    if total_frames > num_frames:
        frame_indices = np.linspace(0, total_frames-1, num_frames, dtype=int)
    else:
        frame_indices = list(range(total_frames))
    
    frame_count = 0
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
            
        if frame_count in frame_indices:
            # Convert BGR to RGB
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frames.append(frame_rgb)
            
        frame_count += 1
        
        if len(frames) >= num_frames:
            break
    
    cap.release()
    return frames

def classify_image(image, use_alternative_transform=False):
    """Classify a single frame/image as Hockey Fight (Violent) or Non-Fight (NonViolent)"""
    if model is None:
        return {"Error": 1.0}, "Model not loaded properly"
    
    try:
        # Convert to PIL Image if needed
        if isinstance(image, np.ndarray):
            image = Image.fromarray(image)
        elif isinstance(image, str):
            # If it's a file path, load the image
            image = Image.open(image)
        
        # Convert to RGB if needed
        if image.mode != 'RGB':
            image = image.convert('RGB')
        
        # Choose preprocessing based on parameter
        current_transform = transform_alternative if use_alternative_transform else transform
        
        # Preprocess the image
        input_tensor = current_transform(image).unsqueeze(0).to(device)
        
        # Make prediction
        with torch.no_grad():
            outputs = model(input_tensor)
            probabilities = torch.nn.functional.softmax(outputs[0], dim=0)
            
            # Get prediction
            _, predicted = torch.max(outputs, 1)
            
            # Check both possible class orders
            # Option 1: 0=NonFight, 1=HockeyFight
            result_v1 = {
                "NonViolent (NonFight)": float(probabilities[0].item()),
                "Violent (HockeyFight)": float(probabilities[1].item())
            }
            
            # Option 2: 0=HockeyFight, 1=NonFight (in case class order is different)
            result_v2 = {
                "NonViolent (NonFight)": float(probabilities[1].item()),
                "Violent (HockeyFight)": float(probabilities[0].item())
            }
            
            # Determine classification for both interpretations
            pred_class_v1 = "NonViolent (NonFight)" if predicted.item() == 0 else "Violent (HockeyFight)"
            conf_v1 = float(probabilities[predicted.item()].item())
            
            pred_class_v2 = "Violent (HockeyFight)" if predicted.item() == 0 else "NonViolent (NonFight)"
            conf_v2 = float(probabilities[1-predicted.item()].item())
            
            # Return both interpretations for debugging
            classification_text = f"""
🎯 **Raw Model Output**: Class {predicted.item()} with confidence {conf_v1:.2%}

**Interpretation 1** (0=NonFight, 1=HockeyFight):
- Classification: {pred_class_v1}
- Confidence: {conf_v1:.2%}

**Interpretation 2** (0=HockeyFight, 1=NonFight):
- Classification: {pred_class_v2}
- Confidence: {conf_v2:.2%}

**Raw Probabilities**: [{probabilities[0].item():.3f}, {probabilities[1].item():.3f}]
**Transform Used**: {'Alternative' if use_alternative_transform else 'Standard'}
            """
            
            return result_v1, classification_text.strip()
            
    except Exception as e:
        return {"Error": 1.0}, f"Error during classification: {str(e)}"

def classify_training_frame(frame_path):
    """Classify one of your actual training frames"""
    if not frame_path:
        return {"": 0}, "No frame selected"
    
    try:
        # Get the true label from the path
        true_label = "HockeyFight" if "HockeyFight" in frame_path else "NonFight"
        
        # Classify with both transforms
        result1, details1 = classify_image(frame_path, use_alternative_transform=False)
        result2, details2 = classify_image(frame_path, use_alternative_transform=True)
        
        analysis_text = f"""
📁 **Training Frame Analysis**
🏷️ **True Label**: {true_label}
📍 **File**: {os.path.basename(frame_path)}

**Standard Transform Results:**
{details1}

**Alternative Transform Results:**
{details2}

**Recommendation**: Compare both results with the true label to see which transform/interpretation works better.
        """
        
        return result1, analysis_text.strip()
        
    except Exception as e:
        return {"Error": 1.0}, f"Error: {str(e)}"

def classify_video(video_path, classification_method="average"):
    """Classify a video by extracting frames and analyzing each frame individually"""
    if model is None:
        return "Error: Model not loaded properly", "Model Error"
    
    if video_path is None:
        return "No video uploaded", "No Input"
    
    try:
        # Extract frames from video (similar to training data preparation)
        frames = extract_frames(video_path, num_frames=16)
        
        if not frames:
            return "Could not extract frames from video", "Frame Extraction Error"
        
        # Process each frame individually (like training data)
        frame_predictions = []
        frame_probabilities = []
        
        for i, frame in enumerate(frames):
            # Convert to PIL Image
            pil_frame = Image.fromarray(frame)
            
            # Classify this frame (same as training approach)
            result, _ = classify_image(pil_frame)
            
            if "Error" not in result:
                non_violent_prob = result["NonViolent (NonFight)"]
                violent_prob = result["Violent (HockeyFight)"]
                
                frame_probabilities.append([non_violent_prob, violent_prob])
                frame_predictions.append(1 if violent_prob > non_violent_prob else 0)
        
        if not frame_probabilities:
            return {"Error": 1.0}, "No frames could be processed"
        
        # Aggregate frame predictions
        frame_probabilities = np.array(frame_probabilities)
        
        if classification_method == "average":
            # Average probabilities across all frames
            avg_probabilities = np.mean(frame_probabilities, axis=0)
            final_prediction = np.argmax(avg_probabilities)
        elif classification_method == "majority":
            # Majority vote across frames
            final_prediction = np.bincount(frame_predictions).argmax()
            avg_probabilities = np.mean(frame_probabilities, axis=0)
        else:
            # Max confidence approach
            max_conf_idx = np.argmax(np.max(frame_probabilities, axis=1))
            final_prediction = frame_predictions[max_conf_idx]
            avg_probabilities = frame_probabilities[max_conf_idx]
        
        # Create result dictionary
        result = {
            "NonViolent (NonFight)": float(avg_probabilities[0]),
            "Violent (HockeyFight)": float(avg_probabilities[1])
        }
        
        # Final classification
        class_names = ["NonViolent (NonFight)", "Violent (HockeyFight)"]
        predicted_class = class_names[final_prediction]
        confidence = float(avg_probabilities[final_prediction])
        
        # Detailed analysis
        num_frames_processed = len(frames)
        violent_frames = sum(1 for pred in frame_predictions if pred == 1)
        
        classification_text = f"""
        🎯 **Final Classification**: {predicted_class}
        📊 **Confidence**: {confidence:.2%}
        🎬 **Frames Analyzed**: {num_frames_processed}
        ⚡ **Violent Frames**: {violent_frames}/{num_frames_processed} ({violent_frames/num_frames_processed*100:.1f}%)
        📈 **Aggregation Method**: {classification_method.title()}
        
        **Frame-by-Frame Analysis:**
        Each frame was classified individually (same as training approach),
        then results were aggregated for final video classification.
        """
        
        return result, classification_text.strip()
        
    except Exception as e:
        error_msg = f"Error during video classification: {str(e)}"
        return {"Error": 1.0}, error_msg

# Create Gradio interface
def create_interface():
    with gr.Blocks(title="Hockey Fight Detection", theme=gr.themes.Soft()) as demo:
        gr.Markdown(
            """
            # 🏒 Hockey Fight Detection System
            
            Upload images/videos OR test with your actual training frames to debug classification issues.
            This interface helps identify preprocessing or class mapping problems.
            
            **Supported formats**: Images (.jpg, .png) and Videos (.avi, .mp4, .mov, .mkv)
            **Debug Mode**: Test with actual training frames to verify model behavior
            """
        )
        
        with gr.Row():
            with gr.Column(scale=1):
                # Tabs for different input types
                with gr.Tabs():
                    with gr.TabItem("🖼️ Upload Image/Frame"):
                        image_input = gr.Image(
                            label="Upload Hockey Frame/Image",
                            type="pil",
                            height=300
                        )
                        use_alt_transform = gr.Checkbox(
                            label="Use Alternative Transform (Resize→CenterCrop)",
                            value=False
                        )
                        image_submit_btn = gr.Button("🔍 Classify Frame", variant="primary")
                    
                    with gr.TabItem("🎯 Test Training Frames"):
                        if sample_frames:
                            training_frame_dropdown = gr.Dropdown(
                                choices=[f"{os.path.basename(path)} ({label})" for path, label in sample_frames],
                                label="Select a Training Frame",
                                value=f"{os.path.basename(sample_frames[0][0])} ({sample_frames[0][1]})" if sample_frames else None
                            )
                            training_frame_btn = gr.Button("🔍 Test Training Frame", variant="secondary")
                        else:
                            gr.Markdown("⚠️ No training frames found in `/kaggle/working/frames_split`")
                    
                    with gr.TabItem("🎬 Full Video"):
                        video_input = gr.Video(
                            label="Upload Hockey Video",
                            height=300
                        )
                        method_selector = gr.Radio(
                            choices=["average", "majority", "max_confidence"],
                            value="average",
                            label="Frame Aggregation Method",
                            info="How to combine individual frame predictions"
                        )
                        video_submit_btn = gr.Button("🔍 Analyze Video", variant="primary")
                
            with gr.Column(scale=1):
                # Probability output
                prob_output = gr.Label(
                    label="Classification Probabilities",
                    num_top_classes=2
                )
                
                # Text output for detailed results
                text_output = gr.Textbox(
                    label="Detailed Analysis & Debug Info",
                    lines=12,
                    max_lines=20
                )
        
        # Information sections
        with gr.Row():
            with gr.Column():
                with gr.Accordion("🔧 Debugging Tips", open=True):
                    gr.Markdown(
                        """
                        **Common Issues & Solutions:**
                        1. **Wrong Class Order**: Model might have 0=HockeyFight, 1=NonFight (check both interpretations)
                        2. **Preprocessing Mismatch**: Try both Standard and Alternative transforms
                        3. **Class Mapping**: Verify which class index corresponds to which label
                        
                        **Test Strategy:**
                        - Use "Test Training Frames" tab to verify model works on known data
                        - Compare results with true labels to identify the issue
                        - Check raw probabilities and model outputs
                        """
                    )
                    
            with gr.Column():
                with gr.Accordion("ℹ️ Model Information", open=False):
                    gr.Markdown(
                        """
                        **Training Approach**: 
                        - Model trained on individual frames extracted from hockey videos
                        - Each frame labeled as HockeyFight or NonFight
                        - ResNet50 architecture fine-tuned for frame-level classification
                        
                        **Classes**: 
                        - NonViolent (NonFight): Regular hockey gameplay frames
                        - Violent (HockeyFight): Fighting/aggressive behavior frames
                        
                        **Input**: 224x224 RGB images (extracted video frames)
                        **Video Processing**: Extracts frames → Classifies each → Aggregates results
                        """
                    )
        
        with gr.Accordion("⚠️ Usage Guidelines", open=False):
            gr.Markdown(
                """
                **Best Results:**
                - Clear hockey game footage
                - Good lighting and resolution
                - Videos under 2 minutes for faster processing
                
                **Limitations:**
                - Designed specifically for hockey content
                - May not work well on other sports or contexts
                - Results should be interpreted by domain experts
                
                **Disclaimer**: This model is for research and educational purposes only.
                """
            )
        
        # Event handlers
        image_submit_btn.click(
            fn=lambda img, alt_transform: classify_image(img, alt_transform),
            inputs=[image_input, use_alt_transform],
            outputs=[prob_output, text_output]
        )
        
        if sample_frames:
            training_frame_btn.click(
                fn=lambda selected: classify_training_frame(
                    next((path for path, label in sample_frames 
                         if f"{os.path.basename(path)} ({label})" == selected), None)
                ),
                inputs=training_frame_dropdown,
                outputs=[prob_output, text_output]
            )
        
        video_submit_btn.click(
            fn=lambda video, method: classify_video(video, method),
            inputs=[video_input, method_selector],
            outputs=[prob_output, text_output]
        )
        
        # Auto-classify on upload
        image_input.change(
            fn=lambda img, alt_transform: classify_image(img, alt_transform) if img else ({}, ""),
            inputs=[image_input, use_alt_transform],
            outputs=[prob_output, text_output]
        )
    
    return demo

# Launch the interface
if __name__ == "__main__":
    demo = create_interface()
    # Kaggle-friendly launch configuration
    demo.launch(
        share=True,  # Create a public link for Kaggle
        debug=False,  # Disable debug in Kaggle
        quiet=True,   # Reduce console output
        height=700,   # Set height for Kaggle display
        show_error=True
    )



Model loaded successfully from /kaggle/working/best_resnet50_finetuned.pth
* Running on public URL: https://70738e541d03d1a354.gradio.live
