# Numpy Data Prep
if you don't want to export the npy, you can use the "merged_data"

In [None]:
import os

def merge_npy_files(parent_folder, output_folder, output_file_name):
    # Initialize a list for merged data
    merged_data = []

    # Iterate through each label folder in the parent folder
    for label in os.listdir(parent_folder):
        label_folder = os.path.join(parent_folder, label)

        # Check if it's a folder
        if os.path.isdir(label_folder):
            # Iterate through each npy file in the label folder
            for file in os.listdir(label_folder):
                if file.endswith('.npy'):
                    file_path = os.path.join(label_folder, file)
                    data = np.load(file_path)  # Load the npy file
                    # Append a dictionary of data and label
                    merged_data.append({'data': data, 'label': label, 'file_name':file})

    # Save the merged data
    os.makedirs(output_folder, exist_ok=True)  # Create the folder if it doesn't exist

    np.save(os.path.join(output_folder, output_file_name), merged_data)

    print("Merged data saved successfully!")

In [None]:
parent_folder = '/content/drive/MyDrive/Omdena/sign_language_recognition/train'
output_folder = '/content/drive/MyDrive/Omdena/sign_language_recognition'
output_file_name = 'train.npy'
merge_npy_files(parent_folder, output_folder, output_file_name)

In [None]:
parent_folder = '/content/drive/MyDrive/Omdena/sign_language_recognition/test'
output_folder = '/content/drive/MyDrive/Omdena/sign_language_recognition'
output_file_name = 'test.npy'
merge_npy_files(parent_folder, output_folder, output_file_name)

# Start Here

## Data Preprocessing

In [None]:
import numpy as np

from sklearn.preprocessing import LabelEncoder

import tensorflow as tf
from tensorflow.keras import layers, models

from tensorflow.keras.callbacks import Callback
from tensorflow.keras.optimizers import AdamW

In [None]:
train_data = np.load('/content/drive/MyDrive/train.npy', allow_pickle=True)
X_train = [item['data'] for item in train_data]
y_train = [item['label'] for item in train_data]

# # Convert to NumPy arrays
X_train = np.array(X_train)
y_train = np.array(y_train)

test_data = np.load('/content/drive/MyDrive/test.npy', allow_pickle=True)
# Extract features (X) and labels (y)
X_test = [item['data'] for item in test_data]
y_test = [item['label'] for item in test_data]

# Convert to NumPy arrays
X_test = np.array(X_test)
y_test = np.array(y_test)

In [None]:
# Create a LabelEncoder instance
label_encoder = LabelEncoder()

# Fit and transform the labels
y_train = label_encoder.fit_transform(y_train)
y_test = label_encoder.transform(y_test)

In [None]:
def smooth_labels(y_true, num_classes, smoothing=0.1):
    # Convert to one-hot
    y_true_one_hot = tf.one_hot(y_true, depth=num_classes)
    # Apply smoothing
    smoothed_labels = y_true_one_hot * (1 - smoothing) + (smoothing / num_classes)
    return smoothed_labels

# Usage in loss function
num_classes = 30
label_smoothing = 0.1
y_train_smoothed = smooth_labels(y_train, num_classes, smoothing=label_smoothing)
y_test_smoothed = smooth_labels(y_test, num_classes, smoothing=label_smoothing)

In [None]:
def calculate_angle(A, B, C):
    BA = A - B
    BC = C - B
    # Compute dot product and magnitudes
    dot_product = np.dot(BA, BC)
    magnitude_BA = np.linalg.norm(BA)
    magnitude_BC = np.linalg.norm(BC)
    # Prevent division by zero
    if magnitude_BA == 0 or magnitude_BC == 0:
        return 0.0
    # Calculate the cosine of the angle
    cos_angle = dot_product / (magnitude_BA * magnitude_BC)
    # Clip values to handle numerical errors
    cos_angle = np.clip(cos_angle, -1.0, 1.0)
    # Return the angle in radians
    return np.arccos(cos_angle)

In [None]:
def get_angles(X):
    # Reshape the data into (n_videos, n_frames, n_keypoints, 3)
    n_videos, n_frames, n_features = X.shape
    n_keypoints = 75
    X_reshaped = X.reshape(n_videos, n_frames, n_keypoints, 3)

    # Define keypoints for angle calculation (indices start from 0)
    pose_angle_indices = [
        (12, 14, 16),
        (14, 16, 18),
        (18, 16, 22),
        (14, 12, 24),

        (11, 13, 15),
        (13, 15, 17),
        (17, 15, 21),
        (13, 11, 23),
    ]
    # For both left and right hands
    hand_angle_indices = [(4, 0, 8),
                          (8, 0, 16),
                          (0, 9, 12),
                          (0, 17,20),
                          ]

    # Calculate angles for each video and frame
    angles_list = []
    for video in X_reshaped:
        video_angles = []
        for frame in video:
            frame_angles = []
            # Pose angles
            for (i, j, k) in pose_angle_indices:
                frame_angles.append(calculate_angle(frame[i], frame[j], frame[k]))
            # Left hand angles
            for (i, j, k) in hand_angle_indices:
                frame_angles.append(calculate_angle(frame[33 + i], frame[33 + j], frame[33 + k]))
            # Right hand angles
            for (i, j, k) in hand_angle_indices:
                frame_angles.append(calculate_angle(frame[54 + i], frame[54 + j], frame[54 + k]))
            video_angles.append(frame_angles)
        angles_list.append(video_angles)

    # Convert angles list to a numpy array
    angles_array = np.array(angles_list)  # Shape: (n_videos, n_frames, n_angles)
    return angles_array

In [None]:
# Get joint angles
X_train_angles = get_angles(X_train)
X_test_angles = get_angles(X_test)

# Compute mean and standard deviation of the training data
mean = X_train.mean(axis=0)
std = X_train.std(axis=0)

# Flatten the data
X_train_flat = X_train.reshape(X_train.shape[0], -1)
X_test_flat = X_test.reshape(X_test.shape[0], -1)

# Create normalization layer
normalization_layer = layers.Normalization(axis=-1)

# Adapt the layer to the flattened training data
normalization_layer.adapt(X_train_flat)

# Standardize the flattened data
X_train_standardized_flat = normalization_layer(X_train_flat).numpy()
X_test_standardized_flat = normalization_layer(X_test_flat).numpy()

# Reshape back to original
X_train_standardized = X_train_standardized_flat.reshape(X_train.shape)
X_test_standardized = X_test_standardized_flat.reshape(X_test.shape)

# Add angles to standardized X
X_train = np.concatenate([X_train_standardized, X_train_angles],axis=-1)
X_test = np.concatenate([X_test_standardized, X_test_angles],axis=-1)

# Convert to TensorFlow format
X_train = tf.convert_to_tensor(X_train, dtype=tf.float32)
X_test = tf.convert_to_tensor(X_test, dtype=tf.float32)
y_train = tf.convert_to_tensor(y_train_smoothed, dtype=tf.float32)
y_test = tf.convert_to_tensor(y_test_smoothed, dtype=tf.float32)

## Modeling

In [None]:
def create_transformer_model(input_shape, num_classes):
    inputs = layers.Input(shape=input_shape)

    # Positional Encoding
    positional_encoding = layers.Embedding(input_dim=input_shape[0], output_dim=input_shape[1])(tf.range(input_shape[0]))
    x = inputs + positional_encoding

    # Transformer Encoder
    for _ in range(4):  # Number of Transformer blocks
        # Normalized before attention, instead of after
        x_norm = layers.LayerNormalization(epsilon=1e-6)(x)
        # Instead of 4 (base), 8 used. Increase key dimensions into 128 from 64
        attention_output = layers.MultiHeadAttention(num_heads=8, key_dim=128)(x_norm, x_norm)
        x = x + attention_output
        # Instead of a single layer with 241 units, 1 denser layer added
        ff_output = layers.Dense(512, activation='relu')(x)
        ff_output = layers.Dense(241, activation='relu')(x)
        x = layers.LayerNormalization(epsilon=1e-6)(x + ff_output)

    # Global Average Pooling
    x = layers.GlobalAveragePooling1D()(x)

    # Output Layer
    outputs = layers.Dense(num_classes, activation='softmax')(x)

    return models.Model(inputs, outputs)

In [None]:
input_shape = (113, 241)  # (n_frames, n_keypoints * n_coordinates + n_angles)
num_classes = 30

transformer_model = create_transformer_model(input_shape, num_classes)

# Compile the model
transformer_model.compile(
    optimizer=AdamW(learning_rate=0.001, weight_decay=1e-4),
    loss=tf.keras.losses.CategoricalCrossentropy(),
    metrics=["accuracy"]
)

# Add the learning rate scheduler callback
lr_callback = tf.keras.callbacks.ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.5,
    patience=3,
    min_lr=1e-6
)
# Add early stopping callback
early_stopping = tf.keras.callbacks.EarlyStopping(
    monitor='val_loss',
    patience=5,
    restore_best_weights=True
)
# Combine Callbacks
callbacks = [lr_callback, early_stopping]

model = transformer_model.fit(
    X_train, y_train,
    validation_data=(X_test, y_test),
    epochs=50,
    batch_size=32,
    callbacks=[callbacks]
)

Epoch 1/50
[1m38/38[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m28s[0m 330ms/step - accuracy: 0.2483 - loss: 3.0896 - val_accuracy: 0.3500 - val_loss: 2.3286 - learning_rate: 0.0010
Epoch 2/50
[1m38/38[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 50ms/step - accuracy: 0.6456 - loss: 1.6634 - val_accuracy: 0.4500 - val_loss: 2.2198 - learning_rate: 0.0010
Epoch 3/50
[1m38/38[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 51ms/step - accuracy: 0.7107 - loss: 1.4490 - val_accuracy: 0.6167 - val_loss: 1.7128 - learning_rate: 0.0010
Epoch 4/50
[1m38/38[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 52ms/step - accuracy: 0.8864 - loss: 1.0153 - val_accuracy: 0.7167 - val_loss: 1.5058 - learning_rate: 0.0010
Epoch 5/50
[1m38/38[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 49ms/step - accuracy: 0.9589 - loss: 0.8389 - val_accuracy: 0.7667 - val_loss: 1.3992 - learning_rate: 0.0010
Epoch 6/50
[1m38/38[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0

In [None]:
# Get the epoch with the best performance
best_epoch = early_stopping.stopped_epoch - early_stopping.patience + 1

# Print the metrics of the best epoch
best_val_loss = model.history['val_loss'][best_epoch - 1]
best_val_accuracy = model.history['val_accuracy'][best_epoch - 1]  # Change to the metric you're tracking

print(f"Best Epoch: {best_epoch}")
print(f"Validation Loss: {best_val_loss}")
print(f"Validation Accuracy: {best_val_accuracy}")

Best Epoch: 14
Validation Loss: 1.1771689653396606
Validation Accuracy: 0.8333333134651184


# Save the model

In [None]:
# Save the trained model as TensorFlow SavedModel
transformer_model.save('/content/drive/MyDrive/transformer_model_b.keras')

In [None]:
# Sanity check, whether it saved the best weight
loaded_model = tf.keras.models.load_model('/content/drive/MyDrive/transformer_model_b.keras')

# Evaluate on validation data
val_loss, val_accuracy = loaded_model.evaluate(X_test, y_test)

print(f"Validation Loss: {val_loss}")
print(f"Validation Accuracy: {val_accuracy}")

[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 2s/step - accuracy: 0.8368 - loss: 1.1773
Validation Loss: 1.1771689653396606
Validation Accuracy: 0.8333333134651184


In [None]:
!pip install -q dagshub

from dagshub.notebook import save_notebook

save_notebook(repo="Omdena/JakartaIndonesia_SignLanguageTranslation", path="modeling", branch="kenji", commit_message="Current Final Model")