In [None]:
# Install the required libraries.
!pip install pafy moviepy

In [None]:
# Import the required libraries.
import os
import cv2
import pafy
import math
import random
import numpy as np
import datetime as dt
import tensorflow as tf
from collections import deque
import matplotlib.pyplot as plt
from moviepy.editor import *
%matplotlib inline

In [None]:
from sklearn.model_selection import train_test_split

from tensorflow.keras.layers import *
from tensorflow.keras.models import Sequential
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.utils import plot_model

seed_constant = 27
np.random.seed(seed_constant)
random.seed(seed_constant)
tf.random.set_seed(seed_constant)

## Download the dataset

In [None]:
# Download the UCF50 Dataset
!wget --no-check-certificate https://www.crcv.ucf.edu/data/UCF50.rar

#Extract the Dataset
!unrar x UCF50.rar

In [None]:
# Create a Matplotlib figure and specify the size of the figure.
plt.figure(figsize = (20, 20))

# Get the names of all classes/categories in UCF50.
all_classes_names = os.listdir('UCF50')

# Generate a list of 20 random values. The values will be between 0-50,
# where 50 is the total number of class in the dataset.
random_range = random.sample(range(len(all_classes_names)), 20)

# Iterating through all the generated random values.
for counter, random_index in enumerate(random_range, 1):

    # Retrieve a Class Name using the Random Index.
    selected_class_Name = all_classes_names[random_index]

    # Retrieve the list of all the video files present in the randomly selected Class Directory.
    video_files_names_list = os.listdir(f'UCF50/{selected_class_Name}')

    # Randomly select a video file from the list retrieved from the randomly selected Class Directory.
    selected_video_file_name = random.choice(video_files_names_list)

    # Initialize a VideoCapture object to read from the video File.
    video_reader = cv2.VideoCapture(f'UCF50/{selected_class_Name}/{selected_video_file_name}')

    # Read the first frame of the video file.
    _, bgr_frame = video_reader.read()

    # Release the VideoCapture object.
    video_reader.release()

    # Convert the frame from BGR into RGB format.
    rgb_frame = cv2.cvtColor(bgr_frame, cv2.COLOR_BGR2RGB)

    # Write the class name on the video frame.
    cv2.putText(rgb_frame, selected_class_Name, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)

    # Display the frame.
    plt.subplot(5, 4, counter);plt.imshow(rgb_frame);plt.axis('off')

## Data Pre-processing

In [None]:
# Configure the target dimensions for preprocessing video frames
IMAGE_HEIGHT , IMAGE_WIDTH = 64, 64

# Define how many consecutive frames from each video will be used as input to create temporal sequences
SEQUENCE_LENGTH = 20

# Set the path to the root directory where the UCF50 action recognition dataset is stored
DATASET_DIR = "UCF50"

# Build a list of action class names for model training - using the first 20 classes from the dataset for quicker training
CLASSES_LIST = []
for i in range(10):
  CLASSES_LIST.append(all_classes_names[i])

In [None]:
import os

# Path to dataset
DATASET_DIR = "UCF50"

# Automatically get class names from folder names
CLASSES_LIST = sorted(CLASSES_LIST)

print("Classes found:", CLASSES_LIST)
print("Total classes:", len(CLASSES_LIST))

## SetUp Data Pipeline

In [None]:
# ------------------ to extract frames ------------------
def extract_video_frames(video_path):
    frames_list = []
    video_reader = cv2.VideoCapture(video_path)
    video_frames_count = int(video_reader.get(cv2.CAP_PROP_FRAME_COUNT))
    skip_frames_window = max(int(video_frames_count / SEQUENCE_LENGTH), 1)

    for frame_counter in range(SEQUENCE_LENGTH):
        video_reader.set(cv2.CAP_PROP_POS_FRAMES, frame_counter * skip_frames_window)
        success, frame = video_reader.read()
        if not success:
            break
        resized_frame = cv2.resize(frame, (IMAGE_HEIGHT, IMAGE_WIDTH))
        normalized_frame = resized_frame / 255.0
        frames_list.append(normalized_frame)

    video_reader.release()
    return frames_list


# ------------------ Build list of video paths + labels ------------------
video_paths, labels = [], []

for class_idx, class_name in enumerate(CLASSES_LIST):
    class_dir = os.path.join(DATASET_DIR, class_name)
    for file_name in os.listdir(class_dir):
        video_paths.append(os.path.join(class_dir, file_name))
        labels.append(class_idx)

labels = to_categorical(labels, num_classes=len(CLASSES_LIST))

# Train-test split
train_paths, test_paths, train_labels, test_labels = train_test_split(
    video_paths, labels, test_size=0.2, random_state=seed_constant, shuffle=True
)

print(f"Training samples: {len(train_paths)}, Testing samples: {len(test_paths)}")

In [None]:
def video_generator(video_paths, labels):
    def gen():
        for path, label in zip(video_paths, labels):
            frames = extract_video_frames(path)
            if len(frames) == SEQUENCE_LENGTH:
                yield np.array(frames, dtype=np.float32), label
    return gen



train_dataset = tf.data.Dataset.from_generator(
    video_generator(train_paths, train_labels),
    output_signature=(
        tf.TensorSpec(shape=(SEQUENCE_LENGTH, IMAGE_HEIGHT, IMAGE_WIDTH, 3), dtype=tf.float32),
        tf.TensorSpec(shape=(len(CLASSES_LIST),), dtype=tf.float32)
    )
)

test_dataset = tf.data.Dataset.from_generator(
    video_generator(test_paths, test_labels),
    output_signature=(
        tf.TensorSpec(shape=(SEQUENCE_LENGTH, IMAGE_HEIGHT, IMAGE_WIDTH, 3), dtype=tf.float32),
        tf.TensorSpec(shape=(len(CLASSES_LIST),), dtype=tf.float32)
    )
)

In [None]:
# Shuffle, batch, prefetch
BATCH_SIZE = 4
train_dataset = train_dataset.shuffle(100).batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)
test_dataset = test_dataset.batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)

In [None]:
import tensorflow as tf
from tensorflow.keras.layers import ConvLSTM2D, MaxPooling3D, TimeDistributed, Dense, Flatten, Dropout

# ------------------ ConvLSTM Model ------------------
def build_convlstm_network():
    model = Sequential()

    model.add(ConvLSTM2D(filters=4, kernel_size=(3, 3), activation='tanh',
                         recurrent_dropout=0.2, return_sequences=True,
                         input_shape=(SEQUENCE_LENGTH, IMAGE_HEIGHT, IMAGE_WIDTH, 3)))
    model.add(MaxPooling3D(pool_size=(1, 2, 2), padding='same'))
    model.add(TimeDistributed(Dropout(0.2)))

    model.add(ConvLSTM2D(filters=8, kernel_size=(3, 3), activation='tanh',
                         recurrent_dropout=0.2, return_sequences=True))
    model.add(MaxPooling3D(pool_size=(1, 2, 2), padding='same'))
    model.add(TimeDistributed(Dropout(0.2)))

    model.add(ConvLSTM2D(filters=14, kernel_size=(3, 3), activation='tanh',
                         recurrent_dropout=0.2, return_sequences=True))
    model.add(MaxPooling3D(pool_size=(1, 2, 2), padding='same'))
    model.add(TimeDistributed(Dropout(0.2)))

    model.add(ConvLSTM2D(filters=16, kernel_size=(3, 3), activation='tanh',
                         recurrent_dropout=0.2, return_sequences=True))
    model.add(MaxPooling3D(pool_size=(1, 2, 2), padding='same'))
    model.add(TimeDistributed(Dropout(0.2)))

    model.add(Flatten())
    model.add(Dense(len(CLASSES_LIST), activation="softmax"))

    return model


model = build_convlstm_network()
model.summary()

# ------------------ Compile & Train ------------------
model.compile(loss="categorical_crossentropy", optimizer="Adam", metrics=["accuracy"])

# Create an Instance of Early Stopping Callback
early_stopping_callback = EarlyStopping(monitor = 'val_loss', patience = 10, mode = 'min', restore_best_weights = True)

In [None]:
history = model.fit(
    train_dataset,
    validation_data=test_dataset,
    epochs=20,
    callbacks = [early_stopping_callback]
)