<a href="https://colab.research.google.com/github/RaunakBidesi/SwiftSheild/blob/main/SwiftShield.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import os

# Path to your dataset inside Google Drive
base_path = "/content/drive/MyDrive/SwiftSheild"
fight_path = os.path.join(base_path, "fight")  # Path to Fight folder
non_fight_path = os.path.join(base_path, "nofight")  # Path to Non-Fight folder

# Get list of video files
fight_videos = [os.path.join(fight_path, f) for f in os.listdir(fight_path) if f.endswith(".mp4")]
non_fight_videos = [os.path.join(non_fight_path, f) for f in os.listdir(non_fight_path) if f.endswith(".mp4")]

print(f"Found {len(fight_videos)} fight videos and {len(non_fight_videos)} non-fight videos.")


Found 150 fight videos and 150 non-fight videos.


In [None]:
import cv2
import os
import numpy as np

# Paths for saving extracted frames
fight_frames_path = os.path.join(base_path, "fight_frames")
non_fight_frames_path = os.path.join(base_path, "non_fight_frames")

# Create directories if they don't exist
os.makedirs(fight_frames_path, exist_ok=True)
os.makedirs(non_fight_frames_path, exist_ok=True)

# Function to extract frames dynamically based on video length
def extract_frames(video_path, output_folder, fps_factor=10, img_size=(224, 224)):
    cap = cv2.VideoCapture(video_path)

    if not cap.isOpened():
        print(f"Error: Could not open {video_path}")
        return

    # Get video properties
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    fps = int(cap.get(cv2.CAP_PROP_FPS)) if cap.get(cv2.CAP_PROP_FPS) > 0 else 30  # Default to 30 FPS if unknown
    video_duration = total_frames / fps  # Calculate video length in seconds

    # Determine the number of frames to extract dynamically
    num_frames = min(int(video_duration * fps_factor), total_frames)  # Extract 10 frames per second

    # Get evenly spaced frame indices
    frame_indices = np.linspace(0, total_frames - 1, num_frames, dtype=int)

    video_name = os.path.basename(video_path).split('.')[0]  # Get video name without extension

    for idx, frame_idx in enumerate(frame_indices):
        cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)  # Move to specific frame
        ret, frame = cap.read()
        if not ret:
            print(f"Warning: Could not read frame {frame_idx} in {video_path}. Skipping...")
            continue

        frame = cv2.resize(frame, img_size)  # Resize to 224x224
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)  # Convert BGR to RGB
        frame_path = os.path.join(output_folder, f"{video_name}_frame{idx}.jpg")
        cv2.imwrite(frame_path, frame)

    cap.release()

# Extract frames for fight videos
for video in fight_videos:
    extract_frames(video, fight_frames_path, fps_factor=10)

# Extract frames for non-fight videos
for video in non_fight_videos:
    extract_frames(video, non_fight_frames_path, fps_factor=10)

print("Frame extraction complete!")

Frame extraction complete!


In [None]:
import os
import cv2

def count_images(folder_path):
    return len([f for f in os.listdir(folder_path) if f.endswith('.jpg')])

def check_corrupt_images(folder_path):
    corrupt_files = []
    for img_file in os.listdir(folder_path):
        if img_file.endswith('.jpg'):
            img_path = os.path.join(folder_path, img_file)
            img = cv2.imread(img_path)
            if img is None:
                corrupt_files.append(img_path)

    return corrupt_files

# Count images in each category
fight_images_count = count_images(fight_frames_path)
non_fight_images_count = count_images(non_fight_frames_path)

print(f"Fight images: {fight_images_count}")
print(f"Non-Fight images: {non_fight_images_count}")

# Check for corrupt images
corrupt_fight = check_corrupt_images(fight_frames_path)
corrupt_non_fight = check_corrupt_images(non_fight_frames_path)

if corrupt_fight or corrupt_non_fight:
    print("⚠️ Warning: Found corrupt images!")
    print(f"Corrupt Fight Images: {len(corrupt_fight)}")
    print(f"Corrupt Non-Fight Images: {len(corrupt_non_fight)}")
else:
    print("✅ No corrupt images found!")


Fight images: 3374
Non-Fight images: 3092
✅ No corrupt images found!


In [None]:
import os  # OS module to handle file paths and directory operations
import shutil  # shutil is used to move files into different folders
import random  # Random module to shuffle images before splitting

# Define base dataset directory inside Google Drive
dataset_base = "/content/drive/MyDrive/SwiftSheild/dataset"

# Paths where extracted fight & non-fight images are stored
fight_frames_path = os.path.join(base_path, "fight_frames")
non_fight_frames_path = os.path.join(base_path, "non_fight_frames")

# Define paths for train, validation, and test directories
train_dir = os.path.join(dataset_base, "train")
val_dir = os.path.join(dataset_base, "val")
test_dir = os.path.join(dataset_base, "test")

# Create train, val, and test directories with subdirectories for 'fight' and 'non_fight'
for split in [train_dir, val_dir, test_dir]:
    os.makedirs(os.path.join(split, "fight"), exist_ok=True)  # Create 'fight' folder
    os.makedirs(os.path.join(split, "non_fight"), exist_ok=True)  # Create 'non_fight' folder

# Function to split dataset into train, validation, and test sets
def split_data(source_folder, train_dest, val_dest, test_dest, train_ratio=0.8, val_ratio=0.1):
    """
    Splits images from the source folder into train, validation, and test folders.

    Parameters:
        source_folder (str): Path to the folder containing images (fight or non-fight).
        train_dest (str): Path to the train folder for this class.
        val_dest (str): Path to the validation folder for this class.
        test_dest (str): Path to the test folder for this class.
        train_ratio (float): Percentage of data to use for training.
        val_ratio (float): Percentage of data to use for validation.

    The test ratio is automatically calculated as: `1 - (train_ratio + val_ratio)`.
    """

    # List all image files in the source folder
    files = [f for f in os.listdir(source_folder) if f.endswith(".jpg")]

    # Shuffle the images randomly to avoid any bias in the dataset split
    random.shuffle(files)

    # Compute split indices
    train_split = int(len(files) * train_ratio)  # First 80% of images for training
    val_split = int(len(files) * (train_ratio + val_ratio))  # Next 10% for validation

    # Iterate through all image files and move them to appropriate folders
    for i, file in enumerate(files):
        src_path = os.path.join(source_folder, file)  # Full path of the current image

        if i < train_split:  # First 80% go to the train folder
            dest_folder = train_dest
        elif i < val_split:  # Next 10% go to the validation folder
            dest_folder = val_dest
        else:  # Remaining 10% go to the test folder
            dest_folder = test_dest

        # Move the image file to its new location
        shutil.move(src_path, os.path.join(dest_folder, file))

# Apply the split function to both fight and non-fight image folders
split_data(fight_frames_path, os.path.join(train_dir, "fight"), os.path.join(val_dir, "fight"), os.path.join(test_dir, "fight"))
split_data(non_fight_frames_path, os.path.join(train_dir, "non_fight"), os.path.join(val_dir, "non_fight"), os.path.join(test_dir, "non_fight"))

print("✅ Dataset organization complete!")


✅ Dataset organization complete!


In [None]:
import tensorflow as tf

# Define dataset directory
dataset_base = "/content/drive/MyDrive/SwiftSheild/dataset"

# Load training dataset
train_dataset = tf.keras.utils.image_dataset_from_directory(
    directory=os.path.join(dataset_base, "train"),
    image_size=(224, 224),  # Resize to match ResNet50 input
    batch_size=32,  # Adjust batch size as needed
    shuffle=True  # Shuffle data for better training
)

# Load validation dataset
val_dataset = tf.keras.utils.image_dataset_from_directory(
    directory=os.path.join(dataset_base, "val"),
    image_size=(224, 224),
    batch_size=32,
    shuffle=True
)

# Load test dataset (used for final evaluation)
test_dataset = tf.keras.utils.image_dataset_from_directory(
    directory=os.path.join(dataset_base, "test"),
    image_size=(224, 224),
    batch_size=32,
    shuffle=False  # No need to shuffle test data
)

# Normalize pixel values (ResNet expects input in 0-1 range)
normalization_layer = tf.keras.layers.Rescaling(1./255)

train_dataset = train_dataset.map(lambda x, y: (normalization_layer(x), y))
val_dataset = val_dataset.map(lambda x, y: (normalization_layer(x), y))
test_dataset = test_dataset.map(lambda x, y: (normalization_layer(x), y))

print("✅ Dataset successfully loaded and normalized!")


Found 5172 files belonging to 2 classes.
Found 646 files belonging to 2 classes.
Found 648 files belonging to 2 classes.
✅ Dataset successfully loaded and normalized!


In [None]:
for images, labels in train_dataset.take(1):
    print(f"Batch shape: {images.shape}, Labels shape: {labels.shape}")


Batch shape: (32, 224, 224, 3), Labels shape: (32,)


In [None]:
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.applications import ResNet50

# Load pre-trained ResNet50 without the classification head (top layers)
base_model = ResNet50(weights="imagenet", include_top=False, input_shape=(224, 224, 3))

# Freeze the base model layers (don't update weights during initial training)
base_model.trainable = False

# Create the classification head
model = models.Sequential([
    base_model,  # Pre-trained ResNet50 base
    layers.GlobalAveragePooling2D(),  # Converts feature maps to a single vector
    layers.Dense(128, activation="relu"),  # Fully connected layer
    layers.Dropout(0.5),  # Dropout to prevent overfitting
    layers.Dense(1, activation="sigmoid")  # Final layer (binary classification)
])

# Compile the model
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001),  # Lower LR for stability
    loss="binary_crossentropy",  # Binary classification
    metrics=["accuracy"]
)

# Display model summary
model.summary()


Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet50_weights_tf_dim_ordering_tf_kernels_notop.h5
[1m94765736/94765736[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 0us/step


In [None]:
# Train the model
history = model.fit(
    train_dataset,
    validation_data=val_dataset,
    epochs=10,  # Train for 10 epochs first
    batch_size=32
)

# Save the model in Google Drive to prevent losing it again
model_save_path = "/content/drive/MyDrive/SwiftSheild/resnet50_fight_detection_initial.h5"
model.save(model_save_path)
print(f"✅ Model saved at: {model_save_path}")


Epoch 1/10
[1m162/162[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1409s[0m 9s/step - accuracy: 0.5065 - loss: 0.7659 - val_accuracy: 0.5728 - val_loss: 0.6852
Epoch 2/10
[1m162/162[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1041s[0m 6s/step - accuracy: 0.5373 - loss: 0.6904 - val_accuracy: 0.5418 - val_loss: 0.6783
Epoch 3/10
[1m162/162[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1044s[0m 6s/step - accuracy: 0.5547 - loss: 0.6843 - val_accuracy: 0.5975 - val_loss: 0.6727
Epoch 4/10
[1m162/162[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1032s[0m 6s/step - accuracy: 0.6056 - loss: 0.6701 - val_accuracy: 0.5898 - val_loss: 0.6693
Epoch 5/10
[1m162/162[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1036s[0m 6s/step - accuracy: 0.6103 - loss: 0.6701 - val_accuracy: 0.6223 - val_loss: 0.6646
Epoch 6/10
[1m162/162[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m999s[0m 6s/step - accuracy: 0.6083 - loss: 0.6642 - val_accuracy: 0.6068 - val_loss: 0.6637
Epoch 7/10
[1m16



✅ Model saved at: /content/drive/MyDrive/SwiftSheild/resnet50_fight_detection_initial.h5


In [None]:
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.applications import ResNet50

# Load the saved model
model = tf.keras.models.load_model("/content/drive/MyDrive/SwiftSheild/resnet50_fight_detection_initial.h5")

# Recompile the model to ensure evaluation works
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001),
    loss="binary_crossentropy",
    metrics=["accuracy"]
)

# Now, evaluate the model on the test dataset
test_loss, test_accuracy = model.evaluate(test_dataset)
print(f"✅ Test Accuracy: {test_accuracy:.4f}, Test Loss: {test_loss:.4f}")




[1m21/21[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m141s[0m 6s/step - accuracy: 0.7778 - loss: 0.6004
✅ Test Accuracy: 0.6867, Test Loss: 0.6437


In [None]:
import numpy as np
from sklearn.metrics import confusion_matrix

# Get predictions
y_true = []
y_pred = []

for images, labels in test_dataset:
    preds = model.predict(images)  # Get predicted probabilities
    y_true.extend(labels.numpy())  # Store true labels
    y_pred.extend(np.round(preds).flatten())  # Convert probabilities to 0 or 1

# Convert to numpy arrays for analysis
y_true = np.array(y_true)
y_pred = np.array(y_pred)

# Compute confusion matrix
cm = confusion_matrix(y_true, y_pred)
tn, fp, fn, tp = cm.ravel()

# Print results
print(f"True Positives (TP): {tp}")   # Correctly predicted fights
print(f"False Positives (FP): {fp}")  # Mistakenly predicted fights (false alarms)
print(f"False Negatives (FN): {fn}")  # Missed fights (dangerous)
print(f"True Negatives (TN): {tn}")   # Correctly predicted non-fights


[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 8s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 8s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 5s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 5s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 7s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 5s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 7s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 6s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 5s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 8s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 6s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 6s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 8s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 5s/step
[1m1/1[0m [32m━━━

In [None]:
# Unfreeze only the last 20 layers of ResNet50
for layer in base_model.layers[-20:]:  # Adjust as needed
    layer.trainable = True

# Compile with a lower learning rate for fine-tuning
fine_tune_lr = 0.00001  # 10x smaller than the original learning rate
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=fine_tune_lr),
    loss="binary_crossentropy",
    metrics=["accuracy"]
)

# Fine-tune for 10 more epochs
history_fine = model.fit(
    train_dataset,
    validation_data=val_dataset,
    epochs=10,
    batch_size=32
)

# Save the fine-tuned model
model.save("/content/drive/MyDrive/SwiftSheild/resnet50_fight_detection_finetuned.h5")


Epoch 1/10
[1m162/162[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2574s[0m 16s/step - accuracy: 0.6203 - loss: 0.6493 - val_accuracy: 0.4830 - val_loss: 0.6948
Epoch 2/10
[1m162/162[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1351s[0m 8s/step - accuracy: 0.8880 - loss: 0.3301 - val_accuracy: 0.5124 - val_loss: 0.6726
Epoch 3/10
[1m162/162[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1306s[0m 8s/step - accuracy: 0.9649 - loss: 0.1517 - val_accuracy: 0.9861 - val_loss: 0.1412
Epoch 4/10
[1m162/162[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1287s[0m 8s/step - accuracy: 0.9907 - loss: 0.0687 - val_accuracy: 0.9783 - val_loss: 0.0771
Epoch 5/10
[1m162/162[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1313s[0m 8s/step - accuracy: 0.9913 - loss: 0.0440 - val_accuracy: 0.9706 - val_loss: 0.0869
Epoch 6/10
[1m162/162[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1312s[0m 8s/step - accuracy: 0.9931 - loss: 0.0301 - val_accuracy: 0.9907 - val_loss: 0.0168
Epoch 7/10
[1m



In [None]:
import tensorflow as tf
import os

# Define dataset directory
dataset_base = "/content/drive/MyDrive/SwiftSheild/dataset"

# Reload test dataset
test_dataset = tf.keras.utils.image_dataset_from_directory(
    directory=os.path.join(dataset_base, "test"),
    image_size=(224, 224),
    batch_size=32,
    shuffle=False  # No need to shuffle test data
)

# Normalize pixel values (0-1 range)
normalization_layer = tf.keras.layers.Rescaling(1./255)
test_dataset = test_dataset.map(lambda x, y: (normalization_layer(x), y))

print("✅ Test dataset successfully reloaded and normalized!")


Found 648 files belonging to 2 classes.
✅ Test dataset successfully reloaded and normalized!


In [None]:
# Load the fine-tuned model
model_path = "/content/drive/MyDrive/SwiftSheild/resnet50_fight_detection_finetuned.h5"
model = tf.keras.models.load_model(model_path)

print(f"✅ Model successfully loaded from: {model_path}")




✅ Model successfully loaded from: /content/drive/MyDrive/SwiftSheild/resnet50_fight_detection_finetuned.h5


In [None]:
# Recompile model before evaluating (needed after loading from .h5)
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=0.00001),  # Keep the fine-tuned learning rate
    loss="binary_crossentropy",
    metrics=["accuracy"]
)

# Evaluate the model on test dataset
test_loss, test_accuracy = model.evaluate(test_dataset)
print(f"✅ Test Accuracy: {test_accuracy:.4f}, Test Loss: {test_loss:.4f}")


[1m21/21[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m123s[0m 5s/step - accuracy: 0.9972 - loss: 0.0083
✅ Test Accuracy: 0.9969, Test Loss: 0.0086


In [None]:
test_video_path = "/content/drive/MyDrive/SwiftSheild/hooligan.violance.mp4"


In [None]:
import cv2
import numpy as np

# Function to extract frames from a single test video
def extract_test_video_frames(video_path, fps_factor=10, img_size=(224, 224)):
    cap = cv2.VideoCapture(video_path)

    if not cap.isOpened():
        print(f"Error: Could not open {video_path}")
        return None

    # Get video properties
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    fps = int(cap.get(cv2.CAP_PROP_FPS)) if cap.get(cv2.CAP_PROP_FPS) > 0 else 30  # Default to 30 FPS if unknown
    video_duration = total_frames / fps  # Calculate video length in seconds

    # Determine the number of frames to extract dynamically
    num_frames = min(int(video_duration * fps_factor), total_frames)  # Extract 10 frames per second

    # Get evenly spaced frame indices
    frame_indices = np.linspace(0, total_frames - 1, num_frames, dtype=int)

    frame_list = []  # List to store extracted frames

    for frame_idx in frame_indices:
        cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)  # Move to specific frame
        ret, frame = cap.read()
        if not ret:
            print(f"Warning: Could not read frame {frame_idx} in {video_path}. Skipping...")
            continue

        frame = cv2.resize(frame, img_size)  # Resize to 224x224
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)  # Convert BGR to RGB
        frame = frame / 255.0  # Normalize pixel values
        frame_list.append(frame)

    cap.release()

    # Return frames as a NumPy array for prediction
    return np.array(frame_list)

# Define the test video path
test_video_path = "/content/drive/MyDrive/SwiftSheild/hooligan.violance.mp4"

# Extract frames from the test video
frames = extract_test_video_frames(test_video_path)

if frames is not None:
    print(f"✅ Extracted {frames.shape[0]} frames from the test video!")
else:
    print("⚠️ Frame extraction failed!")


✅ Extracted 52 frames from the test video!


In [None]:
import tensorflow as tf
import os
# Load the fine-tuned model
model_path = "/content/drive/MyDrive/SwiftSheild/resnet50_fight_detection_finetuned.h5"
model = tf.keras.models.load_model(model_path)

print(f"✅ Model successfully loaded from: {model_path}")




✅ Model successfully loaded from: /content/drive/MyDrive/SwiftSheild/resnet50_fight_detection_finetuned.h5


In [None]:
# Make predictions
predictions = model.predict(frames)  # Predict fight probabilities for each frame
predicted_labels = (predictions > 0.5).astype(int)  # Convert probabilities to 0 (Non-Fight) or 1 (Fight)

# Count fight vs. non-fight frames
fight_frames = np.sum(predicted_labels)
non_fight_frames = len(predicted_labels) - fight_frames

print(f"✅ Fight Frames Detected: {fight_frames}")
print(f"✅ Non-Fight Frames Detected: {non_fight_frames}")

# Final decision: Is the video a fight?
if fight_frames > non_fight_frames:
    print("🔥 Fight Detected in the Video! 🔥")
else:
    print("✅ No Fight Detected in the Video.")


[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m14s[0m 6s/step
✅ Fight Frames Detected: 50
✅ Non-Fight Frames Detected: 2
🔥 Fight Detected in the Video! 🔥
