In [None]:
%%capture

!wget --no-check-certificate https://www.crcv.ucf.edu/data/UCF50.rar

!unrar x UCF50.rar

In [None]:
%%capture

!pip install pafy youtube-dl moviepy

In [None]:
import os
import cv2
import pafy
import math
import random
import numpy as np
import datetime as dt
import tensorflow as tf
from collections import deque
import matplotlib.pyplot as plt

from moviepy.editor import *
%matplotlib inline

from sklearn.model_selection import train_test_split

from tensorflow.keras.layers import *
from tensorflow.keras.models import Sequential
from tensorflow.keras.utils import to_categorical ,plot_model
from tensorflow.keras.callbacks import EarlyStopping

In [None]:
seed_constant = 27
np.random.seed(seed_constant)
random.seed(seed_constant)
tf.random.set_seed(seed_constant)

In [None]:
plt.figure(figsize=(20,20))

all_classes_name = os.listdir('UCF50')

random_range = random.sample(range(len(all_classes_name)) ,20)

for counter ,random_index in enumerate(random_range ,1):

   selected_class_name = all_classes_name[random_index]

   video_files_names_list = os.listdir(f'UCF50/{selected_class_name}')

   selected_video_file_name = random.choice(video_files_names_list)

   video_reader = cv2.VideoCapture(f'UCF50/{selected_class_name}/{selected_video_file_name}')

   _ ,bgr_frame = video_reader.read()

   video_reader.release()

   rgb_frame = cv2.cvtColor(bgr_frame ,cv2.COLOR_BGR2RGB)

   cv2.putText(rgb_frame ,selected_class_name ,(10,30) ,cv2.FONT_HERSHEY_SIMPLEX ,1 ,(255,255,255) ,2)

   plt.subplot(5, 4, counter)
   plt.imshow(rgb_frame)
   plt.axis('off')


In [None]:
IMAGE_HEIGHT ,IMAGE_WIDTH = 64 ,64

SEQUENCE_LENGTH = 20

DATASET_DIR = 'UCF50'

CLASSES_LIST = ['Biking','PullUps','PushUps','Swing']

In [None]:
def frame_extraction(video_path):

    frames_list = []

    video_reader = cv2.VideoCapture(video_path)

    video_frames_count = int(video_reader.get(cv2.CAP_PROP_FRAME_COUNT))

    skip_frames_window = max(int(video_frames_count/SEQUENCE_LENGTH) ,1)

    for frame_counter in range(SEQUENCE_LENGTH):

        video_reader.set(cv2.CAP_PROP_POS_FRAMES ,frame_counter * skip_frames_window)

        success ,frame = video_reader.read()

        if not success:
            break

        resized_frame = cv2.resize(frame ,(IMAGE_HEIGHT ,IMAGE_WIDTH))

        normalized_frame = resized_frame / 255

        frames_list.append(normalized_frame)

    video_reader.release()

    return frames_list


In [None]:
def create_dataset():

    features = []
    labels = []
    video_file_paths = []

    for class_index ,class_name in enumerate(CLASSES_LIST):

        print(f'Extracting Data of Class : {class_name}')

        files_list = os.listdir(os.path.join(DATASET_DIR ,class_name))

        for file_name in files_list:

            video_file_path = os.path.join(DATASET_DIR ,class_name ,file_name)

            frames = frame_extraction(video_file_path)

            if len(frames)==SEQUENCE_LENGTH:

                features.append(frames)
                labels.append(class_index)
                video_file_paths.append(video_file_path)

    features = np.asarray(features)
    labels = np.array(labels)

    return features ,labels ,video_file_paths


In [None]:
features ,labels ,video_file_paths = create_dataset()

In [None]:
features

In [None]:
labels

In [None]:
one_hot_encoded_labels = to_categorical(labels)

In [None]:
one_hot_encoded_labels

In [None]:
X_train , X_test ,Y_train ,Y_test = train_test_split(features ,one_hot_encoded_labels ,
                                                     test_size=0.2 ,shuffle=True ,
                                                     random_state=seed_constant)

In [None]:
len(features) ,len(X_train) ,len(X_test)

In [None]:
def create_convlstm_model():

    model = Sequential([

        # layer 1
        ConvLSTM2D(filters=4 ,kernel_size=(3,3) ,activation='tanh' ,data_format='channels_last' ,
                  recurrent_dropout=0.2 ,return_sequences=True ,input_shape=(SEQUENCE_LENGTH ,IMAGE_HEIGHT ,IMAGE_WIDTH ,3)),

        MaxPooling3D(pool_size=(1,2,2) ,padding='same' ,data_format='channels_last'),

        TimeDistributed(Dropout(0.2)),

        # layer 2
        ConvLSTM2D(filters=8 ,kernel_size=(3,3) ,activation='tanh' ,data_format='channels_last' ,
                  recurrent_dropout=0.2 ,return_sequences=True),

        MaxPooling3D(pool_size=(1,2,2) ,padding='same' ,data_format='channels_last'),

        TimeDistributed(Dropout(0.2)),

        # layer 3
        ConvLSTM2D(filters=14 ,kernel_size=(3,3) ,activation='tanh' ,data_format='channels_last' ,
                  recurrent_dropout=0.2 ,return_sequences=True),

        MaxPooling3D(pool_size=(1,2,2) ,padding='same' ,data_format='channels_last'),

        TimeDistributed(Dropout(0.2)),

        # layer 4
        ConvLSTM2D(filters=16 ,kernel_size=(3,3) ,activation='tanh' ,data_format='channels_last' ,
                  recurrent_dropout=0.2 ,return_sequences=True),

        MaxPooling3D(pool_size=(1,2,2) ,padding='same' ,data_format='channels_last'),

        # TimeDistributed(Dropout(0.2)),

        Flatten(),

        Dense(len(CLASSES_LIST) ,activation='softmax')

    ])

    return model

In [None]:
convlstm_model = create_convlstm_model()
convlstm_model.summary()

In [None]:
plot_model(convlstm_model ,to_file='convlstm_model_structure_plot.png' ,show_shapes=True ,show_layer_names=True)

In [None]:
early_stoping_callbacks = EarlyStopping(monitor = 'val_loss' ,patience=10 ,mode = 'min' ,restore_best_weights = True)

convlstm_model.compile(loss='categorical_crossentropy' ,optimizer='Adam' ,metrics=['accuracy'])

convlstm_model_training_history = convlstm_model.fit( x=X_train , y=Y_train ,
                                                     epochs=50 , batch_size=4 ,
                                                      shuffle=True , validation_split=0.2 ,
                                                      callbacks=[early_stoping_callbacks])

In [None]:
import pickle
with open('/content/convlstm_model.pickle','wb') as file:
  pickle.dump(convlstm_model,file)

In [None]:
model_evalution_history = convlstm_model.evaluate(X_test ,Y_test)

In [None]:
model_evalution_history

In [None]:
convlstm_model_training_history

In [None]:
convlstm_model.save('convlstm_model.h5')

In [None]:
def plot_metric(model_training_history ,metric_name1 ,metric_name2 ,plot_name):

  metric_value1 = model_training_history.history[metric_name1]
  metric_value2 = model_training_history.history[metric_name2]

  epochs = range(len(metric_value1))

  plt.plot(epochs ,metric_value1 ,'blue' ,label=metric_name1)
  plt.plot(epochs ,metric_value2 ,'red' ,label=metric_name2)

  plt.title(str(plot_name))

  plt.legend()

In [None]:
plot_metric(convlstm_model_training_history ,'loss' ,'val_loss' ,'Total Loss vs Total Validation Loss')

In [None]:
plot_metric(convlstm_model_training_history ,'accuracy' ,'val_accuracy' ,'Total Accuracy vs Total Validation Accuracy')

In [None]:
def create_lrcn_model():

    model = Sequential([

        TimeDistributed(Conv2D(16 ,(3,3) ,padding='same' ,activation='relu'),
                               input_shape=(SEQUENCE_LENGTH ,IMAGE_HEIGHT ,IMAGE_WIDTH ,3)),
        TimeDistributed(MaxPooling2D((4,4))),
        TimeDistributed(Dropout(0.2)),

        TimeDistributed(Conv2D(32 ,(3,3) ,padding='same' ,activation='relu')),
        TimeDistributed(MaxPooling2D((4,4))),
        TimeDistributed(Dropout(0.2)),

        TimeDistributed(Conv2D(64 ,(3,3) ,padding='same' ,activation='relu')),
        TimeDistributed(MaxPooling2D((2,2))),
        TimeDistributed(Dropout(0.2)),

        TimeDistributed(Conv2D(64 ,(3,3) ,padding='same' ,activation='relu')),
        TimeDistributed(MaxPooling2D((2,2))),
        # TimeDistributed(Dropout(0.2)),

        TimeDistributed(Flatten()),

        Bidirectional(LSTM(32 ,return_sequences=True)),
        Bidirectional(LSTM(64 ,return_sequences=True)),
        Bidirectional(LSTM(64 ,return_sequences=False)),
        # LSTM(32),

        Dense(len(CLASSES_LIST) ,activation='softmax')
    ])

    return model

In [None]:
lrcn_model = create_lrcn_model()
lrcn_model.summary()

In [None]:
plot_model(lrcn_model ,to_file='lrcn_model_structure_plot.png' ,show_shapes=True ,show_layer_names=True)

In [None]:
early_stoping_callbacks = EarlyStopping(monitor = 'val_loss' ,patience=15 ,mode = 'min' ,restore_best_weights = True)

lrcn_model.compile(loss='categorical_crossentropy' ,optimizer='Adam' ,metrics=['accuracy'])

lrcn_model_training_history = lrcn_model.fit( x=X_train , y=Y_train ,
                                                     epochs=70 , batch_size=4 ,
                                                      shuffle=True , validation_split=0.2 ,
                                                      callbacks=[early_stoping_callbacks])

In [None]:
model_evalution_history = lrcn_model.evaluate(X_test ,Y_test)

In [None]:
import pickle
with open('/content/lrcn_model.pickle','wb') as file:
  pickle.dump(lrcn_model,file)

In [None]:
lrcn_model.save('lrcn_model.h5')

In [None]:
plot_metric(lrcn_model_training_history ,'loss' ,'val_loss' ,'Total Loss vs Total Validation Loss')

In [None]:
plot_metric(lrcn_model_training_history ,'accuracy' ,'val_accuracy' ,'Total Accuracy vs Total Validation Accuracy')

In [None]:
def download_youtube_videos(youtube_video_url ,output_directory):

    video = pafy.new(youtube_video_url)

    title = video.title

    video_best = video.getbest()

    output_file_path = f'{output_directory}/{title}.mp4'

    video_best.download(filepath = output_file_path ,quiet=True)

    return title

In [None]:
# test_video_dir = 'test_videos'

# os.makedirs(test_video_dir ,exist_ok =True)

# video_title = download_youtube_videos('https://www.youtube.com/watch?v=aAggnpPyR6E',test_video_dir)

# input_video_file_path = f'{test_video_dir}/{video_title}.mp4'

In [14]:
import pickle

with open('models/lrcn_model.pickle' ,'rb') as file:
    lrcn_model = pickle.load(file)

2024-01-23 11:31:23.369808: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-01-23 11:31:23.369901: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-01-23 11:31:23.409103: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-01-23 11:31:23.494413: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [15]:
lrcn_model.summary()

Model: "sequential_8"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 time_distributed_69 (TimeD  (None, 20, 64, 64, 16)    448       
 istributed)                                                     
                                                                 
 time_distributed_70 (TimeD  (None, 20, 16, 16, 16)    0         
 istributed)                                                     
                                                                 
 time_distributed_71 (TimeD  (None, 20, 16, 16, 16)    0         
 istributed)                                                     
                                                                 
 time_distributed_72 (TimeD  (None, 20, 16, 16, 32)    4640      
 istributed)                                                     
                                                                 
 time_distributed_73 (TimeD  (None, 20, 4, 4, 32)     

In [16]:
import cv2
import numpy as np
from collections import deque
SEQUENCE_LENGTH = 20
IMAGE_HEIGHT ,IMAGE_WIDTH = 64 ,64
CLASSES_LIST = ['Biking','PullUps','PushUps','Swing']

In [17]:

def predict_on_single_action(video_path):
        frame_deque = deque(maxlen=SEQUENCE_LENGTH)

        video_reader = cv2.VideoCapture(video_path)

        video_frames_count = int(video_reader.get(cv2.CAP_PROP_FRAME_COUNT))

        skip_frames_window = max(int(video_frames_count/SEQUENCE_LENGTH) ,1)

        for frame_counter in range(SEQUENCE_LENGTH):

            video_reader.set(cv2.CAP_PROP_POS_FRAMES ,frame_counter * skip_frames_window)

            success ,frame = video_reader.read()

            if not success:
                break

            resized_frame = cv2.resize(frame ,(IMAGE_HEIGHT ,IMAGE_WIDTH))

            normalized_frame = resized_frame / 255

            frame_deque.append(normalized_frame)

        video_reader.release()

        predicted_probs = lrcn_model.predict(np.expand_dims(frame_deque ,axis=0))[0]

        predicted_label =  np.argmax(predicted_probs)

        predicted_class_name = CLASSES_LIST[predicted_label]

        print(predicted_class_name ,predicted_probs[predicted_label])

In [18]:
predict_on_single_action('test_video.mp4')

Biking 0.97140527


In [22]:
import imageio
import cv2
import numpy as np
from collections import deque

def predict_on_multiple_actions(video_path, output_path, SEQUENCE_LENGTH):
    video_reader = cv2.VideoCapture(video_path)
    
    video_height = int(video_reader.get(cv2.CAP_PROP_FRAME_HEIGHT))
    video_width = int(video_reader.get(cv2.CAP_PROP_FRAME_WIDTH))
    
    video_writer = imageio.get_writer(output_path + '.mp4', fps=video_reader.get(cv2.CAP_PROP_FPS))
    
    frame_deque = deque(maxlen=SEQUENCE_LENGTH)
    predicted_class_name = ''

    while video_reader.isOpened():
        success, frame = video_reader.read()

        if not success:
            break

        resized_frame = cv2.resize(frame, (IMAGE_HEIGHT, IMAGE_WIDTH))
        normalized_frame = resized_frame / 255
        frame_deque.append(normalized_frame)

        if len(frame_deque) == SEQUENCE_LENGTH:
            predicted_probs = lrcn_model.predict(np.expand_dims(frame_deque, axis=0))[0]
            predicted_label = np.argmax(predicted_probs)
            predicted_class_name = CLASSES_LIST[predicted_label]

        cv2.putText(frame, predicted_class_name, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)

        video_writer.append_data(frame)

    video_reader.release()
    video_writer.close()

predict_on_multiple_actions('test_video.mp4', 'output_video', SEQUENCE_LENGTH)





In [27]:
def predict_single_action(video_path ,SEQUENCE_LENGTH):
    
    video_reader = cv2.VideoCapture(video_path)
    
    video_height = int(video_reader.get(cv2.CAP_PROP_FRAME_HEIGHT))
    video_width = int(video_reader.get(cv2.CAP_PROP_FRAME_WIDTH))
    
    frames_list = []
    
    predicted_class_name = ''
    
    video_frames_count = int(video_reader.get(cv2.CAP_PROP_FRAME_COUNT))

    skip_frames_window = max(int(video_frames_count/SEQUENCE_LENGTH) ,1)

    for frame_counter in range(SEQUENCE_LENGTH):

        video_reader.set(cv2.CAP_PROP_POS_FRAMES ,frame_counter * skip_frames_window)

        success ,frame = video_reader.read()

        if not success:
            break

        resized_frame = cv2.resize(frame ,(IMAGE_HEIGHT ,IMAGE_WIDTH))

        normalized_frame = resized_frame / 255

        frames_list.append(normalized_frame)

    predicted_probs = lrcn_model.predict(np.expand_dims(frame_deque ,axis=0))[0]

    predicted_label =  np.argmax(predicted_probs)

    predicted_class_name = CLASSES_LIST[predicted_label]
    
    print(predicted_class_name ,predicted_probs[predicted_label])
    
    video_reader.release()


In [31]:
predict_single_action('test_video.mp4' ,SEQUENCE_LENGTH)

PullUps 0.99910563


In [2]:
import cv2

def cut_video(input_path, output_path, start_time, end_time):
    """
    Cut (trim) a video based on specified start and end times.

    Parameters:
        - input_path (str): Path to the input video file.
        - output_path (str): Path to the output (trimmed) video file.
        - start_time (float): Start time in seconds.
        - end_time (float): End time in seconds.
    """

    # Open the video file
    video_capture = cv2.VideoCapture(input_path)

    # Get the frames per second (fps) and total number of frames
    fps = int(video_capture.get(cv2.CAP_PROP_FPS))
    total_frames = int(video_capture.get(cv2.CAP_PROP_FRAME_COUNT))

    # Calculate start and end frame numbers based on start_time and end_time
    start_frame = int(start_time * fps)
    end_frame = int(end_time * fps)

    # Ensure the end_frame is within the total_frames
    end_frame = min(end_frame, total_frames - 1)

    # Set the starting frame position
    video_capture.set(cv2.CAP_PROP_POS_FRAMES, start_frame)

    # Create VideoWriter object to write the trimmed video
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # Adjust the codec as needed
    video_writer = cv2.VideoWriter(output_path, fourcc, fps, (int(video_capture.get(3)), int(video_capture.get(4))))

    # Read and write frames within the specified range
    for frame_counter in range(start_frame, end_frame + 1):
        success, frame = video_capture.read()
        if not success:
            break
        video_writer.write(frame)

    # Release video capture and writer objects
    video_capture.release()
    video_writer.release()

if __name__ == "__main__":
    input_video_path = 'biking.mp4'
    output_video_path = 'test_video.mp4'
    start_time_seconds = 3  # Specify start time in seconds
    end_time_seconds = 7    # Specify end time in seconds

    cut_video(input_video_path, output_video_path, start_time_seconds, end_time_seconds)
