## Dependencies

In [26]:
import cv2
import numpy as np
import pandas as pd
import os
import time
import mediapipe as mp
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow.keras.utils import to_categorical  # covert stuff to one-hot encoding
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout, Input
from tensorflow.keras.callbacks import TensorBoard
from sklearn.metrics import multilabel_confusion_matrix, accuracy_score

## Constants


In [5]:
mp_holistic = mp.solutions.holistic # Holistic model
mp_drawing = mp.solutions.drawing_utils # Drawing utilities

# Path for exported data, numpy arrays
DATA_PATH = os.path.join(os.getcwd(), "data")
DATASET_PATH = "dataset/"

# Number of frames for each video
sequence_length = 30


## WLASL Data Parsing


In [4]:
def get_videos_ids(json_list):
    """
    function to check if the video id is available in the dataset
    and return the viedos ids of the current instance

    input: instance json list
    output: list of videos_ids

    """
    videos_list = []
    for ins in json_list:
        video_id = ins["video_id"]
        if os.path.exists(f"{DATASET_PATH}videos/{video_id}.mp4"):
            videos_list.append(video_id)
    return videos_list


def get_json_features(json_list):
    """
    function to check if the video id is available in the dataset
    and return the viedos ids and url or any other featrue of the current instance

    input: instance json list
    output: list of videos_ids

    """
    videos_ids = []
    videos_urls = []
    for ins in json_list:
        video_id = ins["video_id"]
        video_url = ins["url"]
        if os.path.exists(f"{DATASET_PATH}videos/{video_id}.mp4"):
            videos_ids.append(video_id)
            videos_urls.append(video_url)
    return videos_ids, videos_urls

wlas_df = pd.read_json(DATASET_PATH + "WLASL_v0.3.json")
wlas_df["videos_ids"] = wlas_df["instances"].apply(get_videos_ids)

features_df = pd.DataFrame(columns=["gloss", "video_id", "url"])
for row in wlas_df.iterrows():
    ids, urls = get_json_features(row[1][1])
    word = [row[1][0]] * len(ids)
    df = pd.DataFrame(list(zip(word, ids, urls)), columns=features_df.columns)
    # features_df = features_df.append(df, ignore_index=True)
    features_df = pd.concat([features_df, df], ignore_index=True)
    features_df.index.name = "index"


## Mediapipe Methods


In [6]:
def mediapipe_detection(image, model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)  # Colour conversion
    image.flags.writeable = False  # Image is not writeable
    results = model.process(
        image
    )  # Make prediction -> Go read about that for diss ,it basically tries to find the landmarks
    image.flags.writeable = True  # Image is now writeable
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
    return image, results

def draw_landmarks(image, results):
    mp_drawing.draw_landmarks(
        image,
        results.face_landmarks,
        mp_holistic.FACEMESH_CONTOURS,
        mp_drawing.DrawingSpec(color=(0, 0, 255), thickness=1, circle_radius=1),
        mp_drawing.DrawingSpec(color=(0, 255, 0), thickness=1, circle_radius=1),
    )
    mp_drawing.draw_landmarks(
        image,
        results.left_hand_landmarks,
        mp_holistic.HAND_CONNECTIONS,
        mp_drawing.DrawingSpec(color=(0, 0, 255), thickness=2, circle_radius=2),
        mp_drawing.DrawingSpec(color=(0, 255, 0), thickness=2),
    )
    mp_drawing.draw_landmarks(
        image,
        results.right_hand_landmarks,
        mp_holistic.HAND_CONNECTIONS,
        mp_drawing.DrawingSpec(color=(0, 0, 255), thickness=2, circle_radius=2),
        mp_drawing.DrawingSpec(color=(0, 255, 0), thickness=2),
    )
    mp_drawing.draw_landmarks(
        image,
        results.pose_landmarks,
        mp_holistic.POSE_CONNECTIONS,
        mp_drawing.DrawingSpec(color=(0, 0, 255), thickness=1, circle_radius=1),
        mp_drawing.DrawingSpec(color=(0, 255, 0), thickness=2, circle_radius=1),
    )
def extract_keypoints(results):
    pose = (
        np.array(
            [
                [res.x, res.y, res.z, res.visibility]
                for res in results.pose_landmarks.landmark
            ]
        ).flatten()
        if results.pose_landmarks
        else np.zeros(33 * 4)
    )
    face = (
        np.array(
            [[res.x, res.y, res.z] for res in results.face_landmarks.landmark]
        ).flatten()
        if results.face_landmarks
        else np.zeros(468 * 3)
    )
    lh = (
        np.array(
            [[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]
        ).flatten()
        if results.left_hand_landmarks
        else np.zeros(21 * 3)
    )
    rh = (
        np.array(
            [[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]
        ).flatten()
        if results.right_hand_landmarks
        else np.zeros(21 * 3)
    )
    return np.concatenate([pose, face, lh, rh])


## Live Data Capturing


In [39]:
count = 0
limit = 100

for row in features_df.iterrows():

    # Iterate through fixed number of data
    count += 1
    if count > limit:
        break

    # Get the sign name, video id, and path
    name = row[1][0]
    video_id = row[1][1]
    path = DATASET_PATH + "videos/" + video_id + ".mp4"

    cap = cv2.VideoCapture(path)

    # Access mediapipe model
    with mp_holistic.Holistic(
        min_detection_confidence=0.4,
        min_tracking_confidence=0.4,
        model_complexity=1,
    ) as holistic:
        last_frame = None
        for frame_num in range(sequence_length):

            # Read feed
            # ret is return value, frame is the image
            ret, frame = cap.read()

            # Pad remaining frames with last frame
            if ret:
                last_frame = frame
            else:
                frame = last_frame

            # Make detections
            image, results = mediapipe_detection(frame, holistic)
            draw_landmarks(image, results)
            keypoints = extract_keypoints(results)

            # Save keypoints
            npy_path = os.path.join(DATA_PATH, name, video_id, str(frame_num))
            os.makedirs(os.path.join(DATA_PATH, name, video_id), exist_ok=True)
            np.save(npy_path, keypoints)

            # Display current frames, comment out if needed
            cv2.imshow("Keypoints", image)

            # Break using q
            if cv2.waitKey(10) & 0xFF == ord("q"):
                cap.release()
                cv2.destroyAllWindows()
                break
        

print("Data created successfully!")


KeyboardInterrupt: 

: 

## Data Loading

In [52]:
sequences, labels = [], []
label_map = {label: num for num, label in enumerate(os.listdir(DATA_PATH))}
signs = []

for sign in os.listdir(DATA_PATH):
    signs.append(sign)
    for sequence in os.listdir(os.path.join(DATA_PATH, sign)):
        window = []
        for frame_num in range(sequence_length):
            res = np.load(
                os.path.join(DATA_PATH, sign, str(sequence), f"{frame_num}.npy")
            )
            window.append(res)
        sequences.append(window)
        labels.append(label_map[sign])

signs = np.array(signs)


In [59]:
# Test

sequences, labels = [], []
label_map = {label: num for num, label in enumerate(os.listdir(DATA_PATH))}
signs = []

for sign in os.listdir(DATA_PATH):
    signs.append(sign)
    for sequence in os.listdir(os.path.join(DATA_PATH, sign)):
        window = []
        for frame_num in range(sequence_length):
            res = np.load(
                os.path.join(DATA_PATH, sign, str(sequence), f"{frame_num}.npy")
            )
            sequences.append(res)
            labels.append(label_map[sign])

signs = np.array(signs)


In [60]:
X = np.array(sequences)
y = to_categorical(labels).astype(int)  # why do we do onehot encoding here?

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

log_dir = os.path.join("Logs")
tb_callback = TensorBoard(log_dir=log_dir)

In [66]:
X.shape

(2460, 1662)

## Model Training


In [27]:
model = Sequential()  # look at sequential video mentionned at 1:38:00
model.add(
    LSTM(
        64,
        return_sequences=True,
        activation="relu",
        input_shape=(sequence_length, 1662),
    )
)
model.add(Dropout(0.2))
model.add(LSTM(128, return_sequences=True, activation="relu"))
model.add(
    LSTM(64, return_sequences=False, activation="relu")
)  # set to false because we are not returning sequences as next layer is dense
model.add(Dropout(0.4))
model.add(Dense(64, activation="relu", kernel_initializer='he_normal'))
model.add(Dense(32, activation="relu", kernel_initializer='he_normal'))
model.add(Dense(signs.shape[0], activation="softmax"))  # look at argmax logic

model.compile(
    optimizer="adam", loss="categorical_crossentropy", metrics=["categorical_accuracy"]
)

model.summary()


Model: "sequential_5"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 lstm_9 (LSTM)               (None, 30, 64)            442112    
                                                                 
 dropout_12 (Dropout)        (None, 30, 64)            0         
                                                                 
 lstm_10 (LSTM)              (None, 30, 128)           98816     
                                                                 
 lstm_11 (LSTM)              (None, 64)                49408     
                                                                 
 dropout_13 (Dropout)        (None, 64)                0         
                                                                 
 dense_16 (Dense)            (None, 64)                4160      
                                                                 
 dense_17 (Dense)            (None, 32)               

In [67]:
# Test

model = Sequential([
    Input((1662,)),
    Dropout(0.2),
    Dense(20, activation='relu'),
    Dropout(0.4),
    Dense(10, activation='relu'),
    Dense(signs.shape[0], activation='softmax')
])

model.summary()
model.compile(
    optimizer="adam", loss="categorical_crossentropy", metrics=["categorical_accuracy"]
)

Model: "sequential_9"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dropout_20 (Dropout)        (None, 1662)              0         
                                                                 
 dense_28 (Dense)            (None, 20)                33260     
                                                                 
 dropout_21 (Dropout)        (None, 20)                0         
                                                                 
 dense_29 (Dense)            (None, 10)                210       
                                                                 
 dense_30 (Dense)            (None, 6)                 66        
                                                                 
Total params: 33,536
Trainable params: 33,536
Non-trainable params: 0
_________________________________________________________________


In [68]:
model.fit(X_train, y_train, epochs=1000, callbacks=[tb_callback])
model.summary()
model.save("model.h5")

Epoch 1/1000
Epoch 2/1000
Epoch 3/1000
Epoch 4/1000
Epoch 5/1000
Epoch 6/1000
Epoch 7/1000
Epoch 8/1000
Epoch 9/1000
Epoch 10/1000
Epoch 11/1000
Epoch 12/1000
Epoch 13/1000
Epoch 14/1000
Epoch 15/1000
Epoch 16/1000
Epoch 17/1000
Epoch 18/1000
Epoch 19/1000
Epoch 20/1000
Epoch 21/1000
Epoch 22/1000
Epoch 23/1000
Epoch 24/1000
Epoch 25/1000
Epoch 26/1000
Epoch 27/1000
Epoch 28/1000
Epoch 29/1000
Epoch 30/1000
Epoch 31/1000
Epoch 32/1000
Epoch 33/1000
Epoch 34/1000
Epoch 35/1000
Epoch 36/1000
Epoch 37/1000
Epoch 38/1000
Epoch 39/1000
Epoch 40/1000
Epoch 41/1000
Epoch 42/1000
Epoch 43/1000
Epoch 44/1000
Epoch 45/1000
Epoch 46/1000
Epoch 47/1000
Epoch 48/1000
Epoch 49/1000
Epoch 50/1000
Epoch 51/1000
Epoch 52/1000
Epoch 53/1000
Epoch 54/1000
Epoch 55/1000
Epoch 56/1000
Epoch 57/1000
Epoch 58/1000
Epoch 59/1000
Epoch 60/1000
Epoch 61/1000
Epoch 62/1000
Epoch 63/1000
Epoch 64/1000
Epoch 65/1000
Epoch 66/1000
Epoch 67/1000
Epoch 68/1000
Epoch 69/1000
Epoch 70/1000
Epoch 71/1000
Epoch 72/1000
E

In [69]:
yhat = model.predict(X_test)

ytrue = np.argmax(y_test, axis=1).tolist()
yhat = np.argmax(yhat, axis=1).tolist()

print(multilabel_confusion_matrix(ytrue, yhat))

print(accuracy_score(ytrue, yhat))

[[[396   3]
  [ 64  29]]

 [[438   6]
  [ 41   7]]

 [[407   2]
  [ 36  47]]

 [[291 109]
  [ 24  68]]

 [[383  13]
  [ 49  47]]

 [[298 114]
  [ 33  47]]]
0.49796747967479676


## Real Time Visualisation

In [None]:
sequence = []
sentence = []
predictions = []
threshold = 0.6
# Select a webcam
cap = cv2.VideoCapture(0)

# Access mediapipe model
with mp_holistic.Holistic(
    min_detection_confidence=0.5, min_tracking_confidence=0.5
) as holistic:
    while cap.isOpened():
        # Read feed
        # ret is return value, frame is the image
        ret, frame = cap.read()

        # Make detections
        image, results = mediapipe_detection(frame, holistic)
        draw_landmarks(image, results)

        keypoints = extract_keypoints(results)
        sequence.append(keypoints)
        sequence = sequence[-int(sequence_length) :]

        if len(sequence) == int(sequence_length):
            res = model.predict(np.expand_dims(sequence, axis=0))[0]
            print(signs[np.argmax(res)])
            predictions.append(np.argmax(res))

            if np.unique(predictions[-10:])[0] == np.argmax(res):
                if res[np.argmax(res)] > threshold:

                    if len(sentence) > 0:
                        if signs[np.argmax(res)] != sentence[-1]:
                            sentence.append(signs[np.argmax(res)])
                    else:
                        sentence.append(signs[np.argmax(res)])

                if len(sentence) > 5:
                    sentence = sentence[-5:]

        cv2.rectangle(image, (0, 0), (640, 40), (245, 117, 16), -1)
        cv2.putText(
            image,
            " ".join(sentence),
            (3, 30),
            cv2.FONT_HERSHEY_SIMPLEX,
            1,
            (255, 255, 255),
            2,
            cv2.LINE_AA,
        )
        cv2.imshow("Sign Language Recognition", image)

        # Break using q
        if cv2.waitKey(10) & 0xFF == ord("q"):
            break

    # Release webcam and close windows
    cap.release()
    cv2.destroyAllWindows()