In [None]:
import os
import json

import numpy as np

from sklearn.preprocessing import LabelEncoder

import tensorflow as tf
from tensorflow.keras import layers, models

from tensorflow.keras.callbacks import Callback
from tensorflow.keras.optimizers import AdamW

# Landmark Data Merging
Previously, landmark data was stored as individual NumPy files (one per video). <br>
This section merges all individual files into a single NumPy array ("merged_data") to significantly improve loading speed for multiple experiments. <br>
This avoids repeated file I/O, resulting in faster iteration during development. <br>
If you don't need to save the merged data, you can use the "merged_data" variable directly.

In [None]:
def merge_npy_files(parent_folder, output_folder, output_file_name):
    """Merges landmark data from individual .npy files into a single .npy file.
    Args:
        parent_folder: Path to the parent directory containing label subfolders.
        output_folder: Path to the directory where the merged data will be saved.
        output_file_name: Name of the output .npy file.
    Note:
        If you don't need to save the merged data to disk, you can modify the function
        to directly return the 'merged_data' list instead of saving it.
    """
    # Initialize a list to store the merged data. Each element is a dictionary
    merged_data = []

    # Iterate through each label subfolder in the parent folder.
    for label in os.listdir(parent_folder):
        label_folder = os.path.join(parent_folder, label)

        # Check if the current item is a directory (a label folder).
        if os.path.isdir(label_folder):
            # Iterate through each file in the label folder.
            for file in os.listdir(label_folder):
                # Check if the file is a .npy file.
                if file.endswith('.npy'):
                    file_path = os.path.join(label_folder, file)
                    # Load the .npy data file.
                    data = np.load(file_path)
                    # Append a dictionary to the merged_data list.  This dictionary contains:
                    # - data: The loaded NumPy array.
                    # - label: The label of the data (the name of the subfolder).
                    # - file_name: The name of the original .npy file.
                    merged_data.append({'data': data, 'label': label, 'file_name': file})

    # Create the output directory if it doesn't exist.
    os.makedirs(output_folder, exist_ok=True)

    # Save the merged data to a new .npy file.
    np.save(os.path.join(output_folder, output_file_name), merged_data)

In [None]:
parent_folder = '/content/drive/MyDrive/Omdena/sign_language_recognition/train'
output_folder = '/content/drive/MyDrive/Omdena/sign_language_recognition'
output_file_name = 'train.npy'
merge_npy_files(parent_folder, output_folder, output_file_name)

In [None]:
parent_folder = '/content/drive/MyDrive/Omdena/sign_language_recognition/test'
output_folder = '/content/drive/MyDrive/Omdena/sign_language_recognition'
output_file_name = 'test.npy'
merge_npy_files(parent_folder, output_folder, output_file_name)

# Data Preparation

## Data Loading

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
train_data = np.load('/content/drive/MyDrive/Omdena/sign_language_recognition/train.npy', allow_pickle=True)
# Extract features (X) and labels (y)
X_train = [item['data'] for item in train_data]
y_train = [item['label'] for item in train_data]

# # Convert to NumPy arrays
X_train = np.array(X_train)
y_train = np.array(y_train)

test_data = np.load('/content/drive/MyDrive/Omdena/sign_language_recognition/test.npy', allow_pickle=True)
# Extract features (X) and labels (y)
X_test = [item['data'] for item in test_data]
y_test = [item['label'] for item in test_data]

# Convert to NumPy arrays
X_test = np.array(X_test)
y_test = np.array(y_test)

## Encoding
Encode labels to integer for model input. A reversed dictionary is also created for decoding during deployment.


In [None]:
label_encoder = LabelEncoder()

# Fit and transform the labels
y_train = label_encoder.fit_transform(y_train)
y_test = label_encoder.transform(y_test)

label_mapping = dict(zip(label_encoder.classes_, label_encoder.transform(label_encoder.classes_)))
print("Label Mapping:", label_mapping)

reverse_label_mapping = {int(v): str(k) for k, v in label_mapping.items()}
print("Reverse Label Mapping", reverse_label_mapping)

Label Mapping: {'adik': 0, 'anak': 1, 'besar': 2, 'buka': 3, 'buruk': 4, 'dengar': 5, 'gembira': 6, 'guru': 7, 'haus': 8, 'ibu': 9, 'jalan': 10, 'keluarga': 11, 'kertas': 12, 'kucing': 13, 'lapar': 14, 'lihat': 15, 'maaf': 16, 'main': 17, 'makan': 18, 'marah': 19, 'minum': 20, 'nama': 21, 'orang': 22, 'panggil': 23, 'rumah': 24, 'sedikit': 25, 'selamat': 26, 'senyum': 27, 'teman': 28, 'tidur': 29}
Reverse Label Mapping {0: 'adik', 1: 'anak', 2: 'besar', 3: 'buka', 4: 'buruk', 5: 'dengar', 6: 'gembira', 7: 'guru', 8: 'haus', 9: 'ibu', 10: 'jalan', 11: 'keluarga', 12: 'kertas', 13: 'kucing', 14: 'lapar', 15: 'lihat', 16: 'maaf', 17: 'main', 18: 'makan', 19: 'marah', 20: 'minum', 21: 'nama', 22: 'orang', 23: 'panggil', 24: 'rumah', 25: 'sedikit', 26: 'selamat', 27: 'senyum', 28: 'teman', 29: 'tidur'}


## Label Smoothing

### Label Smoothing

Label smoothing is a regularization technique used to prevent overfitting, particularly in classification tasks. Instead of using "hard" targets (e.g., [0, 0, 1, 0]) it uses "soft" targets where a small probability mass is distributed across all classes.

For example, with a smoothing factor of 0.1 and 4 classes, a true label of class 2 would be transformed from [0, 0, 1, 0] to [0.025, 0.025, 0.925, 0.025].

In [None]:
def smooth_labels(y_true, num_classes, smoothing=0.1):
    """Applies label smoothing to one-hot encoded labels.

    Args:
        y_true: True labels (integer, encoded labels).
        num_classes: Total number of classes.
        smoothing: Smoothing factor (0.0 for no smoothing, 1.0 for maximum smoothing).

    Returns:
        Smoothed labels (one-hot encoded).
    """
    # Convert to one-hot
    y_true_one_hot = tf.one_hot(y_true, depth=num_classes)
    # Apply smoothing
    smoothed_labels = y_true_one_hot * (1 - smoothing) + (smoothing / num_classes)
    return smoothed_labels

# Apply label smoothing to the training labels.
num_classes = 30 # Number of classes in dataset
label_smoothing = 0.1 # Smoothing parameter
y_train_smoothed = smooth_labels(y_train, num_classes, smoothing=label_smoothing)

# Convert y_test into one-hot format too, for consistency. We don't smooth the test labels.
y_test = tf.one_hot(y_test, depth=num_classes)

## Angle Feature Engineering

This section focuses on extracting relevant features from the landmark data in the form of angles between keypoints. These angles capture the relative positions and movements of body parts, particularly hands, wrists, and elbows. This information is crucial for the model to understand gestures and signs.

For example, the angle between the wrist, elbow, and shoulder can indicate whether the hand is open or closed. Similarly, the angles between different hand keypoints can reveal finger movements and their relative positions.

By including these angle features, the model learns to focus on the key aspects of hand and body posture that are essential for sign language recognition.

In [None]:
def calculate_angle(A, B, C):
  """Calculates the angle between three points in 3D space (in radians).

  Args:
      A: First point coordinates (3D numpy array).
      B: Second point coordinates (3D numpy array).
      C: Third point coordinates (3D numpy array).

  Returns:
      The angle between points A, B, and C (radians).
  """
    BA = A - B
    BC = C - B
    # Compute dot product and magnitudes
    dot_product = np.dot(BA, BC)
    magnitude_BA = np.linalg.norm(BA)
    magnitude_BC = np.linalg.norm(BC)

    # Prevent division by zero
    if magnitude_BA == 0 or magnitude_BC == 0:
        return 0.0

    # Calculate and clip cosine of the angle
    cos_angle = dot_product / (magnitude_BA * magnitude_BC)
    cos_angle = np.clip(cos_angle, -1.0, 1.0)

    # Return the angle in radians
    return np.arccos(cos_angle)

In [None]:
def get_angles(X):
  """Extracts angles between keypoints for each video and frame.

  Args:
      X: Landmark data (numpy array, shape: (n_videos, n_frames, n_keypoints * 3)).

  Returns:
      Angles for each video and frame (numpy array, shape: (n_videos, n_frames, n_angles)).
  """

    # Reshape the data into (n_videos, n_frames, n_keypoints, 3)
    n_videos, n_frames, n_features = X.shape
    n_keypoints = 75
    X_reshaped = X.reshape(n_videos, n_frames, n_keypoints, 3)

    # Define keypoints for angle calculation (indices start from 0)
    pose_angle_indices = [
        (12, 14, 16),
        (14, 16, 18),
        (18, 16, 22),
        (14, 12, 24),

        (11, 13, 15),
        (13, 15, 17),
        (17, 15, 21),
        (13, 11, 23),
    ]
    # For both left and right hands
    hand_angle_indices = [(4, 0, 8),
                          (8, 0, 16),
                          (0, 9, 12),
                          (0, 17,20),
                          ]

    # Calculate angles for each video and frame
    angles_list = []
    for video in X_reshaped:
        video_angles = []
        for frame in video:
            frame_angles = []
            # Pose angles
            for (i, j, k) in pose_angle_indices:
                frame_angles.append(calculate_angle(frame[i], frame[j], frame[k]))
            # Left hand angles
            for (i, j, k) in hand_angle_indices:
                frame_angles.append(calculate_angle(frame[33 + i], frame[33 + j], frame[33 + k]))
            # Right hand angles
            for (i, j, k) in hand_angle_indices:
                frame_angles.append(calculate_angle(frame[54 + i], frame[54 + j], frame[54 + k]))
            video_angles.append(frame_angles)
        angles_list.append(video_angles)

    # Convert angles list to a numpy array
    angles_array = np.array(angles_list)  # Shape: (n_videos, n_frames, n_angles)
    return angles_array

In [None]:
# Get joint angles
X_train_angles = get_angles(X_train)
X_test_angles = get_angles(X_test)

## Standardization

### Standardization

Standardization is applied to the landmark data to improve the performance of the transformer model. Transformers, like many neural networks, tend to perform better when input features have a mean of zero and a standard deviation of one. This process scales the data, preventing features with larger ranges from dominating the learning process.

It's important to note that the angle features are added *after* the standardization of the original landmark data.

In [None]:
# Create normalization layer. Standardization is performed per frame (axis=-1).
normalization_layer = layers.Normalization(axis=-1)

# Adapt the normalization layer to the training data to calculate mean and std
normalization_layer.adapt(X_train)

# Standardize the data
X_train_standardized = normalization_layer(X_train).numpy()
X_test_standardized = normalization_layer(X_test).numpy()

# Add angle features to the standardized landmark data.
X_train = np.concatenate([X_train_standardized, X_train_angles],axis=-1)
X_test = np.concatenate([X_test_standardized, X_test_angles],axis=-1)

# Convert to TensorFlow tensor
X_train = tf.convert_to_tensor(X_train, dtype=tf.float32)
X_test = tf.convert_to_tensor(X_test, dtype=tf.float32)
y_train = tf.convert_to_tensor(y_train_smoothed, dtype=tf.float32)
y_test = tf.convert_to_tensor(y_test, dtype=tf.float32)

# Modeling

This section defines and implements the transformer model used for sign language recognition. This specific architecture and its hyperparameters were selected based on previous experiments with various architectures and hyperparameter tuning.

In [None]:
def create_transformer_model(input_shape, num_classes):
    """Creates a transformer model for sign language recognition.

    Args:
        input_shape: Shape of the input data (sequence length, feature dimension).
        num_classes: Number of sign classes.

    Returns:
        A Keras Model instance.
    """
    inputs = layers.Input(shape=input_shape)

    # Add learnable positional information to the input sequence.
    positional_encoding = layers.Embedding(input_dim=input_shape[0], output_dim=input_shape[1])(tf.range(input_shape[0]))
    x = inputs + positional_encoding

    # Transformer Encoder
    for _ in range(4):  # 4 Transformer blocks
        # Layer Normalization (applied before attention, based on prior experimentation)
        x_norm = layers.LayerNormalization(epsilon=1e-6)(x)
        # Multi-Head Attention (8 heads, increased key dimension to 128 from base 64)
        attention_output = layers.MultiHeadAttention(num_heads=8, key_dim=128)(x_norm, x_norm)
        x = x + attention_output
        # Feed-Forward Network (added an extra dense layer with 512 units based on prior experimentation)
        ff_output = layers.Dense(512, activation='relu')(x)
        ff_output = layers.Dense(241, activation='relu')(x)
        x = layers.LayerNormalization(epsilon=1e-6)(x + ff_output)

    # Global Average Pooling
    x = layers.GlobalAveragePooling1D()(x)

    # Output Layer
    outputs = layers.Dense(num_classes, activation='softmax')(x)

    return models.Model(inputs, outputs)

### Training Configuration

This section details the configuration used for training the transformer model. The hyperparameters, optimizer, and callbacks were chosen based on previous experiments and exploration.

- **Optimizer**: AdamW optimizer is used with a learning rate of 0.001 and a weight decay of 1e-4. Weight decay helps prevent overfitting by penalizing large weights during training.

**Learning Rate Scheduler (ReduceLROnPlateau):**

This callback monitors the validation loss and reduces the learning rate by a factor of 0.5 if the validation loss remains stagnant for 3 epochs (patience).

**Early Stopping:**

Early stopping terminates training if the validation loss does not improve for 5 epochs (patience). This prevents overfitting by stopping training when the model starts to memorize the training data instead of generalizing to unseen data.

In [None]:
input_shape = (113, 241)  # (n_frames, n_keypoints * n_coordinates + n_angles)
num_classes = 30

transformer_model = create_transformer_model(input_shape, num_classes)

# Compile the model
transformer_model.compile(
    optimizer=AdamW(learning_rate=0.001, weight_decay=1e-4),
    loss=tf.keras.losses.CategoricalCrossentropy(from_logits=False),
    metrics=["accuracy"]
)

# Add the learning rate scheduler callback
lr_callback = tf.keras.callbacks.ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.5,
    patience=3,
    min_lr=1e-6
)
# Add early stopping callback
early_stopping = tf.keras.callbacks.EarlyStopping(
    monitor='val_loss',
    patience=5,
    restore_best_weights=True
)
# Combine Callbacks
callbacks = [lr_callback, early_stopping]

model = transformer_model.fit(
    X_train, y_train,
    validation_data=(X_test, y_test),
    epochs=50,
    batch_size=32,
    callbacks=[callbacks]
)

Epoch 1/50
[1m38/38[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m33s[0m 480ms/step - accuracy: 0.2674 - loss: 3.0585 - val_accuracy: 0.4667 - val_loss: 1.6266 - learning_rate: 0.0010
Epoch 2/50
[1m38/38[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 51ms/step - accuracy: 0.6727 - loss: 1.4705 - val_accuracy: 0.5833 - val_loss: 1.3509 - learning_rate: 0.0010
Epoch 3/50
[1m38/38[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 51ms/step - accuracy: 0.8427 - loss: 1.1524 - val_accuracy: 0.6833 - val_loss: 1.1439 - learning_rate: 0.0010
Epoch 4/50
[1m38/38[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 53ms/step - accuracy: 0.9605 - loss: 0.8573 - val_accuracy: 0.7167 - val_loss: 1.0176 - learning_rate: 0.0010
Epoch 5/50
[1m38/38[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 53ms/step - accuracy: 0.9856 - loss: 0.7640 - val_accuracy: 0.6667 - val_loss: 1.2299 - learning_rate: 0.0010
Epoch 6/50
[1m38/38[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0

In [None]:
# Get the epoch with the best performance
best_epoch = early_stopping.stopped_epoch - early_stopping.patience + 1

# Print the metrics of the best epoch
best_val_loss = model.history['val_loss'][best_epoch - 1]
best_val_accuracy = model.history['val_accuracy'][best_epoch - 1]  # Change to the metric you're tracking

print(f"Best Epoch: {best_epoch}")
print(f"Validation Loss: {best_val_loss}")
print(f"Validation Accuracy: {best_val_accuracy}")

Best Epoch: 12
Validation Loss: 0.8660455346107483
Validation Accuracy: 0.8333333134651184


# Deployment Preparation

## Exporting the model

In [None]:
# Save the trained model as TensorFlow SavedModel
save_path = '/content/drive/MyDrive/Omdena/sign_language_recognition/transformer_model_b.keras'
transformer_model.save(save_path)

In [None]:
# Sanity check, whether it truly saved the best weight
loaded_model = tf.keras.models.load_model(save_path)

# Evaluate on validation data
val_loss, val_accuracy = loaded_model.evaluate(X_test, y_test)

print(f"Validation Loss: {val_loss}")
print(f"Validation Accuracy: {val_accuracy}")

[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 2s/step - accuracy: 0.8472 - loss: 0.8356
Validation Loss: 0.8660455346107483
Validation Accuracy: 0.8333333134651184


## Exporting Training Statistics for Standardization

In [None]:
mean = normalization_layer.mean.numpy()
variance = normalization_layer.variance.numpy()

# Save mean and variance to a dictionary
normalization_stats = {
    "mean": mean.tolist(),
    "variance": variance.tolist()
}

# Specify the Google Drive path
save_path = "/content/drive/MyDrive/Omdena/sign_language_recognition/normalization_stats.json"

# Save the normalization_stats to Google Drive
with open(save_path, "w") as json_file:
    json.dump(normalization_stats, json_file)

In [None]:
# # Example inference
# load_path = '/content/drive/MyDrive/Omdena/sign_language_recognition/normalization_stats.json'

# # Load mean and variance from JSON
# with open(load_path, "r") as f:
#     normalization_stats = json.load(f)

# mean = tf.convert_to_tensor(normalization_stats["mean"])
# variance = tf.convert_to_tensor(normalization_stats["variance"])

# # Create normalization function
# def normalize_data(data, mean, variance):
#     return (data - mean) / tf.sqrt(variance)

## Exporting Decoder

In [None]:
# Ensure all keys/values are standard Python types
reverse_label_mapping = {int(v): str(k) for k, v in label_mapping.items()}

# Specify the Google Drive path
save_path = "/content/drive/MyDrive/Omdena/sign_language_recognition/reverse_label_mapping.json"

# Save the decoder to Google Drive
with open(save_path, "w") as json_file:
    json.dump(reverse_label_mapping, json_file)

In [None]:
# # Example inference
# load_path = "/content/drive/MyDrive/Omdena/sign_language_recognition/reverse_label_mapping.json"

# with open(load_path, "r") as json_file:
#     reverse_label_mapping = json.load(json_file)


# predicted_index = 2  # This is the model's output (e.g., from `argmax`)
# predicted_label = reverse_label_mapping[str(predicted_index)]  # Convert to string key for JSON compatibility

# print(f"Predicted Label: {predicted_label}")

Predicted Label: besar


In [None]:
!pip install -q dagshub

from dagshub.notebook import save_notebook

save_notebook(repo="Omdena/JakartaIndonesia_SignLanguageTranslation", path="modeling", branch="kenji_modeling", commit_message="Add Final Landmark Transformer Model notebook")

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m255.6/255.6 kB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.2/139.2 kB[0m [31m6.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.3/13.3 MB[0m [31m17.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m203.2/203.2 kB[0m [31m9.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m49.3/49.3 kB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m83.2/83.2 kB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m74.0/74.0 kB[0m [31m3.2 MB/s[0m eta [36m0:00:00[0m
[?25h