# Real-time Anomaly Detection using Optical Flow and Background Subtraction

This notebook implements an anomaly detection system based on motion analysis. It learns a "normal speed profile" from training data and identifies deviations (anomalies) in test videos, such as vehicles on pedestrian paths or high-speed movements.

In [26]:
import cv2
import numpy as np
import glob
import os

In [27]:
# Configuration Parameters
DATASETS = ["./data/UCSDped1"]

# Image dimensions for processing and display
RESIZE_W, RESIZE_H = 238, 158
DISPLAY_W, DISPLAY_H = 960, 540

# Algorithm Sensitivity Tweaks
CLAHE_CLIP = 1.5            # Contrast enhancement limit
SENSITIVITY = 1.2           # Multiplier for the max speed model (lower = more sensitive)
MIN_AREA = 200              # Minimum contour area to be flagged as an anomaly
MOTION_DIFF_THRESH = 0.1    # Threshold for speed deviation
SMOOTHING = 7               # Kernel size for Gaussian blur and morphology

In [28]:
train_dirs = []
test_dirs = []
for base in DATASETS:
    train_dirs.extend(sorted(glob.glob(os.path.join(base, "Train", "Train*"))))
    test_dirs.extend(sorted(glob.glob(os.path.join(base, "Test", "Test*"))))

## 1. Building the Normal Speed Model

In this phase, we process the **Train** folders to calculate the maximum normal speed for every pixel.
We use the **Farneback Optical Flow** algorithm to estimate motion between consecutive frames and maintain a `max_speed` map.

In [29]:
print("[*] Building model from training folders...")
max_speed = np.zeros((RESIZE_H, RESIZE_W), dtype=np.float32)
for folder in train_dirs:
    frames = sorted(glob.glob(os.path.join(folder, "*.tif*")))
    prev = None
    for i in range(0, len(frames), 2):
        img = cv2.imread(frames[i], cv2.IMREAD_GRAYSCALE)
        if img is None:
            continue
        g = cv2.resize(img, (RESIZE_W, RESIZE_H))
        if prev is not None:
            # Calculate Optical Flow between previous and current frame
            flow = cv2.calcOpticalFlowFarneback(prev, g,
                                                None,
                                                pyr_scale=0.5, levels=3,
                                                winsize=15, iterations=3,
                                                poly_n=5, poly_sigma=1.2, flags=0)
            mag, _ = cv2.cartToPolar(flow[..., 0], flow[..., 1])

            # Update the pixel-wise maximum speed model
            max_speed = np.maximum(max_speed, mag)
        prev = g
# smooth model to remove outlier spikes
max_speed = cv2.GaussianBlur(max_speed, (SMOOTHING, SMOOTHING), 0)
# normalize to [0,1] for stability
if max_speed.max() > 0:
    max_speed /= max_speed.max()
print("[*] Model built.")

[*] Building model from training folders...
[*] Model built.


In [30]:
def enhance_contrast(gray):
    """Applies CLAHE to improve visibility in low-contrast grayscale frames."""
    clahe = cv2.createCLAHE(clipLimit=CLAHE_CLIP, tileGridSize=(8, 8))
    return clahe.apply(gray)


In [31]:
def visualize_heatmap(mag_norm):
    """Converts motion magnitude into a JET color-map for visualization."""
    heat = np.uint8(np.clip(mag_norm * 255.0, 0, 255))
    heat = cv2.applyColorMap(heat, cv2.COLORMAP_JET)
    return heat

## 2. Anomaly Detection Logic

The system identifies anomalies by combining two masks:
1. **Background Subtraction (MOG2):** Detects any moving objects in the scene.
2. **Speed Thresholding:** Compares the current motion magnitude with the learned `max_speed` model.
If an object moves significantly faster than the learned threshold, it is enclosed in a **red bounding box**.

In [32]:
def process_test_folder(folder, max_speed_model):
    print(f"[*] Processing test folder: {folder}")
    frames = sorted(glob.glob(os.path.join(folder, "*.tif*")))
    if not frames:
        return

    backsub = cv2.createBackgroundSubtractorMOG2(history=500, varThreshold=16, detectShadows=True)
    prev_gray = None

    for idx, fpath in enumerate(frames):
        bgr = cv2.imread(fpath)
        if bgr is None:
            continue

        gray_full = cv2.cvtColor(bgr, cv2.COLOR_BGR2GRAY)
        gray = cv2.resize(gray_full, (RESIZE_W, RESIZE_H))
        gray = enhance_contrast(gray)

        # Apply MOG2 Background Subtraction
        fg_mask_full = backsub.apply(gray_full)
        motion_mask = np.zeros_like(gray, dtype=np.uint8)
        mag_norm = None

        if prev_gray is not None:
            # Calculate current motion magnitude
            flow = cv2.calcOpticalFlowFarneback(prev_gray, gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)
            mag, _ = cv2.cartToPolar(flow[...,0], flow[...,1])
            mag_norm = mag / (mag.max() + 1e-8) if mag.max() > 0 else mag

            # Logic: Highlight areas where current speed exceeds the learned model * sensitivity
            diff = mag_norm - (max_speed_model * SENSITIVITY)
            diff = np.clip(diff, 0, 1)
            motion_mask = (diff > MOTION_DIFF_THRESH).astype(np.uint8) * 255

        motion_full = cv2.resize(motion_mask, (gray_full.shape[1], gray_full.shape[0]), interpolation=cv2.INTER_NEAREST)

        # Combine Background mask with Speed mask (Logical AND)
        combined = cv2.bitwise_and(fg_mask_full, motion_full)
        contours, _ = cv2.findContours(combined, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

        display_right = cv2.resize(bgr.copy(), (DISPLAY_W, DISPLAY_H))
        w_ratio = DISPLAY_W / gray_full.shape[1]
        h_ratio = DISPLAY_H / gray_full.shape[0]

        for cnt in contours:
            if cv2.contourArea(cnt) > MIN_AREA:
                x, y, w, h = cv2.boundingRect(cnt)
                cv2.rectangle(display_right, (int(x*w_ratio), int(y*h_ratio)),
                              (int((x+w)*w_ratio), int((y+h)*h_ratio)), (0, 0, 255), 2)

        if mag_norm is not None:
            heat = visualize_heatmap(cv2.resize(mag_norm, (DISPLAY_W, DISPLAY_H)))
            display_right = cv2.addWeighted(display_right, 0.7, heat, 0.3, 0)

        display_left = cv2.resize(bgr, (DISPLAY_W, DISPLAY_H))
        side_by_side = np.hstack([display_left, display_right])

        cv2.imshow("Original vs Analysis", side_by_side)

        prev_gray = gray
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    cv2.destroyAllWindows()

## 3. Results Visualization

The output displays the **Original Frame** on the left and the **Analysis Overlay** on the right.
The heatmap shows the intensity of motion, while red rectangles highlight the specific anomalous regions.

In [33]:
process_folders = [f for f in test_dirs if not f.endswith('_gt')]
for f in process_folders:
    process_test_folder(f, max_speed)

[*] Processing test folder: ./data/UCSDped1/Test/Test001
[*] Processing test folder: ./data/UCSDped1/Test/Test002
[*] Processing test folder: ./data/UCSDped1/Test/Test003
[*] Processing test folder: ./data/UCSDped1/Test/Test004
[*] Processing test folder: ./data/UCSDped1/Test/Test005
[*] Processing test folder: ./data/UCSDped1/Test/Test006
[*] Processing test folder: ./data/UCSDped1/Test/Test007
[*] Processing test folder: ./data/UCSDped1/Test/Test008
[*] Processing test folder: ./data/UCSDped1/Test/Test009
[*] Processing test folder: ./data/UCSDped1/Test/Test010
[*] Processing test folder: ./data/UCSDped1/Test/Test011
[*] Processing test folder: ./data/UCSDped1/Test/Test012
[*] Processing test folder: ./data/UCSDped1/Test/Test013
[*] Processing test folder: ./data/UCSDped1/Test/Test014
[*] Processing test folder: ./data/UCSDped1/Test/Test015
[*] Processing test folder: ./data/UCSDped1/Test/Test016
[*] Processing test folder: ./data/UCSDped1/Test/Test017
[*] Processing test folder: ./d