# Imports

In [15]:
import os
import random
import keras

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf

from keras.callbacks import EarlyStopping
from keras.models import load_model
from tensorflow.keras.preprocessing import image
from tensorflow.keras.utils import Sequence
from tensorflow.keras.utils import to_categorical
from PIL import Image
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split

# File paths handling

In [None]:
# Change if data location is different

base_path = "../../KArSL-100/"

In [None]:
def load_label_map(excel_path):
    df = pd.read_excel(excel_path)
    sign_ids = df['SignID'].astype(str).str.zfill(4)
    label_map = {sid: idx for idx, sid in enumerate(sign_ids)}
    signid_map = {idx: sid for idx, sid in enumerate(sign_ids)}
    return label_map, signid_map
label_map, signid_map = load_label_map(os.path.join(base_path, "KARSL-100_Labels.xlsx"))


In [None]:
def extract_sign_id(folder_name):
    # folder like "03_03_1234_(15_12_16_16_11_06)_c"
    parts = folder_name.split('_')
    return parts[2]  # the 4-digit sign ID

def load_sequence(sequence_path):
    frames = sorted([
        f for f in os.listdir(sequence_path) if f.endswith('.jpg')
    ])
    sequence = []
    for frame in frames:
        img_path = os.path.join(sequence_path, frame)
        img = image.load_img(img_path, target_size=(128, 128))
  # Resize if needed
        img_array = image.img_to_array(img)
        img_array = preprocess_input(img_array)
        sequence.append(img_array)
    return np.array(sequence)


In [None]:
class SignLanguageGenerator(Sequence):
    def __init__(self, samples, label_map, batch_size=6, max_frames=20, input_size=(128, 128)):
        self.samples = samples
        self.label_map = label_map
        self.batch_size = batch_size
        self.max_frames = max_frames
        self.input_size = input_size
        random.shuffle(self.samples)

    def _load_samples(self):
        samples = []
        for sign_id in os.listdir(self.data_dir):
            sign_path = os.path.join(self.data_dir, sign_id)
            if not os.path.isdir(sign_path):
                continue
            for repetition_folder in os.listdir(sign_path):
                if not os.path.isdir(os.path.join(sign_path, repetition_folder)):
                    continue
                samples.append((sign_id, os.path.join(sign_path, repetition_folder)))
        return samples

    def __len__(self):
        return int(np.ceil(len(self.samples) / self.batch_size))

    def __getitem__(self, idx):
        batch_samples = self.samples[idx * self.batch_size:(idx + 1) * self.batch_size]
        batch_frames = []
        batch_labels = []

        for sign_id, folder_path in batch_samples:
            frames = self._load_frames(folder_path)
            batch_frames.append(frames)
            batch_labels.append(self.label_map[sign_id])

        batch_frames = tf.keras.preprocessing.sequence.pad_sequences(
            batch_frames, padding='post', maxlen=self.max_frames, dtype='float32'
        )
        batch_labels = to_categorical(batch_labels, num_classes=len(self.label_map))
        return np.array(batch_frames), np.array(batch_labels)

    def _load_frames(self, folder_path):
        frame_files = sorted([
            f for f in os.listdir(folder_path) if f.endswith('.jpg') or f.endswith('.png')
        ])[:self.max_frames]

        frames = []
        for f in frame_files:
            img_path = os.path.join(folder_path, f)
            img = image.load_img(img_path, target_size=self.input_size)
            img = image.img_to_array(img) / 255.0
            frames.append(img)

        return np.array(frames)

In [None]:
# Helper function for getting file paths

def get_samples(path):
    all_samples = []
    for sign_id in os.listdir(path):  
        sign_path = os.path.join(path, sign_id)
        if not os.path.isdir(sign_path):
            continue
        for repetition_folder in os.listdir(sign_path):
            if not os.path.isdir(os.path.join(sign_path, repetition_folder)):
                continue
            all_samples.append((sign_id, os.path.join(sign_path, repetition_folder)))
    return all_samples


train_dir = os.path.join(base_path, 'train')
test_dir = os.path.join(base_path, 'test')
train_samples = get_samples(train_dir)
test_samples = get_samples(test_dir)

# Split into train/val
train_samples, valid_samples = train_test_split(train_samples, test_size=0.2, random_state=42)

In [None]:
# Data generators

train_gen = SignLanguageGenerator(train_samples, label_map, max_frames=30)
valid_gen = SignLanguageGenerator(valid_samples, label_map, max_frames=30)
test_gen = SignLanguageGenerator(test_samples, label_map,  max_frames=30)

In [None]:
_, (X, y) = next(enumerate(train_gen))
print(signid_map[np.argmax(y[1])])
plt.imshow(X[1][29])

In [None]:
X.shape

In [None]:
print(np.argmax(model.predict(X), axis=1))
print(np.argmax(y, axis=1))

# Model Initialization

In [None]:
from tensorflow.keras.applications import MobileNet
from tensorflow.keras.models import Model
from tensorflow.keras.layers import TimeDistributed, GRU, Dense, Input, GlobalAveragePooling2D
from tensorflow.keras.optimizers import Adam

# Change input shape when changing frame width and height
input_shape = (30, 128, 128, 3)
num_classes = 100

inputs = Input(shape=input_shape)

# Load MobileNet 
mobilenet_base = MobileNet(weights='imagenet', include_top=False, input_shape=(128, 128, 3))

# Freeze mobilenet layers (to fine-tune later if you want)
for layer in mobilenet_base.layers:
    layer.trainable = False

# Apply MobileNet to each frame individually using TimeDistributed
x = TimeDistributed(mobilenet_base)(inputs)
x = TimeDistributed(GlobalAveragePooling2D())(x)  # (batch, timesteps, features)

# RNN layer (you can choose LSTM, GRU, etc.)
x = GRU(128, return_sequences=False)(x)  # (batch, 256)

# Output layer
outputs = Dense(num_classes, activation='softmax')(x)

# Build the model
model = Model(inputs, outputs)

# Compile
model.compile(optimizer=Adam(learning_rate=1e-4),
              loss='categorical_crossentropy',
              metrics=['accuracy'])

# Summary
model.summary()


# Training Cell

In [None]:
es = EarlyStopping(
    monitor="val_loss",
    patience=5,
    restore_best_weights=True,
)

In [None]:
history = model.fit(train_gen, validation_data=valid_gen, epochs=10, verbose=1, callbacks=[es])
history

In [None]:
scores = model.evaluate(test_gen)
scores

In [None]:
model.save("100_labels_model_128x128_30_frames.keras")
loaded_model = load_model("100_labels_model_128x128_30_frames.keras")

In [None]:
_, (X, y) = next(enumerate(test_gen))
print(X.shape)

In [None]:
prediction = loaded_model.predict(X)
print(f"True: {np.argmax(y, axis=1)}\nPred: {np.argmax(prediction, axis=1)}")