## Importin Libraries

In [12]:
import os
import cv2
import json
import time
import numpy as np
import mediapipe as mp
import tensorflow as tf
from collections import deque
import matplotlib.pyplot as plt
from concurrent.futures import ThreadPoolExecutor

tf.get_logger().setLevel('ERROR')

## MediaPipe Implementation

In [13]:
filtered_hand = list(range(21))

filtered_pose = [11, 12, 13, 14, 15, 16]

filtered_face = [0, 4, 7, 8, 10, 13, 14, 17, 21, 33, 37, 39, 40, 46, 52, 53, 54, 55, 58,
                 61, 63, 65, 66, 67, 70, 78, 80, 81, 82, 84, 87, 88, 91, 93, 95, 103, 105,
                 107, 109, 127, 132, 133, 136, 144, 145, 146, 148, 149, 150, 152, 153, 154,
                 155, 157, 158, 159, 160, 161, 162, 163, 172, 173, 176, 178, 181, 185, 191,
                 234, 246, 249, 251, 263, 267, 269, 270, 276, 282, 283, 284, 285, 288, 291,
                 293, 295, 296, 297, 300, 308, 310, 311, 312, 314, 317, 318, 321, 323, 324,
                 332, 334, 336, 338, 356, 361, 362, 365, 373, 374, 375, 377, 378, 379, 380,
                 381, 382, 384, 385, 386, 387, 388, 389, 390, 397, 398, 400, 402, 405, 409,
                 415, 454, 466, 468, 473]

HAND_NUM = len(filtered_hand)
POSE_NUM = len(filtered_pose)
FACE_NUM = len(filtered_face)

In [14]:
hands = mp.solutions.hands.Hands()
pose = mp.solutions.pose.Pose()
face_mesh = mp.solutions.face_mesh.FaceMesh(refine_landmarks=True)

def get_frame_landmarks(frame):
    
    all_landmarks = np.zeros((HAND_NUM * 2 + POSE_NUM + FACE_NUM, 3))
    
    def get_hands(frame):
        results_hands = hands.process(frame)
        if results_hands.multi_hand_landmarks:
            for i, hand_landmarks in enumerate(results_hands.multi_hand_landmarks):
                if results_hands.multi_handedness[i].classification[0].index == 0: 
                    all_landmarks[:HAND_NUM, :] = np.array(
                        [(lm.x, lm.y, lm.z) for lm in hand_landmarks.landmark]) # right
                else:
                    all_landmarks[HAND_NUM:HAND_NUM * 2, :] = np.array(
                        [(lm.x, lm.y, lm.z) for lm in hand_landmarks.landmark]) # left

    def get_pose(frame):
        results_pose = pose.process(frame)
        if results_pose.pose_landmarks:
            all_landmarks[HAND_NUM * 2:HAND_NUM * 2 + POSE_NUM, :] = np.array(
                [(lm.x, lm.y, lm.z) for lm in results_pose.pose_landmarks.landmark])[filtered_pose]
        
    def get_face(frame):
        results_face = face_mesh.process(frame)
        if results_face.multi_face_landmarks:
            all_landmarks[HAND_NUM * 2 + POSE_NUM:, :] = np.array(
                [(lm.x, lm.y, lm.z) for lm in results_face.multi_face_landmarks[0].landmark])[filtered_face]
        
    with ThreadPoolExecutor(max_workers=3) as executor:
        executor.submit(get_hands, frame)
        executor.submit(get_pose, frame)
        executor.submit(get_face, frame)

    return all_landmarks

## Load Trained Model

In [15]:
model_path = "D:\\Desktop\\AI\\2024-1-Sprint\\testing model\\1-29(1)\\1-29(1).h5"
gloss_mapping_path = "D:\\Desktop\\AI\\2024-1-Sprint\\testing model\\1-29(1)\\269_gloss_mapping.json"
index_gloss_mapping_path = "D:\\Desktop\\AI\\2024-1-Sprint\\testing model\\1-29(1)\\269_index_gloss_mapping.json"
index_label_mapping_path = "D:\\Desktop\\AI\\2024-1-Sprint\\testing model\\1-29(1)\\269_index_label_mapping.json"

model = tf.keras.models.load_model(model_path)
gloss_mapping = json.load(open(gloss_mapping_path, "r"))
index_gloss_mapping = json.load(open(index_gloss_mapping_path, "r"))
index_label_mapping = json.load(open(index_label_mapping_path, "r"))

In [16]:
model.input_shape, model.output_shape

((None, 100, 180, 3), (None, 269))

## Sentence Generation

In [21]:
timer = time.time()
sentence = []

def get_sentence(label):
    global timer, sentence
    
    if label == 'None':
        pass
    elif  len(sentence) >= 1 and sentence[-1] == label:
        timer = time.time()
    elif len(sentence) >= 2 and sentence[-2] == label:
        sentence.pop()
        timer = time.time()
    elif time.time() - timer < 3:
        sentence.append(label)
        timer = time.time()
    else:
        sentence = [label]
        timer = time.time()
        
    return sentence

## Test Live Feed

In [22]:
cap = cv2.VideoCapture(0)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

sequence = deque(maxlen=model.input_shape[1])
for _ in range(model.input_shape[1]):
    sequence.append(np.zeros((model.input_shape[2], 3)))

step_length = 40
TIME_PER_STEP = step_length / 30.0
step_time = time.time()
frame_time = 0
label = ''
step = []

while True:
    ret, frame = cap.read()
    if not ret: continue
    
    fps = str(int(1 / (time.time() - frame_time)))
    frame_time = time.time()
    
    frame = cv2.flip(frame, 1)
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    frame_rgb.flags.writeable = False
    frame_landmarks = get_frame_landmarks(frame_rgb)
    
    for point in frame_landmarks:
        x = int(point[0] * width)
        y = int(point[1] * height)
        cv2.circle(frame, (x, y), 2, (0, 255, 0), -1)
    cv2.putText(frame, fps, (30,60), cv2.FONT_HERSHEY_SIMPLEX, 2, (0, 0, 255), 2, cv2.LINE_AA)
    cv2.putText(frame, label, (30, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA)    
    
    step.append(frame_landmarks)

    if time.time() - step_time >= TIME_PER_STEP:
        step = np.array(step)
        step = np.apply_along_axis(lambda arr: np.interp(np.linspace(0, 1, step_length),
                                                         np.linspace(0, 1, arr.shape[0]), arr),
                                   axis=0, arr=step)
        
        sequence.extend(step)
        sequence_array = np.expand_dims(np.array(sequence), axis=0)        
        prediction = model.predict(sequence_array, verbose=0)
        prediction = prediction.reshape(-1)
        print(f'{prediction.argmax()}: {prediction.max()}')
        prediction = prediction.argmax()
        
        label = index_label_mapping[str(prediction)]
        print(f'Label: {label}')
        print(f'Sentence: {get_sentence(label)}')
        
        step_time = time.time()
        step = []
        
    cv2.imshow("Test", frame)
    cv2.setWindowProperty("Test", cv2.WND_PROP_TOPMOST, 1)
    k = cv2.waitKey(1)
    if k == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()

152: 1.0
Label: None
Sentence: []
141: 0.9996691942214966
Label: cheap
Sentence: ['cheap']
154: 0.9996480941772461
Label: door
Sentence: ['cheap', 'door']
154: 0.9942076206207275
Label: door
Sentence: ['cheap', 'door']
97: 0.5684857964515686
Label: money
Sentence: ['cheap', 'door', 'money']
202: 0.9975554347038269
Label: journey
Sentence: ['cheap', 'door', 'money', 'journey']
256: 0.6409409046173096
Label: travel
Sentence: ['cheap', 'door', 'money', 'journey', 'travel']
161: 0.9994199275970459
Label: forest
Sentence: ['cheap', 'door', 'money', 'journey', 'travel', 'forest']
161: 0.9994136095046997
Label: forest
Sentence: ['cheap', 'door', 'money', 'journey', 'travel', 'forest']
8: 0.8629806041717529
Label: firetruck
Sentence: ['cheap', 'door', 'money', 'journey', 'travel', 'forest', 'firetruck']
85: 0.7050928473472595
Label: cross
Sentence: ['cheap', 'door', 'money', 'journey', 'travel', 'forest', 'firetruck', 'cross']
218: 0.9208775758743286
Label: reach
Sentence: ['cheap', 'door', 'm

In [8]:
cap.release()
cv2.destroyAllWindows()

---