## References
- https://youtu.be/QmtSkq3DYko?si=6VzZc_NH5glCPi0m
- https://learnopencv.com/introduction-to-video-classification-and-human-activity-recognition/

In [None]:
from random import seed as randomSeed, choice
from numpy import asarray, array
from numpy.random import seed as numpyRandomSeed
from sklearn.model_selection import train_test_split
from tensorflow.keras.layers import (
    ConvLSTM2D,
    MaxPooling3D,
    TimeDistributed,
    Dropout,
    Flatten,
    Dense
)
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.random import set_seed as tensorflowRandomSeed
from os import listdir
from os.path import join
from cv2 import (
    VideoCapture, 
    cvtColor, 
    COLOR_BGR2RGB,
    putText,
    FONT_HERSHEY_SIMPLEX,
    CAP_PROP_FRAME_COUNT,
    CAP_PROP_POS_FRAMES,
    resize,
)
from matplotlib.pyplot import (
    figure, 
    subplot, 
    plot, 
    imshow, 
    axis, 
    title, 
    legend
)

%matplotlib inline

Hardcoding SEED to make results consistent with every execution

In [None]:
SEED = 27
numpyRandomSeed(SEED)
randomSeed(SEED)
tensorflowRandomSeed(SEED)

Downloading and extracting UCF50 dataset<br>
source: https://www.crcv.ucf.edu/data/UCF50.rar

In [None]:
%%capture

# !wget --no-check-certificate https://www.crcv.ucf.edu/data/UCF50.rar

# uncomment this to unrar the rar dataset file \
# or use some unpacking software like 7-ZIP like I did
# !unrar x UCF50.rar

Visualizing the UTF50 dataset, not necessary to do

In [None]:
figure(figsize=(20, 20))
allClassNames = listdir("UCF50")
print(allClassNames)
samplesInEachClass = [len(listdir(join("UCF50", i))) for i in allClassNames]
print(samplesInEachClass)

for i in range(len(allClassNames)):
    # getting paths of all the videos in class[i]
    allVideosInClass = listdir(join("UCF50", allClassNames[i]))
    # pick a random video to show onto the subplot
    randomSelectedVideo = choice(allVideosInClass)
    videoReader = VideoCapture(join("UCF50", allClassNames[i], randomSelectedVideo))
    success, bgrFrame = videoReader.read()
    # if not successful in reading a frame break from the loop
    if not success:
        break
    videoReader.release()
    # converting frame from BGR to RGB
    rgbFrame = cvtColor(bgrFrame, COLOR_BGR2RGB)
    # writing class label on the sample image
    putText(
        rgbFrame,
        allClassNames[i],
        (10, 30),
        FONT_HERSHEY_SIMPLEX,
        1,
        (255, 255, 255),
        2,
    )
    # putting the frame with class label onto the subplot
    subplot(5, 4, i + 1)
    imshow(rgbFrame)
    axis("off")

In [None]:
# frame dimensions
IMAGE_WIDTH = 64
IMAGE_HEIGHT = 64
IMAGE_DIMENSION = (IMAGE_WIDTH, IMAGE_HEIGHT)

# number of frames present in one feature
SEQUENCE_LENGTH = 20

# dataset name
DATASET_DIR = "UCF50"

# classes to train upon
CLASSES = ["BenchPress", "CleanAndJerk", "Diving", "BreastStroke"]

In [None]:
def frameExtraction(videoPath):
    """
    @desc: extract frames from a video at videoPath
    @param {string} videoPath: path of a video
    @returns {list} frames: `SEQUENCE_LENGTH` number of frames that are \
        equally spaced out in the video 
    """
    frames = []
    videoReader = VideoCapture(videoPath)
    # total number of frames present in the video
    frameCount = int(videoReader.get(CAP_PROP_FRAME_COUNT))
    skipFrameWindow = max(int(frameCount / SEQUENCE_LENGTH), 1)
    for i in range(SEQUENCE_LENGTH):
        videoReader.set(CAP_PROP_POS_FRAMES, i * skipFrameWindow)
        success, frame = videoReader.read()
        # if not successful in reading the frame break from the loop
        if not success:
            break
        # append the frame on frames after resizing
        frames.append(resize(frame, IMAGE_DIMENSION) / 255)
    videoReader.release()
    return frames

Extracting features and labels from `CLASSES` (train classses)
- *{2D vector} features*: vector of feature (vector of frame in a video)
- *{2D vector} oneHotEncodedLabels*: vector of hotEncodedLabel corresponding to a feature
  - Ex. [1 0 0 0]: meaning that the corresponding feature belongs to class[0]

In [None]:
features, labels = [], []
for classId, className in enumerate(CLASSES):
    print(f"Extracting Data of Class: {className}")
    files = listdir(join(DATASET_DIR, className))
    for file in files:
        videoFilePath = join(DATASET_DIR, className, file)
        frames = frameExtraction(videoFilePath)
        if len(frames) == SEQUENCE_LENGTH:
            features.append(frames)
            labels.append(classId)
features = asarray(features)
labels = array(labels)
oneHotEncodedLabels = to_categorical(labels)

Splitting the features and labels into train and test dataset with test_size = 0.2 and shuffling enabled

In [None]:
featuresTrain, featuresTest, labelsTrain, labelsTest = train_test_split(
    features, oneHotEncodedLabels, test_size=0.2, shuffle=True, random_state=SEED
)

Model: "sequential"

| Layer (type) | Output Shape | Param |   
| :----------- | :----------- | :---- |
| conv_lstm2d (ConvLSTM2D) | (None, 20, 62, 62, 4) | 1024 |            
| max_pooling3d (MaxPooling3D) | (None, 20, 31, 31, 4) | 0 |
| time_distributed (TimeDistributed) | (None, 20, 31, 31, 4) | 0 | 
| conv_lstm2d_1 (ConvLSTM2D) | (None, 20, 29, 29, 8) | 3488 |      
| max_pooling3d_1 (MaxPooling3D) | (None, 20, 15, 15, 8) | 0 |                                                                    
| time_distributed_1 (TimeDistributed) | (None, 20, 15, 15, 8) | 0 |         
| conv_lstm2d_2 (ConvLSTM2D) | (None, 20, 13, 13, 14) | 11144 |    
| max_pooling3d_2 (MaxPooling3D) | (None, 20, 7, 7, 14) | 0 |                                                        
| time_distributed_2 (TimeDistributed) | (None, 20, 7, 7, 14) | 0 |                                                      
| conv_lstm2d_3 (ConvLSTM2D) | (None, 20, 5, 5, 16) | 17344 |    
| max_pooling3d_3 (MaxPooling3D) | (None, 20, 3, 3, 16) | 0 |                                                            
| flatten (Flatten) | (None, 2880) | 0 |        
| dense (Dense) | (None, 4) | 11524 |    
                                                                 
- Total params: 44524 (173.92 KB)
- Trainable params: 44524 (173.92 KB)
- Non-trainable params: 0 (0.00 Byte)

In [None]:
def createModelArchitecture():
    model = Sequential(
        [
            ConvLSTM2D(
                filters=4,
                kernel_size=(3, 3),
                activation="tanh",
                data_format="channels_last",
                recurrent_dropout=0.2,
                return_sequences=True,
                input_shape=(SEQUENCE_LENGTH, IMAGE_HEIGHT, IMAGE_WIDTH, 3),
            ),
            MaxPooling3D(
                pool_size=(1, 2, 2), padding="same", data_format="channels_last"
            ),
            TimeDistributed(Dropout(0.2)),
            ConvLSTM2D(
                filters=8,
                kernel_size=(3, 3),
                activation="tanh",
                data_format="channels_last",
                recurrent_dropout=0.2,
                return_sequences=True,
            ),
            MaxPooling3D(
                pool_size=(1, 2, 2), padding="same", data_format="channels_last"
            ),
            TimeDistributed(Dropout(0.2)),
            ConvLSTM2D(
                filters=14,
                kernel_size=(3, 3),
                activation="tanh",
                data_format="channels_last",
                recurrent_dropout=0.2,
                return_sequences=True,
            ),
            MaxPooling3D(
                pool_size=(1, 2, 2), padding="same", data_format="channels_last"
            ),
            TimeDistributed(Dropout(0.2)),
            ConvLSTM2D(
                filters=16,
                kernel_size=(3, 3),
                activation="tanh",
                data_format="channels_last",
                recurrent_dropout=0.2,
                return_sequences=True,
            ),
            MaxPooling3D(
                pool_size=(1, 2, 2), padding="same", data_format="channels_last"
            ),
            Flatten(),
            Dense(len(CLASSES), activation="softmax"),
        ]
    )
    print(model.summary())
    return model

In [None]:
# model object
model = createModelArchitecture()

compiling and training the model

In [None]:
# for info on early stopping callback refer the references mentioned at top
earlyStoppingCallback = EarlyStopping(
    monitor="val_loss", patience=10, mode="min", restore_best_weights=True
)
model.compile(loss="categorical_crossentropy", optimizer="Adam", metrics=["accuracy"])
modelTrainingHistory = model.fit(
    x=featuresTrain,
    y=labelsTrain,
    epochs=7,
    batch_size=4,
    shuffle=True,
    validation_split=0.2,
    callbacks=[earlyStoppingCallback],
)

In [None]:
def plot_metric(modelTrainingHistory, metricName1, metricName2, plotName):
    """
    @desc: plots the training history of `metricName1` & `metricName2` \
        using `modelTrainingHistory`
    """
    metricValue1 = modelTrainingHistory.history[metricName1]
    metricValue2 = modelTrainingHistory.history[metricName2]
    epochs = range(len(metricValue1))
    plot(epochs, metricValue1, "blue", label=metricName1)
    plot(epochs, metricValue2, "red", label=metricName2)
    title(str(plotName))
    legend()

In [None]:
plot_metric(
    modelTrainingHistory, "loss", "val_loss", "Total Loss vs Total Validation Loss"
)

![training_history](ucf_training_history.png)

In [None]:
model.save(f"{DATASET_DIR}.h5")

In [None]:
model = load_model(f"{DATASET_DIR}.h5")

In [None]:
loss, accuracy = model.evaluate(featuresTest, labelsTest)
print(loss)
print(accuracy)

In [None]:
def predictVideo(videoPath):
    """
    @desc: predicts the video based on the model trained above
    @param {string} videoPath: the path to the video
    @returns {string} className: class that the model thinks given video belongs to 
    """
    frames = frameExtraction(videoPath)
    confidences = model.predict(asarray([frames]))
    i = 0
    confidence = confidences[i]
    for j in range(1, len(confidences)):
        if j > confidence:
            confidence = confidences[j]
            i = j
    return CLASSES[i]

In [None]:
prediction = predictVideo(r"UCF50/BenchPress/v_BenchPress_g01_c01.avi")
print(prediction)