<a href="https://colab.research.google.com/github/AbhishekU05/SoC_Real-Time_Anomaly_Detection_in_Surveillance_Videos/blob/main/Model_testing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Tracking + Anomaly Detection in Avenue Dataset

## Dataset used: **Avenue Dataset for Abnormal Event Detection**



This dataset accompanies paper "Abnormal Event Detection at 150 FPS in Matlab, Cewu Lu, Jianping Shi, Jiaya Jia, International Conference on Computer Vision, (ICCV), 2013"









### Importing required libraries

In [1]:
import os
import cv2
import torch
import shutil
import random
import numpy as np
from glob import glob
from tqdm import tqdm
import matplotlib.pyplot as plt

np.random.seed(42)

### Mount google drive


In [None]:
from google.colab import drive
drive.mount('/content/drive')

```
Avenue Dataset
 └── testing_videos
 └── testing_vol
 └── training_videos
 └── training_vol
     
```

In [None]:
if not os.path.exists('yolov5'):
    !git clone https://github.com/ultralytics/yolov5.git

In [None]:
%cd yolov5/
!pwd

In [None]:
!pip install -r requirements.txt

In [None]:
%cd /content

#### Use the best YOLOv5 model u have trained

In [None]:
model_path = 'drive/MyDrive/SoC/best.pt'

In [None]:
# Load YOLOv5 model
model = torch.hub.load('yolov5', 'custom', path= model_path, source='local')
model.conf = 0.35
model.iou = 0.45

In [None]:
! pip install deep_sort_realtime opencv-python tqdm

In [None]:
from deep_sort_realtime.deepsort_tracker import DeepSort

In [None]:
output_video_folder = os.makedirs('output_video_folder', exist_ok = True)

In [None]:
input_video_path = '/content/drive/MyDrive/SoC/Avenue_Dataset/Avenue Dataset/testing_videos/01.avi'
output_video_path = './output_video_folder/lol.mp4'

### Tune the below parameters

In [None]:
N_history = 12            # how many frames to use for velocity calc
smoothing_frames = 5     # Smoothing: once RED, stay RED for N frames
velocity_threshold = 235   # Velocity threshold (pixels per second)

In [None]:
# Init DeepSORT tracker
tracker = DeepSort(max_age=30)

# Input video
cap = cv2.VideoCapture(input_video_path)

# Get video info
fps = cap.get(cv2.CAP_PROP_FPS)
width  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
print(f"Video: {fps} FPS, {width}x{height}, {frame_count} frames")

# Prepare output video writer
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))

import warnings
warnings.filterwarnings("ignore", category=FutureWarning)

read_count = 0
frame_idx = 0
pbar = tqdm(total=frame_count)

track_memory = {}     # Memory for tracking: store last N positions
anomaly_memory = {}   # store last anomaly state (for smoothing)

saved_anomaly_ids = set()   # to avoid saving duplicates
max_anomaly_frames = 500    # limit on the frames saved

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    # YOLO Inference
    results = model(frame)
    detections = results.xyxy[0].cpu().numpy()  # (xmin, ymin, xmax, ymax, conf, cls)

    # Format detections for DeepSORT
    formatted_detections = []
    for d in detections:
        xmin, ymin, xmax, ymax, conf, cls = d
        width_box = xmax - xmin
        height_box = ymax - ymin
        box = [xmin, ymin, width_box, height_box]
        formatted_detections.append([box, conf])

    # Update tracker
    tracks = tracker.update_tracks(formatted_detections, frame=frame)

    # Draw tracks
    for track in tracks:
        if not track.is_confirmed():
            continue

        track_id = track.track_id   # unique ID assigned to the pedestrian
        ltrb = track.to_ltrb()      # gives [left, top, right, bottom] of bounding box
        xmin, ymin, xmax, ymax = map(int, ltrb)

        # Center point
        center_x = (xmin + xmax) / 2
        center_y = (ymin + ymax) / 2

        # Update history
        track_memory.setdefault(track_id, []).append((frame_idx, center_x, center_y))
        track_memory[track_id] = track_memory[track_id][-N_history:]

        # Calculate velocity
        if len(track_memory[track_id]) >= 2:
            f1, x1, y1 = track_memory[track_id][0]
            f2, x2, y2 = track_memory[track_id][-1]

            dt = (f2 - f1) / fps  # time in seconds
            dx = x2 - x1
            dy = y2 - y1
            distance = np.sqrt(dx**2 + dy**2)

            velocity = distance / (dt + 1e-6)  # avoid zero div

            # Anomaly flag
            if velocity > velocity_threshold:
                anomaly = True
            else:
                anomaly = False
        else:
            anomaly = False


        if anomaly:
            anomaly_memory[track_id] = frame_idx  # mark when anomaly seen

        if track_id in anomaly_memory:
            if frame_idx - anomaly_memory[track_id] <= smoothing_frames:
                anomaly = True  # force RED

        # Drawing boxes only around the anomaly
        if anomaly:

          label = f"ID {track_id} {'ANOMALY'}"
          color = (0, 0, 255)

          cv2.rectangle(frame, (xmin, ymin), (xmax, ymax), color, 1)
          cv2.putText(frame, label, (xmin, ymin - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)


    # Write frame
    out.write(frame)

    frame_idx += 1
    read_count += 1
    pbar.update(1)

pbar.close()
cap.release()
out.release()

print(f"Frames read: {read_count} / {frame_count}")
print(f"\n DONE — output saved to: {output_video_path}")



####  Check for different test videos

In [None]:
import os
import cv2


# Check if file exists
if os.path.exists(input_video_path):
    print(f"File exists at: {input_video_path}")
else:
    print(f"File not found at: {input_video_path}")

# Try opening a dummy video file (if possible, or just check if VideoCapture works)
cap_test = cv2.VideoCapture()
print(f"OpenCV VideoCapture can be initialized: {cap_test.isOpened()}")
cap_test.release()

In [None]:
import os

output_folder_path = './output_video_folder/'

if os.path.exists(output_folder_path):
    print(f"Contents of {output_folder_path}:")
    for item in os.listdir(output_folder_path):
        print(item)
else:
    print(f"The directory {output_folder_path} does not exist.")