## Install Required Packages

In [None]:
!pip install opencv-python matplotlib imageio gdown tensorflow

## Import Necessary Libraries

In [None]:
import os
import cv2
import tensorflow as tf
import numpy as np
from typing import List
from matplotlib import pyplot as plt
import imageio

## Download and Unzip Dataset

In [None]:
!gdown --id 1YlvpDLix3S-U8fd-gqRwPcWXAXm8JwjL -O GRID_dataset.zip
!unzip -q GRID_dataset.zip

## Define Vocabulary and Character Mapping Layer

In [None]:
# Define the list of all valid characters in the dataset
vocab = [x for x in "abcdefghijklmnopqrstuvwxyz'?!123456789 "]

# Map characters to numbers and vice versa
char_to_num = tf.keras.layers.StringLookup(
    vocabulary=vocab,
    oov_token=""
)
num_to_char = tf.keras.layers.StringLookup(
    vocabulary=char_to_num.get_vocabulary(),
    oov_token="",
    invert=True
)

# Show vocabulary info
print(
    f"The vocabulary is: {char_to_num.get_vocabulary()}\n"
    f"(Size = {char_to_num.vocabulary_size()})"
)

## Define Video Loading Function

In [None]:
def load_video(path: str) -> tf.Tensor:

    # Open video file
    cap = cv2.VideoCapture(path)
    frames = []

    # Loop through all video frames
    for _ in range(int(cap.get(cv2.CAP_PROP_FRAME_COUNT))):
        ret, frame = cap.read()
        if not ret or frame is None:
            print(f"Skipping invalid frame in {path}")
            continue

        try:

            # Ensure frame is valid RGB
            if frame.ndim != 3 or frame.shape[2] != 3:
                raise ValueError(f"Expected RGB frame but got shape {frame.shape}")
            frame = tf.convert_to_tensor(frame, dtype=tf.uint8)

            # Check for minimum dimensions before cropping
            if frame.shape[0] < 236 or frame.shape[1] < 220:
                raise ValueError(f"Invalid frame shape: {frame.shape}")

            # Convert to grayscale and crop region of interest (RoI) (46x140)
            frame = tf.image.rgb_to_grayscale(frame)
            frame = frame[190:236, 80:220, :]

            # Check final frame shape
            if frame.shape != (46, 140, 1):
                raise ValueError(f"Frame shape mismatch after crop: {frame.shape}")

            frames.append(frame)

        except Exception as e:
            print(f"Frame error in {path}: {e}")
            continue

    cap.release()

    # Return zero tensor if no valid frames found
    if not frames:
        print(f"Warning: No frames in {path}")
        return tf.zeros((75, 46, 140, 1), dtype=tf.float32)

    # Pad or trim to exactly 75 frames
    if len(frames) < 75:
        frames += [frames[-1]] * (75 - len(frames))
    else:
        frames = frames[:75]

    # Normalize video frames
    frames = tf.stack(frames)
    frames = tf.cast(frames, tf.float32)
    mean = tf.reduce_mean(frames)
    std = tf.math.reduce_std(frames)
    return (frames - mean) / (std + 1e-6)

## Define Alignment Loading Function

In [None]:
def load_alignments(path: str) -> tf.Tensor:

    # Read alignment file
    with open(path, 'r') as f:
        lines = f.readlines()

    # Extract non-silence tokens
    tokens = ""
    for line in lines:
        parts = line.strip().split()
        if len(parts) >= 3 and parts[2] != "sil":
            tokens += " " + parts[2]
    tokens = tokens.strip()

    # Return empty tensor if no tokens
    if not tokens:
        return tf.constant([], dtype=tf.int64)

    # Convert characters to numeric sequence
    chars = tf.reshape(tf.strings.unicode_split(tokens, input_encoding="UTF-8"), (-1,))
    return char_to_num(chars)

## Wrap Data Loading for TensorFlow

In [None]:
def load_data(path: tf.Tensor):

    # Decode file path from tensor to string
    path = path.numpy().decode('utf-8')
    file_name = os.path.splitext(os.path.basename(path))[0]

    # Construct video and alignment paths
    video_path = os.path.join('data', 's1', f'{file_name}.mpg')
    alignment_path = os.path.join('data', 'alignments', 's1', f'{file_name}.align')

    # Load video frames and label sequence
    frames = load_video(video_path)
    alignments = load_alignments(alignment_path)
    return frames, alignments

# TensorFlow wrapper to call Python function inside data pipeline
def mappable_function(path):
    return tf.py_function(load_data, [path], (tf.float32, tf.int64))

## Build the Data Pipeline

In [None]:
# Create dataset of video files and shuffle
data = tf.data.Dataset.list_files('./data/s1/*.mpg')
data = data.shuffle(500, reshuffle_each_iteration=False)

# Map video and label loader function
data = data.map(mappable_function)

# Batch and pad inputs to 75×46×140×1 shape, and prefetch for performance
data = data.padded_batch(batch_size=2, padded_shapes=([75, 46, 140, 1], [40]))
data = data.prefetch(tf.data.AUTOTUNE)

# Split into training and test sets
train = data.take(450)
test = data.skip(450)

## Visualize Sample Video and Alignment

In [None]:
# Load one batch
frames, alignments = data.as_numpy_iterator().next()
sample = data.as_numpy_iterator()
val = sample.next()

# Normalize video to 0-255 and save as GIF
video = val[0][0]
video = np.squeeze(video, axis=-1)
vmin, vmax = video.min(), video.max()
video_norm = (video - vmin) / (vmax - vmin)
video_uint8 = (video_norm * 255).astype(np.uint8)

# Save video as animated GIF
imageio.mimsave('./Animation.gif', list(video_uint8), fps=10)

# Show one specific frame
plt.imshow(video_uint8[30])

# Decode and print alignment for first example
decoded = tf.strings.reduce_join([num_to_char(word) for word in val[1][0]])
print(decoded.numpy().decode('utf-8'))

# Design Deep Neural Network (3D-CNN + BiLSTM Model)

In [None]:
# Import essential model-building layers
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv3D, Activation, MaxPool3D, TimeDistributed, Flatten, Bidirectional, LSTM, Dropout, Dense
from tensorflow.keras.optimizers import Adam

# Sequential model architecture combining 3D CNN, TimeDistributed Flatten, and BiLSTM layers
model = Sequential([

    # First 3D convolution layer with 128 filters
    Conv3D(128, 3, padding='same', input_shape=(75, 46, 140, 1)),
    Activation('relu'),
    MaxPool3D((1, 2, 2)),

    # Second 3D convolution layer with 256 filters
    Conv3D(256, 3, padding='same'),
    Activation('relu'),
    MaxPool3D((1, 2, 2)),

    # Third 3D convolution layer with 75 filters
    Conv3D(75, 3, padding='same'),
    Activation('relu'),
    MaxPool3D((1, 2, 2)),

    # Flatten the spatial dimensions
    TimeDistributed(Flatten()),

    # First bidirectional LSTM to learn temporal dependencies
    Bidirectional(LSTM(128, return_sequences=True, kernel_initializer='Orthogonal')),
    Dropout(0.5),

    # Second bidirectional LSTM
    Bidirectional(LSTM(128, return_sequences=True, kernel_initializer='Orthogonal')),
    Dropout(0.5),

    # Output layer with softmax over character vocabulary
    Dense(char_to_num.vocabulary_size() + 1, activation='softmax', kernel_initializer='he_normal')
])

In [None]:
# Display model architecture summary
model.summary()

## Set Up Learning Rate Scheduler

In [None]:
from tensorflow.keras.callbacks import LearningRateScheduler

# Learning rate scheduler to reduce LR
def scheduler(epoch: int):
    if epoch < 30:
        return 1e-3
    else:
        return 1e-4

# Wrap scheduler in a TensorFlow callback
scheduler_callback = tf.keras.callbacks.LearningRateScheduler(scheduler)

## Define (Connectionist Temporal Classification) CTC Loss Function

In [None]:
def CTCLoss(y_true, y_pred):

    # Compute batch size and sequence lengths for predictions and labels
    batch_len = tf.cast(tf.shape(y_true)[0], dtype=tf.int64)
    input_len = tf.cast(tf.shape(y_pred)[1], dtype=tf.int64)
    label_len = tf.cast(tf.shape(y_true)[1], dtype=tf.int64)

    # Input and label lengths are constant per batch
    input_length = input_len * tf.ones(shape=(batch_len, 1), dtype=tf.int64)
    label_length = label_len * tf.ones(shape=(batch_len, 1), dtype=tf.int64)

    # Return mean CTC loss over the batch
    loss = tf.keras.backend.ctc_batch_cost(y_true, y_pred, input_length, label_length)
    return loss

## Create Callback to Decode and Display Predictions


In [None]:
class ProduceExample(tf.keras.callbacks.Callback):

    # Initialize with dataset (store dataset directly, not iterator)
    def __init__(self, dataset) -> None:
        self.dataset = dataset

    def on_epoch_end(self, epoch, logs=None) -> None:
        try:

            # Create a fresh iterator and get one batch of data
            data = next(iter(self.dataset))
            # Unpack batch into videos and labels
            videos, labels = data

            # Predict on videos using the current model
            yhat = self.model.predict(videos)

            # Decode predicted sequences using CTC greedy decoding
            decoded = tf.keras.backend.ctc_decode(
                yhat,
                input_length=[yhat.shape[1]] * yhat.shape[0],
                greedy=True
            )[0][0].numpy()

            # Iterate over first two examples or fewer
            for i in range(min(2, len(yhat))):
                # Convert original labels indices to readable text
                original = tf.strings.reduce_join(num_to_char(labels[i])).numpy().decode("utf-8")
                # Convert decoded predictions indices to readable text
                predicted = tf.strings.reduce_join(num_to_char(decoded[i])).numpy().decode("utf-8")

                # Print the original and predicted texts
                print("Original:", original)
                print("Prediction:", predicted)
                # Print a separator for readability
                print("~" * 100)

        except Exception as e:
            # Handle exceptions gracefully without stopping training
            print(f"[Callback Error] Skipping example callback due to: {e}")

# Show predictions after each epoch using test set
example_callback = ProduceExample(test)

## Calculate Character-Level Accuracy

In [None]:
from sklearn.metrics import accuracy_score

class CharacterAccuracyCallback(tf.keras.callbacks.Callback):
    def __init__(self, dataset):
        super().__init__()

        # Store dataset to compute accuracy on samples
        self.dataset = dataset
        self.accuracy_per_epoch = []

    def decode_seq(self, seq):

        # Convert sequence indices to characters, ignore padding (0)
        return [num_to_char(c).numpy().decode('utf-8') for c in seq if c != 0]

    def on_epoch_end(self, epoch, logs=None):
        y_true_chars = []
        y_pred_chars = []

        # Take only 5 batches for faster evaluation
        for batch in self.dataset.take(5):
            videos, labels = batch
            yhat = self.model.predict(videos)

            # Decode predictions using greedy CTC decoding
            decoded = tf.keras.backend.ctc_decode(
                yhat,
                input_length=[yhat.shape[1]] * yhat.shape[0],
                greedy=True
            )[0][0].numpy()

            # Compare each true and predicted sequence character-wise
            for true_seq, pred_seq in zip(labels.numpy(), decoded):
                true_dec = self.decode_seq(true_seq)
                pred_dec = self.decode_seq(pred_seq)
                min_len = min(len(true_dec), len(pred_dec))
                y_true_chars.extend(true_dec[:min_len])
                y_pred_chars.extend(pred_dec[:min_len])

        # Compute accuracy score and save it
        acc = accuracy_score(y_true_chars, y_pred_chars)
        self.accuracy_per_epoch.append(acc)
        print(f"\n[Epoch {epoch+1}] Character-Level Accuracy: {acc:.4f}")

# Initialize callback with test dataset
acc_callback = CharacterAccuracyCallback(test)

## Define Model Checkpoint and Compile Model

In [None]:
from tensorflow.keras.callbacks import ModelCheckpoint
from google.colab import drive

# Mount Google Drive to save checkpoints
drive.mount('/content/drive')

# Create checkpoint directory inside Drive
checkpoint_dir = '/content/drive/MyDrive/Labiomancy_checkpoints'
os.makedirs(checkpoint_dir, exist_ok=True)

# Setup checkpoint callback to save best model by validation loss
checkpoint_callback = ModelCheckpoint(
    filepath=os.path.join(checkpoint_dir, 'Best_model.keras'),
    monitor='val_loss',
    save_best_only=True,
    save_weights_only=False,
    verbose=1
)


# Compile model with Adam optimizer and CTC loss
model.compile(optimizer=Adam(learning_rate=0.0001), loss=CTCLoss)

## Save Training History

In [None]:
import pickle

class HistorySaverCallback(tf.keras.callbacks.Callback):
    def __init__(self, save_path):
        super().__init__()
        self.save_path = save_path
        self.history = {}

        # Load existing history if available
        if os.path.exists(self.save_path):
            with open(self.save_path, 'rb') as f:
                self.history = pickle.load(f)

    def on_epoch_end(self, epoch, logs=None):
        logs = logs or {}

        # Append current epoch metrics to history dictionary
        for k, v in logs.items():
            self.history.setdefault(k, []).append(v)

        # Save updated history to disk
        with open(self.save_path, 'wb') as f:
            pickle.dump(self.history, f)

        # Confirm that history was saved successfully
        print(f"[Epoch {epoch+1}] History saved to {self.save_path}")

# Create and initialize the callback with path to Drive
history_saver = HistorySaverCallback('/content/drive/MyDrive/Labiomancy_checkpoints/History_live.pkl')

# Train the Model

In [None]:
# Start training with all callbacks
history = model.fit(
    train,
    validation_data=test,
    epochs=70,
    callbacks=[acc_callback, checkpoint_callback, schedule_callback, example_callback, history_saver]
)