In [None]:
!pip install pafy youtube-dl==2020.12.2 moviepy

In [None]:
!pip3 install imageio==2.4.1

In [None]:
import os
import cv2
import pafy
import math
import random
import numpy as np
import datetime as dt
import tensorflow as tf
from collections import deque
import matplotlib.pyplot as plt
from moviepy.editor import *
from sklearn.model_selection import train_test_split
from tensorflow.keras.layers import *
from tensorflow.keras.models import Sequential
from tensorflow.keras.utils import to_categorical,plot_model
from tensorflow.keras.callbacks import EarlyStopping

In [None]:
seed_constant=27
np.random.seed(seed_constant)
random.seed(seed_constant)
tf.random.set_seed(seed_constant)

plt.figure(figsize=(20,20))
all_classes_names=os.listdir('/content/drive/MyDrive/data/UCF50')

random_range=random.sample(range(len(all_classes_names)),20)
print(all_classes_names)
random_range


In [None]:
for counter, random_index in enumerate(random_range,1):
    selected_class_name=all_classes_names[random_index]
    print(selected_class_name)
    video_files_names_list=os.listdir(f'/content/drive/MyDrive/data/UCF50/{selected_class_name}')
    print(f'/content/drive/MyDrive/data/UCF50/{selected_class_name}')
    print(video_files_names_list)
    selected_video_file_name=random.choice(video_files_names_list)
    video_reader=cv2.VideoCapture(f'UCF50/{selected_class_name}/{selected_video_file_name}')
    _,bgr_frame=video_reader.read()
    video_reader.release()
    rgb_frame=cv2.cvtColor(bgr_frame,cv2.COLOR_BGR2RGB)
    cv2.putText(rgb_frame,selected_class_name,(10,30),cv2.FONT_HERSHEY_SIMPLEX,1,(255,255,255),2)
    plt.subplot(5,4,counter)
    plt.imshow(rgb_frame)
    cv2.imshow('window',rgb_frame)
    if cv2.waitKey(100) & 0xFF ==ord('q') :
        cv2.destroyAllWindows()
        break
    plt.axis('off')

image_height,image_width=64,64
sequence_length=20
dataset_dir='UCF50'
classes_list=['BaseballPitch', 'Basketball', 'BenchPress', 'Biking', 'Billiards', 'BreastStroke', 'CleanAndJerk', 'Diving', 'Drumming', 'Fencing', 'GolfSwing', 'HighJump', 'HorseRace', 'HorseRiding', 'HulaHoop', 'JavelinThrow', 'JugglingBalls', 'JumpingJack', 'JumpRope', 'Kayaking', 'Lunges', 'MilitaryParade', 'Mixing', 'Nunchucks', 'PizzaTossing', 'PlayingGuitar', 'PlayingPiano', 'PlayingTabla', 'PlayingViolin', 'PoleVault', 'PommelHorse', 'PullUps', 'Punch', 'PushUps', 'RockClimbingIndoor', 'RopeClimbing', 'Rowing', 'SalsaSpin', 'SkateBoarding', 'Skiing', 'Skijet', 'SoccerJuggling', 'Swing', 'TaiChi', 'TennisSwing', 'ThrowDiscus', 'TrampolineJumping', 'VolleyballSpiking', 'WalkingWithDog', 'YoYo']


In [None]:
def frames_extraction(video_path):
    frames_list=[]
    video_reader=cv2.VideoCapture(video_path)
    video_frames_count=int(video_reader.get(cv2.CAP_PROP_FRAME_COUNT))
    skip_frames_window=max(int(video_frames_count/sequence_length),1)
    for frame_counter in range(sequence_length):
        video_reader.set(cv2.CAP_PROP_POS_FRAMES,frame_counter*skip_frames_window)
        success,frame=video_reader.read()
        if not success:
            break
        resized_frame=cv2.resize(frame,(image_height,image_width))
        normalized_frame=resized_frame/255
        frames_list.append(normalized_frame)
    video_reader.release()
    return frames_list


In [None]:
def create_dataset():
    features=[]
    labels=[]
    video_files_paths=[]
    for class_index,class_name in enumerate(classes_list):
        print(f'extracting data of class {class_name}')
        files_list=os.listdir(os.path.join(dataset_dir,class_name))
        for file_name in files_list:
            video_file_path=os.path.join(dataset_dir,class_name,file_name)
            frames=frames_extraction(video_file_path)
            if len(frames)==sequence_length:
                features.append(frames)
                labels.append(class_index)
                video_files_paths.append(video_file_path)
    features=np.asarray(features)
    labels=np.asarray(labels)

    return features,labels,video_files_paths


In [None]:
features,labels,video_files_paths=create_dataset()
one_hot_encoded_labels=to_categorical(labels)

xtrain,xtest,ytrain,ytest=train_test_split(features,one_hot_encoded_labels,test_size=0.25,shuffel=True,random_state=seed_constant)


In [None]:
def create_model():
    model=Sequential()
    model.add(ConvLSTM2D(filters=4,kernel_size=(3,3),activation='tanh',
                         data_format="channels_last",
                         recurrent_dropout=0.2,return_sequences=True,
                         input_shape=(sequence_length,image_height,image_width,3)))

    model.add(MaxPooling3D(pool_size=(1,2,2),padding='same',data_format='channels_last'))
    model.add(TimeDistributed(Dropout(0.2)))

    model.add(ConvLSTM2D(filters=8,kernel_size=(3,3),activation='tanh',
                         data_format="channels_last",
                         recurrent_dropout=0.2,return_sequences=True))

    model.add(MaxPooling3D(pool_size=(1,2,2),padding='same',data_format='channels_last'))
    model.add(TimeDistributed(Dropout(0.2)))

    model.add(ConvLSTM2D(filters=14,kernel_size=(3,3),activation='tanh',
                         data_format="channels_last",
                         recurrent_dropout=0.2,return_sequences=True))

    model.add(MaxPooling3D(pool_size=(1,2,2),padding='same',data_format='channels_last'))
    model.add(TimeDistributed(Dropout(0.2)))

    model.add(ConvLSTM2D(filters=16,kernel_size=(3,3),activation='tanh',
                         data_format="channels_last",
                         recurrent_dropout=0.2,return_sequences=True))

    model.add(MaxPooling3D(pool_size=(1,2,2),padding='same',data_format='channels_last'))
    #model.add(TimeDistributed(Dropout(0.2)))
    
    model.add(Flatten())
    model.add(Dense(len(classes_list),activation='softmax'))

    model.summary()
    return model


In [None]:
model=create_model()
print('model created')
print(model.summary())


In [None]:
plot_model(model,to_file='model.png',show_shapes=True,show_layer_names=True)

early_stopping_callback=EarlyStopping(monitor='val_loss',patience=10,mode='min',restore_best_weights=True)
model.compile(loss='categorical_crossentropy',optimizer='Adam',metrics=['accuracy'])
history=model.fit(x=xtrain,y=ytrain,epochs=50,batch_size=4,shuffle=True,validation_split=0.2,callbacks=[early_stopping_callback])


In [None]:

def plot_loss_curves(history):
  loss = history['loss']
  val_loss = history['val_loss']
  accuracy = history['accuracy']
  val_accuracy = history['val_accuracy']
  epochs = range(len(history['loss']))
  plt.plot(epochs, loss, label='training_loss')
  plt.plot(epochs, val_loss, label='val_loss')
  plt.title('Loss')
  plt.xlabel('Epochs')
  plt.legend()
  plt.figure()
  plt.plot(epochs, accuracy, label='training_accuracy')
  plt.plot(epochs, val_accuracy, label='val_accuracy')
  plt.title('Accuracy')
  plt.xlabel('Epochs')
  plt.legend()


In [None]:
plot_loss_curves(history.history)

evalutaion_history=model.evaluate(xtest,ytest)

loss,acc=evaluation_history


date_format='%Y_%m_%d__%H_%M_%S'
curr_date=dt.datetime.now()
curr_date_str=dt.datetime.strftime(curr_date,date_format)
model.save(f'activity_model{curr_date_str}_loss{loss}_accuracy{acc}.h5')




In [None]:
def create_lstm_cnn_model():
    model=Sequential()
    model.add(TimeDistributed(Conv2D(16,(3,3),padding='same',activation='relu'),
                              input_shape=(sequence_length,image_height,image_width,3)))
    model.add(TimeDistributed(MaxPooling2D((4,4))))
    model.add(TimeDistributed(Dropout(0.25)))
    
    model.add(TimeDistributed(Conv2D(32,(3,3),padding='same',activation='relu')))
    model.add(TimeDistributed(MaxPooling2D((4,4))))
    model.add(TimeDistributed(Dropout(0.25)))

    model.add(TimeDistributed(Conv2D(64,(3,3),padding='same',activation='relu')))
    model.add(TimeDistributed(MaxPooling2D((4,4))))
    model.add(TimeDistributed(Dropout(0.25)))

    model.add(TimeDistributed(Conv2D(64,(3,3),padding='same',activation='relu')))
    model.add(TimeDistributed(MaxPooling2D((4,4))))
    #model.add(TimeDistributed(Dropout(0.25)))

    model.add(TimeDistributed(Flatten()))
    model.add(lstm(32))
    model.add(Dense(len(classes_list),activation='softmax'))

    model.summary()

    return model

model=create_lstm_cnn_model()
print('model created')
early_stopping_callback=EarlyStopping(monitor='val_loss',patience=15,mode='min',restore_best_weights=True)
model.compile(loss='categorical_crossentropy',optimizer='Adam',metrics=['accuracy'])
history=model.fit(x=xtrain,y=ytrain,epochs=70,batch_size=4,shuffle=True,validation_split=0.2,callbacks=[early_stopping_callback])
plot_loss_curves(history.history)

evalutaion_history=model.evaluate(xtest,ytest)

loss,acc=evaluation_history

date_format='%Y_%m_%d__%H_%M_%S'
curr_date=dt.datetime.now()
curr_date_str=dt.datetime.strftime(curr_date,date_format)
model.save(f'activity_model2_lstm_and_cnn{curr_date_str}_loss{loss}_accuracy{acc}.h5')


In [None]:
def download_videos(url,out_dir):
    video=pafy.new(url)
    title=video.title
    video_best=video.getbest()
    output_path=f'{out_dir}/{title}.mp4'
    video_best.download(filepath=output_path,quiet=True)
    return title


In [None]:
test_video_dir='test_videos'
os.makedirs(test_video_dir,exist_ok=True)
video=pafy.new('https://www.youtube.com/watch?v=x2AWg_Z5WkA')
video_title=download_videos('https://www.youtube.com/watch?v=x2AWg_Z5WkA',
                            test_video_dir)
input_video_file_path=f'{test_video_dir}/{video_title}.mp4'


In [None]:
def predict_on_video(path,out_path,sequence_length):
    
    video_reader=cv2.VideoCapture(path)
    og_w=int(video_reader.get(cv2.CAP_PROP_FRAME_WIDTH))
    og_h=int(video_reader.get(cv2.CAP_PROP_FRAME_HEIGHT))

    video_writer=cv2.VideoWriter(out_path,cv2.VideoWriter_fourcc('M','P','4','V'),
                                 video_reader.get(cv2.CAP_PROP_FPS),
                                 (og_w,og_h))
    frames_q=deque(maxlen=SEQUENCE_LENGTH)
    predicted_class=''
    while video_reader.isOpened():
        ok,frame=video_reader.read()
        if not ok:
            break
        resized_frame=cv2.resize(frame,(image_height,image_width))
        normalized_frame=resized_frame/255
        frames_q.append(normalized_frame)
        if len(frames_q)==SEQUENCE_LENGTH:
            predicted_labels=model.predict(np.expand_dims(frames,axis=0))[0]
            predicted_label=np.argmax(predicted_labels)
            predicted_class=classes_list[predicted_label]

        cv2.putText(frame,predicted_class,(10,30),cv2.FONT_HERSHEY_SIMPLEX,1,(0,255,0),2)
        video_writer.write(frame)

    video_reader.release()
    video_writer.release()
