In [2]:
import cv2
import numpy as np
import os
from matplotlib import pyplot as plt
import time
import mediapipe as mp
import pandas as pd
import time
import urllib.request
from pytube import YouTube
import ffmpeg
import pyttsx3




In [3]:
mp_holistic = mp.solutions.holistic # Holistic model
mp_drawing = mp.solutions.drawing_utils # Drawing utilities

In [4]:
def mediapipe_detection(image, model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # COLOR CONVERSION BGR 2 RGB
    image.flags.writeable = False                  # Image is no longer writeable
    results = model.process(image)                 # Make prediction
    image.flags.writeable = True                   # Image is now writeable 
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) # COLOR COVERSION RGB 2 BGR
    return image, results

In [5]:
def draw_styled_landmarks(image, results):
    # Draw pose connections
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS,
                             mp_drawing.DrawingSpec(color=(80,22,10), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(80,44,121), thickness=2, circle_radius=2)
                             ) 
    # Draw left hand connections
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                             mp_drawing.DrawingSpec(color=(121,22,76), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(121,44,250), thickness=2, circle_radius=2)
                             ) 
    # Draw right hand connections  
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                             mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2)
                             ) 

In [6]:
#get image ids in dataset
import json
DATASET_PATH = os.path.join('dataset', "WLASL_v0.3.json") 

with open(DATASET_PATH, 'r') as data:
    dt = json.load(data)
    min = 90000000
    vid_locs = {}
    for i in range(3,6):
        instance_ids = []
        instances_json = dt[i]['instances']
        for instance in instances_json:
            instance_ids.append([instance['video_id'], instance['url']])
            vid_locs[dt[i]['gloss']] = instance_ids

In [7]:
vid_locs

{'before': [['05724', 'https://www.youtube.com/watch?v=pDqITge_TGs'],
  ['70348', 'https://www.youtube.com/watch?v=bq-HmgjGzmw'],
  ['68007', 'https://www.youtube.com/watch?v=IYH-gBXXl8I'],
  ['05744', 'https://www.youtube.com/watch?v=MvtVsr3P098'],
  ['05746', 'https://www.youtube.com/watch?v=PeO4kaKvoVM'],
  ['05728',
   'https://signstock.blob.core.windows.net/signschool/videos/db_uploads/SignSchool%20Before%202-02xxXtC3G8c.mp4'],
  ['05747', 'https://www.youtube.com/watch?v=TQMe2WDz9QE'],
  ['05748', 'http://www.aslpro.com/main/b/before.swf'],
  ['05749', 'http://www.aslsearch.com/signs/videos/before3.mp4'],
  ['05750', 'http://www.aslsearch.com/signs/videos/before.mp4'],
  ['05729',
   'https://signstock.blob.core.windows.net/signschool/videos/db_uploads/SignSchool%20Before-kM8O_piLegw.mp4'],
  ['05730',
   'https://signstock.blob.core.windows.net/signschool/videos/db_uploads/SignSchool%20Before-Se78WI3j-mI.mp4'],
  ['65167',
   'https://aslsignbank.haskins.yale.edu/dictionary/pro

In [8]:
def download_video(url, download_path):
    # Check if the URL is for a YouTube video
    if 'youtube.com' in url or 'youtu.be' in url:
        try:
            yt = YouTube(url)
            stream = yt.streams.get_highest_resolution()
            file_name = stream.default_filename
            downloaded_video_path = os.path.join(download_path, file_name)
            stream.download(download_path)
            print(f"YouTube video downloaded: {downloaded_video_path}")
            return downloaded_video_path
        except Exception as e:
            print(f"Error downloading YouTube video: {e}")
            return None

    # Check if the URL points to an MP4 file
    elif url.endswith('.mp4'):
        try:
            file_name = url.split('/')[-1]
            downloaded_video_path = os.path.join(download_path, file_name)
            urllib.request.urlretrieve(url, downloaded_video_path)
            print(f"MP4 video downloaded: {downloaded_video_path}")
            return downloaded_video_path
        except Exception as e:
            print(f"Error downloading MP4 video: {e}")
            return None
            
    elif url.endswith('.swf'):
        try:
            file_name = os.path.basename(url)
            downloaded_video_path = os.path.join(download_path, file_name)
            ffmpeg.input(url).output(downloaded_video_path).run()
            print(f"SWF video converted to MP4: {downloaded_video_path}")
            return downloaded_video_path
        except Exception as e:
            print(f"Error converting SWF to MP4: {e}")
            return None
    else:
        return None


In [5]:
def extract_keypoints(results):
    pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*4)
    # face = np.array([[res.x, res.y, res.z] for res in results.face_landmarks.landmark]).flatten() if results.face_landmarks else np.zeros(468*3)
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)
    # return np.concatenate([pose, face, lh, rh])
    return np.concatenate([pose, lh, rh])

In [142]:
for gloss, ids in vid_locs.items(): 
    try: 
        os.makedirs(os.path.join('images', gloss))
    except:
        pass

In [9]:
no_of_frames = 30

In [144]:
# Set mediapipe model 
for gloss, instances in vid_locs.items():
    for instance in instances:
        VID_PATH = os.path.join('dataset', 'videos', f"{instance[0]}.mp4")
        cap = cv2.VideoCapture(VID_PATH)
        frame_cnt = 0
        print(id)
        if not cap.isOpened():
            url_vid_path = download_video(instance[1], os.path.join("downloaded_images"))
            if(url_vid_path is not None):
                cap = cv2.VideoCapture(url_vid_path)

        length = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        if length < no_of_frames:
            continue
        with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
            while cap.isOpened() and frame_cnt < no_of_frames:
        
                # Read feed
                ret, frame = cap.read()
                if not ret:
                    print("Done")
                    break
        
                # Make detections
                image, results = mediapipe_detection(frame, holistic)
                print(results)
                
                # Draw landmarks
                draw_styled_landmarks(image, results)
        
                # Show to screen
                cv2.imshow('OpenCV Feed', image)

                keypoints = extract_keypoints(results)
                try: 
                    os.makedirs(os.path.join('images', gloss, str(instance[0])))
                except:
                    pass
                npy_path = os.path.join('images', gloss, str(instance[0]), str(frame_cnt))
                np.save(npy_path, keypoints)
                frame_cnt += 1
                # Break gracefully
                if cv2.waitKey(10) & 0xFF == ord('q'):
                    break
cap.release()
cv2.destroyAllWindows()

70173
YouTube video downloaded: downloaded_images\b.mp4
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solutio

In [6]:
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

In [10]:
label_map = {}
# for gloss in os.listdir(os.path.join('images')):
#     label_map[gloss] = len(label_map)
for gloss in os.listdir(os.path.join('custom_images_data')):
    label_map[gloss] = len(label_map)
label_map

{'demon': 0,
 'eleven': 1,
 'hello': 2,
 'i love you': 3,
 'one': 4,
 'stay': 5,
 'stop': 6}

In [14]:
sequences, labels = [], []
# WLASL dataset
# for gloss in os.listdir(os.path.join('images')):
#     for id in os.listdir(os.path.join('images', gloss)):
#         window = []
#         for frame_num in range(no_of_frames):
#             res = np.load(os.path.join('images', gloss, str(id), f'{frame_num}.npy'))
#             window.append(res)
#         if len(window) == no_of_frames:
#             sequences.append(window)
#             labels.append(label_map[gloss])

# custom dataset
for gloss in os.listdir(os.path.join('custom_images_data')):
    for id in os.listdir(os.path.join('custom_images_data', gloss)):
        window = []
        for frame_num in range(no_of_frames):
            print(os.path.join('custom_images_data', gloss, str(id), f'{frame_num}.npy'))
            res = np.load(os.path.join('custom_images_data', gloss, str(id), f'{frame_num}.npy'))
            window.append(res)
        if len(window) == no_of_frames:
            sequences.append(window)
            labels.append(label_map[gloss])
            
# for gloss, instances in vid_locs.items():
#     for instance in instances:
#         window = []
#         for dirpath, _, filenames in os.walk(os.path.join('images', gloss, str(instance[0]))):
#             for filename in filenames:
#                 res = np.load(os.path.join('images', gloss, str(instance[0]), filename))
#                 window.append(res)
#             print(dirpath, len(filenames))
#         if len(window) == no_of_frames:
#             sequences.append(window)
#             labels.append(label_map[gloss])
# for gloss in os.listdir(os.path.join('custom_images_data')):
#     label_map[gloss] = len(label_map)
        # res = np.load(os.path.join(dirpath, filename)

custom_images_data\hello\0\0.npy
custom_images_data\hello\0\1.npy
custom_images_data\hello\0\2.npy
custom_images_data\hello\0\3.npy
custom_images_data\hello\0\4.npy
custom_images_data\hello\0\5.npy
custom_images_data\hello\0\6.npy
custom_images_data\hello\0\7.npy
custom_images_data\hello\0\8.npy
custom_images_data\hello\0\9.npy
custom_images_data\hello\0\10.npy
custom_images_data\hello\0\11.npy
custom_images_data\hello\0\12.npy
custom_images_data\hello\0\13.npy
custom_images_data\hello\0\14.npy
custom_images_data\hello\0\15.npy
custom_images_data\hello\0\16.npy
custom_images_data\hello\0\17.npy
custom_images_data\hello\0\18.npy
custom_images_data\hello\0\19.npy
custom_images_data\hello\0\20.npy
custom_images_data\hello\0\21.npy
custom_images_data\hello\0\22.npy
custom_images_data\hello\0\23.npy
custom_images_data\hello\0\24.npy
custom_images_data\hello\0\25.npy
custom_images_data\hello\0\26.npy
custom_images_data\hello\0\27.npy
custom_images_data\hello\0\28.npy
custom_images_data\hello

In [11]:
label_map

{'demon': 0,
 'eleven': 1,
 'hello': 2,
 'i love you': 3,
 'one': 4,
 'stay': 5,
 'stop': 6}

In [36]:
for s in sequences:
    print(len(s))

30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30
30


In [37]:
np.array(sequences).shape

(120, 30, 258)

In [38]:
np.array(labels).shape

(120,)

In [39]:
X = np.array(sequences)

In [40]:
y = to_categorical(labels).astype(int)

In [41]:
y

array([[1, 0, 0, 0],
       [1, 0, 0, 0],
       [1, 0, 0, 0],
       [1, 0, 0, 0],
       [1, 0, 0, 0],
       [1, 0, 0, 0],
       [1, 0, 0, 0],
       [1, 0, 0, 0],
       [1, 0, 0, 0],
       [1, 0, 0, 0],
       [1, 0, 0, 0],
       [1, 0, 0, 0],
       [1, 0, 0, 0],
       [1, 0, 0, 0],
       [1, 0, 0, 0],
       [1, 0, 0, 0],
       [1, 0, 0, 0],
       [1, 0, 0, 0],
       [1, 0, 0, 0],
       [1, 0, 0, 0],
       [1, 0, 0, 0],
       [1, 0, 0, 0],
       [1, 0, 0, 0],
       [1, 0, 0, 0],
       [1, 0, 0, 0],
       [1, 0, 0, 0],
       [1, 0, 0, 0],
       [1, 0, 0, 0],
       [1, 0, 0, 0],
       [1, 0, 0, 0],
       [0, 1, 0, 0],
       [0, 1, 0, 0],
       [0, 1, 0, 0],
       [0, 1, 0, 0],
       [0, 1, 0, 0],
       [0, 1, 0, 0],
       [0, 1, 0, 0],
       [0, 1, 0, 0],
       [0, 1, 0, 0],
       [0, 1, 0, 0],
       [0, 1, 0, 0],
       [0, 1, 0, 0],
       [0, 1, 0, 0],
       [0, 1, 0, 0],
       [0, 1, 0, 0],
       [0, 1, 0, 0],
       [0, 1, 0, 0],
       [0, 1,

In [42]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1)

In [43]:
y_test.shape

(12, 4)

In [44]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from tensorflow.keras.callbacks import TensorBoard

In [45]:
log_dir = os.path.join('Logs')
tb_callback = TensorBoard(log_dir=log_dir)

In [46]:
np_gloss = np.array(list(label_map.keys()))
np_gloss

array(['hello', 'i love you', 'stay', 'stop'], dtype='<U10')

In [47]:
model = Sequential()
model.add(LSTM(64, return_sequences=True, activation='relu', input_shape=(no_of_frames,258)))
model.add(LSTM(128, return_sequences=True, activation='relu'))
model.add(LSTM(64, return_sequences=False, activation='relu'))
model.add(Dense(64, activation='relu'))
model.add(Dense(32, activation='relu'))
model.add(Dense(np_gloss.shape[0], activation='softmax'))

In [48]:
import tensorflow as tf
# model.compile(optimizer=tf.compat.v1.train.AdamOptimizer(), loss='categorical_crossentropy', metrics=['categorical_accuracy'])
model.compile(optimizer=tf.keras.optimizers.Adam(), loss='categorical_crossentropy', metrics=['categorical_accuracy'])

In [49]:
model.fit(X_train, y_train, epochs=100, callbacks=[tb_callback])

Epoch 1/100


Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Epoch 28/100
Epoch 29/100
Epoch 30/100
Epoch 31/100
Epoch 32/100
Epoch 33/100
Epoch 34/100
Epoch 35/100
Epoch 36/100
Epoch 37/100
Epoch 38/100
Epoch 39/100
Epoch 40/100
Epoch 41/100
Epoch 42/100
Epoch 43/100
Epoch 44/100
Epoch 45/100
Epoch 46/100
Epoch 47/100
Epoch 48/100
Epoch 49/100
Epoch 50/100
Epoch 51/100
Epoch 52/100
Epoch 53/100
Epoch 54/100
Epoch 55/100
Epoch 56/100
Epoch 57/100
Epoch 58/100
Epoch 59/100
Epoch 60/100
Epoch 61/100
Epoch 62/100
Epoch 63/100
Epoch 64/100
Epoch 65/100
Epoch 66/100
Epoch 67/100
Epoch 68/100
Epoch 69/100
Epoch 70/100
Epoch 71/100
Epoch 72/100
Epoch 73/100
Epoch 74/100
Epoch 75/100
Epoch 76/100
Epoch 77/100
Epoch 

<keras.src.callbacks.History at 0x1cdf0d0ea50>

In [50]:
model.summary()

Model: "sequential_2"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 lstm_6 (LSTM)               (None, 30, 64)            82688     
                                                                 
 lstm_7 (LSTM)               (None, 30, 128)           98816     
                                                                 
 lstm_8 (LSTM)               (None, 64)                49408     
                                                                 
 dense_6 (Dense)             (None, 64)                4160      
                                                                 
 dense_7 (Dense)             (None, 32)                2080      
                                                                 
 dense_8 (Dense)             (None, 4)                 132       
                                                                 
Total params: 237284 (926.89 KB)
Trainable params: 237

In [51]:
res = model.predict(X_test)



In [52]:
list(vid_locs.keys())[np.argmax(res[0])]

'go'

In [53]:
list(vid_locs.keys())[np.argmax(y_test[0])]

'go'

In [54]:
model.save('motion.h5')

  saving_api.save_model(


In [16]:
import tensorflow as tf
model = tf.keras.models.load_model('motion.h5')

In [56]:
from sklearn.metrics import multilabel_confusion_matrix, accuracy_score

In [57]:
yhat = model.predict(X_test)



In [58]:
ytrue = np.argmax(y_test, axis=1).tolist()
yhat = np.argmax(yhat, axis=1).tolist()

In [59]:
multilabel_confusion_matrix(ytrue, yhat)

array([[[10,  0],
        [ 0,  2]],

       [[ 7,  1],
        [ 0,  4]],

       [[ 7,  0],
        [ 1,  4]],

       [[11,  0],
        [ 0,  1]]], dtype=int64)

In [60]:
accuracy_score(ytrue, yhat)

0.9166666666666666

In [61]:
from scipy import stats

In [17]:
import random
colors = [(random.randint(0, 255),random.randint(0, 255),random.randint(0, 255)) for _ in label_map]
def prob_viz(res, actions, input_frame, colors):
    output_frame = input_frame.copy()
    for num, prob in enumerate(res):
        cv2.rectangle(output_frame, (0,60+num*40), (int(prob*100), 90+num*40), colors[num], -1)
        cv2.putText(output_frame, actions[num], (0, 85+num*40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2, cv2.LINE_AA)
        
    return output_frame

In [18]:
engine = pyttsx3.init()

In [19]:
def speak(text):
    engine.say(text)
    engine.runAndWait()

In [None]:
# 1. New detection variables
sequence = []
sentence = []
predictions = []
threshold = 0.5
actions = list(label_map.keys())
cap = cv2.VideoCapture(1)
# Set mediapipe model 
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    while cap.isOpened():
        # Read feed
        ret, frame = cap.read()

        # Make detections
        image, results = mediapipe_detection(frame, holistic)
        print(results)
        
        # Draw landmarks
        draw_styled_landmarks(image, results)
        
        # 2. Prediction logic
        keypoints = extract_keypoints(results)
        sequence.append(keypoints)
        sequence = sequence[-30:]
        
        if len(sequence) == 30:
            res = model.predict(np.expand_dims(sequence, axis=0))[0]
            print(actions[np.argmax(res)])
            predictions.append(np.argmax(res))
            
            
        #3. Viz logic
            if np.unique(predictions[-10:])[0]==np.argmax(res): 
                if res[np.argmax(res)] > threshold: 
                    
                    if len(sentence) > 0: 
                        if actions[np.argmax(res)] != sentence[-1]:
                            sentence.append(actions[np.argmax(res)])
                            speak(actions[np.argmax(res)])
                    else:
                        sentence.append(actions[np.argmax(res)])
                        speak(actions[np.argmax(res)])

            if len(sentence) > 5: 
                sentence = sentence[-5:]

            # Viz probabilities
            image = prob_viz(res, actions, image, colors)
            
        cv2.rectangle(image, (0,0), (640, 40), (245, 117, 16), -1)
        cv2.putText(image, ' '.join(sentence), (3,30), 
                       cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
        
        # Show to screen
        cv2.imshow('OpenCV Feed', image)

        # Break gracefully
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break
    cap.release()
    cv2.destroyAllWindows()

<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.soluti