<a href="https://colab.research.google.com/github/DotunOluyade/ShootingVideoClassifier/blob/main/VideoAnomalyDetection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


# SwinAnomaly Transformer-Model for Shooting Anomaly Detection




### Run this Cell First for Custome File Import

In [55]:
!cp /content/drive/MyDrive/VideoAnomalyDetection/*.py /content/

"""
TensorFlow version: 2.15.0
Keras version: 2.15.0
"""

'\nTensorFlow version: 2.15.0\nKeras version: 2.15.0\n'

## Fine-Tuned Pretrained VideoSwin Model

In [56]:
import tensorflow as tf
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import TimeDistributed, Dense, Flatten, GlobalMaxPooling1D

def build_finetuned_model(pretrained_model_path, segment_count=32):
    """
    Fine-tune a pretrained model for shooting classification.

    Args:
        saved_model_path (str): Path to the saved Video Swin Transformer model.

    Returns:
        model (keras.Model): Fine-tuned model.
    """

    print("Loading the pretrained model from:", pretrained_model_path)
    pretrained_model = load_model(pretrained_model_path, compile=False)

    print("Freeze all layers to prevent updates to weights")
    # Prevent the pre-trained weights from being updated during training
    pretrained_model.trainable = False

    print("Display pretrained base model architecture:")
    pretrained_model.summary()

    # Fine tune the architecture by connecting the pretrainined model to a downstream architecture for
    # shooting anomaly detection
    print("Build Fine-tuned model")
    model = Sequential([
        TimeDistributed(pretrained_model, input_shape=(segment_count,) + pretrained_model.input_shape[1:]),
        TimeDistributed(Dense(1, activation='sigmoid')),  # Use sigmoid for binary output
        GlobalMaxPooling1D()  # Use max pooling to select the highest score as the output
    ], name='anomaly_detection_model')

    print("Display fine-tuned model architecture:")
    model.summary()
    return model


In [58]:
from tensorflow.keras import backend as K
from tensorflow.keras.optimizers import Adam
import tensorflow as tf
from tensorflow.keras.metrics import Precision, Recall, AUC, Accuracy, F1Score
import pickle
import os
import config
import numpy as np
#import model
import video_utils as vu
import callbacks
import time
import data_processor as dp
import mil_loss as mil
from tensorflow.keras.models import load_model


train_file_path = '/content/drive/MyDrive/VideoAnomalyDetection/data/UCF-Crime/Anomaly_Train_2.txt'
segments_dir = '/content/drive/MyDrive/VideoAnomalyDetection/output/preprocessed_videos'
base_dataset_dir = '/content/drive/MyDrive/VideoAnomalyDetection/data/UCF-Crime/'
base_directory = '/content/drive/MyDrive/VideoAnomalyDetection/'
pretrained_model_path = '/content/drive/MyDrive/VideoAnomalyDetection/pretrained/Video Swin Transformer/TFVideoSwinB_K600_IN22K_P244_W877_32x224'

model_chkpt_filename = f"training_1/vad_{int(time.time())}.ckpt"
checkpoint_path = os.path.join(base_directory, model_chkpt_filename)
checkpoint_dir = os.path.dirname(checkpoint_path)

# Read video paths from the file
video_paths = vu.read_video_paths_from_file(train_file_path,base_dataset_dir)

# Use "Shooting" and "Normal" to distinguish class types
anomaly_videos = vu.filter_video_paths(video_paths, 'Shooting')
normal_videos = vu.filter_video_paths(video_paths, 'Normal')

anomaly_videos= vu.select_random_videos(anomaly_videos, 4)
normal_videos= vu.select_random_videos(normal_videos, 4)

video_paths = anomaly_videos + normal_videos

# Annotate anomalous and normal videos
labels = [1] * len(anomaly_videos) + [0] * len(normal_videos)

# Alternate normal and anomalous videos to ensure pairs are passed along for training
video_paths, labels = vu.alternate_video_and_labels(video_paths, labels)

batch_size=32
epoch_size = 10

train_dataset, val_dataset, total_train_samples, total_val_samples = dp.create_train_val_datasets(video_paths, labels,resize_shape=(224, 224), segment_length=32, batch_size=32, validation_split=0.2)

# Calculate steps per epoch for training and validation
steps_per_epoch = total_train_samples // batch_size
validation_steps = total_val_samples // batch_size

# Ensure datasets are shuffled
train_dataset = train_dataset.shuffle(buffer_size=total_train_samples,reshuffle_each_iteration=True)
val_dataset = val_dataset.shuffle(buffer_size=total_val_samples,reshuffle_each_iteration=True)

# Fetch batches while model is training
train_dataset = train_dataset.prefetch(tf.data.AUTOTUNE)
val_dataset = val_dataset.prefetch(tf.data.AUTOTUNE)

# Configure the Adam optimizer with a conventional learning rate
adam_optimizer = Adam(learning_rate=0.0001)

# Load the pretrained Video Swin model
finetuned_model = build_finetuned_model(pretrained_model_path, batch_size)

# If there exists a previous checkpoint, load it
latest = tf.train.latest_checkpoint(checkpoint_dir)
if latest:
    print(f"Resuming training from {latest}")
    finetuned_model.load_weights(latest)

# Compile model with MIL loss, adam optimizer and metrics
finetuned_model.compile(
    optimizer=adam_optimizer,
    loss=mil.mil_ranking_loss,
    #loss='binary_crossentropy',
    metrics=[
        Accuracy(name='accuracy'),
        Precision(name='precision'),
        Recall(name='recall'),
        AUC(name='auc')
    ]
)

history = finetuned_model.fit(
    train_dataset,
    validation_data=val_dataset,
    epochs=epoch_size,
    steps_per_epoch=steps_per_epoch,
    validation_steps=validation_steps,
    callbacks=callbacks.callbacks_list,
    verbose=1
)

# Save history object
with open('model_history.pkl', 'wb') as file:
    pickle.dump(history.history, file)

ValueError: not enough values to unpack (expected 4, got 2)

In [49]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## Train Fine-tuned Model

Epoch 1/10
Epoch 1: val_loss improved from inf to 0.12093, saving model to /content/drive/MyDrive/VideoAnomalyDetection/training_1/vad_1713886874.ckpt
Epoch 2/10
Epoch 2: val_loss improved from 0.12093 to 0.12002, saving model to /content/drive/MyDrive/VideoAnomalyDetection/training_1/vad_1713886874.ckpt
Epoch 3/10

# Post Training Analysis

In [None]:
import pandas as pd
import matplotlib.pyplot as plt


# Load the history from a pickle file
with open('model_history.pkl', 'rb') as file:
    loaded_history = pickle.load(file)

print(loaded_history)


history_df = pd.DataFrame(loaded_history)

# Number of metrics to plot
num_metrics = len(history_df.columns)
# Define how many columns of subplots
num_columns = 2
num_rows = (num_metrics + 1) // num_columns

# Create a figure with subplots
fig, axes = plt.subplots(num_rows, num_columns, figsize=(15, num_rows * 5))
axes = axes.flatten()


for i, col in enumerate(history_df.columns):
    ax = axes.flatten()[i]
    history_df[[col]].plot(ax=ax)
    ax.set_title(col)
    ax.set_xlabel('Epochs')
    ax.set_ylabel(col)

# Show the plot
plt.show()

# Display the metrics in a tabular format using DataFrame
print(history_df)


FileNotFoundError: [Errno 2] No such file or directory: 'model_history.pkl'

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix
import seaborn as sns
import itertools



# Make prediction with validation dataset that includes labels
val_predictions = finetune_model.predict(val_dataset)
# Get the index of the max prediction
val_pred_labels = np.argmax(val_predictions, axis=1)

true_labels = np.concatenate([y.numpy() for _, y in val_dataset])

cm = confusion_matrix(true_labels, val_pred_labels)

# Using matplotlib
plt.figure(figsize=(8, 6))
plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
plt.title('Confusion Matrix')
plt.colorbar()
tick_marks = np.arange(len(set(true_labels)))  # Number of classes
plt.xticks(tick_marks, range(len(set(true_labels))), rotation=45)
plt.yticks(tick_marks, range(len(set(true_labels))))

thresh = cm.max() / 2.
for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
    plt.text(j, i, cm[i, j],
             horizontalalignment="center",
             color="white" if cm[i, j] > thresh else "black")

plt.tight_layout()
plt.ylabel('True label')
plt.xlabel('Predicted label')
plt.show()

# Using seaborn
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt="d", linewidths=.5, cmap="Blues")
plt.title('Confusion Matrix')
plt.ylabel('True label')
plt.xlabel('Predicted label')
plt.show()
