# Testing

This notebook is meant for testing the trained encoder-decoder models.

# Dependencies

In [97]:
%%capture
!pip install openai
!pip install pandas
!pip install pyarrow
!pip install tensorflow
!pip install protobuf==3.20.*
!pip install mediapipe==0.9.0.1

In [98]:
import os
import json
import shutil
import random
import difflib
import matplotlib
from itertools import chain
from collections import deque
from tqdm.notebook import tqdm
import matplotlib.pyplot as plt
from collections import Counter
from matplotlib import animation, rc
from IPython.display import display, Image

import cv2
import openai
import numpy as np
import pandas as pd
import mediapipe as mp
import tensorflow as tf
from tensorflow import keras
import pyarrow.parquet as pq
from tensorflow.keras import layers
from mediapipe.framework.formats import landmark_pb2

In [3]:
!python --version
print("TensorFlow v" + tf.__version__)
print("Mediapipe v" + mp.__version__)

Python 3.9.13
TensorFlow v2.14.0
Mediapipe v0.9.0.1


In [4]:
seed = 42
random.seed(seed)
np.random.seed(seed)
tf.random.set_seed(seed)
cv2.setRNGSeed(seed)

# Fetch from TfRecords

To acquire the tfrecords you might want to run the data_handling notebook first!

In [5]:
PATH_KAGGLE_DS = "kaggle_dataset"
dataset_df = pd.read_csv(os.path.join(PATH_KAGGLE_DS, "supplemental_metadata.csv"))
PATH_TFRECORD_DS = os.path.join(PATH_KAGGLE_DS, "test_tfrecords")
tf_records = dataset_df.file_id.map(lambda x: os.path.join(PATH_TFRECORD_DS, f"{x}.tfrecord")).unique()
print(f"List of {len(tf_records)} TFRecord files.")

List of 53 TFRecord files.


In [6]:
with open(os.path.join(PATH_TFRECORD_DS, "feature_columns.json"), 'r') as f:
    json_str = f.read()
FEATURE_COLUMNS = json.loads(json_str)
FEATURE_COLUMNS[:10]

['x_right_hand_0',
 'x_right_hand_1',
 'x_right_hand_2',
 'x_right_hand_3',
 'x_right_hand_4',
 'x_right_hand_5',
 'x_right_hand_6',
 'x_right_hand_7',
 'x_right_hand_8',
 'x_right_hand_9']

In [7]:
# # These points represent the hands, elbows, and shoulders.
# LPOSE = [13, 15, 17, 19, 21]
# RPOSE = [14, 16, 18, 20, 22]

# # Facial information isn't necessary, but the nose will serve as a midpoint for normalizing the data, as it is usually located in the middle of the frame.
# FPOSE = [0] # Nose as midpoint

# # Collecting the indices of certain important/distinct sets of features.
# # This can be beneficial during the preprocessing step.
# RHAND_IDX = [i for i, col in enumerate(FEATURE_COLUMNS)  if "right" in col]
# LHAND_IDX = [i for i, col in enumerate(FEATURE_COLUMNS)  if  "left" in col]
# RPOSE_IDX = [i for i, col in enumerate(FEATURE_COLUMNS)  if  "pose" in col and int(col.split("_")[-1]) in RPOSE]
# LPOSE_IDX = [i for i, col in enumerate(FEATURE_COLUMNS)  if  "pose" in col and int(col.split("_")[-1]) in LPOSE]
# MID_POINT_IDX = [i for i, col in enumerate(FEATURE_COLUMNS)  if  "pose" in col and int(col.split("_")[-1]) == 0] # Nose

In [8]:
def decode_fn(record_bytes):
    schema = {COL: tf.io.VarLenFeature(dtype=tf.float32) for COL in FEATURE_COLUMNS}
    schema["phrase"] = tf.io.FixedLenFeature([], dtype=tf.string)
    features = tf.io.parse_single_example(record_bytes, schema)
    phrase = features["phrase"]
    landmarks = ([tf.sparse.to_dense(features[COL]) for COL in FEATURE_COLUMNS])
    # Transpose to maintain the original shape of landmarks data.
    landmarks = tf.transpose(landmarks)
    
    return landmarks, phrase

In [9]:
# The default mapping that came with the dataset was changed:
# padding is represented with the number 0
# start_token is 60
# end_token is 61
with open (os.path.join(PATH_KAGGLE_DS, "character_to_prediction_index.json"), "r") as f:
    char_to_num = json.load(f)
    
char_to_num = {c:char_to_num[c]+1 for c in char_to_num}

# Add pad_token, start pointer and end pointer to the dict
pad_token = 'P'
pad_token_idx = 0
char_to_num[pad_token] = pad_token_idx

start_token = '<'
start_token_idx = 60
char_to_num[start_token] = start_token_idx

end_token = '>'
end_token_idx = 61
char_to_num[end_token] = end_token_idx

num_to_char = {j:i for i,j in char_to_num.items()}

## Preprocess phrase

In [10]:
table = tf.lookup.StaticHashTable(
    initializer=tf.lookup.KeyValueTensorInitializer(
        keys=list(char_to_num.keys()),
        values=list(char_to_num.values()),
    ),
    default_value=tf.constant(-1),
    name="tf_char_to_num"
)

# Function to decode the characters and pad the phrases
MAX_PHRASE_LEN = 31 + 2 # The start and end token take space as well
def preprocess_phrase(phrase):
    phrase = start_token + phrase + end_token
    phrase = tf.strings.bytes_split(phrase)
    phrase = table.lookup(phrase)
    
    max_len_plus = MAX_PHRASE_LEN + 1
    amount_to_pad = max_len_plus - tf.shape(phrase)[0]
    
    if amount_to_pad > 0:
        phrase = tf.pad(phrase, paddings=[[0, amount_to_pad]], mode = 'CONSTANT', constant_values = pad_token_idx)
    else:
        phrase = phrase[:max_len_plus]

    return phrase

Notice that landmarks don't need to be preprocessed, the saved model should contain it's own

In [11]:
def preprocess(landmark, phrase):
    phrase = preprocess_phrase(phrase)
    return (landmark, phrase[:-1]), phrase[1:] # Shifted phrase for encoder-decoder architecture

## Create TFDataset

In [12]:
def get_dataset(tfrecords, batch_size=1, repeat=False, shuffle=False, drop_remainder=False, cache=False):
    ds = tf.data.TFRecordDataset(tf_records)
    ds = ds.map(decode_fn, tf.data.AUTOTUNE)
    # Note: preprocessing can happen before and after the batching (if you can preprocess the whole batch at once to save computation time)
    ds = ds.map(preprocess, tf.data.AUTOTUNE)
    
    if repeat: 
        ds = ds.repeat()
    
    if shuffle:
        ds = ds.shuffle(shuffle)
        options = tf.data.Options()
        options.experimental_deterministic = (False)
        ds = ds.with_options(options)

    if batch_size >= 1:
        # There's also a padded_batch version of this function
        ds = ds.batch(batch_size, drop_remainder=drop_remainder)
        
    ds = ds.prefetch(tf.data.AUTOTUNE)

    # If the system doesn't have enough RAM caching might slow down the process
    if cache:
        ds = ds.cache()
    
    return ds

test_ds = get_dataset(tf_records, batch_size=1, cache=True)

In [13]:
lm_shape = None
phrase_shape = None

# Create an iterator for the train and valid datasets
test_iterator = iter(test_ds)

# Print data points from the training dataset
print("Training Data:\n")
(landmarks, context), phrase = next(test_iterator)

# Save shapes
lm_shape = landmarks.shape[2]
phrase_shape = phrase.shape[0]
print("Saved shapes:")
print(f"lm_shape: {lm_shape}")
print(f"phrase_shape: {phrase_shape}")
print("-" * 40)
print()

print("Encoder input - first in batch (Landmarks:)")
print(type(landmarks))
print(landmarks.shape)
print(landmarks[0])
print("-" * 40)
print()

print("Decoder input (Context):")
print(context.shape)
print(context[0])
print("-" * 40)
print()

print("Model target output (Phrase):")
print(phrase.shape)
print(phrase[0])
print("-" * 40)

Training Data:

Saved shapes:
lm_shape: 159
phrase_shape: 1
----------------------------------------

Encoder input - first in batch (Landmarks:)
<class 'tensorflow.python.framework.ops.EagerTensor'>
(1, 178, 159)
tf.Tensor(
[[ 0.12627919  0.22162548  0.3116899  ... -2.7103877  -2.565694
  -2.1914308 ]
 [        nan         nan         nan ... -2.791268   -2.6042914
  -2.3839378 ]
 [        nan         nan         nan ... -2.7606401  -2.5794535
  -2.3470478 ]
 ...
 [        nan         nan         nan ... -2.8067975  -2.6485422
  -1.3479928 ]
 [ 0.13762568  0.20138527  0.25962168 ... -2.979717   -2.6853716
  -1.9384248 ]
 [ 0.10258354  0.1709002   0.24561381 ... -3.1564484  -2.9503818
  -2.201033  ]], shape=(178, 159), dtype=float32)
----------------------------------------

Decoder input (Context):
(1, 33)
tf.Tensor(
[60 41 46 52 37 50 37 51 52 41 46 39  1 47 34 51 37 50 54 33 52 41 47 46
  1 55 33 51  1 45 33 36 37], shape=(33,), dtype=int32)
----------------------------------------


# Load Model

Load the model from saved model format.

In [14]:
loaded_model = tf.saved_model.load("GRU_local_test")

- The models preprocess the input inside with the method that was used during training.
- The return types are tensorflow specific, so one needs to call .numpy() on them to get the underlying data.

In [21]:
%%time
loaded_model.predict(np.zeros((10, len(FEATURE_COLUMNS))), "abc")

CPU times: total: 93.8 ms
Wall time: 30.3 ms


{'confidence': <tf.Tensor: shape=(), dtype=float32, numpy=0.55950594>,
 'result': <tf.Tensor: shape=(), dtype=string, numpy=b'h'>}

Each model stores the landmarks that are needed for it to run.

In [20]:
loaded_model.info()

<tf.Tensor: shape=(159,), dtype=string, numpy=
array([b'x_right_hand_0', b'x_right_hand_1', b'x_right_hand_2',
       b'x_right_hand_3', b'x_right_hand_4', b'x_right_hand_5',
       b'x_right_hand_6', b'x_right_hand_7', b'x_right_hand_8',
       b'x_right_hand_9', b'x_right_hand_10', b'x_right_hand_11',
       b'x_right_hand_12', b'x_right_hand_13', b'x_right_hand_14',
       b'x_right_hand_15', b'x_right_hand_16', b'x_right_hand_17',
       b'x_right_hand_18', b'x_right_hand_19', b'x_right_hand_20',
       b'x_left_hand_0', b'x_left_hand_1', b'x_left_hand_2',
       b'x_left_hand_3', b'x_left_hand_4', b'x_left_hand_5',
       b'x_left_hand_6', b'x_left_hand_7', b'x_left_hand_8',
       b'x_left_hand_9', b'x_left_hand_10', b'x_left_hand_11',
       b'x_left_hand_12', b'x_left_hand_13', b'x_left_hand_14',
       b'x_left_hand_15', b'x_left_hand_16', b'x_left_hand_17',
       b'x_left_hand_18', b'x_left_hand_19', b'x_left_hand_20',
       b'x_pose_13', b'x_pose_15', b'x_pose_17', b'x_pos

# Testing

## Metrics on test dataset

In [25]:
def generate(fs_model, inp, max_len):
    ctx = str(num_to_char[start_token_idx])
    for i in range(max_len):
        res = fs_model.predict(inp, ctx)
        res_char = res["result"].numpy().decode("utf-8")
        ctx += res_char

        if res_char == num_to_char[end_token_idx]:
            break
    return ctx

def generate_teacher_forcing(fs_model, inp, expected):
    pred = str(num_to_char[start_token_idx])
    ctx = str(num_to_char[start_token_idx])
    for e in expected:
        if e == 'P':
            break
        res = fs_model.predict(inp, ctx)
        res_char = res["result"].numpy().decode("utf-8")
        pred += res_char
        ctx += e
    return pred

In [32]:
def levenshtein_distance(a, b):
    return sum(1 for _ in difflib.ndiff(a, b) if '+' in _ or '-' in _)

def word_error_rate(ref, hyp):
    ref_words = ref.split()
    hyp_words = hyp.split()
    distance = levenshtein_distance(ref_words, hyp_words)
    return distance / max(len(ref_words), 1)

def character_error_rate(ref, hyp):
    distance = levenshtein_distance(ref, hyp)
    return distance / max(len(ref), 1)

In [35]:
data = []

for (inp_batch, _ctx), expected_batch in test_ds:
    for seq, expected in zip(inp_batch, expected_batch):
        expected = "".join([num_to_char[num.numpy()] for num in expected])

        gen_on_own = generate(loaded_model, seq, MAX_PHRASE_LEN)
        gen_teacher_forcing = generate_teacher_forcing(loaded_model, seq, expected)

        data.append([levenshtein_distance(expected, gen_on_own),
                     levenshtein_distance(expected, gen_teacher_forcing),
                     word_error_rate(expected, gen_on_own),
                     word_error_rate(expected, gen_teacher_forcing),
                     character_error_rate(expected, gen_on_own),
                     character_error_rate(expected, gen_teacher_forcing)])
    
        # print("Expected: " + expected)
        # print("Gen on own: " + generate(loaded_model, seq, MAX_PHRASE_LEN))
        # print("Gen teacher forcing: " + generate_teacher_forcing(loaded_model, seq, expected))
        # print('\n~~~\n')

    break
columns = ["edit_dist_gen_on_own",
           "edit_dist_tf",
           "wer_gen_on_own",
           "wer_tf",
           "cer_gen_on_own",
           "cer_tf"]
stat_df = pd.DataFrame(data, columns=columns)
stat_df

Unnamed: 0,edit_dist_gen_on_own,edit_dist_tf,wer_gen_on_own,wer_tf,cer_gen_on_own,cer_tf
0,30,51,1.25,1.25,0.909091,1.545455


## Real world testing

In [38]:
mp_pose = mp.solutions.pose
mp_hands = mp.solutions.hands
mp_holistic = mp.solutions.holistic
mp_drawing = mp.solutions.drawing_utils
mp_drawing_styles = mp.solutions.drawing_styles

def draw_landmarks_on_image(image, results):
    image.flags.writeable = True
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
    mp_drawing.draw_landmarks(
        image,
        results.face_landmarks,
        mp_holistic.FACEMESH_CONTOURS,
        landmark_drawing_spec=None,
        connection_drawing_spec=mp_drawing_styles
        .get_default_face_mesh_contours_style())
    mp_drawing.draw_landmarks(
        image,
        results.pose_landmarks,
        mp_holistic.POSE_CONNECTIONS,
        landmark_drawing_spec=mp_drawing_styles
        .get_default_pose_landmarks_style())
    mp_drawing.draw_landmarks(
        image,
        results.left_hand_landmarks,
        mp_holistic.HAND_CONNECTIONS,
        landmark_drawing_spec=mp_drawing_styles.get_default_hand_landmarks_style()
    )
    mp_drawing.draw_landmarks(
        image,
        results.right_hand_landmarks,
        mp_holistic.HAND_CONNECTIONS,
        landmark_drawing_spec=mp_drawing_styles.get_default_hand_landmarks_style()
    )
    return image

In [47]:
WEBCAM = 0

def video_loop(source, process_result_func):
    video = cv2.VideoCapture(source)
    display_handle=display(None, display_id=True)
    try:
        with mp_holistic.Holistic(min_detection_confidence=0.5,min_tracking_confidence=0.5) as holistic:
            while True:
                _, frame = video.read()
    
                if frame is None:
                    break
    
                #image = cv2.resize(frame, (360, 240))
                image=frame
    
                # To improve performance, optionally mark the image as not writeable to pass by reference.
                image.flags.writeable = False
                image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
                results = holistic.process(image)
                process_result_func(results)
    
                # Draw landmark annotation on the image.
                image = draw_landmarks_on_image(image, results)
    
                image = cv2.flip(image, 1)
                _, image = cv2.imencode('.jpeg', image)
                display_handle.update(Image(data=image.tobytes()))
    except KeyboardInterrupt:
        pass
    finally:
        video.release()
        display_handle.update(None)

### Signing detector

In [42]:
# Pose coordinates for hand movement.
LPOSE = [13, 15, 17, 19, 21]
RPOSE = [14, 16, 18, 20, 22]
POSE = LPOSE + RPOSE

def extract_for_signing_detection(res):
    # Extract specific pose landmarks if available
    px = []
    py = []
    pz = []
    if res.pose_landmarks:
        for i in POSE:
            lm = res.pose_landmarks.landmark[i]
            px.append(lm.x)
            py.append(lm.y)
            pz.append(lm.z)
    else:
        px = [0.0]*len(POSE)
        py = [0.0]*len(POSE)
        pz = [0.0]*len(POSE)

    # Extract left hand landmarks if available
    lx = []
    ly = []
    lz = []
    if res.left_hand_landmarks:
        for lm in res.left_hand_landmarks.landmark:
            lx.append(lm.x)
            ly.append(lm.y)
            lz.append(lm.z)
    else:
        lx = [0.0]*21
        ly = [0.0]*21
        lz = [0.0]*21

    # Extract right hand landmarks if available
    rx = []
    ry = []
    rz = []
    if res.right_hand_landmarks:
        for lm in res.right_hand_landmarks.landmark:
            rx.append(lm.x)
            ry.append(lm.y)
            rz.append(lm.z)
    else:
        rx = [0.0]*21
        ry = [0.0]*21
        rz = [0.0]*21

    return list(chain(rx, lx, px, ry, ly, py, rz, lz, pz))

# Only load once
signing_detection_model = tf.saved_model.load("signing_detection_model")

class SigningDetectionModel:
    def __init__(self):
        self.signing_detection_model_input = list(np.zeros((15, 156)))

    def is_signing(self, mp_holistic_result):
        inp_lm = extract_for_signing_detection(mp_holistic_result)
        self.signing_detection_model_input.pop(0)
        self.signing_detection_model_input.append(inp_lm)
        return signing_detection_model.predict(self.signing_detection_model_input)["result"].numpy() == 1

class BufferedSigningDetectionModel:
    def __init__(self, buffer_len=5, confidence_number=3):
        self.signing_detection_model_input = list(np.zeros((15, 156)))
        self.signing_detector_buffer = deque(maxlen=buffer_len)
        self.confidence_number = confidence_number 

    def is_signing(self, mp_holistic_result):
        inp_lm = extract_for_signing_detection(mp_holistic_result)
        self.signing_detection_model_input.pop(0)
        self.signing_detection_model_input.append(inp_lm)
        pred = signing_detection_model.predict(self.signing_detection_model_input)["result"].numpy()
        self.signing_detector_buffer.append(pred)
        buffered_pred, count = Counter(self.signing_detector_buffer).most_common(1)[0]
        if count >= self.confidence_number:
            return buffered_pred == 1
        else:
            return False

### Fingerspelling models

In [55]:
# Note that currently the formatter only supports pose and hand landmarks not face landmarks
class ModelInputFormatter:
    def __init__(self, model):
        self.required_landmarks = [bytes.decode("utf-8") for bytes in model.info().numpy()]

    def get_model_input(self, mp_holistic_result):
        (rx, ry, rz), (lx, ly, lz), (px, py, pz) = self._extract_from_result(mp_holistic_result)

        mapped_list = []
        for item in self.required_landmarks:
            parts = item.split('_')
            idx = int(parts[-1]) # Extract the index
        
            if parts[0] == 'x':
                if 'right_hand' in item:
                    mapped_list.append(rx[idx])
                elif 'left_hand' in item:
                    mapped_list.append(lx[idx])
                elif 'pose' in item:
                    mapped_list.append(px[idx])
        
            elif parts[0] == 'y':
                if 'right_hand' in item:
                    mapped_list.append(ry[idx])
                elif 'left_hand' in item:
                    mapped_list.append(ly[idx])
                elif 'pose' in item:
                    mapped_list.append(py[idx])
        
            elif parts[0] == 'z':
                if 'right_hand' in item:
                    mapped_list.append(rz[idx])
                elif 'left_hand' in item:
                    mapped_list.append(lz[idx])
                elif 'pose' in item:
                    mapped_list.append(pz[idx])

        return mapped_list

    def _extract_from_result(self, res):
        # Extract specific pose landmarks if available
        px = []
        py = []
        pz = []
        if res.pose_landmarks:
            for lm in res.pose_landmarks.landmark:
                px.append(lm.x)
                py.append(lm.y)
                pz.append(lm.z)
        else:
            px = [0.0]*len(POSE)
            py = [0.0]*len(POSE)
            pz = [0.0]*len(POSE)
    
        # Extract left hand landmarks if available
        lx = []
        ly = []
        lz = []
        if res.left_hand_landmarks:
            for lm in res.left_hand_landmarks.landmark:
                lx.append(lm.x)
                ly.append(lm.y)
                lz.append(lm.z)
        else:
            lx = [0.0]*21
            ly = [0.0]*21
            lz = [0.0]*21
    
        # Extract right hand landmarks if available
        rx = []
        ry = []
        rz = []
        if res.right_hand_landmarks:
            for lm in res.right_hand_landmarks.landmark:
                rx.append(lm.x)
                ry.append(lm.y)
                rz.append(lm.z)
        else:
            rx = [0.0]*21
            ry = [0.0]*21
            rz = [0.0]*21
    
        return (rx, ry, rz), (lx, ly, lz), (px, py, pz)

#### Continuous model

Works well for isolated sequences. Can't handle sudden pauses, and stops.
Extremely sensitive to window size. Also, the training data was from professional signers. For beginners who sign slower the same window size isn't suitable.

In [69]:
class ContinuousRecognitionModel:
    def __init__(self, model, confidence_threshold=0.2, inp_buf_len=30, out_buf_len=10, out_majority_threshold=7):
        self.model = model
        self.formatter = ModelInputFormatter(self.model)

        # Collect a maximum of inp_buf_len frames for inference
        self.input = deque(maxlen=inp_buf_len)
        self.inp_len = inp_buf_len

        # The output is also buffered
        self.inner_fifo = deque(maxlen=out_buf_len)
        # Only predictions with a higher confidence make it inside the buffer
        self.trust_confidence = confidence_threshold
        # Need a confidence_number majority in the buffer to be returned as output
        self.confidence_number = out_majority_threshold
        # Previous predictions for the model
        self.context = str(num_to_char[start_token_idx])

    def process_frame(self, mp_holistic_result):
        selected_landmarks_for_model = formatter.get_model_input(mp_holistic_result)
        self.input.append(selected_landmarks_for_model)
    
        res = self.model.predict(self.input, self.context)
        pred = res["result"].numpy().decode("utf-8")
        prob = res["confidence"].numpy()

        if prob < self.trust_confidence:
            return None

        self.inner_fifo.append(pred)
        pred_char, count = Counter(self.inner_fifo).most_common(1)[0]
        if count >= self.confidence_number:
            if self.context[-1] != pred_char:
                self.context += pred_char
                print(pred_char, end="")
                
                # Predicted the end
                # if pred_char == '>':
                #     # restart the detection
                #     self.context = str(num_to_char[start_token_idx])
                #     self.inner_fifo.clear()
                #     self.input.clear()

                return pred_char
        return None

In [84]:
fs_model = ContinuousRecognitionModel(loaded_model, confidence_threshold=0.2, inp_buf_len=30, out_buf_len=10, out_majority_threshold=7)
video_loop(os.path.join("test_videos", "bear.mp4"), lambda data: fs_model.process_frame(data))

None

bear> 

#### Translate in long chunks

This model performs well on single words that fit into the buffer. But for longer text it fails to translate well, presumable because the signs are cut off at the wrong positions. For longer text pause detection/signing detection is needed.

In [82]:
class NonContinuousRecognitionModel:
    def __init__(self, model, max_out_length=31, confidence_threshold=0.2):
        self.model = model
        self.formatter = ModelInputFormatter(self.model)

        self.max_out_length = max_out_length
        # Only predictions with a higher confidence count as a predicted character
        self.confidence_threshold = confidence_threshold
        self.input = []

    def reset_buffer(self):
        self.input.clear()

    def translate_buffer(self, reset_buffer=False):
        res = None
        if len(self.input) > 0:
            res = self._generate_with_confidence()
            
        if reset_buffer:
            self.reset_buffer()
            
        return res

    def process_frame(self, mp_holistic_result):
        selected_landmarks_for_model = formatter.get_model_input(mp_holistic_result)
        self.input.append(selected_landmarks_for_model)

    def _generate_with_confidence(self):
        ctx = str(num_to_char[start_token_idx])
        for i in range(self.max_out_length):
            res = self.model.predict(self.input, ctx)
            res_char = res["result"].numpy().decode("utf-8")
            prob = res["confidence"].numpy()
            if prob > self.confidence_threshold:
                ctx += res_char
                if res_char == num_to_char[end_token_idx]:
                    break
        return ctx

In [83]:
fs_model = NonContinuousRecognitionModel(loaded_model, max_out_length=31, confidence_threshold=0.0)
video_loop(os.path.join("test_videos", "alligator.mp4"), lambda data: fs_model.process_frame(data))
fs_model.translate_buffer()

None

'<aliga trove>'

In [89]:
fs_model = NonContinuousRecognitionModel(loaded_model, max_out_length=MAX_PHRASE_LEN, confidence_threshold=0.2)
sign_detector = SigningDetectionModel()
#sign_detector = BufferedSigningDetectionModel()

def process_data(data):
    if sign_detector.is_signing(data):
        fs_model.process_frame(data)

video_loop(os.path.join("test_videos", "monkey.mp4"), process_data)
fs_model.translate_buffer()

None

'<mookeye>'

In [96]:
fs_model = NonContinuousRecognitionModel(loaded_model, max_out_length=MAX_PHRASE_LEN, confidence_threshold=0.2)
sign_detector = SigningDetectionModel()
sign_detector = BufferedSigningDetectionModel(buffer_len=10, confidence_number=7)

def process_data(data):
    if sign_detector.is_signing(data):
        fs_model.process_frame(data)
    else:
        res = fs_model.translate_buffer(reset_buffer=True)
        if res and res != "<":
            print(res)

video_loop(os.path.join("test_videos", "fingerspelling_animals.mp4"), process_data)
fs_model.translate_buffer()

None

<era lu>
<tegue pere>
<terba riber>
<hat
<jean andrad>


'<'

### Correct with llms

In [99]:
key = os.environ.get('OPEN_AI_API_KEY')
if key is not None:
    openai.api_key = key
else:
    print("Error: Please set a valid api key!")

In [100]:
def correct_output(pred):
    completion = openai.ChatCompletion.create(
      model="gpt-3.5-turbo", 
       messages=[
        {"role": "system", "content": "You are a machine that tries to correct the output of a fingerspelling recognition model. Some letters might be missing, but it's also possible that the given text has extra characters. Only reply the corrected text."},
        {"role": "user", "content": "angaro angaro"},
        {"role": "system", "content": "kangaroo"},
        {"role": "user", "content": "beark"},
        {"role": "system", "content": "bear"},
        {"role": "user", "content": "6 halee hale"},
        {"role": "system", "content": "whale"},
        {"role": "user", "content": pred},
      ]
    )
    
    return completion["choices"][0]["message"]["content"]

In [101]:
correct_output("earkh/tiger/tiger angar key ligator alligator h horse gro")

'tiger, tiger, kangaroo, alligator, horse'

In [102]:
fs_model = NonContinuousRecognitionModel(loaded_model, max_out_length=MAX_PHRASE_LEN, confidence_threshold=0.2)
sign_detector = SigningDetectionModel()
sign_detector = BufferedSigningDetectionModel(buffer_len=10, confidence_number=7)

def process_data(data):
    if sign_detector.is_signing(data):
        fs_model.process_frame(data)
    else:
        res = fs_model.translate_buffer(reset_buffer=True)
        if res and res != "<":
            print(correct_output(res[1:]))

video_loop(os.path.join("test_videos", "fingerspelling_animals.mp4"), process_data)
fs_model.translate_buffer()

None

era lu
tug of war
riverbank


'<hat'