In [1]:
from tensorflow.keras import layers
from tensorflow.keras.callbacks import TensorBoard
from tensorflow import keras

import matplotlib.pyplot as plt
import tensorflow as tf
import numpy as np
import cv2
import os
import time
import json


In [19]:
MAX_SEQ_LENGTH = 128
NUM_FEATURES = 258

EPOCHS = 200

DATA_PATH = "transformer_datanoface_msasl100"

actions = json.load(open(os.path.join("MSASL","MSASL_classes.json")))
actions.sort()

In [None]:
train_data, train_labels = np.load(os.path.join(DATA_PATH,"train_data_30fps.npy")), np.load(os.path.join(DATA_PATH,"train_labels_30fps.npy"))
test_data, test_labels = np.load(os.path.join(DATA_PATH,"test_data_30fps.npy")), np.load(os.path.join(DATA_PATH,"test_labels_30fps.npy"))

print(f"Frame features in train set: {train_data.shape}")

In [24]:
assert not np.any(np.isnan(train_data))
assert not np.any(np.isnan(train_labels))
assert not np.any(np.isnan(test_data))
assert not np.any(np.isnan(test_labels))

## Build Transformer

In [25]:
class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=output_dim
        )
        self.sequence_length = sequence_length
        self.output_dim = output_dim

    def call(self, inputs):
        # The inputs are of shape: `(batch_size, frames, num_features)`
        length = tf.shape(inputs)[1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_positions = self.position_embeddings(positions)
        return inputs + embedded_positions

    def compute_mask(self, inputs, mask=None):
        mask = tf.reduce_any(tf.cast(inputs, "bool"), axis=-1)
        return mask
    def get_config(self):
        config = super().get_config()
        config.update({
            "sequence_length": self.sequence_length,
            "output_dim": self.output_dim,
        })
        return config

In [26]:
class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim, dropout=0.3
        )
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation=tf.nn.gelu), layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()

    def call(self, inputs, mask=None):
        if mask is not None:
            mask = mask[:, tf.newaxis, :]

        attention_output = self.attention(inputs, inputs, attention_mask=mask)
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)
    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "dense_dim": self.dense_dim,
            "num_heads": self.num_heads,
        })
        return config

In [27]:
def get_compiled_model():
    sequence_length = MAX_SEQ_LENGTH
    embed_dim = NUM_FEATURES
    dense_dim = 8
    num_heads = 6
    classes = len(actions)

    inputs = keras.Input(shape=(None, None))
    x = PositionalEmbedding(
        sequence_length, embed_dim, name="frame_position_embedding"
    )(inputs)
    x = TransformerEncoder(embed_dim, dense_dim, num_heads, name="transformer_layer")(x)
    x = layers.GlobalMaxPooling1D()(x)
    x = layers.Dropout(0.5)(x)
    outputs = layers.Dense(classes, activation="softmax")(x)
    model = keras.Model(inputs, outputs)

    model.compile(
        optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"]
    )
    return model


def run_experiment(filepath):
#     checkpoint = keras.callbacks.ModelCheckpoint(
#         filepath, save_weights_only=True, save_best_only=True, verbose=1
#     )
    checkpoint = TensorBoard(log_dir=filepath)
    model = get_compiled_model()
    history = model.fit(
        train_data,
        train_labels,
        validation_split=0.15,
        epochs=EPOCHS,
        callbacks=[checkpoint],
    )

    model.load_weights(filepath)
    _, accuracy = model.evaluate(test_data, test_labels)
    print(f"Test accuracy: {round(accuracy * 100, 2)}%")

    return model

def load_model(filepath):
    model = get_compiled_model()
    model.load_weights(filepath)
    _, accuracy = model.evaluate(test_data, test_labels)
    print(f"Test accuracy: {round(accuracy * 100, 2)}%")

    return model


## Load or Train Model

In [29]:
trained_model = run_experiment("transformer_model_noface_msasl100/model") # arg: save path

Epoch 1/200
Epoch 2/200
Epoch 3/200
Epoch 4/200
Epoch 5/200
Epoch 6/200
Epoch 7/200
Epoch 8/200
Epoch 9/200
Epoch 10/200
Epoch 11/200
Epoch 12/200
Epoch 13/200
Epoch 14/200
Epoch 15/200
Epoch 16/200
Epoch 17/200
Epoch 18/200
Epoch 19/200
Epoch 20/200
 1/27 [>.............................] - ETA: 0s - loss: 0.1767 - accuracy: 0.9375

KeyboardInterrupt: 

## Run it real time

In [29]:
import cv2
import numpy as np
import os
from matplotlib import pyplot as plt
import time
import mediapipe as mp

In [30]:
mp_hands = mp.solutions.hands
mp_pose = mp.solutions.pose
mp_drawing = mp.solutions.drawing_utils # drawing utils
mp_drawing_styles = mp.solutions.drawing_styles

In [31]:
def extract_keypoints(results):
    pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*4)
    face = np.array([[res.x, res.y, res.z] for res in results.face_landmarks.landmark]).flatten() if results.face_landmarks else np.zeros(468*3)
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)
    return np.concatenate([pose, face, lh, rh])

In [32]:
def extract_keypoints(image,hands,pose):
    results = hands.process(image)
    results_pose = pose.process(image)

    hand_np = []
    if results.multi_hand_landmarks:
        for i in range(len(results.multi_hand_landmarks)):
            hand_np.append(np.array([[res.x, res.y, res.z] for res in results.multi_hand_landmarks[i].landmark]).flatten() if results.multi_hand_landmarks[i] else np.zeros(21*3))

    if len(hand_np) == 0:
        lh = np.zeros(21*3)
        rh = np.zeros(21*3)
    elif len(hand_np) == 1:
        lh = hand_np[0]
        rh = np.zeros(21*3)
    else:
        lh = hand_np[0]
        rh = hand_np[1]        
    ps = np.array([[res.x, res.y, res.z, res.visibility] for res in results_pose.pose_landmarks.landmark]).flatten() if results_pose.pose_landmarks else np.zeros(33*4)

    return np.concatenate([ps, lh, rh])

In [33]:
colors = [(245,117,16), (117,245,16), (16,117,245)]
def prob_viz(res, actions, input_frame, colors):
    output_frame = input_frame.copy()
    for num, prob in enumerate(res):
        cv2.rectangle(output_frame, (0,60+num*40), (int(prob*100), 90+num*40), colors[num%3], -1)
        cv2.putText(output_frame, actions[num], (0, 85+num*40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2, cv2.LINE_AA)
        
    return output_frame

Continuously Translate Signs

In [None]:
# 1. New detection variables
sequence = []
sentence = []
predictions = []
threshold = 0.7

cap = cv2.VideoCapture(0)
# Set mediapipe model 
with mp_hands.Hands(min_detection_confidence=0.5,min_tracking_confidence=0.5) as hands:
    with mp_pose.Pose(min_detection_confidence=0.5,min_tracking_confidence=0.5) as pose:
        while cap.isOpened():

            # Read feed
            ret, image = cap.read()

            # To improve performance, optionally mark the image as not writeable to
            # pass by reference.
            image.flags.writeable = False
            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
            results = hands.process(image)
            results_pose = pose.process(image)


            # Draw the hand annotations on the image.
            image.flags.writeable = True
            image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)


            hand_np = []
            if results.multi_hand_landmarks:
                for i in range(len(results.multi_hand_landmarks)):
                    mp_drawing.draw_landmarks(image,results.multi_hand_landmarks[i],mp_hands.HAND_CONNECTIONS,mp_drawing_styles.get_default_hand_landmarks_style(),mp_drawing_styles.get_default_hand_connections_style())
                    hand_np.append(np.array([[res.x, res.y, res.z] for res in results.multi_hand_landmarks[i].landmark]).flatten() if results.multi_hand_landmarks[i] else np.zeros(21*3))

            if len(hand_np) == 0:
                lh = np.zeros(21*3)
                rh = np.zeros(21*3)
            elif len(hand_np) == 1:
                lh = hand_np[0]
                rh = np.zeros(21*3)
            else:
                lh = hand_np[0]
                rh = hand_np[1]        

            # Draw pose annotations        
            mp_drawing.draw_landmarks(image,results_pose.pose_landmarks,mp_pose.POSE_CONNECTIONS,landmark_drawing_spec=mp_drawing_styles.get_default_pose_landmarks_style())

            ps = np.array([[res.x, res.y, res.z, res.visibility] for res in results_pose.pose_landmarks.landmark]).flatten() if results_pose.pose_landmarks else np.zeros(33*4)

            keypoints = np.concatenate([ps, lh, rh])
            sequence.append(keypoints)
            sequence = sequence[-sequence_length:]

            if len(sequence) == sequence_length:
                res = trained_model.predict(np.expand_dims(sequence, axis=0))[0]
                predictions.append(np.argmax(res))


                #3. Viz logic
                if np.unique(predictions[-10:])[0]==np.argmax(res): 
                    if res[np.argmax(res)] > threshold: 

                        if len(sentence) > 0: 
                            if actions[np.argmax(res)] != sentence[-1]:
                                sentence.append(actions[np.argmax(res)])
                        else:
                            sentence.append(actions[np.argmax(res)])

                if len(sentence) > 4: 
                    sentence = sentence[-4:]

                # Viz probabilities
                image = prob_viz(res, actions, image, colors)
                
                sequence = []
                cv2.waitKey(3000)
                

            cv2.rectangle(image, (0,0), (640, 40), (245, 117, 16), -1)
            cv2.putText(image, ' '.join(sentence), (3,30), 
                           cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)

            # Show to screen
            cv2.imshow('OpenCV Feed', image)

            # Break gracefully
            if cv2.waitKey(10) & 0xFF == ord('q'):
                break
            
        cap.release()
        cv2.destroyAllWindows()

Translate one-by-one

In [None]:
images = []
count = 0
num_of_vid = 0
threshold = 0.7

cap = cv2.VideoCapture(0)
while cap.isOpened():
    # Read feed
    ret, image = cap.read()
    
    if count == 0:
        cv2.rectangle(image, (0,0), (640, 40), (245, 117, 16), -1)
        cv2.putText(image, ' '.join("START"), (3,30), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
        cv2.waitKey(1000)
        
    count+=1

    if count == 121:
        num_of_vid+=1
        cv2.rectangle(image, (0,0), (640, 40), (245, 117, 16), -1)
        cv2.putText(image, ' '.join("THINKING"), (3,30), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)

    cv2.imshow('OpenCV Feed', image)
    if cv2.waitKey(5) & 0xFF == ord('q'):
        break

    if count == 121:
        sequence = []
        tic = time.perf_counter()
        with mp_hands.Hands(min_detection_confidence=0.3,min_tracking_confidence=0.5) as hands:
            with mp_pose.Pose(min_detection_confidence=0.5,min_tracking_confidence=0.5) as pose:
                for image in images:
                    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) 
                    image.flags.writeable = False

                    # Make detections
                    keypoints = extract_keypoints(image, hands, pose)
                    
                    sequence.append(keypoints)
                    
        res = trained_model.predict(np.expand_dims(sequence, axis=0))[0]
        print(actions[np.argmax(res)])
        toc = time.perf_counter()
        print(f"Predictions took {toc-tic} seconds")
        count = 0
        images = []
        
cap.release()
cv2.destroyAllWindows()