In [5]:
import os
import pandas as pd
import numpy as np
from skvideo.io import ffprobe, vread,vwrite,FFmpegWriter,FFmpegReader
import imgaug.augmenters as iaa
import re
import mediapipe as mp
from sklearn.preprocessing import LabelEncoder
import joblib
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import LSTM, Dense, Dropout, Conv1D,MaxPool1D,GlobalMaxPool1D,TimeDistributed, BatchNormalization
import datetime as dt
from tensorflow.keras.callbacks import ModelCheckpoint,EarlyStopping,TensorBoard

# Data Preprocessing

### Resizing The Videos (The Same Quality For All Data Videos): 256x256

In [None]:
PATH = '---'
SAVE_PATH = '---'

dataset_folder = os.listdir(PATH)

print("--- Starting Resizing All Videos ---\n")
for word_folder in dataset_folder:
    for vid in os.listdir(os.path.join(PATH, word_folder)):
        print(vid)
        if not vid.endswith('.mp4'): ## if os.path.isdir(vid)
            for v in os.listdir(os.path.join(PATH, word_folder, vid)):
                video = vread(os.path.join(PATH, word_folder, vid, v))
                resized_aug = iaa.Resize({"height": 256, "width": 256})
                resized_vid = resized_aug.augment_images(video)
                curr_vid_dir = os.path.join(SAVE_PATH, word_folder, vid, v)
                curr_dir = os.path.join(SAVE_PATH, word_folder, vid)
                isExist = os.path.isdir(curr_dir)
                if not isExist:
                    os.makedirs(curr_dir)
                vwrite(curr_vid_dir, resized_vid)

                print(curr_vid_dir, "is resized successfully")
        else:
            video = vread(os.path.join(PATH, word_folder, vid))
            resized_aug = iaa.Resize({"height": 256, "width": 256})
            resized_vid = resized_aug.augment_images(video)
            curr_vid_dir = os.path.join(SAVE_PATH, word_folder, vid)
            curr_dir = os.path.join(SAVE_PATH, word_folder)
            isExist = os.path.isdir(curr_dir)
            if not isExist:
                os.mkdir(curr_dir)
            vwrite(curr_vid_dir, resized_vid)

            print(curr_vid_dir, "is resized successfully")

print("*** Resizing Finished ***")


### Preprocessing Number of Frames To Be 30 Frames For All Data

In [2]:
def isEven(number):
    return number%2==0

def fixVideo(frames,video_name,startFrames=0,endFrames=0,middleFrames=0):
    folder_name=video_name.split('\\')[0]
    file_name=video_name.split('\\')[1].split('.')[0]+"_out.mp4"
    reader=FFmpegReader(video_name)
    writer=FFmpegWriter(os.path.join(folder_name,file_name))
    counter=0
    reachMiddle=False
    for frame in reader.nextFrame():
        if startFrames!=0:
            for i in range(2):
                writer.writeFrame(frame)
            startFrames-=1
        elif middleFrames!=0 and reachMiddle:
            for i in range(2):
                writer.writeFrame(frame)
            middleFrames-=1
        elif endFrames!=0 and frames-counter==endFrames:
            for i in range(2):
                writer.writeFrame(frame)
            endFrames-=1
        else:
            writer.writeFrame(frame)
        counter+=1
        if isEven(frames):
            if frames/counter==2:
                reachMiddle=True
        if not isEven(frames):
            if frames/(counter-0.5)==2:
                reachMiddle=True
    writer.close()

In [None]:
cd ---

Getting Videos' Metadata: To Get Number of Frames of Each Video

In [None]:
data={"Name":[],"Frames":[]}

for dir in os.listdir():
    for file in os.listdir(os.path.join(os.curdir,dir)):
        if file.endswith(".mp4"):
            metadata = ffprobe(os.path.join(os.curdir,dir,file))
            data["Name"].append(os.path.join(dir,file))
            data["Frames"].append(metadata['video']['@nb_frames'])

df = pd.DataFrame(data)
df["Frames"]=df["Frames"].astype(np.int32)
df.head()

In [None]:
df.to_csv("data.csv",encoding="utf-8-sig")

In [None]:
df.describe()

In [None]:
df = df.groupby('Label').count().sort_values("Video")

df = df.reset_index()

Fixing Each Case Separately

1. Increasing # Frames

In [None]:
variable = 26 ## Change The Number For Each Case

for video in df[df["Frames"] == variable]["Name"]:
    fixVideo(variable,video,endFrames=1,startFrames=1,middleFrames=2)
    os.remove(video) # Remove The Old Video As The Function Will Produce A New One

2. Reducing # Frames

In [None]:
df[df["Frames"] > 60]

In [None]:
videos = df.loc[[3091,3112],"Name"]

3. Removing The Outliers

In [None]:
df.drop(index=[1434,5568,5569],inplace=True)

for video in df.loc[[386],"Name"]:
    os.remove(video)

### Applying Augmentation: To Increase The Number of Videos With Variation For Helping The Model

In [None]:
augs = [iaa.Rotate(5), iaa.Rotate(10), iaa.Rotate(15), iaa.Rotate(-5), iaa.Rotate(-10), iaa.Rotate(-15),
        iaa.ShearX(5), iaa.ShearX(10), iaa.ShearX(-5), iaa.ShearX(-10),
        iaa.ScaleY(1.1), iaa.ScaleY(0.9),
        iaa.TranslateX(px=5), iaa.TranslateY(px=5),
        iaa.Sequential([iaa.TranslateY(px=5),iaa.TranslateX(px=5)])]

aug = iaa.Fliplr(1)

for video in df["Video"]:
    video_file = vread(video)
    output = aug.augment_images(video_file)
    vwrite(f'{video.split(".")[0]}_filp.mp4',output)

# Extracting The Features Using MediaPipe Framework

Extracting All Landmarks of The Right & Left Hand & Only 4 Landmarks From The Pose (The Right Wrist & Elbow and The Left Wrist & Elbow)
21 + 21 + 4 = 46 key points

In [6]:
def extract_keypoints(results):
    la = np.array([[res.x, res.y, res.z] if res.visibility > 0.2 else [0,0,0] for res in np.array(results.pose_landmarks.landmark)[[13,15]]]) if results.pose_landmarks else np.zeros((2,3))
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]) if results.left_hand_landmarks else np.zeros((21,3))
    ra = np.array([[res.x, res.y, res.z] if res.visibility > 0.2 else [0,0,0] for res in np.array(results.pose_landmarks.landmark)[[14,16]]]) if results.pose_landmarks else np.zeros((2,3))
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]) if results.right_hand_landmarks else np.zeros((21,3))
    return np.concatenate([la ,lh ,ra , rh])

In [None]:
data={"Name":[],"Label":[]}

for dir in os.listdir():
    if os.path.isdir(os.path.join(os.curdir,dir)):
        for video in os.listdir(dir):
            if video.endswith(".mp4"):
                data["Name"].append(os.path.join(dir,video))
                data["Label"].append(dir)

df=pd.DataFrame(data)
df.head()

In [None]:
mp_drawing = mp.solutions.drawing_utils
mp_holistic = mp.solutions.holistic

with mp_holistic.Holistic(min_detection_confidence=0.001, min_tracking_confidence=0.001) as holistic:
    for video in df["Name"]:
        if not os.path.isfile(video.split(".")[0]+".npy"):
            reader = FFmpegReader(video)
            results_arr = []

            for frame in reader.nextFrame():
                results = holistic.process(frame)
                results_arr.append(extract_keypoints(results))

            temp_arr = np.array(results_arr)
            np.save(video.split(".")[0],temp_arr)

In [None]:
X = np.load(df["Name"][0]).reshape((1,30,46,3))
labels = [df["Label"][0]]

for data,label in list(zip(df["Name"][1:],df["Label"][1:])):
    temp=np.load(data)
    if np.all(temp==0):
        continue
    temp=temp.reshape((1,30,46,3))
    X=np.concatenate([X,temp],axis=0)
    labels.append(label)

y = np.array(labels)

Encoding The Labels/Classes (Words)

In [None]:
encoder = LabelEncoder()
y = encoder.fit_transform(y)

Saving The Features & Labels

In [None]:
np.save("Features",X)
np.save("Target",y)
joblib.dump(encoder,"encoder.pkl")

# Building The Model For Predicting The Signs

Splitting The Dataset To Training-set, Testing-set, and Validation-set

In [None]:
X_train, X_test, y_train, y_test   = train_test_split(X, y, test_size=0.1, random_state=42, stratify=y)
X_train, X_valid, y_train, y_valid = train_test_split(X_train, y_train, test_size=0.2, random_state=42, stratify=y_train)

Building The Model

In [None]:
model=  Sequential(name="CNNLSTM")
model.add(TimeDistributed(Conv1D(64, kernel_size=3, padding="same", activation="relu"), input_shape=X_train.shape[1:]))
model.add(TimeDistributed(MaxPool1D()))
model.add(TimeDistributed(Conv1D(96, kernel_size=3, padding="same", activation="relu")))
model.add(TimeDistributed(MaxPool1D()))
model.add(TimeDistributed(Conv1D(128, kernel_size=3, padding="same", activation="relu")))
model.add(TimeDistributed(GlobalMaxPool1D()))
model.add(LSTM(90, dropout=0.4, return_sequences=True))
model.add(LSTM(45, dropout=0.4))
model.add(Dense(100, activation="relu"))
model.add(BatchNormalization())
model.add(Dropout(0.4))
model.add(Dense(50, activation="relu"))
model.add(BatchNormalization())
model.add(Dropout(0.4))
model.add(Dense(np.unique(y).shape[0],activation="softmax"))

model.compile(optimizer="nadam", loss="sparse_categorical_crossentropy", metrics=['accuracy'])

Model Training

In [None]:
def logPath():
    return os.path.join(os.curdir,'logs',dt.datetime.now().strftime("run_%Y_%m_%d_%H_%M_%S"))

In [None]:
model.fit(X_train, y_train, epochs=1000,
          callbacks=[ModelCheckpoint("sadma2.h5",monitor="val_accuracy",save_best_only=True),
                     EarlyStopping(monitor="val_accuracy",patience=50,restore_best_weights=True),
                     TensorBoard(log_dir=logPath())],
          validation_data=(X_valid,y_valid))

Loading The Model, Features, and Labels To Evaluate Its Accuracy

In [None]:
X = np.load("Features.npy")
y = np.load("Target.npy")
encoder = joblib.load("encoder.pkl")
model = load_model("SignLanguageModel.h5")

Evaluating The Model

In [None]:
model.evaluate(X_train,y_train)

In [None]:
model.evaluate(X_valid,y_valid)

In [None]:
model.evaluate(X_test,y_test)