In [160]:
import numpy as np
import torch
import cv2
import os

In [161]:
# Load test features
y_speed_test = torch.tensor(np.load("y_speed_feature_test.npy"), dtype=torch.float32)  # (N, D)
y_ang_test = torch.tensor(np.load("y_ang_features_test.npy"), dtype=torch.float32)      # (N, D)
y_bkg_test = torch.tensor(np.load("y_bkg_features_test.npy"), dtype=torch.float32)      # (N, D)

In [162]:
# Stack motion features: shape (N, 3D)
test_features = torch.cat([y_speed_test, y_ang_test, y_bkg_test], dim=1)

# Load exemplar features: shape (M, 3D)
exemplars = torch.tensor(np.load("exemplar_motion_features.npy"), dtype=torch.float32)

In [163]:
# Get dimensions
D = y_speed_test.shape[1]

In [164]:
# Normalization factors (from validation)
Z_speed = torch.max(torch.cdist(y_speed_test, y_speed_test)).item()
Z_ang = torch.max(torch.cdist(y_ang_test, y_ang_test)).item()
Z_bkg = torch.max(torch.cdist(y_bkg_test, y_bkg_test)).item()

In [165]:
Z_speed = Z_speed if Z_speed > 0 else 1
Z_ang = Z_ang if Z_ang > 0 else 1
Z_bkg = Z_bkg if Z_bkg > 0 else 1

In [166]:
# Split exemplar features
ex_speed = exemplars[:, :D]
ex_ang = exemplars[:, D:2*D]
ex_bkg = exemplars[:, 2*D:]

In [167]:
# Split test features
te_speed = test_features[:, :D]
te_ang = test_features[:, D:2*D]
te_bkg = test_features[:, 2*D:]

In [168]:
# Compute pairwise distances for each component: shape (N_test, N_exemplars)
d_speed = torch.cdist(te_speed, ex_speed) / Z_speed
d_ang   = torch.cdist(te_ang, ex_ang) / Z_ang
d_bkg   = torch.cdist(te_bkg, ex_bkg) / Z_bkg

In [169]:
# Total distance = sum of normalized distances
total_dist = d_speed + d_ang + d_bkg  # Shape: (N_test, N_exemplars)

In [170]:
# Get minimum distance to any exemplar = anomaly score
anomaly_scores = torch.min(total_dist, dim=1).values.cpu().numpy()

In [171]:
# Save anomaly scores
np.save("anomaly_scores.npy", anomaly_scores)
print(f"Saved anomaly scores for {len(anomaly_scores)} test volumes.")
print("Score stats: min =", np.min(anomaly_scores), "max =", np.max(anomaly_scores), "mean =", np.mean(anomaly_scores))

Saved anomaly scores for 13880 test volumes.
Score stats: min = 0.3804205 max = 1.5737062 mean = 0.7931127


In [172]:
# === STEP 2: Create heatmap === #

patch_size = 128
volume_depth = 10
stride_t = 10
stride_h = patch_size
stride_w = patch_size

def get_video_shape(video_path):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise ValueError("Could not open video file.")
    frame_count = 0
    frame_height = frame_width = None
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        if frame_height is None:
            frame_height, frame_width = frame.shape[:2]
        frame_count += 1
    cap.release()
    return (frame_count, frame_height, frame_width)

def create_anomaly_heatmap(video_path, anomaly_scores_path, save_path):
    T, H, W = get_video_shape(video_path)
    print(f"Detected video shape: {T} frames of size {H}x{W}")
    anomaly_scores = np.load(anomaly_scores_path)
    print(f"Loaded anomaly scores: {anomaly_scores.shape}")

    heatmap = np.zeros((T, H, W), dtype=np.float32)
    volume_index = 0
    for t in range(0, T - volume_depth + 1, stride_t):
        for y in range(0, H - patch_size + 1, stride_h):
            for x in range(0, W - patch_size + 1, stride_w):
                if volume_index >= len(anomaly_scores):
                    continue
                score = anomaly_scores[volume_index]
                volume_index += 1
                for dt in range(volume_depth):
                    frame_idx = t + dt
                    heatmap[frame_idx, y:y+patch_size, x:x+patch_size] = np.maximum(
                        heatmap[frame_idx, y:y+patch_size, x:x+patch_size],
                        score
                    )
    np.save(save_path, heatmap)
    print(f"Saved anomaly heatmap to {save_path} with shape {heatmap.shape}")

In [181]:
# === STEP 3: Overlay visualization === #

def overlay_binary_heatmap(video_path, heatmap_path, output_dir, threshold=0.9, alpha=0.5):
    heatmap = np.load(heatmap_path)
    T, H, W = heatmap.shape
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise ValueError("Could not open video file.")
    os.makedirs(output_dir, exist_ok=True)

    patch_size = 128
    frame_idx = 0
    BLUE = np.array([255, 0, 0], dtype=np.uint8)
    DARK_RED = np.array([0, 0, 139], dtype=np.uint8)

    while frame_idx < T:
        ret, frame = cap.read()
        if not ret:
            break
        frame = cv2.resize(frame, (W, H))
        raw_map = heatmap[frame_idx]
        color_map = np.ones((H, W, 3), dtype=np.uint8) * BLUE

        for y in range(0, H, patch_size):
            for x in range(0, W, patch_size):
                patch = raw_map[y:y+patch_size, x:x+patch_size]
                if patch.max() > threshold:
                    color_map[y:y+patch_size, x:x+patch_size] = DARK_RED

        blended = cv2.addWeighted(frame, 1 - alpha, color_map, alpha, 0)
        out_path = os.path.join(output_dir, f"frame_{frame_idx:04d}.jpg")
        cv2.imwrite(out_path, blended)
        frame_idx += 1

    cap.release()
    print(f"✅ Saved refined binary overlay frames to: {output_dir}")

In [182]:
# === STEP 4: Convert to video === #

def frames_to_video(frames_dir, output_video_path, fps=25):
    frame_files = sorted([f for f in os.listdir(frames_dir) if f.endswith(".jpg")])
    if not frame_files:
        raise ValueError("No .jpg frames found in the given directory.")
    first_frame = cv2.imread(os.path.join(frames_dir, frame_files[0]))
    height, width, _ = first_frame.shape
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    video_writer = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))
    for file in frame_files:
        frame = cv2.imread(os.path.join(frames_dir, file))
        video_writer.write(frame)
    video_writer.release()
    print(f"✅ Final video saved at: {output_video_path}")

In [184]:
# === RUN EVERYTHING === #

video_path = "C:\\Users\\hci\\Desktop\\data_test\\05.avi"
anomaly_scores_path = "anomaly_scores.npy"
heatmap_path = "anomaly_heatmap.npy"
overlay_dir = "overlay_frames"
output_video_path = "anomaly_overlay_output.mp4"

create_anomaly_heatmap(video_path, anomaly_scores_path, heatmap_path)
overlay_binary_heatmap(video_path, heatmap_path, overlay_dir, threshold=0.9)
frames_to_video(overlay_dir, output_video_path, fps=25)

Detected video shape: 1007 frames of size 360x640
Loaded anomaly scores: (13880,)
Saved anomaly heatmap to anomaly_heatmap.npy with shape (1007, 360, 640)
✅ Saved refined binary overlay frames to: overlay_frames
✅ Final video saved at: anomaly_overlay_output.mp4
