In [None]:
import os
import cv2
import matplotlib.pyplot as plt
import numpy as np

from collections import deque
from sklearn.model_selection import train_test_split

import tensorflow as tf 
from tensorflow.keras import layers
from tensorflow.keras.layers import Dense, Flatten, Conv2D, MaxPooling2D, Dropout, BatchNormalization, Activation, GlobalAveragePooling2D, AveragePooling2D, Input, Concatenate, LeakyReLU, Add, Multiply
from tensorflow.keras.models import Sequential
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.utils import plot_model, Sequence
from tensorflow.keras.callbacks import EarlyStopping

import seaborn as sns
from sklearn.preprocessing import LabelEncoder

# Utils

In [None]:
# plot training history function
def plot_training_history(history):
    plt.figure(figsize=(10, 5))
    plt.subplot(1, 2, 1)
    plt.plot(history.history['accuracy'], label='accuracy')
    plt.plot(history.history['val_accuracy'], label = 'val_accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.ylim([0, 1])
    plt.legend(loc='lower right')
    plt.title('Training and Validation Accuracy')

    plt.subplot(1, 2, 2)
    plt.plot(history.history['loss'], label='loss')
    plt.plot(history.history['val_loss'], label = 'val_loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.ylim([0, 3])
    plt.legend(loc='upper right')
    plt.title('Training and Validation Loss')
    plt.show()

In [None]:
DIR_PATH = "./RWF-2001"
TRAIN_DIR = os.path.join(DIR_PATH, "train")
TEST_DIR = os.path.join(DIR_PATH, "val")

In [None]:
CLASS_NAMES = os.listdir(TRAIN_DIR)
SEED = 1337
SEQUENCE_LENGTH  = 51
IMG_HEIGHT = 100
IMG_WIDTH = 100
IMG_SIZE = (IMG_HEIGHT, IMG_WIDTH)
BATCH_SIZE = 10

ORIGINAL_FRAMES_PER_VIDEO = 150
FRAMES_PER_VIDEO = 50 + 1
VIDEO_WIDTH, VIDEO_HEIGHT = 100, 100
N_CHANNELS = 3

## Load Data

In [None]:
# get list of labels and video path
def get_labels_and_video_paths(data_dir):
    labels = []
    video_paths = []
    for label in CLASS_NAMES:
        label_dir = os.path.join(data_dir, label)
        for video in os.listdir(label_dir):
            video_paths.append(os.path.join(label_dir, video))
            labels.append(label)
    return labels, video_paths

In [None]:
import gc
from tensorflow.keras import backend as K

def load_videos(video_IDs: list, video_labels: dict, video_frames: int = FRAMES_PER_VIDEO, video_width: int = VIDEO_WIDTH, video_height: int = VIDEO_HEIGHT,
                video_channels: int = N_CHANNELS, dtype = np.float32, normalize: bool = False) -> tuple:
    videos = np.empty((len(video_IDs), video_frames, video_height, video_width, video_channels), dtype=dtype)
    labels = np.empty((len(video_IDs),), dtype=np.int8)

    # Indexes of frames to be kept to comply with video_frames
    frames_idx = set(np.round(np.linspace(0, ORIGINAL_FRAMES_PER_VIDEO - 1, video_frames)).astype(int))

    for i, video_ID in enumerate(video_IDs):
        cap = cv2.VideoCapture(video_ID)
        frames = []
        index = 0
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            if index in frames_idx:
                frame = cv2.resize(frame, (video_width, video_height)).astype(dtype)
                if normalize:
                    frame /= 255.0
                frames.append(frame)
            index += 1
        cap.release()

        videos[i,] = np.array(frames)
        labels[i] = video_labels[video_ID]

    return videos, labels

class DataGenerator(tf.keras.utils.Sequence):
    def __init__(self, video_IDs: list, video_labels: dict, batch_size: int, video_width: int = VIDEO_WIDTH, video_height: int = VIDEO_HEIGHT,
                video_frames: int = FRAMES_PER_VIDEO, video_channels: int = N_CHANNELS, dtype = np.float32, normalize: bool = False, shuffle: bool = True):
        self.video_IDs = video_IDs
        self.video_labels = video_labels
        self.batch_size = batch_size
        self.video_width = video_width
        self.video_height = video_height
        self.video_frames = video_frames
        self.video_channels = video_channels
        self.dtype = dtype
        self.normalize = normalize
        self.shuffle = shuffle

    def __len__(self):
        return len(self.video_IDs) // self.batch_size

    def __getitem__(self, idx):
        bacth_IDs = self.video_IDs[idx*self.batch_size:(idx+1)*self.batch_size]

        return load_videos(bacth_IDs, self.video_labels, self.video_frames, self.video_width, self.video_height, self.video_channels, self.dtype, self.normalize)
    
    def on_epoch_end(self):
        'Updates indexes after each epoch'
        if self.shuffle:
            np.random.shuffle(self.video_IDs)
        # Clear memory after epochs
        gc.collect()
        #K.clear_session()

In [None]:
# # get labels and video path
# labels_train, video_paths_train = get_labels_and_video_paths(TRAIN_DIR)
# labels_test, video_paths_test = get_labels_and_video_paths(TEST_DIR)

# # convert labels to one-hot encoding
# label_encoder = LabelEncoder()
# labels_train = label_encoder.fit_transform(labels_train)
# labels_test = label_encoder.transform(labels_test)

# # generate data
# train_gen = VideoDataGenerator(video_paths_train, labels_train, BATCH_SIZE, SEQUENCE_LENGTH, IMG_SIZE)
# test_gen = VideoDataGenerator(video_paths_test, labels_test, BATCH_SIZE, SEQUENCE_LENGTH, IMG_SIZE)

import glob

train_video_IDs = glob.glob(f"{DIR_PATH}/train/*/*")
train_video_labels = {video: 0 if 'NonFight' in video else 1 for video in train_video_IDs}
test_video_IDs = glob.glob(f"{DIR_PATH}/val/*/*")
test_video_labels = {video: 0 if 'NonFight' in video else 1 for video in test_video_IDs}

train_generator = DataGenerator(train_video_IDs, train_video_labels, batch_size=10)
test_generator = DataGenerator(test_video_IDs, test_video_labels, batch_size=10)

# Training Model

In [None]:
def tf_frame_diff(video):
    return video[1:] - video[:-1]

In [None]:
# Input layer
inputs = layers.Input(shape=(SEQUENCE_LENGTH, IMG_HEIGHT, IMG_WIDTH, 3))

# Frame Difference Calculation
inputs_diff = layers.Lambda(lambda video: tf.map_fn(tf_frame_diff, video))(inputs)

# ConvLSTM2D block 1
x = layers.ConvLSTM2D(
    filters=4,
    kernel_size=(3, 3),
    padding='same',
    return_sequences=True,
    recurrent_dropout=0.3,
    kernel_regularizer=tf.keras.regularizers.l2(0.01),
    data_format='channels_last'
)(inputs_diff)
x = layers.BatchNormalization()(x)
x = layers.MaxPooling3D(pool_size=(1, 2, 2), padding='same', data_format='channels_last')(x)
x = layers.TimeDistributed(layers.Dropout(0.3))(x)

# ConvLSTM2D block 2
x = layers.ConvLSTM2D(
    filters=8,
    kernel_size=(3, 3),
    padding='same',
    return_sequences=True,
    recurrent_dropout=0.3,
    kernel_regularizer=tf.keras.regularizers.l2(0.01),
    data_format='channels_last'
)(x)
x = layers.BatchNormalization()(x)
x = layers.MaxPooling3D(pool_size=(1, 2, 2), padding='same', data_format='channels_last')(x)
x = layers.TimeDistributed(layers.Dropout(0.3))(x)

# ConvLSTM2D block 3
x = layers.ConvLSTM2D(
    filters=16,
    kernel_size=(3, 3),
    padding='same',
    return_sequences=False,
    recurrent_dropout=0.3,
    kernel_regularizer=tf.keras.regularizers.l2(0.01),
    data_format='channels_last'
)(x)
x = layers.BatchNormalization()(x)

# Depthwise Convolution
x = layers.DepthwiseConv2D(kernel_size=(3, 3), depth_multiplier=2, activation='relu', data_format='channels_last')(x)

# Global Average Pooling
x = layers.GlobalAveragePooling2D(data_format='channels_last')(x)

# Fully Connected Dense Layers
x = layers.Dense(units=128, activation='relu')(x)
x = layers.Dense(units=16, activation='relu')(x)

# Output layer
outputs = layers.Dense(units=1, activation='sigmoid')(x)

# Define and compile the model
model = tf.keras.Model(inputs=inputs, outputs=outputs)
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

model.summary()



In [None]:
# Add callbacks
early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_accuracy', patience=5, restore_best_weights=True)
lr_scheduler = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_accuracy', factor=0.5, patience=3)

In [None]:
EPOCH = 30

# train the model
train_hist = model.fit(train_generator,
                            validation_data = test_generator,                                    
                            epochs = EPOCH)

In [None]:
plot_training_history(train_hist)

In [None]:
max_val_acc = max(train_hist.history['val_accuracy'])

print(f"max val accuracy: {max_val_acc:.2f}")