<a href="https://www.kaggle.com/code/lonnieqin/isolated-sign-language-recognition-with-convlstm1d?scriptVersionId=120840060" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

# Isolated Sign Language Recognition with ConvLSTM1D

In this notebook, I will create Sign Language Recognition model using ConvLSTM1D. To build an efficient training pipeline, I will using TFRecord Dataset from https://www.kaggle.com/datasets/lonnieqin/islr-12-time-steps-tfrecords created by notebook https://www.kaggle.com/code/lonnieqin/islr-create-tfrecord for training.

It will take about 1 hour to finish runing this notebook using GPU.

## Configuration

In [None]:
class CFG:
    data_path = "../input/asl-signs/"
    tf_record_path = "../input/islr-12-time-steps-tfrecords/"
    sequence_length = 12
    rows_per_frame = 543 

## Import Libraries

In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tqdm import tqdm
import json
import os
from kaggle_datasets import KaggleDatasets

## Utilities

In [None]:
ROWS_PER_FRAME = 543  # number of landmarks per frame

def load_relevant_data_subset_with_imputation(pq_path):
    data_columns = ['x', 'y', 'z']
    data = pd.read_parquet(pq_path, columns=data_columns)
    data.replace(np.nan, 0, inplace=True)
    n_frames = int(len(data) / ROWS_PER_FRAME)
    data = data.values.reshape(n_frames, ROWS_PER_FRAME, len(data_columns))
    return data.astype(np.float16)

def load_relevant_data_subset(pq_path):
    data_columns = ['x', 'y', 'z']
    data = pd.read_parquet(pq_path, columns=data_columns)
    n_frames = int(len(data) / ROWS_PER_FRAME)
    data = data.values.reshape(n_frames, ROWS_PER_FRAME, len(data_columns))
    return data.astype(np.float32)

def read_dict(file_path):
    path = os.path.expanduser(file_path)
    with open(path, "r") as f:
        dic = json.load(f)
    return dic

## Load data

In [None]:
train = pd.read_csv(f"{CFG.data_path}train.csv")
train.head()

There are 21 participants. Each of them created about 3000 to 5000 training records.

In [None]:
train.participant_id.nunique()

In [None]:
train.participant_id.value_counts().plot(kind="bar")

There are 94477 training samples in total.

In [None]:
len(train)

There are 250 kinds of sign languages that we need to make prediction on.

In [None]:
label_index = read_dict(f"{CFG.data_path}sign_to_prediction_index_map.json")
index_label = dict([(label_index[key], key) for key in label_index])
print(label_index)
train["label"] = train["sign"].map(lambda sign: label_index[sign])
train.head()

## Create Tensorflow Dataset

In [None]:
def decode_function(record_bytes):
    return tf.io.parse_single_example(
          # Data
          record_bytes,
          # Schema
          {
              "feature": tf.io.FixedLenFeature([CFG.sequence_length * CFG.rows_per_frame * 3], dtype=tf.float32),
              "label": tf.io.FixedLenFeature([], dtype=tf.int64)
          }
      )
def preprocess(item):
    features = item["feature"]
    features = tf.reshape(features, (CFG.sequence_length, 543, 3))
    return features, item["label"]         
def make_dataset(file_paths, batch_size=128, mode="train"):
    ds = tf.data.TFRecordDataset(file_paths)
    ds = ds.map(decode_function)
    ds = ds.map(preprocess)
    options = tf.data.Options()
    if mode == "train":
        ds = ds.shuffle(1024)
        options.experimental_deterministic = False
    ds = ds.batch(batch_size, drop_remainder=True)
    ds = ds.with_options(options) 
    ds = ds.cache().prefetch(tf.data.AUTOTUNE)
    return ds

In [None]:
unique_ids = np.array(sorted(train.participant_id.unique()))
train_ds = make_dataset([f"{CFG.tf_record_path}{identifier}.tfrecords" for identifier in unique_ids[:-4]])
valid_ds = make_dataset([f"{CFG.tf_record_path}{identifier}.tfrecords" for identifier in unique_ids[-4:]], mode="valid")

## Modeling

In [None]:
def conv1d_lstm_block(inputs, filters):
    vector = tf.keras.layers.ConvLSTM1D(filters=32, kernel_size=8)(inputs)
    for f in filters:
        vector = tf.keras.layers.Conv1D(filters=f, kernel_size=8)(vector)
        vector = tf.keras.layers.MaxPooling1D()(vector)
    vector = tf.keras.layers.Dropout(0.3)(vector)
    return vector

def get_model():
    inputs = tf.keras.Input((CFG.sequence_length, 543, 3), dtype=tf.float32)
    face_inputs = inputs[:, :, 0:468, :]
    left_hand_inputs = inputs[:, :, 468:489, :]
    pose_inputs = inputs[:, :, 489:522, :]
    right_hand_inputs = inputs[:, :,522:,:]
    face_vector = conv1d_lstm_block(face_inputs, [32, 64])
    left_hand_vector = conv1d_lstm_block(left_hand_inputs, [64])
    right_hand_vector = conv1d_lstm_block(right_hand_inputs, [64])
    pose_vector = conv1d_lstm_block(pose_inputs, [64])
    vector = tf.keras.layers.Concatenate(axis=1)([face_vector, left_hand_vector, right_hand_vector, pose_vector])
    vector = tf.keras.layers.Flatten()(vector)
    output = tf.keras.layers.Dense(250, activation="softmax")(vector)
    model = tf.keras.Model(inputs=inputs, outputs=output)
    model.compile(
        loss=tf.keras.losses.SparseCategoricalCrossentropy(), metrics=[
            "accuracy",
        ]
    )
    return model

In [None]:
model = get_model()
model.summary()
tf.keras.utils.plot_model(model, show_shapes=True)

In [None]:
file_name = "model.h5"
callbacks = [
    tf.keras.callbacks.ModelCheckpoint(
        file_name, 
        save_best_only=True, 
        restore_best_weights=True, 
        monitor="val_accuracy",
        mode="max"
    ),
    tf.keras.callbacks.EarlyStopping(
        patience=5, 
        monitor="val_accuracy",
        mode="max"
    )
]
model.fit(train_ds, epochs=30, validation_data=valid_ds, callbacks=callbacks)
model = tf.keras.models.load_model(file_name)

## Create Model for inference

In [None]:
def get_inference_model(model):
    inputs = tf.keras.Input((543, 3), dtype=tf.float32, name="inputs")
    vector = tf.image.resize(inputs, (CFG.sequence_length, 543))
    vector = tf.where(tf.math.is_nan(vector), tf.zeros_like(vector), vector)
    vector = tf.expand_dims(vector, axis=0)
    vector = model(vector)
    output = tf.keras.layers.Activation(activation="linear", name="outputs")(vector)
    inference_model = tf.keras.Model(inputs=inputs, outputs=output) 
    inference_model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(), metrics=["accuracy"])
    return inference_model

In [None]:
inference_model = get_inference_model(model)
inference_model.summary()
tf.keras.utils.plot_model(inference_model, show_shapes=True)

## Create submission file

In [None]:
converter = tf.lite.TFLiteConverter.from_keras_model(inference_model)
tflite_model = converter.convert()
model_path = "model.tflite"
# Save the model.
with open(model_path, 'wb') as f:
    f.write(tflite_model)
!zip submission.zip $model_path

## Making Predictions

In [None]:
!pip install tflite-runtime

In [None]:
import tflite_runtime.interpreter as tflite
interpreter = tflite.Interpreter(model_path)
found_signatures = list(interpreter.get_signature_list().keys())
prediction_fn = interpreter.get_signature_runner("serving_default")
for i in range(100):
    frames = load_relevant_data_subset(f'/kaggle/input/asl-signs/{train.iloc[i].path}')
    output = prediction_fn(inputs=frames)
    sign = np.argmax(output["outputs"])
    print(f"Predicted label: {index_label[sign]}, Actual Label: {train.iloc[i].sign}")