In [None]:
import os
import cv2
import numpy as np
import random
import tensorflow as tf
from moviepy.editor import *
import keras
from keras.applications.vgg16 import VGG16
from keras.layers import Dense, Input, RepeatVector, Dropout
from keras.models import Model
from keras.optimizers import Nadam, Adam
from keras.layers import LSTM, GRU, Bidirectional, Attention
from keras.layers import TimeDistributed
from keras.layers import GlobalAveragePooling2D
from keras.layers import Conv2D, BatchNormalization, MaxPool2D, GlobalMaxPool2D
from keras.layers import TimeDistributed, GRU, Dense, Dropout

In [None]:
from tensorflow.keras.utils import to_categorical

from tensorflow.keras.layers import *

from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.utils import plot_model
from keras.models import Sequential, Model

In [None]:
import json

# Save the list to a file
# with open('file_paths_test.json', 'w') as file:
#    json.dump(file_paths_test, file)

# Load the list from the file
with open('file_paths_test.json', 'r') as file:
    loaded_list = json.load(file)

print(loaded_list)

In [None]:
from tensorflow.keras.models import load_model
mobileNetGRU = load_model('VGG+GRU_best_weights.keras')
mobileNetGRU.summary()

In [None]:
def predict_video(video_file_path, SEQUENCE_LENGTH):
 
    video_reader = cv2.VideoCapture(video_file_path)
 
    # Get the width and height of the video.
    original_video_width = int(video_reader.get(cv2.CAP_PROP_FRAME_WIDTH))
    original_video_height = int(video_reader.get(cv2.CAP_PROP_FRAME_HEIGHT))
 
    # Declare a list to store video frames we will extract.
    frames_list = []
    
    # Store the predicted class in the video.
    predicted_class_name = ''
 
    # Get the number of frames in the video.
    video_frames_count = int(video_reader.get(cv2.CAP_PROP_FRAME_COUNT))
 
    # Calculate the interval after which frames will be added to the list.
    skip_frames_window = max(int(video_frames_count/SEQUENCE_LENGTH),1)
 
    # Iterating the number of times equal to the fixed length of sequence.
    for frame_counter in range(SEQUENCE_LENGTH):
 
        # Set the current frame position of the video.
        video_reader.set(cv2.CAP_PROP_POS_FRAMES, frame_counter * skip_frames_window)
 
        success, frame = video_reader.read() 
 
        if not success:
            break
 
        # Resize the Frame to fixed Dimensions.
        resized_frame = cv2.resize(frame, (IMAGE_HEIGHT, IMAGE_WIDTH))
        
        # Normalize the resized frame.
        normalized_frame = resized_frame / 255
        
        # Appending the pre-processed frame into the frames list
        frames_list.append(normalized_frame)
 
    # Passing the  pre-processed frames to the model and get the predicted probabilities.
    predicted_labels_probabilities = mobileNetGRU.predict(np.expand_dims(frames_list, axis = 0))[0]
 
    # Get the index of class with highest probability.
    predicted_label = np.argmax(predicted_labels_probabilities)
 
    # Get the class name using the retrieved index.
    predicted_class_name = class_categories_list[predicted_label]
    
    # Display the predicted class along with the prediction confidence.
    print(f'Predicted: {predicted_class_name}\nConfidence: {predicted_labels_probabilities[predicted_label]}')
    return predicted_class_name, predicted_labels_probabilities[predicted_label]
    video_reader.release()
    

In [None]:
import os

# Specify the folder path
folder_path = "TestSet/"

# List to store .mp4 file paths
loaded_list_new = []

# List files in the given folder
for file in os.listdir(folder_path):
    # Check if the file ends with .mp4 and is a file (not a directory)
    if file.endswith(".mp4") and os.path.isfile(os.path.join(folder_path, file)):
        loaded_list_new.append(os.path.join(folder_path, file))

print("Len of the test set:", len(loaded_list_new))
print(loaded_list_new)

In [None]:
# Example list of .mp4 file paths
# Array to store 1s and 0s
import numpy as np
ground_truth_array = []

# Search for the string "No_Gun" in each element
for file_path in loaded_list_new:
    if "No_Gun" in file_path:
        ground_truth_array.append(1)
    else:
        ground_truth_array.append(0)

ground_truth_array=np.array(ground_truth_array)
print(type(ground_truth_array))
ground_truth_array

In [None]:
import time
import torch
import numpy as np

# Perform Single Prediction on the Test Video.
SEQUENCE_LENGTH = 30
IMAGE_HEIGHT = IMAGE_WIDTH = 224
class_categories_list = ["Gun", "NoGun"]  # Update to match the corrected mapping
GunCount = 0
NoGunCount = 0

# Lists to save paths for TP, FP, FN
true_positives = []  # Correctly predicted "Gun"
false_positives = []  # Predicted "Gun" but actually "NoGun"
false_negatives = []  # Predicted "NoGun" but actually "Gun"
true_negatives = []  # Correctly predicted "NoGun"

time_per_video=[]
for ii in range(len(loaded_list_new)):
    start_time = time.time()
    predicted_class_name, predicted_labels_probabilities = predict_video(loaded_list_new[ii], SEQUENCE_LENGTH)
    end_time = time.time()
    execution_time = end_time - start_time
    time_per_video.append(execution_time)
    # Get GPU memory usage in MB
    if torch.cuda.is_available():
        gpu_memory_usage = torch.cuda.memory_allocated() / (1024 * 1024)
    else:
        gpu_memory_usage = "N/A"

    print(f"Video {ii+1}: {loaded_list_new[ii]}")
    print("Prediction: ", predicted_class_name, "Confidence: ", predicted_labels_probabilities)
    print(f"Execution Time: {execution_time:.2f} seconds")
    print(f"GPU Memory Usage: {gpu_memory_usage:.2f} MB")
    print("-" * 50)

    # Get ground truth label for current video
    actual_class_name = class_categories_list[ground_truth_array[ii]]

    if predicted_class_name == "Gun":
        if actual_class_name == "Gun":  # True Positive
            true_positives.append(loaded_list_new[ii])
        else:  # False Positive
            false_positives.append(loaded_list_new[ii])
    else:
        if actual_class_name == "Gun":  # False Negative
            false_negatives.append(loaded_list_new[ii])
        else:  # True Negative
            true_negatives.append(loaded_list_new[ii])
            

# Print summary
print(f"Total Gun Videos: {GunCount}, Total NoGun Videos: {NoGunCount}")
print("True Positives (TP):", len(true_positives))
print("False Positives (FP):", len(false_positives))
print("False Negatives (FN):", len(false_negatives))
print("True Negatives (TN):", len(true_negatives))

In [None]:
sum(time_per_video)

In [None]:
145.46+38.5284

In [None]:
len(true_positives), len(false_negatives)

In [None]:
import json
import os
import time
import psutil
from ultralytics import YOLO
from sklearn.metrics import precision_score, recall_score, average_precision_score, f1_score
import numpy as np

# Function to calculate IoU
def calculate_iou(bbox1, bbox2):
    x_min1, y_min1, w1, h1 = bbox1
    x_min2, y_min2, w2, h2 = bbox2

    # Calculate the intersection
    x_intersect = max(x_min1, x_min2)
    y_intersect = max(y_min1, y_min2)
    x_intersect2 = min(x_min1 + w1, x_min2 + w2)
    y_intersect2 = min(y_min1 + h1, y_min2 + h2)

    if x_intersect >= x_intersect2 or y_intersect >= y_intersect2:
        return 0  # No intersection

    intersection_area = (x_intersect2 - x_intersect) * (y_intersect2 - y_intersect)
    area_bbox1 = w1 * h1
    area_bbox2 = w2 * h2
    union_area = area_bbox1 + area_bbox2 - intersection_area

    return intersection_area / union_area

# Function to match predictions and ground truths for evaluation
def match_predictions(predictions, annotations, iou_threshold=0.5):
    y_true = []
    y_pred = []
    scores = []
    IoU = []

    for frame_id, pred_bboxes in predictions.items():
        gt_bboxes = [anno['bbox'] for anno in annotations if anno['image_id'] == frame_id]
        gt_used = set()

        if gt_bboxes:
            # Ground truth bounding boxes are present
            for pred in pred_bboxes:
                matched = False
                for idx, gt_bbox in enumerate(gt_bboxes):
                    if idx in gt_used:
                        continue
                    iou = calculate_iou(pred[:4], gt_bbox)

                    if iou >= iou_threshold:
                        y_true.append(1)  # True positive
                        y_pred.append(1)
                        scores.append(pred[4])  # Confidence score
                        IoU.append(iou)
                        gt_used.add(idx)
                        matched = True
                        break

                if not matched:
                    y_true.append(0)  # False positive
                    y_pred.append(1)
                    scores.append(pred[4])  # Confidence score

            # Add unmatched ground truths as false negatives
            y_true.extend([1] * (len(gt_bboxes) - len(gt_used)))
            y_pred.extend([0] * (len(gt_bboxes) - len(gt_used)))
            scores.extend([0] * (len(gt_bboxes) - len(gt_used)))
        else:
            # No ground truth bounding boxes present
            if pred_bboxes:  # If predictions are made without ground truth
                for pred in pred_bboxes:
                    y_true.append(0)  # False positive
                    y_pred.append(1)
                    scores.append(pred[4])  # Confidence score

    return y_true, y_pred, scores, np.mean(IoU) if IoU else 0

# Path to the YOLO model
model_path = os.path.join('.', 'runs', 'detect', 'train3', 'weights', 'best.pt')
model = YOLO(model_path)

# Directory containing labels
label_dir = "TestSet/"

# List of video files
videos_list1 = true_positives

videos_list2 = false_negatives

# Combine video lists
videos = videos_list2 + videos_list1 

# Get the list of JSON files
labels = {os.path.splitext(f)[0].replace('_label', ''): f for f in os.listdir(label_dir) if f.endswith('_label.json')}

# Initialize global metrics
all_y_true = []
all_y_pred = []
all_scores = []
all_ious = []
averaged_metrics = {
    "Precision": [],
    "Recall": [],
    "F1 Score": [],
    "Average Precision (AP)": [],
    "IoU": []
}

time_and_storage_metrics = {}

for counter, video_file in enumerate(videos, start=1):
    video_path = video_file
    video_name = os.path.splitext(os.path.basename(video_file))[0]

    # Identify matching annotation file based on naming pattern
    if video_file in videos_list2:
        label_file = None  # No annotation file for videos without the desired object
    elif video_name.endswith('_video'):
        label_file = labels.get(video_name.replace('_video', ''))  # Match video with corresponding annotation file
    else:
        label_file = None  # No annotation file for videos without the desired object

    # Start memory tracking
    process = psutil.Process(os.getpid())
    initial_memory = process.memory_info().rss / (1024 ** 2)  # Memory in MB

    # Initialize frame-level time tracking
    total_time_taken = 0  # Initialize total inference time
    
    # Predict on the video
    predictions = {}
    for idx, result in enumerate(model.predict(source=video_path, show=False, save=False, conf=0.5, line_width=2, show_labels=True, show_conf=True, classes=[0], stream=True)):
        frame_time_ms = result.speed["inference"]  # Extract inference time specifically
        total_time_taken += frame_time_ms
    
        frame_id = idx
        frame_preds = []
    
        for pred in result.boxes.data:
            x_min, y_min, x_max, y_max, conf, class_id = pred
            width, height = x_max - x_min, y_max - y_min
            frame_preds.append([x_min.item(), y_min.item(), width.item(), height.item(), conf.item()])
    
        predictions[frame_id] = frame_preds


    # Calculate memory used
    final_memory = process.memory_info().rss / (1024 ** 2)  # Memory in MB
    memory_used = final_memory - initial_memory

    # Store time and storage metrics for the video
    time_and_storage_metrics[video_file] = {
        "Total Time Taken (ms)": total_time_taken
        #"Memory Used (MB)": memory_used
    }

    # Load ground truth annotations
    annotations = []
    if label_file:
        label_path = os.path.join(label_dir, label_file)
        with open(label_path) as f:
            gt_data = json.load(f)
            annotations = gt_data.get('annotations', [])  # Default to empty if no annotations exist

    # Match predictions with ground truths and calculate metrics
    y_true, y_pred, scores, iou = match_predictions(predictions, annotations)

    # Calculate individual video metrics
    precision = precision_score(y_true, y_pred, zero_division=0)
    recall = recall_score(y_true, y_pred, zero_division=0)
    f1 = f1_score(y_true, y_pred, zero_division=0)
    average_precision = average_precision_score(y_true, scores) if scores else 0.0

    video_metrics = {
        "Precision": precision,
        "Recall": recall,
        "F1 Score": f1,
        "Average Precision (AP)": average_precision,
        "IoU": iou
    }

    # Accumulate global metrics
    all_y_true.extend(y_true)
    all_y_pred.extend(y_pred)
    all_scores.extend(scores)
    all_ious.append(iou)

    # Collect metrics for averaging
    for key in averaged_metrics:
        averaged_metrics[key].append(video_metrics[key])

    print(f"Counter: {counter}")    
    print(f"Processing Video: {video_file}, Annotation File: {label_file if label_file else 'No Annotations'}")
    print(f"Metrics for Video {video_file}: {video_metrics}")
    print(f"Total Time Taken: {total_time_taken:.2f} milliseconds")
    print(f"Memory Used: {memory_used:.2f} MB")
    print(f"Ground_Truth length: {len(all_y_true)}, Prediction length: {len(all_y_pred)}")
    print(f"Ground_Truth: {all_y_true}")
    print(f"Predictions: {all_y_pred}")

# Calculate holistic metrics
holistic_precision = precision_score(all_y_true, all_y_pred, zero_division=0)
holistic_recall = recall_score(all_y_true, all_y_pred, zero_division=0)
holistic_f1 = f1_score(all_y_true, all_y_pred, zero_division=0)
holistic_average_precision = average_precision_score(all_y_true, all_scores) if all_scores else 0.0

holistic_metrics = {
    "Precision": holistic_precision,
    "Recall": holistic_recall,
    "F1 Score": holistic_f1,
    "Average Precision (AP)": holistic_average_precision,
    "Mean Average Precision (mAP)": holistic_average_precision,
    "IoU": np.mean(all_ious) if all_ious else 0,

}

# Calculate simple averaged metrics
averaged_metrics_result = {key: np.mean(values) for key, values in averaged_metrics.items()}


print("\nHolistic Metrics:")
print(holistic_metrics)

print("\nAveraged Metrics Across Videos:")
print(averaged_metrics_result)


print("\nTime and Storage Metrics for Each Video:")
for video, metrics in time_and_storage_metrics.items():
    print(f"{video}: {metrics}")

In [None]:
counter

In [None]:
print("\nHolistic Metrics:")
print(holistic_metrics)


print("\nTime and Storage Metrics for Each Video:")
for video, metrics in time_and_storage_metrics.items():
    print(f"{video}: {metrics}")

In [None]:
print(metrics['Total Time Taken (ms)'])

timesGun = []
timesNoGun = []
for video, metrics in time_and_storage_metrics.items():
    if "No_Gun_video" in video:
        timesNoGun.append(metrics['Total Time Taken (ms)'])
    else:
        timesGun.append(metrics['Total Time Taken (ms)'])

print(len(timesGun), len(timesNoGun))
print(sum(timesGun)/1000, sum(timesNoGun)/1000)