In [1]:
import os, json, cv2, numpy as np, tensorflow as tf
from pathlib import Path
from tqdm import tqdm
DATA_ROOT = r"D:\Samvad_Setu_final\datasets\WLASL"
VIDEO_PATH = os.path.join(DATA_ROOT, "videos")
JSON_PATH = os.path.join(DATA_ROOT, "nslt_2000.json")


CACHE_ROOT = os.path.join(DATA_ROOT, "cache_final")
CACHE_TRAIN = os.path.join(CACHE_ROOT, "train")
CACHE_VAL = os.path.join(CACHE_ROOT, "val")
CACHE_TEST = os.path.join(CACHE_ROOT, "test")


os.makedirs(CACHE_TRAIN, exist_ok=True)
os.makedirs(CACHE_VAL, exist_ok=True)
os.makedirs(CACHE_TEST, exist_ok=True)

In [2]:
with open(JSON_PATH, "r") as f:
    meta = json.load(f)


print("Total JSON entries:", len(meta))

Total JSON entries: 21095


In [3]:
pairs = []


for vid_id, info in meta.items():
    vid = vid_id.zfill(5)
    mp4 = os.path.join(VIDEO_PATH, vid + ".mp4")


    if not os.path.exists(mp4):
        continue

    
    label = info["action"][0] # class id
    subset = info["subset"] # train / val / test
    
    
    pairs.append((vid, label, subset))


print("Matched videos:", len(pairs))

Matched videos: 9659


In [4]:
train_pairs = [(v,l) for v,l,s in pairs if s=="train"]
val_pairs = [(v,l) for v,l,s in pairs if s=="val"]
test_pairs = [(v,l) for v,l,s in pairs if s=="test"]


print("Train:", len(train_pairs))
print("Val :", len(val_pairs))
print("Test :", len(test_pairs))

Train: 6761
Val : 1784
Test : 1114


In [5]:
def load_frames(video_path, max_frames=24):
    cap = cv2.VideoCapture(video_path)
    frames = []

    
    total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    step = max(1, total // max_frames)
    
    
    idx = 0
    while len(frames) < max_frames:
        cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
        ret, frame = cap.read()
        if not ret:
            break
        frame = cv2.resize(frame, (160,160))
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        frames.append(frame)
        idx += step
    
    
    cap.release()
    return np.array(frames, dtype=np.uint8)

In [6]:
def cache_video(vid, label, subset):
    cache_dir = CACHE_TRAIN if subset=="train" else CACHE_VAL if subset=="val" else CACHE_TEST
    out = os.path.join(cache_dir, vid + ".npz")
    
    
    if os.path.exists(out):
        return
    
    
    frames = load_frames(os.path.join(VIDEO_PATH, vid + ".mp4"))
    if len(frames)==0:
        return
    
    
    np.savez_compressed(out, frames=frames, label=label)

In [None]:
for vid, label, subset in tqdm(pairs, desc="Caching videos"):
    cache_video(vid, label, subset)


Caching videos:  41%|█████████████████████████▋                                    | 4004/9659 [01:03<39:05,  2.41it/s]

In [None]:
BATCH_SIZE = 4


import numpy as np


def decode_npz(path):
    data = np.load(path.numpy().decode())
    return data['frames']/255.0, data['label']


def load_npz(path):
    frames, label = tf.py_function(decode_npz, [path], [tf.float32, tf.int64])
    frames.set_shape((None,160,160,3))
    label.set_shape(())
    return frames, label


def make_ds(paths, shuffle=False):
    ds = tf.data.Dataset.from_tensor_slices(paths)
        if shuffle:
    ds = ds.shuffle(min(2048, len(paths)))
    ds = ds.map(load_npz, num_parallel_calls=tf.data.AUTOTUNE)
    ds = ds.padded_batch(BATCH_SIZE, padded_shapes=([None,160,160,3], []))
    ds = ds.prefetch(tf.data.AUTOTUNE)
    return ds


train_ds = make_ds(train_cached, True)
val_ds = make_ds(val_cached)
test_ds = make_ds(test_cached)

In [None]:
from tensorflow.keras import layers, models


NUM_CLASSES = 2000


def build_model():
    inp = layers.Input((None,160,160,3))
    
    
    x = layers.TimeDistributed(layers.Conv2D(32,3,padding='same',activation='relu'))(inp)
    x = layers.TimeDistributed(layers.MaxPooling2D(2))(x)
    x = layers.TimeDistributed(layers.Conv2D(64,3,padding='same',activation='relu'))(x)
    x = layers.TimeDistributed(layers.MaxPooling2D(2))(x)
    x = layers.TimeDistributed(layers.Conv2D(128,3,padding='same',activation='relu'))(x)
    x = layers.TimeDistributed(layers.GlobalAveragePooling2D())(x)
    
    
    x = layers.GRU(128)(x)
    x = layers.Dense(256, activation='relu')(x)
    x = layers.Dropout(0.4)(x)
    out = layers.Dense(NUM_CLASSES, activation='softmax')(x)
    
    
    return models.Model(inp, out)


model = build_model()
model.compile(optimizer=tf.keras.optimizers.Adam(1e-4),
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy'])
model.summary()

In [None]:
history = model.fit(
train_ds,
validation_data=val_ds,
epochs=10
)

In [None]:
model.evaluate(test_ds)

In [None]:
model.save(os.path.join(DATA_ROOT, 'nslt2000_video_model.h5'))

In [None]:
mport collections


def predict_live(model):
cap = cv2.VideoCapture(0)
buffer = collections.deque(maxlen=24)


while True:
ret, frame = cap.read()
if not ret:
break


frame = cv2.resize(frame, (160,160))
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
buffer.append(rgb)


if len(buffer)==24:
inp = np.expand_dims(np.array(buffer)/255.0, axis=0)
pred = model.predict(inp, verbose=0)
cls = np.argmax(pred)
cv2.putText(frame, f"Pred: {cls}", (20,40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255,0),2)


cv2.imshow("Live Sign Recognition", frame)
if cv2.waitKey(1) & 0xFF == 27:
break


cap.release()
cv2.destroyAllWindows()