<a href="https://colab.research.google.com/github/MeghdeepMondal/web-site/blob/master/Unsupervised_Abnormal_Activity_Detection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# PHASE 1 --> DATA PROCESSING
# 1.1 IMPORTING FILES
import os

rar_path = "/content/drive/MyDrive/training.rar"  # Update with your actual path
extract_path = "/content/Training"  # Where you want to extract it

os.makedirs(extract_path, exist_ok=True)  # Create folder if it doesn't exist

!unrar x -o+ "{rar_path}" "{extract_path}"

print("Extraction complete!")



UNRAR 6.11 beta 1 freeware      Copyright (c) 1993-2022 Alexander Roshal


Extracting from /content/drive/MyDrive/training.rar

Creating    /content/Training/training                                OK
Creating    /content/Training/training/videos                         OK
Extracting  /content/Training/training/videos/01_001.avi                   0%  OK 
Extracting  /content/Training/training/videos/01_002.avi                   0%  OK 
Extracting  /content/Training/training/videos/01_003.avi                   0%  OK 
Extracting  /content/Training/training/videos/01_004.avi                   0%  OK 
Extracting  /content/Training/training/videos/01_005.avi                   0%  OK 
Extracting  /content/Training/training/videos/01_006.avi                   0%  OK 
Extracting  /content/Training/training/videos/01_007.avi                   0%  OK 
Extracting  /content/Training/training/videos/01_008.avi                   1%

In [None]:
import os
video_dir = "/content/Training/training/videos"
videos = os.listdir(video_dir)               # all videos (name) List
print(videos[:5])  # Display some video files


['02_005.avi', '13_007.avi', '08_036.avi', '10_010.avi', '07_004.avi']


In [None]:
# 1.2 Convert videos into frames

import cv2
import numpy as np

def extract_frames(video_path, frame_size=(64, 64), max_frames=50):
    cap = cv2.VideoCapture(video_path)
    frames = []
    count = 0
    while cap.isOpened() and count < max_frames:
        ret, frame = cap.read()
        if not ret:
            break
        frame = cv2.resize(frame, frame_size)  # Resize to 64x64
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)  # Convert to grayscale
        frame = frame / 255.0  # Normalize
        frames.append(frame)
        count += 1
    cap.release()
    return np.array(frames)

# Example usage
video_path = "/content/Training/training/videos/01_001.avi"
frames = extract_frames(video_path)
print(frames.shape)  # Should be (max_frames, 64, 64)


(50, 64, 64)


In [None]:
#1.3 CONVERTING fRAMES into Datasets
import os
import numpy as np

video_folder = "/content/Training/training/videos"
X_train = []

for video in os.listdir(video_folder):
    video_path = os.path.join(video_folder, video)
    frames = extract_frames(video_path)
    if frames.shape[0] == 50:  # Keep only full sequences
        X_train.append(frames)

X_train = np.array(X_train).reshape(-1, 50, 64, 64, 1)  # Shape (samples, timesteps, H, W, C)
print("Dataset shape:", X_train.shape)

Dataset shape: (330, 50, 64, 64, 1)


In [None]:
# PHASE 2 --> bUILD THE AUTOENCODER MODEL [CAE + LSTM]
#  deFINE MODEL
import tensorflow as tf
from tensorflow.keras.layers import Conv3D, Conv3DTranspose, LSTM, TimeDistributed, BatchNormalization, Flatten
from tensorflow.keras.models import Model

input_shape = (50, 64, 64, 1)                                # (frames, height, width, channels)
inputs = tf.keras.Input(shape=input_shape)

#  Encoder
x = Conv3D(32, (3, 3, 3), activation="relu", padding="same")(inputs)
x = BatchNormalization()(x)
x = Conv3D(64, (3, 3, 3), activation="relu", padding="same")(x)
x = BatchNormalization()(x)
x = Conv3D(128, (3, 3, 3), activation="relu", padding="same")(x)
x = BatchNormalization()(x)

#  Latent Representation (LSTM)
x = TimeDistributed(Flatten())(x)
x = LSTM(128, return_sequences=True)(x)

#  Decoder
x = Conv3DTranspose(128, (3, 3, 3), activation="relu", padding="same")(inputs)
x = BatchNormalization()(x)
x = Conv3DTranspose(64, (3, 3, 3), activation="relu", padding="same")(x)
x = BatchNormalization()(x)
outputs = Conv3DTranspose(1, (3, 3, 3), activation="sigmoid", padding="same")(x)

autoencoder = Model(inputs, outputs)
autoencoder.compile(optimizer="adam", loss="mse")
autoencoder.summary()


In [None]:
# pHASE 3 --> TRAIN THE AUTOENCODER
autoencoder.fit(X_train, X_train, epochs=30, batch_size=8, shuffle=True, validation_split=0.2)


Epoch 1/30
[1m33/33[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m63s[0m 1s/step - loss: 0.0635 - val_loss: 0.0291
Epoch 2/30
[1m33/33[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m30s[0m 923ms/step - loss: 0.0017 - val_loss: 0.0321
Epoch 3/30
[1m33/33[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m43s[0m 984ms/step - loss: 9.2855e-04 - val_loss: 0.0323
Epoch 4/30
[1m33/33[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m40s[0m 956ms/step - loss: 6.9445e-04 - val_loss: 0.0302
Epoch 5/30
[1m33/33[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m41s[0m 961ms/step - loss: 9.4620e-04 - val_loss: 0.0259
Epoch 6/30
[1m33/33[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m42s[0m 980ms/step - loss: 8.0982e-04 - val_loss: 0.0263
Epoch 7/30
[1m33/33[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m40s[0m 956ms/step - loss: 8.3112e-04 - val_loss: 0.0248
Epoch 8/30
[1m33/33[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m41s[0m 965ms/step - loss: 6.5656e-04 - val_loss: 0.0259
Epoch 9/30


<keras.src.callbacks.history.History at 0x7e9c84f3e510>

In [None]:
# import TEST data via kaggle

#UPLOAD KAGGLE JSON FILE FOR API ACCESS
from google.colab import files
files.upload()


Saving kaggle.json to kaggle.json


{'kaggle.json': b'{"username":"mohammedabyan","key":"a202e736de2a10d99c4ef79594c06ee7"}'}

In [None]:
# CONFIGURE KAGGLE
!mkdir ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json


In [None]:
# download the TEST data from kaggle
!kaggle datasets download -d mohammedabyan/shanghaitech-testing-dataset  # uploaded on my kaggle account

Dataset URL: https://www.kaggle.com/datasets/mohammedabyan/shanghaitech-testing-dataset
License(s): unknown
Downloading shanghaitech-testing-dataset.zip to /content
 99% 4.20G/4.22G [00:51<00:00, 81.2MB/s]
100% 4.22G/4.22G [00:51<00:00, 88.2MB/s]


In [None]:
# extract Test data
!unzip shanghaitech-testing-dataset.zip -d /content/TEST

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
  inflating: /content/TEST/testing/frames/10_0042/365.jpg  
  inflating: /content/TEST/testing/frames/10_0042/366.jpg  
  inflating: /content/TEST/testing/frames/10_0042/367.jpg  
  inflating: /content/TEST/testing/frames/10_0042/368.jpg  
  inflating: /content/TEST/testing/frames/10_0042/369.jpg  
  inflating: /content/TEST/testing/frames/10_0042/370.jpg  
  inflating: /content/TEST/testing/frames/10_0042/371.jpg  
  inflating: /content/TEST/testing/frames/10_0042/372.jpg  
  inflating: /content/TEST/testing/frames/10_0042/373.jpg  
  inflating: /content/TEST/testing/frames/10_0042/374.jpg  
  inflating: /content/TEST/testing/frames/10_0042/375.jpg  
  inflating: /content/TEST/testing/frames/10_0042/376.jpg  
  inflating: /content/TEST/testing/frames/10_0042/377.jpg  
  inflating: /content/TEST/testing/frames/10_0042/378.jpg  
  inflating: /content/TEST/testing/frames/10_0042/379.jpg  
  inflating: /content/TEST/testing/

In [None]:
# Phase 4 --> ANOMALY detection [testing] by using reconstruction error
"""test_video_path = ""
test_frames = extract_frames(test_video_path)
test_frames = test_frames.reshape(1, 50, 64, 64, 1)  # Reshape for model

reconstructed_frames = autoencoder.predict(test_frames)
error = np.mean(np.abs(test_frames - reconstructed_frames))

print("Reconstruction Error:", error)


threshold = 0.02  # DO Adjust based on model behaviour

if error > threshold:
    print(" Anomaly Detected!")
else:
    print(" Normal Activity")
"""

In [None]:
# Phase 5 --> Model Testing with the ground truth annotations from TEST data
"""
import os
import cv2
import numpy as np

# Paths to test dataset
test_frames_path = ""
test_frame_masks_path = ""
test_pixel_masks_path = ""

# Function to load test frames, frame masks, and pixel masks
def load_test_data(test_frames_path, test_frame_masks_path, test_pixel_masks_path, frame_size=(64, 64)):
    test_data, frame_masks, pixel_masks = [], [], []

    for file in sorted(os.listdir(test_frames_path)):                                                     # Sort to ensure order
        frame_path = os.path.join(test_frames_path, file)
        mask_path = os.path.join(test_frame_masks_path, file)                                             # Frame-level mask
        pixel_mask_path = os.path.join(test_pixel_masks_path, file)                                       # Pixel-level mask

        # Load frame
        frame = cv2.imread(frame_path, cv2.IMREAD_GRAYSCALE)
        frame = cv2.resize(frame, frame_size) / 255.0                                     # Normalize
        test_data.append(frame)

        # Load frame-level mask
        mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)
        mask = cv2.resize(mask, (1, 1)) / 255.0                                         # Reduce to a single value (0 or 1)
        frame_masks.append(mask)

        # Load pixel-level mask
        pixel_mask = cv2.imread(pixel_mask_path, cv2.IMREAD_GRAYSCALE)
        pixel_mask = cv2.resize(pixel_mask, frame_size) / 255.0                          # Normalize
        pixel_masks.append(pixel_mask)

    return np.array(test_data).reshape(-1, 64, 64, 1), np.array(frame_masks).reshape(-1, 1), np.array(pixel_masks)

# Load test data
X_test, frame_level_ground_truth, pixel_level_ground_truth = load_test_data(
    test_frames_path, test_frame_masks_path, test_pixel_masks_path
)

print("Test Data Shape:", X_test.shape)  # (num_frames, 64, 64, 1)
print("Frame-Level Mask Shape:", frame_level_ground_truth.shape)  # (num_frames, 1)
print("Pixel-Level Mask Shape:", pixel_level_ground_truth.shape)  # (num_frames, 64, 64)
"""

In [None]:
import os
import cv2
import numpy as np

# Paths to test dataset
test_frames_path = "/content/TEST/testing/frames"
test_frame_masks_path = "/content/TEST/testing/test_frame_mask"
test_pixel_masks_path = "/content/TEST/testing/test_pixel_mask"

# Function to load test frames, frame masks, and pixel masks
def load_test_data(test_frames_path, test_frame_masks_path, test_pixel_masks_path, frame_size=(64, 64)):
    test_data, frame_masks, pixel_masks = [], [], []

    # Iterate over all subfolders in the test_frames_path
    for video_folder in sorted(os.listdir(test_frames_path)):
        video_frame_path = os.path.join(test_frames_path, video_folder)
        mask_frame_path = os.path.join(test_frame_masks_path, video_folder)
        pixel_mask_path = os.path.join(test_pixel_masks_path, video_folder)

        if not (os.path.isdir(video_frame_path) and os.path.isdir(mask_frame_path) and os.path.isdir(pixel_mask_path)):
            continue  # Skip if any folder does not exist

        # Process each frame within the current video subfolder
        for file in sorted(os.listdir(video_frame_path)):
            frame_path = os.path.join(video_frame_path, file)
            mask_path = os.path.join(mask_frame_path, file)
            pixel_mask_path = os.path.join(pixel_mask_path, file)

            if not (os.path.exists(frame_path) and os.path.exists(mask_path) and os.path.exists(pixel_mask_path)):
                continue  # Skip if any file is missing

            # Load and preprocess frame
            frame = cv2.imread(frame_path, cv2.IMREAD_GRAYSCALE)
            frame = cv2.resize(frame, frame_size) / 255.0  # Normalize
            test_data.append(frame)

            # Load and preprocess frame-level mask
            mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)
            mask = cv2.resize(mask, (1, 1)) / 255.0  # Reduce to a single value (0 or 1)
            frame_masks.append(mask)

            # Load and preprocess pixel-level mask
            pixel_mask = cv2.imread(pixel_mask_path, cv2.IMREAD_GRAYSCALE)
            pixel_mask = cv2.resize(pixel_mask, frame_size) / 255.0  # Normalize
            pixel_masks.append(pixel_mask)

    return (
        np.array(test_data).reshape(-1, 64, 64, 1),
        np.array(frame_masks).reshape(-1, 1),
        np.array(pixel_masks)
    )

# Load test data
X_test, frame_level_ground_truth, pixel_level_ground_truth = load_test_data(
    test_frames_path, test_frame_masks_path, test_pixel_masks_path
)

print("Test Data Shape:", X_test.shape)  # (num_frames, 64, 64, 1)
print("Frame-Level Mask Shape:", frame_level_ground_truth.shape)  # (num_frames, 1)
print("Pixel-Level Mask Shape:", pixel_level_ground_truth.shape)  # (num_frames, 64, 64)


Test Data Shape: (0, 64, 64, 1)
Frame-Level Mask Shape: (0, 1)
Pixel-Level Mask Shape: (0,)


In [None]:
# NEW DATA LOADER FUNCTION TRIAL-01
import os
import cv2
import numpy as np
# Paths to test dataset
test_frames_path = "/content/TEST/testing/frames"
test_frame_masks_path = "/content/TEST/testing/test_frame_mask"
test_pixel_masks_path = "/content/TEST/testing/test_pixel_mask"

def load_test_data_debug(test_frames_path, test_frame_masks_path, test_pixel_masks_path, frame_size=(64, 64)):
    test_data, frame_masks, pixel_masks = [], [], []

    for video_folder in sorted(os.listdir(test_frames_path)):
        video_frame_path = os.path.join(test_frames_path, video_folder)
        mask_frame_path = os.path.join(test_frame_masks_path, video_folder)
        pixel_mask_path = os.path.join(test_pixel_masks_path, video_folder)

        if not (os.path.isdir(video_frame_path) and os.path.isdir(mask_frame_path) and os.path.isdir(pixel_mask_path)):
            print(f"Skipping missing directory: {video_folder}")
            continue

        for file_name in sorted(os.listdir(video_frame_path)):
            frame_path = os.path.join(video_frame_path, file_name)
            frame_mask_path = os.path.join(mask_frame_path, file_name)
            pixel_mask_path = os.path.join(pixel_mask_path, file_name)

            if not (os.path.exists(frame_path) and os.path.exists(frame_mask_path) and os.path.exists(pixel_mask_path)):
                print(f"Missing file: {frame_path} or corresponding masks")
                continue

            # Load frame
            frame = cv2.imread(frame_path, cv2.IMREAD_GRAYSCALE)
            if frame is None:
                print(f"Error reading frame: {frame_path}")
                continue

            frame = cv2.resize(frame, frame_size) / 255.0  # Normalize
            test_data.append(frame)

            # Load frame-level mask
            mask = cv2.imread(frame_mask_path, cv2.IMREAD_GRAYSCALE)
            mask = cv2.resize(mask, (1, 1)) / 255.0
            frame_masks.append(mask)

            # Load pixel-level mask
            pixel_mask = cv2.imread(pixel_mask_path, cv2.IMREAD_GRAYSCALE)
            pixel_mask = cv2.resize(pixel_mask, frame_size) / 255.0
            pixel_masks.append(pixel_mask)

    return np.array(test_data).reshape(-1, 64, 64, 1), np.array(frame_masks).reshape(-1, 1), np.array(pixel_masks)

    # Load test data
X_test, frame_level_ground_truth, pixel_level_ground_truth = load_test_data(
    test_frames_path, test_frame_masks_path, test_pixel_masks_path
)

print("Test Data Shape:", X_test.shape)  # (num_frames, 64, 64, 1)
print("Frame-Level Mask Shape:", frame_level_ground_truth.shape)  # (num_frames, 1)
print("Pixel-Level Mask Shape:", pixel_level_ground_truth.shape)  # (num_frames, 64, 64)


Test Data Shape: (0, 64, 64, 1)
Frame-Level Mask Shape: (0, 1)
Pixel-Level Mask Shape: (0,)


In [None]:
# NEW DATA LOADER FUNCTION TRIAL-02

import os
import cv2
import numpy as np

# Paths to test dataset
test_frames_path = "/content/TEST/testing/frames"
test_frame_masks_path = "/content/TEST/testing/test_frame_mask"
test_pixel_masks_path = "/content/TEST/testing/test_pixel_mask"

# Function to load test frames, frame masks, and pixel masks
def load_test_data(test_frames_path, test_frame_masks_path, test_pixel_masks_path, frame_size=(64, 64)):
    test_data, frame_masks, pixel_masks = [], [], []

    # Iterate over subfolders in test_frames_path
    for video_folder in sorted(os.listdir(test_frames_path)):
        video_frame_path = os.path.join(test_frames_path, video_folder)
        mask_frame_path = os.path.join(test_frame_masks_path, f"{video_folder}.npy")
        pixel_mask_path = os.path.join(test_pixel_masks_path, f"{video_folder}.npy")

        if not (os.path.isdir(video_frame_path) and os.path.exists(mask_frame_path) and os.path.exists(pixel_mask_path)):
            print(f"Skipping missing directory or mask files for: {video_folder}")
            continue

        # Load and preprocess frame-level mask (single file for the entire folder)
        frame_mask = np.load(mask_frame_path)
        frame_mask = np.resize(frame_mask, (1,))  # Ensure single value for frame-level mask

        # Load and preprocess pixel-level mask (single file for the entire folder)
        pixel_mask = np.load(pixel_mask_path)
        pixel_mask = cv2.resize(pixel_mask, frame_size) / 255.0

        # Process each frame within the current video subfolder
        for file_name in sorted(os.listdir(video_frame_path)):
            frame_path = os.path.join(video_frame_path, file_name)

            if not os.path.exists(frame_path):
                print(f"Missing frame file: {frame_path}")
                continue

            # Load and preprocess frame
            frame = cv2.imread(frame_path, cv2.IMREAD_GRAYSCALE)
            if frame is None:
                print(f"Error reading frame: {frame_path}")
                continue

            frame = cv2.resize(frame, frame_size) / 255.0  # Normalize
            test_data.append(frame)
            frame_masks.append(frame_mask)  # Apply the same mask for all frames
            pixel_masks.append(pixel_mask)  # Apply the same pixel mask for all frames

    return (
        np.array(test_data).reshape(-1, 64, 64, 1),
        np.array(frame_masks).reshape(-1, 1),
        np.array(pixel_masks)
    )

# Load test data
X_test, frame_level_ground_truth, pixel_level_ground_truth = load_test_data(
    test_frames_path, test_frame_masks_path, test_pixel_masks_path
)

print("Test Data Shape:", X_test.shape)  # (num_frames, 64, 64, 1)
print("Frame-Level Mask Shape:", frame_level_ground_truth.shape)  # (num_frames, 1)
print("Pixel-Level Mask Shape:", pixel_level_ground_truth.shape)  # (num_frames, 64, 64)


error: OpenCV(4.11.0) /io/opencv/modules/imgproc/src/resize.cpp:3845: error: (-215:Assertion failed) !dsize.empty() in function 'resize'


In [None]:
# TEST data loader -03.......................IT WORKS !!!!!!!!!

import os
import cv2
import numpy as np

# Paths to test dataset
test_frames_path = "/content/TEST/testing/frames"
test_frame_masks_path = "/content/TEST/testing/test_frame_mask"
test_pixel_masks_path = "/content/TEST/testing/test_pixel_mask"

# Function to load test frames, frame masks, and pixel masks
def load_test_data(test_frames_path, test_frame_masks_path, test_pixel_masks_path, frame_size=(64, 64)):
    test_data, frame_masks, pixel_masks = [], [], []

    # Iterate over subfolders in test_frames_path
    for video_folder in sorted(os.listdir(test_frames_path)):
        video_frame_path = os.path.join(test_frames_path, video_folder)
        mask_frame_path = os.path.join(test_frame_masks_path, f"{video_folder}.npy")  # Corrected path
        pixel_mask_path = os.path.join(test_pixel_masks_path, f"{video_folder}.npy")  # Corrected path

        # Check if directory and mask files exist
        if not (os.path.isdir(video_frame_path) and os.path.exists(mask_frame_path) and os.path.exists(pixel_mask_path)):
            print(f"Skipping missing directory or mask files for: {video_folder}")
            continue

        # Load frame-level mask (single file for the entire folder)
        frame_mask = np.load(mask_frame_path)
        # Reshape frame_mask to (number_of_frames, 1) if necessary
        frame_mask = frame_mask.reshape(-1, 1)

        # Load pixel-level mask (single file for the entire folder)
        pixel_mask = np.load(pixel_mask_path)

        # Process each frame within the current video subfolder
        for i, file_name in enumerate(sorted(os.listdir(video_frame_path))):
            frame_path = os.path.join(video_frame_path, file_name)

            if not os.path.exists(frame_path):
                print(f"Missing frame file: {frame_path}")
                continue

            # Load and preprocess frame
            frame = cv2.imread(frame_path, cv2.IMREAD_GRAYSCALE)
            if frame is None:
                print(f"Error reading frame: {frame_path}")
                continue

            frame = cv2.resize(frame, frame_size) / 255.0  # Normalize
            test_data.append(frame)

            # Append the corresponding frame-level mask
            frame_masks.append(frame_mask[i])

            # Resize and append the pixel-level mask for the current frame
            resized_pixel_mask = cv2.resize(pixel_mask[i], frame_size) / 255.0  # Resize for each frame
            pixel_masks.append(resized_pixel_mask)

    return (
        np.array(test_data).reshape(-1, 64, 64, 1),
        np.array(frame_masks).reshape(-1, 1),
        np.array(pixel_masks)
    )

# Load test data
X_test, frame_level_ground_truth, pixel_level_ground_truth = load_test_data(
    test_frames_path, test_frame_masks_path, test_pixel_masks_path
)

print("Test Data Shape:", X_test.shape)  # (num_frames, 64, 64, 1)
print("Frame-Level Mask Shape:", frame_level_ground_truth.shape)  # (num_frames, 1)
print("Pixel-Level Mask Shape:", pixel_level_ground_truth.shape)  # (num_frames, 64, 64)

Test Data Shape: (40791, 64, 64, 1)
Frame-Level Mask Shape: (40791, 1)
Pixel-Level Mask Shape: (40791, 64, 64)


# **ITS WORKINNNGNGGGGGGGGG.................LETS GOOOOOOOOOOOOOOOO!!!!!!!!!!!!!!!!**

In [None]:
#DEBUGGING........!!!!!!!!
import os

print("Test frames path:", test_frames_path)
print("Subfolders in test_frames_path:", os.listdir(test_frames_path))

print("Test frame masks path:", test_frame_masks_path)
print("Subfolders in test_frame_masks_path:", os.listdir(test_frame_masks_path))

print("Test pixel masks path:", test_pixel_masks_path)
print("Subfolders in test_pixel_masks_path:", os.listdir(test_pixel_masks_path))



VALID_EXTENSIONS = (".png", ".jpg", ".jpeg")

for file in sorted(os.listdir(test_frames_path)):
    if not file.lower().endswith(VALID_EXTENSIONS):
        continue


Test frames path: /content/TEST/testing/frames
Subfolders in test_frames_path: ['03_0041', '08_0158', '01_0054', '07_0007', '03_0035', '12_0174', '03_0033', '01_0130', '12_0143', '06_0153', '08_0079', '04_0004', '04_0050', '05_0022', '07_0005', '10_0075', '01_0014', '05_0021', '08_0157', '01_0135', '02_0128', '01_0129', '08_0078', '05_0023', '08_0156', '03_0060', '12_0148', '01_0134', '07_0048', '04_0010', '07_0049', '02_0161', '07_0006', '05_0020', '01_0030', '01_0056', '12_0152', '01_0139', '06_0145', '04_0003', '08_0159', '01_0055', '01_0076', '03_0061', '08_0077', '01_0136', '02_0164', '10_0038', '10_0042', '01_0052', '01_0029', '05_0018', '08_0080', '01_0051', '07_0047', '08_0058', '12_0175', '01_0177', '04_0046', '03_0039', '11_0176', '01_0027', '04_0011', '10_0037', '03_0059', '07_0009', '08_0179', '10_0074', '03_0032', '01_0131', '04_0012', '03_0031', '01_0132', '01_0138', '01_0133', '01_0163', '01_0026', '06_0147', '12_0173', '05_0017', '09_0057', '04_0001', '01_0016', '06_015

In [None]:
!ls /content/TEST/testing/frames
!ls /content/TEST/testing/test_frame_mask
!ls /content/TEST/testing/test_pixel_mask


01_0014  01_0052  01_0130  01_0141  03_0035  04_0010  05_0021  07_0005	08_0077  09_0057  12_0149
01_0015  01_0053  01_0131  01_0162  03_0036  04_0011  05_0022  07_0006	08_0078  10_0037  12_0151
01_0016  01_0054  01_0132  01_0163  03_0039  04_0012  05_0023  07_0007	08_0079  10_0038  12_0152
01_0025  01_0055  01_0133  01_0177  03_0041  04_0013  05_0024  07_0008	08_0080  10_0042  12_0154
01_0026  01_0056  01_0134  02_0128  03_0059  04_0046  06_0144  07_0009	08_0156  10_0074  12_0173
01_0027  01_0063  01_0135  02_0161  03_0060  04_0050  06_0145  07_0047	08_0157  10_0075  12_0174
01_0028  01_0064  01_0136  02_0164  03_0061  05_0017  06_0147  07_0048	08_0158  11_0176  12_0175
01_0029  01_0073  01_0138  03_0031  04_0001  05_0018  06_0150  07_0049	08_0159  12_0142
01_0030  01_0076  01_0139  03_0032  04_0003  05_0019  06_0153  08_0044	08_0178  12_0143
01_0051  01_0129  01_0140  03_0033  04_0004  05_0020  06_0155  08_0058	08_0179  12_0148
01_0014.npy  01_0064.npy  01_0163.npy  04_0003.npy  06_01

In [None]:
X_test.size
print("Test Data Shape:", X_test.shape)  # ISSUE!!!!!! ................ DATA NOT LOADED
print("Frame-Level Mask Shape:", frame_level_ground_truth.shape)  # ISSUE!!!!!! ................ DATA NOT LOADED
print("Pixel-Level Mask Shape:", pixel_level_ground_truth.shape)# ISSUE!!!!!! ................ DATA NOT LOADED

Test Data Shape: (40791, 64, 64, 1)
Frame-Level Mask Shape: (40791, 1)
Pixel-Level Mask Shape: (40791, 64, 64)


In [None]:
# SIZE INCOMPATIBILITY ERROR WHILE INFERENCING..... DEBUGGING

import os
import cv2
import numpy as np

# Paths to test dataset
test_frames_path = "/content/TEST/testing/frames"

def load_test_frames(video_folder, frame_size=(64, 64), sequence_length=50):
    frames = []
    filenames = sorted(os.listdir(video_folder))  # Ensure correct order

    for filename in filenames:
        frame_path = os.path.join(video_folder, filename)
        frame = cv2.imread(frame_path, cv2.IMREAD_GRAYSCALE)  # Read in grayscale
        frame = cv2.resize(frame, frame_size)  # Resize
        frame = frame / 255.0  # Normalize
        frames.append(frame)

    frames = np.array(frames)

    # Ensure we only take full sequences of 50 frames
    num_sequences = len(frames) // sequence_length
    X_test = frames[: num_sequences * sequence_length].reshape(num_sequences, sequence_length, frame_size[0], frame_size[1], 1)

    return X_test

# Example: Load test frames for a sample video
video_name = "sample_video"
video_folder = os.path.join(test_frames_path, video_name)

X_test = load_test_frames(video_folder)
print("Fixed Test Data Shape:", X_test.shape)  # Should be (batch, 50, 64, 64, 1)


FileNotFoundError: [Errno 2] No such file or directory: '/content/TEST/testing/frames/sample_video'

In [None]:
# Phase 6 --> INFERENCE  AND COMPUTE RECONSTRuction error [ANOMALY DETECTION by using TEST data]

# Pass test frames through the trained autoencoder
reconstructed_frames = autoencoder.predict(X_test)

# Compute frame-wise reconstruction error
frame_reconstruction_error = np.mean(np.abs(X_test - reconstructed_frames), axis=(1,2,3))

# Compute pixel-wise reconstruction error
pixel_reconstruction_error = np.abs(X_test - reconstructed_frames)


ValueError: Input 0 of layer "functional" is incompatible with the layer: expected shape=(None, 50, 64, 64, 1), found shape=(32, 64, 64)

In [None]:
#PHASE 7 --> VISUALISING   ANOMALIES

import matplotlib.pyplot as plt
import seaborn as sns

def show_anomaly_visualization(original, reconstructed, pixel_error, pixel_mask, frame_index=10):
    plt.figure(figsize=(12, 6))

    # Original Frame
    plt.subplot(1, 4, 1)
    plt.imshow(original[frame_index].squeeze(), cmap="gray")
    plt.title("Original Frame")
    plt.axis("off")

    # Reconstructed Frame
    plt.subplot(1, 4, 2)
    plt.imshow(reconstructed[frame_index].squeeze(), cmap="gray")
    plt.title("Reconstructed Frame")
    plt.axis("off")

    # Error Heatmap
    plt.subplot(1, 4, 3)
    sns.heatmap(pixel_error[frame_index].squeeze(), cmap="jet", cbar=True)
    plt.title("Error Heatmap")
    plt.axis("off")

    # Ground Truth Pixel Mask
    plt.subplot(1, 4, 4)
    plt.imshow(pixel_mask[frame_index].squeeze(), cmap="gray")
    plt.title("Ground Truth Anomaly")
    plt.axis("off")

    plt.show()

# Show anomaly visualization for a sample frame
show_anomaly_visualization(X_test, reconstructed_frames, pixel_reconstruction_error, pixel_level_ground_truth, frame_index=10)


In [None]:
# pHASE 8 --> MODEL PERFORMANCE EVALUATION

from sklearn.metrics import precision_recall_fscore_support, roc_auc_score

# Set anomaly threshold (adjust based on training error distribution)
threshold = np.percentile(frame_reconstruction_error, 95)  # Top 5% of errors = anomalies

# Convert errors to binary anomaly predictions
predicted_anomalies = (frame_reconstruction_error > threshold).astype(int)

# Compute precision, recall, F1-score
precision, recall, f1, _ = precision_recall_fscore_support(frame_level_ground_truth, predicted_anomalies, average="binary")

# Compute AUC score
auc_score = roc_auc_score(frame_level_ground_truth, frame_reconstruction_error)

print(f"Precision: {precision:.3f}")
print(f"Recall: {recall:.3f}")
print(f"F1 Score: {f1:.3f}")
print(f"AUC Score: {auc_score:.3f}")
