In [1]:
# Cell 1: Imports, Parameters & Directory Setup (Improved)

import os
import pandas as pd

# ——— PARAMETERS ———
DATA_ROOT     = r"D:\Projects\data"
METADATA_CSV  = os.path.join(DATA_ROOT, "metadata.csv")

# Your original mandatory list; we'll prune it to the top 30 that actually exist
CLASS_LIST = [
    "hello","goodbye","please","thank","sorry","go","again","because","but","blue",
    "family","come","want","like","need","help","eat","drink","see","look","know",
    "learn","read","write","talk","say","speak","understand","can","cannot","do",
    "does","did","will","shall","make","give","get","find","show","ask","tell",
    "work","walk","run","sit","stand","open","close","stop","start","begin","love",
    "good","child","enjoy","more","far","big","before","man","how","boy","angry",
    "black","fine","bad","late","bathroom"
]
NUM_CLASSES   = 30

# Directories for new pipeline outputs
AUG_DIR       = os.path.join(DATA_ROOT, "augment1")
LANDMARK_DIR  = os.path.join(DATA_ROOT, "landmark1")

# Create dirs if they don't exist (won't overwrite existing data)
os.makedirs(AUG_DIR,      exist_ok=True)
os.makedirs(LANDMARK_DIR, exist_ok=True)

# ——— LOAD & PRUNE METADATA ———
assert os.path.exists(METADATA_CSV), f"Missing metadata: {METADATA_CSV}"
df_full      = pd.read_csv(METADATA_CSV)
labels_exist = set(df_full['Label'].unique())

# Keep only the first 30 of your mandatory signs that actually exist in the data
TOP30        = [w for w in CLASS_LIST if w in labels_exist][:NUM_CLASSES]
print(f"Pruned to {len(TOP30)} labels:", TOP30)

# Filter and reset index
df           = df_full[df_full['Label'].isin(TOP30)].copy().reset_index(drop=True)

# Expand file paths to full absolute paths
df['Filepath'] = df['Filepath'].apply(lambda p: os.path.join(DATA_ROOT, p))

# Map each label to an integer index
label_to_idx   = {lab: i for i, lab in enumerate(TOP30)}
df['LabelIdx'] = df['Label'].map(label_to_idx)

# Save filtered metadata for downstream cells
filtered_csv = os.path.join(DATA_ROOT, 'filtered_top30.csv')
df.to_csv(filtered_csv, index=False)
print(f"Filtered {len(df)} samples across {len(TOP30)} classes → saved to {filtered_csv}")


Pruned to 30 labels: ['hello', 'goodbye', 'go', 'again', 'because', 'but', 'blue', 'family', 'come', 'like', 'help', 'eat', 'drink', 'know', 'learn', 'can', 'cannot', 'make', 'give', 'get', 'find', 'ask', 'close', 'love', 'good', 'child', 'enjoy', 'more', 'far', 'big']
Filtered 444 samples across 30 classes → saved to D:\Projects\data\filtered_top30.csv


In [2]:
# Cell 2: Video Augmentation for TOP-30 (New Pipeline)
import os
import time
import cv2
import numpy as np
import pandas as pd
from pathlib import Path

# ——— PARAMETERS from Cell 1 ———
# DATA_ROOT, TOP30, AUG_DIR were already defined in Cell 1
# filtered_csv was set to os.path.join(DATA_ROOT, 'filtered_top30.csv')
NUM_AUG = 3   # how many augmentations per original

# Load filtered CSV
df_aug = pd.read_csv(os.path.join(DATA_ROOT, 'filtered_top30.csv'))

# Simple augmentation: flip + small rotation
def simple_augment(frames, num_aug):
    h, w = frames[0].shape[:2]
    aug_sets = []
    for _ in range(num_aug):
        out = []
        for f in frames:
            # horizontal flip 50%
            if np.random.rand() < 0.5:
                f2 = cv2.flip(f, 1)
            else:
                f2 = f.copy()
            # random rotation ±10°
            ang = np.random.uniform(-10, 10)
            M   = cv2.getRotationMatrix2D((w/2, h/2), ang, 1)
            f2  = cv2.warpAffine(f2, M, (w, h))
            out.append(f2)
        aug_sets.append(out)
    return aug_sets

# Make sure AUG_DIR exists
os.makedirs(AUG_DIR, exist_ok=True)

t0 = time.time()
count = 0

for _, row in df_aug.iterrows():
    src = row['Filepath']
    lbl = row['Label']
    if not os.path.exists(src):
        print(f"Missing source video: {src}")
        continue

    # Read all frames into a list
    cap    = cv2.VideoCapture(src)
    frames = []
    while True:
        ret, fr = cap.read()
        if not ret: break
        frames.append(fr)
    cap.release()
    if not frames:
        continue

    # Prepare per-label folder under AUG_DIR
    out_dir = os.path.join(AUG_DIR, lbl)
    os.makedirs(out_dir, exist_ok=True)
    base = Path(src).stem

    # Write the original (renamed) into augment1/Label/
    fourcc    = cv2.VideoWriter_fourcc(*'mp4v')
    fps       = 30  # or cap.get(cv2.CAP_PROP_FPS)
    h, w      = frames[0].shape[:2]
    orig_out  = os.path.join(out_dir, f"{base}_orig.mp4")
    writer = cv2.VideoWriter(orig_out, fourcc, fps, (w, h))
    for f in frames:
        writer.write(f)
    writer.release()

    # Create & save augmented clips
    aug_lists = simple_augment(frames, NUM_AUG)
    for i, aug in enumerate(aug_lists):
        aug_path = os.path.join(out_dir, f"{base}_aug{i}.mp4")
        writer   = cv2.VideoWriter(aug_path, fourcc, fps, (w, h))
        for fr in aug:
            writer.write(fr)
        writer.release()

    count += 1
    print(f"[{count:03d}] Augmented '{base}' → {out_dir}")

print(f"Done: {count}/{len(df_aug)} videos in {time.time()-t0:.2f}s")


[001] Augmented '02485' → D:\Projects\data\augment1\again
[002] Augmented '02487' → D:\Projects\data\augment1\again
[003] Augmented '02490' → D:\Projects\data\augment1\again
[004] Augmented '02518' → D:\Projects\data\augment1\again
[005] Augmented '02519' → D:\Projects\data\augment1\again
[006] Augmented '02521' → D:\Projects\data\augment1\again
[007] Augmented '02525' → D:\Projects\data\augment1\again
[008] Augmented '02540' → D:\Projects\data\augment1\again
[009] Augmented '02541' → D:\Projects\data\augment1\again
[010] Augmented '02542' → D:\Projects\data\augment1\again
[011] Augmented '02544' → D:\Projects\data\augment1\again
[012] Augmented '02545' → D:\Projects\data\augment1\again
[013] Augmented '02548' → D:\Projects\data\augment1\again
[014] Augmented '06552' → D:\Projects\data\augment1\ask
[015] Augmented '06553' → D:\Projects\data\augment1\ask
[016] Augmented '06554' → D:\Projects\data\augment1\ask
[017] Augmented '06555' → D:\Projects\data\augment1\ask
[018] Augmented '06558

In [3]:
# Cell 3: Extract & Save Hand+Pose Landmarks for All Augmented Videos
import os
import glob
import time
import cv2
import numpy as np
import pandas as pd
import mediapipe as mp

# ——— CONSTANTS for Frame & Feature Dimensions ———
NUM_FRAMES        = 16                             # fixed timesteps per clip
POSE_IDX          = [11, 12, 13, 14]               # shoulders & elbows
HAND_LM_COUNT     = 21                             # MediaPipe hand keypoints
H_FEATS_PER_HAND  = HAND_LM_COUNT * 3              # x,y,z per hand point
P_FEATS           = len(POSE_IDX) * 3              # x,y,z per pose point
FEATURE_PER_FRAME = H_FEATS_PER_HAND * 2 + P_FEATS  # features in one frame
TOTAL_FEATURES    = NUM_FRAMES * FEATURE_PER_FRAME  # total input dims

print(f"Sampling {NUM_FRAMES} frames → {FEATURE_PER_FRAME} features/frame → {TOTAL_FEATURES} total")

# ——— PARAMS & PATHS from Cell 1 & 2 ———
filtered_csv = os.path.join(DATA_ROOT, 'filtered_top30.csv')
df           = pd.read_csv(filtered_csv)
SEQ_LEN      = NUM_FRAMES
AUG_DIR      = os.path.join(DATA_ROOT, 'augment1')
LANDMARK_DIR = os.path.join(DATA_ROOT, 'landmark1')
TOP30        = df['Label'].unique().tolist()

# ——— MediaPipe Setup ———
mp_hands = mp.solutions.hands.Hands(
    static_image_mode=False,
    max_num_hands=2,
    min_detection_confidence=0.5,
    min_tracking_confidence=0.5
)
mp_pose  = mp.solutions.pose.Pose(
    static_image_mode=False,
    min_detection_confidence=0.5,
    min_tracking_confidence=0.5
)

def normalize_hand_landmarks(arr21x3):
    """Subtract wrist, scale so max distance=1, then flatten."""
    wrist = arr21x3[0]
    rel   = arr21x3 - wrist
    norm  = np.linalg.norm(rel, axis=1).max() or 1.0
    return (rel / norm).reshape(-1)

def extract_landmarks(video_fp, out_root, label):
    cap   = cv2.VideoCapture(video_fp)
    total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    if total <= 0:
        cap.release()
        return False

    # uniformly sample NUM_FRAMES indices
    idxs = np.linspace(0, total-1, SEQ_LEN, dtype=int)
    seq  = []

    for i in idxs:
        cap.set(cv2.CAP_PROP_POS_FRAMES, int(i))
        ret, frame = cap.read()
        if not ret:
            continue

        img       = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        feats_row = []

        # — Hands —
        res_h = mp_hands.process(img)
        hand_feats = []
        if res_h.multi_hand_landmarks:
            hands_handed = list(zip(res_h.multi_hand_landmarks, res_h.multi_handedness))
            hands_handed.sort(key=lambda x: x[1].classification[0].label)
            for lm, _ in hands_handed[:2]:
                arr = np.array([[p.x, p.y, p.z] for p in lm.landmark], dtype=np.float32)
                hand_feats.extend(normalize_hand_landmarks(arr))
        # pad to always have two hands
        while len(hand_feats) < H_FEATS_PER_HAND * 2:
            hand_feats.extend([0.0] * H_FEATS_PER_HAND)
        feats_row.extend(hand_feats)

        # — Pose —
        res_p = mp_pose.process(img)
        pose_feats = []
        if res_p.pose_landmarks:
            for pi in POSE_IDX:
                p = res_p.pose_landmarks.landmark[pi]
                pose_feats.extend([p.x, p.y, p.z])
        # pad pose features
        while len(pose_feats) < P_FEATS:
            pose_feats.extend([0.0, 0.0, 0.0])
        feats_row.extend(pose_feats)

        seq.append(feats_row)

    cap.release()

    # pad sequence if too short
    if len(seq) < SEQ_LEN:
        pad = seq[-1] if seq else [0.0] * FEATURE_PER_FRAME
        seq.extend([pad] * (SEQ_LEN - len(seq)))

    arr = np.array(seq, dtype=np.float32)  # shape: (NUM_FRAMES, FEATURE_PER_FRAME)

    # save under landmark1/<label>/
    out_dir = os.path.join(LANDMARK_DIR, label)
    os.makedirs(out_dir, exist_ok=True)
    fname = os.path.splitext(os.path.basename(video_fp))[0] + ".npy"
    np.save(os.path.join(out_dir, fname), arr)
    return True

# ——— Run extraction over all augmented videos ———
t0, cnt = time.time(), 0
for label in TOP30:
    for vid_fp in glob.glob(os.path.join(AUG_DIR, label, "*.mp4")):
        if extract_landmarks(vid_fp, LANDMARK_DIR, label):
            cnt += 1

mp_hands.close()
mp_pose.close()
print(f"✅ Extracted {cnt} sequences in {time.time()-t0:.2f}s → {LANDMARK_DIR}")


Sampling 16 frames → 138 features/frame → 2208 total
✅ Extracted 1704 sequences in 2043.95s → D:\Projects\data\landmark1


In [7]:
# ─── Cell 4: Feature Extraction → X (N,16,69), y (N,) ─────────────────

import os
import glob
import numpy as np

# ——— PARAMETERS ———
DATA_ROOT    = r"D:\Projects\data"               # from Cell 1
LANDMARK_DIR = os.path.join(DATA_ROOT, 'landmark1')
NUM_FRAMES   = 16

# finger chains for joint‐angle calculations
FINGERS = [
    [1,2,3,4],    # thumb
    [5,6,7,8],    # index
    [9,10,11,12], # middle
    [13,14,15,16],# ring
    [17,18,19,20] # pinky
]

# ── Helpers ──
def preprocess_sequence(lm_seq):
    # Drop all-zero frames, then downsample or pad to NUM_FRAMES
    mask = ~np.all(np.isclose(lm_seq, 0, atol=1e-6), axis=1)
    seq  = lm_seq[mask]
    if len(seq) > NUM_FRAMES:
        idxs = np.linspace(0, len(seq)-1, NUM_FRAMES, dtype=int)
        seq  = seq[idxs]
    if len(seq) < NUM_FRAMES:
        pad = np.zeros((NUM_FRAMES - len(seq), lm_seq.shape[1]), dtype=np.float32)
        seq = np.vstack([pad, seq])
    return seq  # (NUM_FRAMES, 138)

def compute_frame_features(frame):
    # frame: (138,) = 2 hands×21×3 + 4 pose pts×3
    hands = frame[:126].reshape(2,21,3)
    pose  = frame[126:].reshape(4,3)
    l_sh, r_sh, l_el, r_el = pose
    angles = []
    # 1) hand joint‐angles
    for hand in hands:
        pw     = np.linalg.norm(hand[5] - hand[17]) or 1.0
        coords = (hand - hand[0]) / pw
        for chain in FINGERS:
            for i,j,k in zip(chain, chain[1:], chain[2:]):
                v1   = coords[i] - coords[j]
                v2   = coords[k] - coords[j]
                cosθ = np.dot(v1,v2)/(np.linalg.norm(v1)*np.linalg.norm(v2)+1e-6)
                angles.append(np.arccos(np.clip(cosθ,-1,1)))
    # 2) elbow angles
    def angle(a,b,c):
        v1   = a - b; v2 = c - b
        cosθ = np.dot(v1,v2)/(np.linalg.norm(v1)*np.linalg.norm(v2)+1e-6)
        return np.arccos(np.clip(cosθ,-1,1))
    lw, rw = hands[0,0], hands[1,0]
    e_left  = angle(l_sh, l_el, lw)
    e_right = angle(r_sh, r_el, rw)
    # 3) inter-wrist distance
    dist_w  = np.linalg.norm(lw - rw)

    return np.array(angles + [e_left, e_right, dist_w], dtype=np.float32)  # (23,)

def interpolate_missing(seq):
    # fill tiny zero‐gaps per dimension
    for d in range(seq.shape[1]):
        col   = seq[:,d]
        miss  = np.isclose(col, 0, atol=1e-6)
        valid = ~miss
        if miss.any() and valid.any():
            seq[miss,d] = np.interp(np.where(miss)[0], np.where(valid)[0], col[valid])
    return seq

def compute_dynamic_features(lm_seq):
    # preprocess → static 23 dims/frame + vel + acc → 69 dims/frame
    seq    = preprocess_sequence(lm_seq)                             # (16,138)
    static = np.stack([compute_frame_features(f) for f in seq], axis=0)  # (16,23)
    static = interpolate_missing(static)
    vel    = np.diff(static, axis=0, prepend=static[0:1])            # (16,23)
    acc    = np.diff(vel,    axis=0, prepend=vel[0:1])               # (16,23)
    return np.concatenate([static, vel, acc], axis=1)                # (16,69)

# ─── Build X, y ──────────────────────────────────────────────────────────
classes = sorted(d for d in os.listdir(LANDMARK_DIR)
                 if os.path.isdir(os.path.join(LANDMARK_DIR, d)))
label_map = {lab:i for i, lab in enumerate(classes)}

X, y = [], []
for lab in classes:
    for fp in glob.glob(os.path.join(LANDMARK_DIR, lab, '*.npy')):
        seq   = np.load(fp)                    # (T,138)
        feats = compute_dynamic_features(seq)  # (16,69)
        X.append(feats)                        # keep 2D
        y.append(label_map[lab])

X = np.stack(X, axis=0)                       # → (N,16,69)
y = np.array(y, dtype='int32')                # → (N,)

print(f"✅ Built dataset → X.shape = {X.shape}, y.shape = {y.shape}")


✅ Built dataset → X.shape = (2981, 16, 69), y.shape = (2981,)


In [8]:
# ─── Cell 5: TCN → Bi-LSTM → Attention Model Training ─────────────────

import os
import numpy as np
import tensorflow as tf
from tensorflow.keras import Model, Input
from tensorflow.keras.layers import (
    Conv1D, BatchNormalization, Dropout, Masking, Add,
    Bidirectional, LSTM, Dense, LayerNormalization, Layer
)
from tensorflow.keras.regularizers import l2
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight

# Paths
MODEL_DIR        = "data/models"
KERAS_MODEL_PATH = os.path.join(MODEL_DIR, "asl_tcn_lstm.keras")
TFLITE_PATH      = os.path.join(MODEL_DIR, "asl_tcn_lstm.tflite")

# ─── Split ─────────────────────────────────────────────────────────────
X_train, X_val, y_train, y_val = train_test_split(
    X, y, test_size=0.2, stratify=y, random_state=42
)

# ─── Attention Layer ───────────────────────────────────────────────────
class SimpleAttention(Layer):
    def build(self, input_shape):
        T, D = input_shape[1], input_shape[2]
        self.W = self.add_weight((D,1), initializer="glorot_uniform")
        self.b = self.add_weight((T,1), initializer="zeros")
        super().build(input_shape)
    def call(self, inputs):
        e     = tf.matmul(inputs, self.W) + self.b
        alpha = tf.nn.softmax(tf.nn.tanh(e), axis=1)
        return tf.reduce_sum(inputs * alpha, axis=1)

# ─── Model ─────────────────────────────────────────────────────────────
NUM_FRAMES, FEATURE_DIM = X_train.shape[1], X_train.shape[2]
NUM_CLASSES = len(np.unique(y))

inp = Input(shape=(NUM_FRAMES, FEATURE_DIM), name="input_seq")

# TCN stem
x = Masking(mask_value=0.0)(inp)
x = Conv1D(64, 3, padding="causal", activation="relu",
           kernel_regularizer=l2(1e-4))(x)
x = BatchNormalization()(x)
x = Dropout(0.3)(x)
for dil in [2,4]:
    res = x
    x   = Conv1D(64, 3, padding="causal", dilation_rate=dil,
                 activation="relu", kernel_regularizer=l2(1e-4))(x)
    x   = BatchNormalization()(x)
    x   = Dropout(0.3)(x)
    x   = Add()([res, x])

# Bi-LSTM stack
x = Bidirectional(LSTM(128, return_sequences=True,
                       kernel_regularizer=l2(1e-4),
                       dropout=0.3, recurrent_dropout=0.3))(x)
x = Dropout(0.3)(x)
x = Bidirectional(LSTM(64, return_sequences=True,
                       kernel_regularizer=l2(1e-4),
                       dropout=0.3, recurrent_dropout=0.3))(x)
x = Dropout(0.3)(x)

# Attention + head
x = SimpleAttention()(x)
x = Dense(64, activation="relu", kernel_regularizer=l2(1e-4))(x)
x = LayerNormalization()(x)
x = Dropout(0.4)(x)
out = Dense(NUM_CLASSES, activation="softmax")(x)

model = Model(inp, out, name="ASL_TCN_LSTM")
model.compile(
    optimizer=tf.keras.optimizers.Adam(1e-3),
    loss="sparse_categorical_crossentropy",
    metrics=["accuracy"]
)
model.summary()

# ─── Class Weights & Callbacks ─────────────────────────────────────────
cw = compute_class_weight("balanced", classes=np.unique(y_train), y=y_train)
class_weights = dict(enumerate(cw))
es = EarlyStopping("val_loss", patience=15, restore_best_weights=True)
rlr= ReduceLROnPlateau("val_loss", factor=0.5, patience=5)

# ─── Train ──────────────────────────────────────────────────────────────
history = model.fit(
    X_train, y_train,
    validation_data=(X_val, y_val),
    epochs=100,
    batch_size=64,
    callbacks=[es, rlr],
    class_weight=class_weights,
    verbose=2
)

# ─── Evaluate & Save ───────────────────────────────────────────────────
loss, acc = model.evaluate(X_val, y_val, verbose=0)
print(f"🔹 Val accuracy: {acc*100:.2f}%")

os.makedirs(MODEL_DIR, exist_ok=True)
model.save(KERAS_MODEL_PATH)
print("Saved Keras model →", KERAS_MODEL_PATH)

converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.target_spec.supported_ops = [
    tf.lite.OpsSet.TFLITE_BUILTINS, tf.lite.OpsSet.SELECT_TF_OPS
]
tflite_model = converter.convert()
open(TFLITE_PATH, "wb").write(tflite_model)
print("Saved TFLite model →", TFLITE_PATH)







Epoch 1/100




38/38 - 16s - 417ms/step - accuracy: 0.0398 - loss: 4.0334 - val_accuracy: 0.0251 - val_loss: 3.8826 - learning_rate: 1.0000e-03
Epoch 2/100
38/38 - 2s - 62ms/step - accuracy: 0.0587 - loss: 3.6285 - val_accuracy: 0.0251 - val_loss: 3.7395 - learning_rate: 1.0000e-03
Epoch 3/100
38/38 - 2s - 63ms/step - accuracy: 0.0721 - loss: 3.4579 - val_accuracy: 0.0335 - val_loss: 3.6505 - learning_rate: 1.0000e-03
Epoch 4/100
38/38 - 2s - 66ms/step - accuracy: 0.0826 - loss: 3.3292 - val_accuracy: 0.0452 - val_loss: 3.4442 - learning_rate: 1.0000e-03
Epoch 5/100
38/38 - 2s - 61ms/step - accuracy: 0.0826 - loss: 3.2861 - val_accuracy: 0.0938 - val_loss: 3.1313 - learning_rate: 1.0000e-03
Epoch 6/100
38/38 - 2s - 62ms/step - accuracy: 0.1204 - loss: 3.1531 - val_accuracy: 0.1441 - val_loss: 3.0211 - learning_rate: 1.0000e-03
Epoch 7/100
38/38 - 2s - 60ms/step - accuracy: 0.1242 - loss: 3.0768 - val_accuracy: 0.1943 - val_loss: 2.9177 - learning_rate: 1.0000e-03
Epoch 8/100
38/38 - 2s - 62ms/step - 

INFO:tensorflow:Assets written to: C:\Users\lenovo\AppData\Local\Temp\tmpz_q137nu\assets


Saved artifact at 'C:\Users\lenovo\AppData\Local\Temp\tmpz_q137nu'. The following endpoints are available:

* Endpoint 'serve'
  args_0 (POSITIONAL_ONLY): TensorSpec(shape=(None, 16, 69), dtype=tf.float32, name='input_seq')
Output Type:
  TensorSpec(shape=(None, 29), dtype=tf.float32, name=None)
Captures:
  1448130734736: TensorSpec(shape=(), dtype=tf.resource, name=None)
  1448130733856: TensorSpec(shape=(), dtype=tf.resource, name=None)
  1448131146848: TensorSpec(shape=(), dtype=tf.resource, name=None)
  1448131147200: TensorSpec(shape=(), dtype=tf.resource, name=None)
  1448131146320: TensorSpec(shape=(), dtype=tf.resource, name=None)
  1448131146496: TensorSpec(shape=(), dtype=tf.resource, name=None)
  1448131179792: TensorSpec(shape=(), dtype=tf.resource, name=None)
  1448131179616: TensorSpec(shape=(), dtype=tf.resource, name=None)
  1448131199392: TensorSpec(shape=(), dtype=tf.resource, name=None)
  1448131199568: TensorSpec(shape=(), dtype=tf.resource, name=None)
  14481311815