In [None]:
import os
import pickle
from time import time
from collections import deque

import cv2
import numpy as np
import pandas as pd
import tensorflow as tf


import plotly.express as px
import plotly.graph_objects as go
import matplotlib.pyplot as plt

# tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)

# Test video reader & image channels

In [None]:
video_reader = cv2.VideoCapture('UCF50/BaseballPitch/v_BaseballPitch_g01_c03.avi')
flag, bgr_frame = video_reader.read()

In [None]:
video_reader.get(cv2.CAP_PROP_FPS)

In [None]:
plt.imshow(bgr_frame)

In [None]:
bgr_frame.shape

In [None]:
b = bgr_frame[:,:,0]
g = bgr_frame[:,:,1]
r = bgr_frame[:,:,2]

plt.imshow(np.stack([r,g,b], axis=2))

In [None]:
# cv2.imshow('image', bgr_frame)
  
# # waits for user to press any key
# # (this is necessary to avoid Python kernel form crashing)
# cv2.waitKey(0)
  
# # closing all open windows
# cv2.destroyAllWindows()

In [None]:
video_reader.release()

In [None]:
video_reader.isOpened()

# Explore data

In [None]:
all_classes_names = os.listdir('UCF50')
print(all_classes_names)

In [None]:
plt.figure(figsize = (12, 12))
selected_classes = np.random.choice(all_classes_names, 4)
for i, class_name in enumerate(selected_classes, 1):
    video_list = os.listdir(os.path.join('UCF50', class_name))
    video_name = np.random.choice(video_list)
    
    video_reader = cv2.VideoCapture(os.path.join('UCF50', class_name, video_name))
    _, bgr_frame = video_reader.read()
    video_reader.release()
    
    cv2.putText(bgr_frame, text=class_name, org=(5, 25), fontFace=cv2.FONT_HERSHEY_SIMPLEX, fontScale=1, color=(0, 0, 255), thickness=2)
    rgb_frame = cv2.cvtColor(bgr_frame, cv2.COLOR_BGR2RGB)
    
    
    plt.subplot(4, 2, i)
    plt.imshow(rgb_frame)
    plt.axis('off')
    
#     print(video_path)

In [None]:
video_reader = cv2.VideoCapture('UCF50/BaseballPitch/v_BaseballPitch_g01_c01.avi')
print(video_reader.get(cv2.CAP_PROP_FRAME_COUNT))
video_reader.release()

In [None]:
len_df = {}
for class_name in selected_classes:
    video_list = os.listdir(os.path.join('UCF50', class_name))
    
    len_list = []
    for video_name in video_list:
        video_reader = cv2.VideoCapture(os.path.join('UCF50', class_name, video_name))
        len_list.append(video_reader.get(cv2.CAP_PROP_FRAME_COUNT))
        video_reader.release()
        
    len_df[class_name] = len_list
    
for class_name in selected_classes:
    plt.hist(len_df[class_name],alpha = 0.5)

# Data Preparation

In [None]:
selected_classes = ['HighJump', 'Lunges', 'JumpRope', 'Punch']

In [None]:
HEIGHT, WIDTH = 64, 64
NUM_FRAMES  = 30

In [None]:
def get_frames(video_path):
    video_reader = cv2.VideoCapture(video_path)
    frame_count = video_reader.get(cv2.CAP_PROP_FRAME_COUNT)
    skip_frames = max(int(frame_count/NUM_FRAMES), 1) # frames to skip to get NUM_FRAMES 

    frame_list = []
    for counter in range(NUM_FRAMES):
        video_reader.set(cv2.CAP_PROP_POS_FRAMES, counter * skip_frames)
        flag, bgr_frame = video_reader.read()
        if flag:
            frame = cv2.cvtColor(bgr_frame, cv2.COLOR_BGR2RGB) # change color sequence
            frame = cv2.resize(frame, (HEIGHT, WIDTH))/255 # resize & normalise the frame
            frame_list.append(frame)
        else: break
    video_reader.release()
    return frame_list

In [None]:
frame_list = get_frames('./UCF50/BaseballPitch/v_BaseballPitch_g01_c01.avi')
print(len(frame_list), frame_list[0].shape)

In [None]:
# X, y, paths = [], [], []
# for class_index, class_name in enumerate(selected_classes):
#     video_list = os.listdir(os.path.join('UCF50', class_name))
#     for video_name in video_list:
#         video_path = os.path.join('UCF50', class_name, video_name)
#         frame_list = get_frames(video_path)
#         if len(frame_list)==NUM_FRAMES:
#             X.append(frame_list)
#             y.append(class_index)
#             paths.append(video_path)

# with open('data_temp.pkl', 'wb') as file:
#     pickle.dump({'X':X,'y':y,'paths':paths}, file)

In [None]:
with open('data_temp.pkl', 'rb') as file:
    obj = pickle.load(file)
X, y, paths = obj['X'], obj['y'], obj['paths']
y = [selected_classes.index(each) for each in y]

In [None]:
X, y, paths = np.array(X), np.array(y), np.array(paths)

In [None]:
{k:v for k,v in zip(*np.unique(y, return_counts=True))}

In [None]:
from tensorflow.keras.utils import to_categorical
y = to_categorical(y)

In [None]:
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.25, shuffle = True, random_state = 43)

In [None]:
X_train.shape

# Model building

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import TimeDistributed, Conv2D, MaxPool2D, LSTM, Dense, Flatten, Dropout
from tensorflow.keras.callbacks import EarlyStopping

In [None]:
layer1 = Conv2D(8, kernel_size=(3,3))
layer2 = TimeDistributed(layer1)

inp = X_train[:5]
print(inp.shape)

opt1 = layer1(inp)
opt2 = layer2(inp)
print(opt1.shape, opt2.shape)
print(np.sum(opt1-opt2))

# conv2d  = 0,1,2,3, | 4,5,6  ; rest  | img[3dim]
# flatten = 0,|1,2,3,4,5,6    ; batch[1dim] | rest

In [None]:
# LRCN is a class of models that is both spatially and temporally deep. 
model = Sequential()


model.add(TimeDistributed(Conv2D(32, (3, 3), padding='same',activation = 'relu'),
                          input_shape = (NUM_FRAMES, HEIGHT, WIDTH, 3)))

model.add(TimeDistributed(MaxPool2D((4, 4)))) 
model.add(TimeDistributed(Dropout(0.25)))

model.add(TimeDistributed(Conv2D(64, (3, 3), padding='same',activation = 'relu')))
model.add(TimeDistributed(MaxPool2D((4, 4))))
model.add(TimeDistributed(Dropout(0.25)))

model.add(TimeDistributed(Conv2D(32, (3, 3), padding='same',activation = 'relu')))
model.add(TimeDistributed(MaxPool2D((2, 2))))
model.add(TimeDistributed(Dropout(0.25)))


model.add(TimeDistributed(Flatten()))

model.add(LSTM(32))

model.add(Dense(len(selected_classes), activation = 'softmax'))

model.summary()

In [None]:
early_stopping_callback = EarlyStopping(monitor = 'val_loss', patience = 15, mode = 'min', restore_best_weights = True)
 
model.compile(loss = 'categorical_crossentropy', optimizer = 'Adam', metrics = ["accuracy"])

history = model.fit(x = X_train, y = y_train, epochs = 70, batch_size = 4 ,
                                             shuffle = True, validation_split = 0.2, callbacks = [early_stopping_callback])

In [None]:
model.save('./LRCN.h5')

In [None]:
model.evaluate(X_test, y_test)

# Test on a video - sliding window

In [None]:
model = tf.keras.models.load_model('./LRCN.h5')

In [None]:
test_frames = get_frames('./UCF50/Lunges/v_Lunges_g01_c01.avi')

In [None]:
test_frames = np.array(test_frames)
test_frames = np.expand_dims(test_frames, 0)
test_frames.shape

In [None]:
pred = model(test_frames).numpy()
pred

In [None]:
selected_classes[np.argmax(pred)]

In [None]:
video_reader = cv2.VideoCapture('./UCF50/Lunges/v_Lunges_g01_c01.avi')
frame_list = deque(maxlen=NUM_FRAMES)
predicted_class = ''
t1 = time()
while video_reader.isOpened():
    flag, frame = video_reader.read()
    
    if not flag:
        break
    
    processed_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # change color sequence
    processed_frame = cv2.resize(processed_frame, (64, 64))/255
    
    frame_list.append(processed_frame)
    
    if len(frame_list)==NUM_FRAMES:
        test_frames = np.expand_dims(frame_list, 0)
        pred = model(test_frames).numpy()
        predicted_class = selected_classes[np.argmax(pred)]
#         print(predicted_class)
    
    t2 = time()
    delta = t2-t1
    t1=t2
    
    
    cv2.putText(frame, text=predicted_class, org=(5, 25), fontFace=cv2.FONT_HERSHEY_SIMPLEX, fontScale=1, color=(0, 0, 255), thickness=2)
    cv2.putText(frame, text=f'{round(1/delta, 2)} fps', org=(5, 50), fontFace=cv2.FONT_HERSHEY_SIMPLEX, fontScale=1, color=(0, 0, 255), thickness=2)
    
    
    cv2.imshow('playback', frame)
    
    if cv2.waitKey(10) & 0xFF == ord('q'):
        break
video_reader.release()
cv2.destroyAllWindows()