In [1]:
import os
import cv2
import numpy as np
import tensorflow as tf
from typing import Tuple

In [2]:
# Vocabulary setup
vocab = [x for x in "abcdefghijklmnopqrstuvwxyz'?!123456789 "]
char_to_num = tf.keras.layers.StringLookup(vocabulary=vocab, oov_token="")
num_to_char = tf.keras.layers.StringLookup(vocabulary=char_to_num.get_vocabulary(), oov_token="", invert=True)

In [3]:
# Numpy function to load video
def load_video_numpy(path: str) -> np.ndarray:
    cap = cv2.VideoCapture(path)
    frames = []
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        frame = frame[190:236, 80:220]  # Crop
        frame = frame / 255.0           # Normalize
        frames.append(frame)
    cap.release()
    frames = np.array(frames)
    if frames.shape[0] < 50:
        pad_width = 50 - frames.shape[0]
        frames = np.pad(frames, ((0, pad_width), (0, 0), (0, 0)), mode='constant')
    else:
        frames = frames[:50]
    return np.expand_dims(frames, axis=-1).astype(np.float32)

In [4]:
# Numpy function to load alignments
def load_alignments_numpy(path: str) -> np.ndarray:
    with open(path, 'r') as f:
        lines = f.readlines()
    tokens = []
    for line in lines:
        parts = line.strip().split()
        if len(parts) == 3 and parts[2] != 'sil':
            tokens.extend(list(parts[2]))
    return char_to_num(tf.strings.unicode_split(''.join(tokens), input_encoding='UTF-8')).numpy()

In [5]:
# Combined load function
def load_data_numpy(file_path_tensor: tf.Tensor) -> Tuple[tf.Tensor, tf.Tensor]:
    file_path = file_path_tensor.numpy().decode("utf-8")
    base_name = os.path.splitext(os.path.basename(file_path))[0]
    video_path = os.path.join('data', 's1', f'{base_name}.mpg')
    align_path = os.path.join('data', 'alignments', 's1', f'{base_name}.align')
    video = load_video_numpy(video_path)
    labels = load_alignments_numpy(align_path)
    return video, labels

In [6]:
# TensorFlow wrapper
def tf_data_loader(file_path):
    video, labels = tf.py_function(
        func=load_data_numpy,
        inp=[file_path],
        Tout=(tf.float32, tf.int64)
    )
    video.set_shape((50, 46, 140, 1))
    labels.set_shape([None])
    return video, labels

In [12]:
# Dataset
data = tf.data.Dataset.list_files('data/s1/*.mpg', shuffle=True)
data = data.map(tf_data_loader, num_parallel_calls=tf.data.AUTOTUNE)
batch_size = 12  # or 4/16 depending on your memory

data = data.padded_batch(batch_size, padded_shapes=([50, 46, 140, 1], [None]))

data = data.cache().prefetch(tf.data.AUTOTUNE)

train = data.take(200 // batch_size)
test = data.skip(200 // batch_size)


In [13]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv3D, MaxPool3D, TimeDistributed, Flatten, Bidirectional, LSTM, Dropout, Dense

def build_model():
    model = Sequential()
    model.add(Conv3D(128, kernel_size=(3, 3, 3), activation='relu', padding='same', input_shape=(75, 46, 140, 1)))
    model.add(MaxPool3D(pool_size=(1, 2, 2)))
    model.add(Conv3D(256, kernel_size=(3, 3, 3), activation='relu', padding='same'))
    model.add(MaxPool3D(pool_size=(1, 2, 2)))
    model.add(Conv3D(75, kernel_size=(3, 3, 3), activation='relu', padding='same'))
    model.add(MaxPool3D(pool_size=(1, 2, 2)))
    model.add(TimeDistributed(Flatten()))
    model.add(Bidirectional(LSTM(128, return_sequences=True)))
    model.add(Dropout(0.5))
    model.add(Bidirectional(LSTM(128, return_sequences=True)))
    model.add(Dropout(0.5))
    model.add(Dense(char_to_num.vocabulary_size() + 1, activation='softmax'))
    return model

model = build_model()


In [14]:
def CTCLoss(y_true, y_pred):
    batch_len = tf.cast(tf.shape(y_true)[0], dtype="int64")
    input_len = tf.cast(tf.shape(y_pred)[1], dtype="int64")
    label_len = tf.cast(tf.shape(y_true)[1], dtype="int64")
    input_len = input_len * tf.ones(shape=(batch_len, 1), dtype="int64")
    label_len = label_len * tf.ones(shape=(batch_len, 1), dtype="int64")
    return tf.keras.backend.ctc_batch_cost(y_true, y_pred, input_len, label_len)

model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4), loss=CTCLoss)


In [15]:
from tensorflow.keras.callbacks import ModelCheckpoint, LearningRateScheduler

os.makedirs("models", exist_ok=True)

def scheduler(epoch, lr):
    return lr if epoch < 30 else lr * tf.math.exp(-0.1)

checkpoint_callback = ModelCheckpoint(
    'models/checkpoint.weights.h5',
    save_weights_only=True,
    monitor='loss'
)

schedule_callback = LearningRateScheduler(scheduler)

model.fit(train, validation_data=test, epochs=10, callbacks=[checkpoint_callback, schedule_callback])


Epoch 1/10


KeyboardInterrupt: 

In [None]:

sample = next(iter(test))
X_sample, y_sample = sample
y_pred = model.predict(X_sample)

decoded, _ = tf.keras.backend.ctc_decode(
    y_pred,
    input_length=tf.fill([tf.shape(y_pred)[0]], tf.shape(y_pred)[1]),
    greedy=True
)

for i in range(len(decoded[0])):  # decoded[0] contains the actual predictions
    true_seq = tf.strings.reduce_join(num_to_char(tf.expand_dims(y_sample[i], axis=0)), axis=-1)
    pred_seq = tf.strings.reduce_join(num_to_char(tf.expand_dims(decoded[0][i], axis=0)), axis=-1)

    print("Original:", true_seq.numpy()[0].decode('utf-8'))
    print("Predicted:", pred_seq.numpy()[0].decode('utf-8'))
    print('-' * 100)


NameError: name 'data' is not defined