In [6]:
from skvideo.io import ffprobe
from skvideo.io import vread,vwrite,FFmpegWriter,FFmpegReader
import pandas as pd
import os
import numpy as np
import joblib

In [1]:
#Fixing Video frames to fit in our model

In [2]:
#function to automate the fixing process
def fixVideo(frames,video_name,startFrames=0,endFrames=0,middleFrames=0):
    folder_name=video_name.split('\\')[0]
    file_name=video_name.split('\\')[1].split('.')[0]+"_out.mp4"
    reader=FFmpegReader(video_name)
    writer=FFmpegWriter(os.path.join(folder_name,file_name))
    counter=0
    reachMiddle=False
    for frame in reader.nextFrame():
        if startFrames!=0:
            for i in range(2):
                writer.writeFrame(frame)
            startFrames-=1
        elif middleFrames!=0 and reachMiddle:
            for i in range(2):
                writer.writeFrame(frame)
            middleFrames-=1
        elif endFrames!=0 and frames-counter==endFrames:
            for i in range(2):
                writer.writeFrame(frame)
            endFrames-=1
        else:
            writer.writeFrame(frame)
        counter+=1
        if isEven(frames):
            if frames/counter==2:
                reachMiddle=True
        if not isEven(frames):
            if frames/(counter-0.5)==2:
                reachMiddle=True
    writer.close()

In [None]:
#read all video metadata to get frame number of every video
data={"Name":[],"Frames":[]}
for dir in os.listdir():
    for file in os.listdir(os.path.join(os.curdir,dir)):
        if file.endswith(".mp4"):
            metadata=ffprobe(os.path.join(os.curdir,dir,file))
            data["Name"].append(os.path.join(dir,file))
            data["Frames"].append(metadata['video']['@nb_frames'])
df=pd.DataFrame(data)
df["Frames"]=df["Frames"].astype(np.int32)
df.head()

after knowing each video frames number we categorized them and started to handle each category

In [None]:
#for big videos we used ffmpeg to remove the duplicated frames
for video in df[df["Frames"] > 30]["Name"]:
    folder_name=video.split('\\')[0]
    file_name=video.split('\\')[1].split('.')[0]+"_out.mp4"
    ret=subprocess.run(["ffmpeg","-i",f"{video}","-vf","mpdecimate,setpts=N/FRAME_RATE/TB",f"{os.path.join(folder_name,file_name)}"])
    if ret.returncode==0:
        os.remove(video)
    else:
        print(f"Error with {video}")

In [None]:
# then we choose every category and starting fixing them by our self
variable=26 # category with 26 frames
for video in df[df["Frames"] == variable]["Name"]:
    # added total of 4 frames
    fixVideo(variable,video,endFrames=1,startFrames=1,middleFrames=2)
    #remove the old video as the function will produce new one
    os.remove(video)

In [None]:
# then we started increasing video numbers by applying video augmentations
import imgaug.augmenters as iaa
augs=[iaa.Rotate(5),iaa.Rotate(10),iaa.Rotate(15),
      iaa.Rotate(-5),iaa.Rotate(-10),iaa.Rotate(-15),
      iaa.ShearX(5),iaa.ShearX(10),iaa.ShearX(-5),
      iaa.ShearX(-10),iaa.ScaleY(1.1),iaa.ScaleY(0.9),
      iaa.TranslateX(px=5),iaa.TranslateY(px=5),
      iaa.Sequential([iaa.TranslateY(px=5),iaa.TranslateX(px=5)])]
for video in df["Video"]:
    video_file=vread(video)
    output=aug.augment_images(video_file)
    vwrite(f'{video.split(".")[0]}_filp.mp4',output)

In [3]:
# then we began our feature extraction process using mediapipe
import mediapipe as mp
mp_drawing = mp.solutions.drawing_utils
mp_holistic = mp.solutions.holistic

In [None]:
# started by writing our feature extraction function which extracts specific points from the pose estimator and the hand estimator
def extract_keypoints(results):
    la = np.array([[res.x, res.y, res.z] if res.visibility > 0.2 else [0,0,0] for res in np.array(results.pose_landmarks.landmark)[[13,15]]]) if results.pose_landmarks else np.zeros((2,3))
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]) if results.left_hand_landmarks else np.zeros((21,3))
    ra = np.array([[res.x, res.y, res.z] if res.visibility > 0.2 else [0,0,0] for res in np.array(results.pose_landmarks.landmark)[[14,16]]]) if results.pose_landmarks else np.zeros((2,3))
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]) if results.right_hand_landmarks else np.zeros((21,3))
    return np.concatenate([la ,lh ,ra , rh])

In [None]:
# then we prepared dataset with our videos
data={"Name":[],"Label":[]}
for dir in os.listdir():
    if os.path.isdir(os.path.join(os.curdir,dir)):
        for video in os.listdir(dir):
            if video.endswith(".mp4"):
                data["Name"].append(os.path.join(dir,video))
                data["Label"].append(dir)
df=pd.DataFrame(data)
df.head()

In [None]:
#started our feature extraction process by reading video and extract its features and then saves them on npy files in the disk
with mp_holistic.Holistic(min_detection_confidence=0.001,min_tracking_confidence=0.001) as holistic:
    for video in tqdm(df["Name"]):
        if not os.path.isfile(video.split(".")[0]+".npy"):
            reader=FFmpegReader(video)
            results_arr=[]
            for frame in reader.nextFrame():
                results=holistic.process(frame)
                results_arr.append(extract_keypoints(results))
            temp_arr=np.array(results_arr)
            np.save(video.split(".")[0],temp_arr)

In [None]:
# then we concatenated
X=np.load(df["Name"][0]).reshape((1,30,46,3))
labels=[df["Label"][0]]
for data,label in tqdm(list(zip(df["Name"][1:],df["Label"][1:]))):
    temp=np.load(data)
    if np.all(temp==0):
        continue
    temp=temp.reshape((1,30,46,3))
    X=np.concatenate([X,temp],axis=0)
    labels.append(label)

y=np.array(labels)

In [None]:
# then encoded the labels
from sklearn.preprocessing import LabelEncoder
encoder=LabelEncoder()
y=encoder.fit_transform(y)

In [None]:
# then saved our features and targets for future use with the encoder
np.save("Features",X)
np.save("Target",y)
joblib.dump(encoder,"encoder.pkl")

In [None]:
# we split the data to train,test and valid datasets
from sklearn.model_selection import train_test_split
X_train,X_test,y_train,y_test=train_test_split(X,y,test_size=0.1,random_state=42,stratify=y)
X_train,X_valid,y_train,y_valid=train_test_split(X_train,y_train,test_size=0.2,random_state=42,stratify=y_train)

In [None]:
import tensorflow as tf
from tensorflow.keras.models import Sequential,load_model
from tensorflow.keras.layers import LSTM, Dense, Input, Dropout , LeakyReLU,Conv1D,MaxPool1D,GlobalMaxPool1D,TimeDistributed,Reshape,BatchNormalization
from tensorflow.keras.callbacks import ModelCheckpoint,EarlyStopping,TensorBoard

In [7]:
from tensorflow.keras.models import load_model

In [None]:
# then started writing our two model
model=Sequential(name="CNNLSTM")
model.add(TimeDistributed(Conv1D(64,kernel_size=3,padding="same",activation="relu"),input_shape=X_train.shape[1:]))
model.add(TimeDistributed(MaxPool1D()))
model.add(TimeDistributed(Conv1D(96,kernel_size=3,padding="same",activation="relu")))
model.add(TimeDistributed(MaxPool1D()))
model.add(TimeDistributed(Conv1D(128,kernel_size=3,padding="same",activation="relu")))
model.add(TimeDistributed(GlobalMaxPool1D()))
model.add(LSTM(90,dropout=0.4,return_sequences=True))
model.add(LSTM(45,dropout=0.4))
model.add(Dense(100,activation="relu"))
model.add(BatchNormalization())
model.add(Dropout(0.4))
model.add(Dense(50,activation="relu"))
model.add(BatchNormalization())
model.add(Dropout(0.4))
model.add(Dense(np.unique(y).shape[0],activation="softmax"))
model.compile(optimizer="nadam",loss="sparse_categorical_crossentropy",metrics=['accuracy'])

In [None]:
model2=Sequential(name="LstmModel")
model2.add(LSTM(256,return_sequences=True,input_shape=X_train_lstm.shape[1:]))
model2.add(LSTM(128,dropout=0.3))
model2.add(Dense(100,activation="relu"))
model2.add(Dropout(0.2))
model2.add(Dense(128,activation="relu"))
model2.add(Dropout(0.2))
model2.add(Dense(np.unique(y).shape[0],activation="softmax"))
model2.compile(optimizer="nadam",loss="sparse_categorical_crossentropy",metrics=['accuracy'])

In [None]:
# for model 1 we didn't need any data preparation because it's responsible to extract features and then map it to the lstm
# but for model 2 we make two data preparation one with pca and the second with flatten the features
from sklearn.decomposition import IncrementalPCA
IPCA=IncrementalPCA(n_components=1)
for video in tqdm(X_train):
    for frame in video:
        IPCA.partial_fit(frame)

def return_pca(data):
    pca=[]
    for video in tqdm(data):
        frames=[]
        for frame in video:
            frames.append(IPCA.transform(frame))
        pca.append(np.array(frames))
    return np.array(pca).reshape((-1,30,46))

X_train_pca=return_pca(X_train)
X_test_pca=return_pca(X_test)
X_valid_pca=return_pca(X_valid)

X_train_lstm=X_train.reshape((-1,30,3*46))
X_test_lstm=X_test.reshape((-1,30,3*46))
X_valid_lstm=X_valid.reshape((-1,30,3*46))

In [4]:
# after training we found out that the cnn model is the most accurate one

In [5]:
cd D:\GP\Final\Model

D:\GP\Final\Model


In [None]:
X=np.load("Features.npy")
y=np.load("Target.npy")
encoder=joblib.load("encoder.pkl")
model=load_model("Final_model.h5")

In [12]:
model.evaluate(X_train,y_train)



[0.002786200726404786, 0.9990403056144714]

In [13]:
model.evaluate(X_valid,y_valid)



[0.0014376712497323751, 1.0]

In [14]:
model.evaluate(X_test,y_test)



[0.026527803391218185, 0.9965457916259766]