📂 1. Import Required Libraries

In [None]:
# 📌 Import necessary libraries
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import load_model
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.utils import Sequence
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import classification_report, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
import cv2

🧠 2. Set Configuration Variables

In [None]:
# 📌 Set this to True if you need to convert videos to .npy files
CONVERT_VIDEOS = True

📦 3. Define the Data Generator

In [None]:
# 📌 DataGenerator class to load and process videos
class DataGenerator(Sequence):
    def __init__(self, data_folder, batch_size=32, dim=(30, 150, 150), n_channels=3, n_classes=2, shuffle=True):
        self.dim = dim
        self.batch_size = batch_size
        self.n_channels = n_channels
        self.n_classes = n_classes
        self.shuffle = shuffle
        self.data_folder = data_folder
        
        self.files = []
        self.labels = []
        
        for label in ['Fight', 'NonFight']:
            folder = os.path.join(data_folder, label)
            for file in os.listdir(folder):
                if file.endswith('.npy'):
                    self.files.append(os.path.join(folder, file))
                    self.labels.append(1 if label == 'Fight' else 0)
        
        self.indexes = np.arange(len(self.files))
        self.on_epoch_end()

    def __len__(self):
        return int(np.floor(len(self.files) / self.batch_size))

    def __getitem__(self, index):
        indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]
        X, y = self.__data_generation(indexes)
        return X, y

    def on_epoch_end(self):
        self.indexes = np.arange(len(self.files))
        if self.shuffle:
            np.random.shuffle(self.indexes)

    def __data_generation(self, indexes):
        X = np.empty((self.batch_size, *self.dim, self.n_channels))
        y = np.empty(self.batch_size, dtype=int)

        for i, idx in enumerate(indexes):
            X[i,] = np.load(self.files[idx]) / 255.0  # Load and normalize
            y[i] = self.labels[idx]

        return X, y

🎥 4. Convert Videos to .npy Format

In [None]:
# 📌 Function to convert videos to .npy format
def convert_videos_to_npy(input_path, output_path):
    class_counters = {"Fight": 1, "NonFight": 1}  # Counters for naming files

    for class_name in os.listdir(input_path):
        class_dir = os.path.join(input_path, class_name)
        class_output_dir = os.path.join(output_path, class_name)

        # Create output subdirectory for the class
        os.makedirs(class_output_dir, exist_ok=True)

        if os.path.isdir(class_dir):
            for video_name in os.listdir(class_dir):
                video_path = os.path.join(class_dir, video_name)
                if video_name.endswith(('.mp4', '.avi', '.mkv')):  # Add more formats if needed
                    cap = cv2.VideoCapture(video_path)
                    frames = []

                    # Read frames from the video
                    while True:
                        ret, frame = cap.read()
                        if not ret:
                            break
                        frame = cv2.resize(frame, (150, 150))  
                        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                        frames.append(frame)

                    cap.release()

                    # Convert frames to a numpy array
                    frames_array = np.array(frames, dtype=np.uint8)

                    # Generate filename based on class and counter
                    file_prefix = "F" if class_name == "Fight" else "NF"
                    output_file = os.path.join(class_output_dir, f"{file_prefix}_{class_counters[class_name]}.npy")

                    np.save(output_file, frames_array)
                    print(f"Converted and saved: {output_file}")

                    class_counters[class_name] += 1

🛠 5. Load Pretrained Model


In [None]:
# 📌 Load the pre-trained violence detection model
model = load_model('violence_detection_model2.h5')
print("Pretrained Violence Model Loaded Successfully!")

📂 6. Prepare Dataset for Training

In [None]:
# 📌 Paths for Road Rage dataset
road_rage_dataset_folder = "Numpy_Videos/RoadRageDataset"
output_folder = "Numpy_Videos"

# 📌 Convert videos to .npy format if needed
if CONVERT_VIDEOS:
    convert_videos_to_npy(road_rage_dataset_folder, output_folder)

# 📌 Create data generators
print("Creating data generators for the road rage dataset...")
train_generator = DataGenerator(output_folder, batch_size=16)
validation_generator = DataGenerator(output_folder, batch_size=16, shuffle=False)
print("Data generators created successfully!")

⚖ 7. Handle Class Imbalance

In [None]:
# 📌 Compute class weights for imbalanced dataset
y_labels = np.concatenate([label for _, label in train_generator])
class_weights = compute_class_weight('balanced', classes=np.unique(y_labels), y=y_labels)
class_weights_dict = {i: class_weights[i] for i in range(len(class_weights))}

🎯 8. Fine-Tune the Model

In [None]:
# 📌 Fine-tune the model on the new dataset
print("Starting model fine-tuning process...")
early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)
lr_scheduler = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=3)

# 📌 Display model summary
print("Model Summary:")
model.summary()

# 📌 Train the model
history = model.fit(train_generator, 
                    validation_data=validation_generator,
                    epochs=20,
                    class_weight=class_weights_dict,
                    callbacks=[early_stopping, lr_scheduler])
print("Model fine-tuning process completed!")

💾 9. Save the Fine-Tuned Model

In [None]:
# 📌 Save the newly trained model
model.save('road_rage_detection_model2.h5')
print("Fine-Tuned Road Rage Model Saved as 'road_rage_detection_model2.h5'")

📊 10. Evaluate Model Performance

In [None]:
# 📌 Generate predictions for the test set
y_pred = model.predict(validation_generator)
y_pred_classes = (y_pred > 0.5).astype(int).flatten()

# 📌 Collect true labels
y_true = np.concatenate([label for _, label in validation_generator])
y_true = y_true[:len(y_pred_classes)]  # Ensure lengths match

# 📌 Print classification report
print("\nClassification Report:")
print(classification_report(y_true, y_pred_classes))

📈 11. Plot Confusion Matrix

In [None]:
# 📌 Compute confusion matrix
cm = confusion_matrix(y_true, y_pred_classes)

# 📌 Plot confusion matrix
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=['NonFight', 'Fight'], yticklabels=['NonFight', 'Fight'])
plt.title('Confusion Matrix')
plt.ylabel('True Label')
plt.xlabel('Predicted Label')
plt.show()