In [1]:
import os
import cv2
import numpy as np
import random
import tensorflow as tf
from collections import deque

import cvzone
from cvzone.SelfiSegmentationModule import SelfiSegmentation

from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import *
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.utils import plot_model

from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split

In [2]:
dataset_path = "C:/Users/batuh/Downloads/Compressed/Dataset_3/"
actions = os.listdir("C:/Users/batuh/Downloads/Compressed/Dataset_3/")

In [3]:
frame_count = 20

frame_width = 64
frame_height = 64

seed_constant = 86
np.random.seed(seed_constant)
random.seed(seed_constant)
tf.random.set_seed(seed_constant)

segmentor = SelfiSegmentation()

In [4]:
def take_video_names(dataset_path):

  actions = os.listdir(dataset_path)
  video_paths = []
  video_labels = []

  for single_action in actions:

    videos = os.listdir(dataset_path + "/" + single_action)

    for single_video in videos:

      single_video_path = dataset_path + "/" + single_action + "/" + single_video

      video_paths.append(single_video_path)
      video_labels.append(single_action)

  print(f"There is {len(actions)} action in the dataset.")
  print(f"There is {len(video_paths)} video in the dataset\n")

  return np.array(video_paths), np.array(video_labels)

In [5]:
def video_preprocess(frame):

  resized_frame = cv2.resize(frame, (frame_height, frame_width))

  normalized_frame = resized_frame / 255

  return normalized_frame

In [6]:
def load_video (video_path):

  frames_list = []

  cap = cv2.VideoCapture(video_path)

  ok, frame = cap.read()

  video_frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

  skip_ratio = max(int(video_frame_count/frame_count), 1)

  for i in range(frame_count):

    cap.set(cv2.CAP_PROP_POS_FRAMES, i * skip_ratio)

    oke, frame = cap.read()

    if not oke:
      break

    # frame = segmentor.removeBG(frame,(0,0,0), threshold=0.1)

    frame = video_preprocess(frame)

    frames_list.append(frame)

  if len(frames_list) != frame_count:
    print(f"There is not enough number of frames in this video {video_path}")

  cap.release()

  return frames_list

In [7]:
feature_s = []

count = 0

videos, labels = take_video_names(dataset_path)

for one_video in videos:

  count = count + 1

  feature_s.append(load_video(one_video))

  print (f"{count/len(videos)*100} percent complete ")

feature_s = np.array(feature_s)

There is 4 action in the dataset.
There is 1944 video in the dataset

0.051440329218107 percent complete 
0.102880658436214 percent complete 
0.15432098765432098 percent complete 
0.205761316872428 percent complete 
0.257201646090535 percent complete 
0.30864197530864196 percent complete 
0.360082304526749 percent complete 
0.411522633744856 percent complete 
0.4629629629629629 percent complete 
0.51440329218107 percent complete 
0.565843621399177 percent complete 
0.6172839506172839 percent complete 
0.668724279835391 percent complete 
0.720164609053498 percent complete 
0.7716049382716049 percent complete 
0.823045267489712 percent complete 
0.8744855967078189 percent complete 
0.9259259259259258 percent complete 
0.977366255144033 percent complete 
1.02880658436214 percent complete 
1.0802469135802468 percent complete 
1.131687242798354 percent complete 
1.183127572016461 percent complete 
1.2345679012345678 percent complete 
1.286008230452675 percent complete 
1.337448559670782 per

In [None]:
print(len(feature_s))
print(type(feature_s))

In [None]:
le = LabelEncoder()
labels = le.fit_transform(labels)

one_hot_encoded_labels = to_categorical(labels)

In [None]:
features_train, features_test, labels_train, labels_test = train_test_split(feature_s, 
                                                                            one_hot_encoded_labels, 
                                                                            test_size = 0.25, 
                                                                            shuffle = True, 
                                                                            random_state = seed_constant)

In [None]:
del feature_s
del labels
del count

In [None]:
def create_LRCN_model():

    model = Sequential()
    
    model.add(TimeDistributed(Conv2D(16, (3, 3), padding='same',activation = 'relu'),
                              input_shape = (frame_count, frame_height, frame_width, 3)))
    
    model.add(TimeDistributed(MaxPooling2D((4, 4)))) 
    model.add(TimeDistributed(Dropout(0.25)))
    
    model.add(TimeDistributed(Conv2D(32, (3, 3), padding='same',activation = 'relu')))
    model.add(TimeDistributed(MaxPooling2D((4, 4))))
    model.add(TimeDistributed(Dropout(0.25)))
    
    model.add(TimeDistributed(Conv2D(64, (3, 3), padding='same',activation = 'relu')))
    model.add(TimeDistributed(MaxPooling2D((2, 2))))
    model.add(TimeDistributed(Dropout(0.25)))
    
    model.add(TimeDistributed(Conv2D(64, (3, 3), padding='same',activation = 'relu')))
    model.add(TimeDistributed(MaxPooling2D((2, 2))))
                                      
    model.add(TimeDistributed(Flatten()))
                                      
    model.add(LSTM(32))
                                      
    model.add(Dense(len(actions), activation = 'softmax'))


    model.summary()
    
    return model

In [None]:
LRCN_model = create_LRCN_model()

print("Model Created Successfully!")

In [None]:
early_stopping_callback = EarlyStopping(monitor = 'val_loss', patience = 60, mode = 'min', restore_best_weights = True)

LRCN_model.compile(loss = 'categorical_crossentropy', optimizer = 'Adam', metrics = ["accuracy"])

LRCN_model_training_history = LRCN_model.fit(x = features_train, y = labels_train, epochs = 70, batch_size = 16 ,
                                             shuffle = True, validation_split = 0.2, 
                                             callbacks = [early_stopping_callback])

In [None]:
model_evaluation_history = LRCN_model.evaluate(features_test, labels_test)

In [None]:
del LRCN_model

In [None]:
def predict_on_camera():

    cam = cv2.VideoCapture(0)
    
    cam_frame_list = deque(maxlen = frame_count)

    while True:

        ok, cam_frame = cam.read()
    
        if ok == True:

            # img_out = segmentor.removeBG(cam_frame,(0,0,0), threshold=0.1)
        
            normalized_cam_frame = video_preprocess(img_out)

            cam_frame_list.append(normalized_cam_frame)

        if len(cam_frame_list) == 20:

            predicted_labels_probabilities = LRCN_model.predict(np.expand_dims(cam_frame_list, axis = 0))[0]
            
            for i in range(len(actions)):
                
                if predicted_labels_probabilities[i] > 0.95 and predicted_labels_probabilities[i] < 1.0:
                    
                    predicted_label = np.argmax(predicted_labels_probabilities)

                    predicted_class_name = actions[predicted_label]

                    cv2.putText(cam_frame, predicted_class_name, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
            
        cv2.imshow("Frame", cam_frame)
        
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
            
    cam.release()

In [None]:
while True:
    
    predict_on_camera()