In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import os
import cv2
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, TimeDistributed, Conv2D, MaxPooling2D, UpSampling2D, Flatten, LSTM, Dense, RepeatVector, Reshape

# **Data Preprocessing**

In [None]:
n_frames = 10               # Number of frames per video sequence
height, width = 64, 64      # Frame dimensions
channels = 3                # RGB channels

In [None]:
def load_video_frames(video_path, n_frames=10, frame_size=(64, 64)):

    cap = cv2.VideoCapture(video_path)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    # Select n evenly spaced frame indices
    frame_indices = np.linspace(0, total_frames - 1, n_frames).astype(int)
    frames = []
    for idx in frame_indices:
        cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
        ret, frame = cap.read()
        if ret:
            frame = cv2.resize(frame, frame_size)
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frames.append(frame)
    cap.release()
    # If not enough frames are read, repeat the last frame
    while len(frames) < n_frames:
        frames.append(frames[-1])
    frames = np.array(frames, dtype=np.float32) / 255.0  # Normalize to [0, 1]
    return frames


In [None]:
def load_dataset(dataset_dir, n_frames=10, frame_size=(64, 64)):

    video_sequences = []
    for filename in os.listdir(dataset_dir):
        if filename.endswith('.mp4') or filename.endswith('.avi'):
            video_path = os.path.join(dataset_dir, filename)
            frames = load_video_frames(video_path, n_frames, frame_size)
            video_sequences.append(frames)
    video_sequences = np.array(video_sequences)
    # Final shape: (num_videos, n_frames, height, width, channels)
    return video_sequences


In [None]:
dataset_dir = '/content/drive/My Drive/dataset/Avenue_Dataset/Avenue_Dataset/training_videos'
video_sequences = load_dataset(dataset_dir, n_frames, (height, width))
print("Dataset shape:", video_sequences.shape)


Dataset shape: (16, 10, 64, 64, 3)


# **CNN-LSTM Autoencoder Model**

In [None]:
def build_autoencoder(n_frames, height, width, channels):
    input_frames = Input(shape=(n_frames, height, width, channels))

    # CNN Encoder: Process each frame individually
    x = TimeDistributed(Conv2D(32, (3, 3), activation='relu', padding='same'))(input_frames)
    x = TimeDistributed(MaxPooling2D((2, 2), padding='same'))(x)
    x = TimeDistributed(Conv2D(64, (3, 3), activation='relu', padding='same'))(x)
    x = TimeDistributed(MaxPooling2D((2, 2), padding='same'))(x)

    # Save shape for later use in the decoder
    shape_before_flatten = tf.keras.backend.int_shape(x)

    # Flatten the CNN output for each frame
    x = TimeDistributed(Flatten())(x)

    # LSTM Encoder: Capture temporal dependencies
    encoded = LSTM(128, activation='relu', return_sequences=False)(x)

    # Repeat the encoded vector to match the number of frames
    repeated = RepeatVector(n_frames)(encoded)

    # LSTM Decoder: Reconstruct the sequence of features
    x = LSTM(128, activation='relu', return_sequences=True)(repeated)

    # Recover the flattened CNN feature dimensions
    flattened_dim = shape_before_flatten[2] * shape_before_flatten[3] * shape_before_flatten[4]
    x = TimeDistributed(Dense(flattened_dim, activation='relu'))(x)

    # Reshape back to the CNN feature map dimensions
    x = TimeDistributed(Reshape((shape_before_flatten[2], shape_before_flatten[3], shape_before_flatten[4])))(x)

    # CNN Decoder: Reconstruct the original frames
    x = TimeDistributed(UpSampling2D((2, 2)))(x)
    x = TimeDistributed(Conv2D(64, (3, 3), activation='relu', padding='same'))(x)
    x = TimeDistributed(UpSampling2D((2, 2)))(x)
    decoded_frames = TimeDistributed(Conv2D(channels, (3, 3), activation='sigmoid', padding='same'))(x)

    autoencoder = Model(inputs=input_frames, outputs=decoded_frames)
    autoencoder.compile(optimizer='adam', loss='mse')
    autoencoder.summary()
    return autoencoder

autoencoder = build_autoencoder(n_frames, height, width, channels)


# **Model Training**

In [None]:
history = autoencoder.fit(video_sequences, video_sequences,
                          epochs=10,
                          batch_size=4,
                          validation_split=0.1)

Epoch 1/10
[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 76ms/step - loss: 0.0090 - val_loss: 0.0127
Epoch 2/10
[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 55ms/step - loss: 0.0082 - val_loss: 0.0114
Epoch 3/10
[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 51ms/step - loss: 0.0076 - val_loss: 0.0112
Epoch 4/10
[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 50ms/step - loss: 0.0071 - val_loss: 0.0112
Epoch 5/10
[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 50ms/step - loss: 0.0076 - val_loss: 0.0109
Epoch 6/10
[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 50ms/step - loss: 0.0072 - val_loss: 0.0110
Epoch 7/10
[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 50ms/step - loss: 0.0069 - val_loss: 0.0103
Epoch 8/10
[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 49ms/step - loss: 0.0067 - val_loss: 0.0103
Epoch 9/10
[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [

# **Anomaly Detection**

In [None]:
test_video = load_video_frames('/content/drive/My Drive/dataset/Avenue_Dataset/Avenue_Dataset/testing_videos/01.avi', n_frames, (height, width))
test_video = np.expand_dims(test_video, axis=0)  # add batch dimension

reconstructed_video = autoencoder.predict(test_video)
reconstruction_error = np.mean((test_video - reconstructed_video) ** 2)
print("Reconstruction error:", reconstruction_error)



[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 35ms/step
Reconstruction error: 0.01403531
