# 0. Install Dependencies

In [None]:
# For facial expression model
!pip install mediapipe opencv-python pandas scikit-learn

In [None]:
# For ASL translation model
!pip install opencv-python mediapipe sklearn matplotlib

In [None]:
# For tts
!pip install request pydub pyAudio

# 1. Import and Install Dependencies

In [203]:
import cv2
import numpy as np
import os
from matplotlib import pyplot as plt
import time
import mediapipe as mp
import copy

import requests
from pydub import AudioSegment
from pydub.playback import play
import io
import os

# 2. Setup Mediapipe

In [121]:
mp_hands = mp.solutions.hands # Hands model
mp_face = mp.solutions.face_mesh # Face model
mp_drawing = mp.solutions.drawing_utils # Drawing utilities
mp_drawing_styles = mp.solutions.drawing_styles

In [122]:
def mediapipe_detection(image, model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # COLOR CONVERSION BGR 2 RGB
    image.flags.writeable = False                  # Image is no longer writeable
    results = model.process(image)                 # Make prediction
    image.flags.writeable = True                   # Image is now writeable 
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) # COLOR COVERSION RGB 2 BGR
    return image, results

In [123]:
def draw_styled_landmarks(image, results):
    if results:
        if results.multi_hand_landmarks:
            for hand_landmarks in results.multi_hand_landmarks:
                mp_drawing.draw_landmarks(
                    image,
                    hand_landmarks,
                    mp_hands.HAND_CONNECTIONS,
                    mp_drawing_styles.get_default_hand_landmarks_style(),
                    mp_drawing_styles.get_default_hand_connections_style())

In [124]:
WIDTH = 854
HEIGHT = 480
def rescale_frame(frame):
    dim = (WIDTH, HEIGHT)
    return cv2.resize(frame, dim, interpolation =cv2.INTER_AREA)

# 3. Setup Keypoint Processing

In [206]:
def extract_keypoints(results):
    lh = np.zeros(21*3)
    rh = np.zeros(21*3)
    if (results.multi_hand_landmarks):
        results_landmarks = copy.deepcopy(results.multi_hand_landmarks)
        if (results_landmarks[0]):
            # print([[res.x, res.y, res.z] for res in results.multi_hand_landmarks[0].landmark])
            min_x = 9999
            min_y = 9999
            min_z = 9999
            for res in results_landmarks[0].landmark :
                # print(res)
                if (res.x < min_x):
                    min_x = res.x
                if (res.y < min_y):
                    min_y = res.y
                if (res.z < min_z):
                    min_z = res.z
            # print (min_x)
            # print (min_y)
            # print (min_z)
            for res in results_landmarks[0].landmark :
                res.x -= min_x
                res.y -= min_y
                res.z -= min_z
                                           
            lh = np.array([[res.x, res.y, res.z] for res in results_landmarks[0].landmark]).flatten()
            # print(lh)
        if (len(results_landmarks) > 1):
            min_x = 9999
            min_y = 9999
            min_z = 9999
            for res in results_landmarks[1].landmark :
                if (res.x < min_x):
                    min_x = res.x
                if (res.y < min_y):
                    min_y = res.y
                if (res.z < min_z):
                    min_z = res.z
            for res in results_landmarks[1].landmark :
                res.x -= min_x
                res.y -= min_y
                res.z -= min_z
            rh = np.array([[res.x, res.y, res.z] for res in results_landmarks[1].landmark]).flatten()
    return np.concatenate([lh, rh])

# 4. Define Training Folders

In [126]:
# Path for exported data, numpy arrays
DATA_PATH = os.path.join('MP_Data') 
TRAINING_PATH = os.path.join('MS-ASL/MS-ASL/videos')

# Actions that we try to detect
# actions = np.array(['hello', 'thanks', 'iloveyou'])
actions = np.array(['eat', 'fish', 'nice', 'milk', 'teacher', 'finish', 'cousin', 'orange', 'yes', 'student', 'sister', 'friend', 'yellow',
                   'white', 'what', 'water', 'want', 'tired', 'pencil', 'mother', 'like', 'drink', 'again', 'table', 'school', 'no', 'help', 'blue', 'spring',
                   'doctor', 'deaf', 'red', 'father', 'black'])

# Thirty videos worth of data
no_sequences = 30

# Videos are going to be 30 frames in length
sequence_length = 30

# Folder start
start_folder = 1

# 5. Import Train Data and Create Labels and Features

In [127]:
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

In [128]:
label_map = {label:num for num, label in enumerate(actions)}

In [129]:
sequences, labels = [], []
for action in actions:
    # print(action)
    for sequence in range(start_folder, no_sequences + 1):
        window = []
        # print(sequence)
        for frame_num in range(1, sequence_length + 1):
            res = np.load(os.path.join(DATA_PATH, action, str(sequence), "{}.npy".format(frame_num)))
            if not (res.any()):
                print(action);
                print(sequence)
            window.append(res)
            # print(frame_num)
        sequences.append(window)
        labels.append(label_map[action])

In [130]:
X = np.array(sequences)
y = to_categorical(labels).astype(int)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.05)

# 6. Build LSTM Neural Network

In [131]:
import tensorflow as tf

tf.config.threading.set_intra_op_parallelism_threads(6)
tf.config.threading.set_inter_op_parallelism_threads(2)

In [132]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from tensorflow.keras.callbacks import TensorBoard

In [133]:
model = Sequential()
model.add(LSTM(64, return_sequences=True, activation='relu', input_shape=(np.array(sequences[0]).shape)))
model.add(LSTM(128, return_sequences=True, activation='relu'))
model.add(LSTM(64, return_sequences=False, activation='relu'))
model.add(Dense(64, activation='relu'))
model.add(Dense(32, activation='relu'))
model.add(Dense(actions.shape[0], activation='softmax'))

In [134]:
model.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['categorical_accuracy'])

# 7. Load Trained Model Weights

In [135]:
model.load_weights('msasl1.h5')

# 8. Setup Threads

In [136]:
from scipy import stats

In [137]:
colors = [(245,117,16), (117,245,16), (16,117,245), (16,117,245), (16,117,245)] * 10
def prob_viz(res, actions, input_frame, colors):
    output_frame = input_frame.copy()
    for num, prob in enumerate(res):
        cv2.rectangle(output_frame, (0,60+num*40), (int(prob*100), 90+num*40), colors[num], -1)
        cv2.putText(output_frame, actions[num], (0, 85+num*40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0,0,0), 2, cv2.LINE_AA)
        
    return output_frame

In [257]:
# 1. New detection variables
predictions = []
res = []
sequence = []
threshold = 0.85

import time

run_predict_word_thread = True

def predictWord() :
    global res
    while run_predict_word_thread :
        time.sleep(0.2)
        if (len(sequence) == sequence_length) :
            res = model.predict(np.expand_dims(sequence, axis=0), verbose=None)[0]
            # print(actions[np.argmax(res)])
            predictions.append(np.argmax(res))
        else:
            res = []

In [258]:
frame = np.zeros(1)
run_cv_thread = True
from time import sleep
def runCV():
    global frame
    cap = cv2.VideoCapture(0)
    # cap = cv2.VideoCapture("Validation/Cool/1.mkv")
    ret, frame = cap.read()
    
    while run_cv_thread:
        ret, frame_new = cap.read()
        # sleep(1 / 15.0)
        if (ret == False):
            break
        frame = rescale_frame(frame_new)
    cap.release()

In [259]:
results_hands = []
run_hand_model_thread = True
def runHandModel():
    global results_hands
    with mp_hands.Hands(model_complexity=1, min_detection_confidence=0.5, min_tracking_confidence=0.5) as hand_model:
        while (run_hand_model_thread):
            if (frame.any()):
                image, results_hands = mediapipe_detection(frame, hand_model)

In [282]:
results_face = []
run_face_model_thread = True
def runFaceModel():
    global results_face
    with mp_face.FaceMesh(min_detection_confidence=0.5, min_tracking_confidence=0.5) as face_model:
        while (run_face_model_thread):
            if (frame.any()):
                image, results_face = mediapipe_detection(frame, face_model)

In [261]:
# 1. New detection variables
face_expression_class = []
face_expression_prob = []
threshold = 0.6

import time

run_predict_expression_thread = True

def predictExpression() :
    global face_expression_class
    global face_expression_prob
    while run_predict_expression_thread :
        time.sleep(0.2)
        if (results_face.multi_face_landmarks) :
            face = face_landmarks.landmark
            row = list(np.array([[landmark.x,landmark.y, landmark.z, landmark.visibility]for landmark in face]).flatten())

            # make detections
            x = pd.DataFrame([row])
            
            face_expression_class = model_face_expression.predict(x)[0]
            face_expression_prob = model_face_expression.predict_proba(x)[0]


# 9. TTS

In [331]:
import requests
from pydub import AudioSegment
from pydub.playback import play
import io
import os
def runTTS(text, emotion):
    url = "https://api.play.ht/api/v2/tts/stream"
    payload = {
        "text": text,
        "voice": "s3://voice-cloning-zero-shot/d9ff78ba-d016-47f6-b0ef-dd630f59414e/female-cs/manifest.json",
        "output_format": "mp3",
        "speed": 1,
        "quality": "medium",
        "temperature": 1,
        "emotion": "female_" + emotion,
        "voice_guidance": 3,
        "text_guidance": 2,
        "style_guidance": 25,
        "voice_engine": "PlayHT2.0"
    }
    headers = {
        "accept": "audio/mpeg",
        "content-type": "application/json",
        "AUTHORIZATION": "c8529d5c8c1b45f8b54ca0f4b14ff32a",
        "X-USER-ID": "dMTMmVsQrMVN6qHhudbSNf5rafF3"
    }
    response = requests.post(url, json=payload, headers=headers)
    #print(response.text)
    song = AudioSegment.from_file(io.BytesIO(response.content), format="mp3")
    play(song)



In [343]:
runTTS("hello", "angry")

Input #0, wav, from '/tmp/tmpsa2q0ggz.wav':   0KB sq=    0B f=0/0   
  Duration: 00:00:00.98, bitrate: 384 kb/s
  Stream #0:0: Audio: pcm_s16le ([1][0][0][0] / 0x0001), 24000 Hz, 1 channels, s16, 384 kb/s
   0.79 M-A:  0.000 fd=   0 aq=    0KB vq=    0KB sq=    0B f=0/0   




   0.91 M-A:  0.000 fd=   0 aq=    0KB vq=    0KB sq=    0B f=0/0   

# 10. Run Model

In [323]:
str(round(face_expression_prob[np.argmax(face_expression_prob)],2))

'0.53'



In [351]:
import threading

t1 = threading.Thread(target=predictWord, args=())
t1.start()

t2 = threading.Thread(target=runCV, args=())
t2.start()

t3 = threading.Thread(target=runHandModel, args=())
t3.start()

t4 = threading.Thread(target=runFaceModel, args=())
t4.start()

sleep(1)

t5 = threading.Thread(target=predictExpression, args=())
t5.start()



In [353]:
frames_wo_points = 0
with open('face_expression.pkl', 'rb') as f:
    model_face_expression = pickle.load(f)

predictions = []
res = []
sequence = []
sentence = []

while (frame.any()):

        
    image = copy.deepcopy(frame)

    # Draw landmarks
    draw_styled_landmarks(image, results_hands)
    
    if (results_face.multi_face_landmarks) :
        face_landmarks = copy.deepcopy(results_face.multi_face_landmarks[0])

        mp_drawing.draw_landmarks(image, face_landmarks, mp.solutions.face_mesh.FACEMESH_CONTOURS, 
                              mp_drawing.DrawingSpec(color=(80,110,10), thickness=1, circle_radius=1),
                              mp_drawing.DrawingSpec(color=(80,256,121), thickness=1, circle_radius=1))


        #Get Status Box
        cv2.rectangle(image, (600,410), (850, 470), (245, 117, 16), -1)

        # Display Class
        cv2.putText(image, 'CLASS'
                    , (600 + 95, 410 + 12), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1, cv2.LINE_AA)
        cv2.putText(image, face_expression_class.split(' ')[0]
                    , (600 + 90,410 + 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)

        # Display Probability
        prob_num = str(round(face_expression_prob[np.argmax(face_expression_prob)],2))
        cv2.putText(image, 'PROB'
                    , (600 + 15, 410 + 12), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1, cv2.LINE_AA)
        cv2.putText(image, prob_num
                    , (600 + 10, 410 + 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)

    # 2. Prediction logic
    keypoints = extract_keypoints(results_hands)
    if np.any(keypoints):
        frames_wo_points = 0
        sequence.append(keypoints)
        sequence = sequence[-sequence_length:]
    else:
        frames_wo_points += 1
        if (frames_wo_points >= 10):
            sequence = []

    res_cpy = res

    #3. Viz logic
    # print(np.any(predictions) & np.any(res))
    if (np.any(predictions) & np.any(res_cpy)) :
        if np.unique(predictions[-3:])[0]==np.argmax(res_cpy): 
            # print(res[np.argmax(res)])
            # print(np.argmax(res))
            if res[np.argmax(res_cpy)] > threshold: 
                if len(sentence) > 0: 
                    if actions[np.argmax(res_cpy)] != sentence[-1]:
                        sentence.append(actions[np.argmax(res_cpy)])
                else:
                    sentence.append(actions[np.argmax(res_cpy)])
                # print(sentence)

        if len(sentence) > 5: 
            sentence = sentence[-5:]

        # Viz probabilities
        image = prob_viz(res_cpy, actions, image, colors)
    if (np.any(predictions)):
        cv2.rectangle(image, (0,0), (640, 40), (245, 117, 16), -1)
        cv2.putText(image, ' '.join(sentence), (3,30), 
        cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)

    # Show to screenP
    cv2.imshow('OpenCV Feed', image)
    

    # Break gracefully
    if cv2.waitKey(10) & 0xFF == ord('q'):break

    # cap.release()
cv2.destroyAllWindows()
sentence_str = ""
for word in sentence:
    sentence_str += word + ' '
print(sentence_str)
emotion = face_expression_class.split(' ')[0].lower()
if (emotion == "neutral"):
    emotion = "happy" 
print(emotion)
runTTS(sentence_str, emotion)



yellow eat fish 
sad


Input #0, wav, from '/tmp/tmpowxf1kay.wav':
  Duration: 00:00:01.78, bitrate: 384 kb/s
  Stream #0:0: Audio: pcm_s16le ([1][0][0][0] / 0x0001), 24000 Hz, 1 channels, s16, 384 kb/s
   1.64 M-A:  0.000 fd=   0 aq=    0KB vq=    0KB sq=    0B f=0/0   






# 10. Stop Processes

In [342]:
run_predict_word_thread = False
t1.join()
run_predict_word_thread = True

run_cv_thread = False
t2.join()
run_cv_thread = True

run_hand_model_thread = False
t3.join()
run_hand_model_thread = True

run_face_model_thread = False
t4.join()
run_face_model_thread = True

run_predict_expression_thread = False
t5.join()
run_predict_expression_thread = True

cv2.destroyAllWindows()



In [341]:
# cap = cv2.VideoCapture(0)
cap.release
cv2.destroyAllWindows()
        

