In [1]:
# ===================================================================
#               COMPLETE DATA & MODEL SETUP SCRIPT
# ===================================================================

# --- 1. SETUP AND IMPORTS ---
# -------------------------------------------------------------------
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
import torchaudio
import pandas as pd
import numpy as np
import os
from lxml import etree
from google.colab import drive
import torch.optim as optim

print("Mounting Google Drive...")
drive.mount('/content/drive')
print("Drive mounted successfully!")


Mounting Google Drive...
Mounted at /content/drive
Drive mounted successfully!


In [2]:
!ls /content/drive/MyDrive/capstone_project/CROHME_2023

test		   train		  train_speech	    val_speech
test_metadata.csv  train_metadata.csv	  val
test_speech	   train_metadata.gsheet  val_metadata.csv


In [3]:

# --- 2. CONFIGURATION ---
# -------------------------------------------------------------------
# --- Paths ---
DRIVE_PROJECT_ROOT = "/content/drive/MyDrive/capstone_project/CROHME_2023"
TRAIN_CSV = os.path.join(DRIVE_PROJECT_ROOT, "train_metadata.csv")
VAL_CSV = os.path.join(DRIVE_PROJECT_ROOT, "val_metadata.csv")
TEST_CSV = os.path.join(DRIVE_PROJECT_ROOT, "test_metadata.csv")

# --- Hyperparameters ---
BATCH_SIZE = 16
LEARNING_RATE = 0.0001
D_MODEL = 256
N_HEADS = 8
NUM_ENCODER_LAYERS = 4
NUM_DECODER_LAYERS = 4

# --- Special Tokens ---
PAD_TOKEN = 0
SOS_TOKEN = 1 # Start of Sequence
EOS_TOKEN = 2 # End of Sequence

# --- Device ---
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


In [4]:
# --- 3. VOCABULARY CREATION ---
# -------------------------------------------------------------------
print("\nCreating vocabulary from training data...")
df_train = pd.read_csv(TRAIN_CSV)
all_chars = set()
for latex_str in df_train['latex_ground_truth']:
    all_chars.update(list(str(latex_str)))

vocab = sorted(list(all_chars))
char_to_idx = {char: i+3 for i, char in enumerate(vocab)}
char_to_idx['<pad>'] = PAD_TOKEN
char_to_idx['<sos>'] = SOS_TOKEN
char_to_idx['<eos>'] = EOS_TOKEN
idx_to_char = {idx: char for char, idx in char_to_idx.items()}
VOCAB_SIZE = len(char_to_idx)
print(f"Vocabulary created with {VOCAB_SIZE} unique tokens.")


Creating vocabulary from training data...
Vocabulary created with 84 unique tokens.


In [5]:
# --- 4. DATASET AND DATALOADER DEFINITION ---
# -------------------------------------------------------------------

def parse_inkml(inkml_path):
    ns = {'inkml': 'http://www.w3.org/2003/InkML'}
    try:
        tree = etree.parse(inkml_path)
        traces = tree.findall('inkml:trace', namespaces=ns)
        all_points = []
        for stroke_id, trace in enumerate(traces):
            points = trace.text.strip().split(',')
            for point_str in points:
                coords = point_str.strip().split(' ')
                if len(coords) == 2:
                    x, y = map(float, coords)
                    all_points.append([x, y, stroke_id])
        return np.array(all_points, dtype=np.float32)
    except Exception as e:
        return np.zeros((1, 3), dtype=np.float32) # Return a placeholder on error

def load_audio_spectrogram(audio_path):
    try:
        waveform, sample_rate = torchaudio.load(audio_path)
        transform = torchaudio.transforms.MelSpectrogram(sample_rate=sample_rate, n_mels=80)
        spectrogram = transform(waveform)
        return spectrogram.squeeze(0).T
    except Exception as e:
        # Print a warning for the problematic file and return None
        print(f"--- WARNING: Failed to load {os.path.basename(audio_path)}. Error: {e} ---")
        return None

class MathDataset(Dataset):
    def __init__(self, csv_path, char_to_idx_map):
        self.df = pd.read_csv(csv_path).dropna()
        self.char_to_idx = char_to_idx_map

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]

        ink_data = parse_inkml(row['inkml_file_path'])
        spectrogram = load_audio_spectrogram(row['audio_file_path'])

        # If loading failed, return None for all items
        if spectrogram is None:
            return None, None, None

        label_str = str(row['latex_ground_truth'])
        label = [self.char_to_idx[char] for char in label_str]
        label = [SOS_TOKEN] + label + [EOS_TOKEN]

        return torch.tensor(ink_data), spectrogram, torch.tensor(label)

def collate_fn(batch):
    # Filter out samples that returned None
    batch = [item for item in batch if item[0] is not None and item[1] is not None]

    # If the entire batch failed, return None
    if not batch:
        return None, None, None

    # Proceed with the original collation logic
    ink_tensors, audio_tensors, label_tensors = zip(*batch)
    ink_padded = pad_sequence(ink_tensors, batch_first=True, padding_value=0)
    labels_padded = pad_sequence(label_tensors, batch_first=True, padding_value=PAD_TOKEN)
    audio_padded = pad_sequence(audio_tensors, batch_first=True, padding_value=0)

    return ink_padded, audio_padded, labels_padded

print("\nDefining Dataset class and collation function.")



Defining Dataset class and collation function.


In [6]:
# --- 5. MODEL ARCHITECTURE DEFINITION ---
# -------------------------------------------------------------------

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x):
        # x shape: (batch_size, seq_len, d_model)
        x = x + self.pe[:x.size(1), :].unsqueeze(0)
        return x

class HandwritingEncoder(nn.Module):
    def __init__(self, input_size, d_model, num_heads, num_layers):
        super(HandwritingEncoder, self).__init__()
        self.d_model = d_model
        self.input_proj = nn.Linear(input_size, d_model)
        self.pos_encoder = PositionalEncoding(d_model)
        encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=num_heads, batch_first=True)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)

    def forward(self, src):
        src = self.input_proj(src) * np.sqrt(self.d_model)
        src = self.pos_encoder(src)
        output = self.transformer_encoder(src)
        return output.mean(dim=1)

class AudioEncoder(nn.Module):
    def __init__(self, cnn_output_size, d_model, num_heads, num_layers):
        super(AudioEncoder, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=3, stride=2, padding=1), nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=3, stride=2, padding=1), nn.ReLU()
        )
        self.d_model = d_model
        self.input_proj = nn.Linear(cnn_output_size, d_model)
        self.pos_encoder = PositionalEncoding(d_model)
        encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=num_heads, batch_first=True)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)

    def forward(self, src):
        src = src.unsqueeze(1)
        src = self.conv(src)
        batch_size, channels, time, features = src.shape
        src = src.permute(0, 2, 1, 3).reshape(batch_size, time, channels * features)
        src = self.input_proj(src) * np.sqrt(self.d_model)
        src = self.pos_encoder(src)
        output = self.transformer_encoder(src)
        return output.mean(dim=1)

class TransformerDecoder(nn.Module):
    def __init__(self, vocab_size, d_model, num_heads, num_layers):
        super(TransformerDecoder, self).__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoder = PositionalEncoding(d_model)
        decoder_layer = nn.TransformerDecoderLayer(d_model=d_model, nhead=num_heads, batch_first=True)
        self.transformer_decoder = nn.TransformerDecoder(decoder_layer, num_layers=num_layers)
        self.fc_out = nn.Linear(d_model, vocab_size)

    def forward(self, tgt, memory):
        tgt_emb = self.embedding(tgt)
        tgt_pos = self.pos_encoder(tgt_emb)
        output = self.transformer_decoder(tgt_pos, memory)
        return self.fc_out(output)

class MultimodalTransformer(nn.Module):
    def __init__(self, handwriting_encoder, audio_encoder, decoder, fusion_size, d_model):
        super(MultimodalTransformer, self).__init__()
        self.handwriting_encoder = handwriting_encoder
        self.audio_encoder = audio_encoder
        self.decoder = decoder
        self.fusion_fc = nn.Linear(fusion_size, d_model)

    def forward(self, ink_data, audio_data, target_latex):
        ink_context = self.handwriting_encoder(ink_data)
        audio_context = self.audio_encoder(audio_data)
        fused_context = torch.cat((ink_context, audio_context), dim=1)
        memory = self.fusion_fc(fused_context).unsqueeze(1)
        predictions = self.decoder(target_latex, memory)
        return predictions

print("Model component classes defined.")

Model component classes defined.


In [7]:
# 1. Get one sample from your dataset
train_dataset = MathDataset(TRAIN_CSV, char_to_idx)
sample_ink, sample_audio, _ = train_dataset[0]

print(f"Original spectrogram shape: {sample_audio.shape}") # Should be (Time, Features)

# 2. Simulate the CNN pass to find the output shape
# Create a dummy encoder just for this test
temp_conv = nn.Sequential(
    nn.Conv2d(1, 32, kernel_size=3, stride=2, padding=1), nn.ReLU(),
    nn.Conv2d(32, 64, kernel_size=3, stride=2, padding=1), nn.ReLU()
)

# Add a batch and channel dimension to the sample
sample_audio = sample_audio.unsqueeze(0).unsqueeze(0)
conv_output = temp_conv(sample_audio)
print(f"Shape after CNN layers: {conv_output.shape}")

# 3. Calculate the correct size
batch_size, channels, time, features = conv_output.shape
calculated_size = channels * features
print(f"\nACTION: Your calculated CNN_OUTPUT_SIZE is: {calculated_size}")

Original spectrogram shape: torch.Size([490, 80])
Shape after CNN layers: torch.Size([1, 64, 123, 20])

ACTION: Your calculated CNN_OUTPUT_SIZE is: 1280




In [8]:
import torchaudio

# Paste the full path to one of your .mp3 files here
test_audio_path = "/content/drive/MyDrive/capstone_project/CROHME_2023/train_speech/form_026_E201.mp3" # Example

try:
    waveform, sr = torchaudio.load(test_audio_path)
    print(f"✅ Successfully loaded {test_audio_path}")
    print(f"   Waveform shape: {waveform.shape}")

    # Also test the spectrogram transformation
    transform = torchaudio.transforms.MelSpectrogram(sample_rate=sr, n_mels=80)
    spectrogram = transform(waveform)
    print(f"   Spectrogram shape: {spectrogram.squeeze(0).T.shape}")

except Exception as e:
    print(f"❌ Failed to load audio file.")
    print(f"   Error: {e}")

✅ Successfully loaded /content/drive/MyDrive/capstone_project/CROHME_2023/train_speech/form_026_E201.mp3
   Waveform shape: torch.Size([1, 97920])
   Spectrogram shape: torch.Size([490, 80])


In [9]:

# --- 6. INSTANTIATION ---
# -------------------------------------------------------------------

# !! ACTION REQUIRED: You must calculate the correct CNN_OUTPUT_SIZE !!
# This depends on your spectrogram dimensions after passing through the CNN.
# For an input spectrogram of (Time, 80), after two strides of 2, the feature dim becomes 80 / 4 = 20.
# So, CNN_OUTPUT_SIZE = 64 (channels) * 20 (features) = 1280.
# Please verify this calculation with a sample from your data.
CNN_OUTPUT_SIZE = 1280

# Instantiate model components
handwriting_enc = HandwritingEncoder(input_size=3, d_model=D_MODEL, num_heads=N_HEADS, num_layers=NUM_ENCODER_LAYERS)
audio_enc = AudioEncoder(cnn_output_size=CNN_OUTPUT_SIZE, d_model=D_MODEL, num_heads=N_HEADS, num_layers=NUM_ENCODER_LAYERS)
decoder = TransformerDecoder(vocab_size=VOCAB_SIZE, d_model=D_MODEL, num_heads=N_HEADS, num_layers=NUM_DECODER_LAYERS)

# Instantiate the main model
model = MultimodalTransformer(
    handwriting_encoder=handwriting_enc,
    audio_encoder=audio_enc,
    decoder=decoder,
    fusion_size=D_MODEL * 2,
    d_model=D_MODEL
).to(device)

print(f"\nModel created and moved to {device}.")

# Instantiate DataLoaders
train_dataset = MathDataset(TRAIN_CSV, char_to_idx)
val_dataset = MathDataset(VAL_CSV, char_to_idx)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, collate_fn=collate_fn)
print("DataLoaders instantiated.")

# Instantiate Loss and Optimizer
criterion = nn.CrossEntropyLoss(ignore_index=PAD_TOKEN)
optimizer = optim.AdamW(model.parameters(), lr=LEARNING_RATE)
print("Loss function and optimizer instantiated.")

print("\n--- SETUP COMPLETE ---")
print("You are now ready to write and run your training loop.")


Model created and moved to cuda.
DataLoaders instantiated.
Loss function and optimizer instantiated.

--- SETUP COMPLETE ---
You are now ready to write and run your training loop.


In [None]:
import time

# --- 7. TRAINING AND VALIDATION LOOP ---
# -------------------------------------------------------------------
def train_epoch(model, dataloader, optimizer, criterion, device):
    # Set the model to training mode. This enables layers like Dropout.
    model.train()

    # Initialize a variable to accumulate the loss for the entire epoch.
    total_loss = 0

    # Define how often to print a progress update (e.g., every 100 batches).
    print_every = 100

    # Use enumerate to get both the index 'i' and the batch data.
    # Loop over each batch of data provided by the DataLoader.
    for i, (ink_batch, audio_batch, label_batch) in enumerate(dataloader):

        # Skip this batch if data loading failed.
        if ink_batch is None:
            continue

        # Move all data tensors to the selected device (e.g., 'cuda' for GPU).
        ink_batch = ink_batch.to(device)
        audio_batch = audio_batch.to(device)
        label_batch = label_batch.to(device)

        # --- Prepare data for teacher forcing ---
        # The decoder input is the sequence except for the last token (<eos>).
        target_input = label_batch[:, :-1]
        # The expected output is the sequence except for the first token (<sos>).
        target_expected = label_batch[:, 1:]

        # --- Forward Pass ---
        # Get the model's predictions for the given inputs.
        predictions = model(ink_batch, audio_batch, target_input)

        # --- Loss Calculation ---
        # Reshape the predictions and target to be compatible with the loss function.
        predictions = predictions.reshape(-1, VOCAB_SIZE)
        target_expected = target_expected.reshape(-1)

        # Calculate the loss between the model's predictions and the actual labels.
        loss = criterion(predictions, target_expected)

        # --- Backward Pass and Optimization ---
        # Clear any gradients from the previous step.
        optimizer.zero_grad()

        # Compute the gradients of the loss with respect to model parameters.
        loss.backward()

        # Clip gradients to prevent them from exploding, which helps stabilize training.
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

        # Update the model's weights based on the computed gradients.
        optimizer.step()

        # Add the loss of the current batch to the running total.
        total_loss += loss.item()

        # --- Progress Indicator ---
        # Print a progress update every 'print_every' batches.
        if (i + 1) % print_every == 0:
            print(f'  ... Batch {i+1}/{len(dataloader)} processed')

    # Return the average loss for the epoch.
    return total_loss / len(dataloader)

def evaluate_epoch(model, dataloader, criterion, device):
    model.eval() # Set model to evaluation mode
    total_loss = 0
    with torch.no_grad():
        for ink_batch, audio_batch, label_batch in dataloader:
            if ink_batch is None:
                continue

            # Move data to device
            ink_batch = ink_batch.to(device)
            audio_batch = audio_batch.to(device)
            label_batch = label_batch.to(device)

            # Prepare target data
            target_input = label_batch[:, :-1]
            target_expected = label_batch[:, 1:]

            # Forward pass
            predictions = model(ink_batch, audio_batch, target_input)

            # Reshape for loss
            predictions = predictions.reshape(-1, VOCAB_SIZE)
            target_expected = target_expected.reshape(-1)

            # Calculate loss
            loss = criterion(predictions, target_expected)
            total_loss += loss.item()

    return total_loss / len(dataloader)

# --- Main Training Execution ---
NUM_EPOCHS = 25 # You can adjust this value
best_val_loss = float('inf')
model_save_path = os.path.join(DRIVE_PROJECT_ROOT, 'best_model.pth')

print("\n--- STARTING MODEL TRAINING ---")
for epoch in range(NUM_EPOCHS):
    start_time = time.time()

    train_loss = train_epoch(model, train_loader, optimizer, criterion, device)
    val_loss = evaluate_epoch(model, val_loader, criterion, device)

    end_time = time.time()
    epoch_mins, epoch_secs = divmod(end_time - start_time, 60)

    print(f"Epoch {epoch+1:02}/{NUM_EPOCHS} | Time: {epoch_mins:.0f}m {epoch_secs:.0f}s")
    print(f"\tTrain Loss: {train_loss:.4f} | Val. Loss: {val_loss:.4f}")

    # Save the best model based on validation loss
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), model_save_path)
        print(f"\t-> Model saved to {model_save_path}")

print("\n--- TRAINING FINISHED ---")


--- STARTING MODEL TRAINING ---




  ... Batch 100/775 processed
  ... Batch 200/775 processed
  ... Batch 300/775 processed
  ... Batch 400/775 processed


After your model has finished training, the final phase is to evaluate its performance on the unseen test set and create a function to generate predictions.

This will give you the final, objective results for your project.

## Step 1: Load Your Best Model

First, you need to create an instance of your model and load the saved weights from your best_model.pth file.

In [None]:
# Create a new instance of your model
model_instance = MultimodalTransformer(
    handwriting_encoder=handwriting_enc,
    audio_encoder=audio_enc,
    decoder=decoder,
    fusion_size=D_MODEL * 2,
    d_model=D_MODEL
).to(device)

# Load the saved weights
model_path = os.path.join(DRIVE_PROJECT_ROOT, 'best_model.pth')
model_instance.load_state_dict(torch.load(model_path))
print("Best model loaded successfully.")

## Step 2: Implement an Inference Function

During training, you used "teacher forcing" (feeding the correct sequence to the decoder). For testing, the model must generate the output on its own. You need an inference function that uses a greedy decoding strategy.

In [None]:
def predict(model, ink_tensor, audio_tensor, max_length=150):
    model.eval()
    with torch.no_grad():
        # Move inputs to device and add a batch dimension
        ink_tensor = ink_tensor.unsqueeze(0).to(device)
        audio_tensor = audio_tensor.unsqueeze(0).to(device)

        # Get the context from the encoders
        ink_context = model.handwriting_encoder(ink_tensor)
        audio_context = model.audio_encoder(audio_tensor)
        fused_context = torch.cat((ink_context, audio_context), dim=1)
        memory = model.fusion_fc(fused_context).unsqueeze(1)

        # Start the output sequence with the <sos> token
        output_sequence = [SOS_TOKEN]

        for _ in range(max_length):
            # Convert the current output sequence to a tensor
            target_tensor = torch.LongTensor(output_sequence).unsqueeze(0).to(device)

            # Get the model's prediction for the next token
            predictions = model.decoder(target_tensor, memory)

            # Get the token with the highest probability (greedy choice)
            next_token = predictions.argmax(2)[:, -1].item()

            # Append the token to the sequence
            output_sequence.append(next_token)

            # If the end-of-sequence token is predicted, stop
            if next_token == EOS_TOKEN:
                break

    # Convert the sequence of indices back to characters
    return "".join([idx_to_char[idx] for idx in output_sequence])

## Step 3: Evaluate on the Test Set

Now, loop through your test_loader, use your predict function on each sample, and compare the model's prediction to the actual ground truth label.

In [None]:
# You may need to install this library for calculating CER
!pip install python-Levenshtein

from Levenshtein import distance as levenshtein_distance

test_dataset = MathDataset(TEST_CSV, char_to_idx)
# Note: Use a smaller batch size for evaluation if needed
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

total_cer = 0
exact_matches = 0
results = []

for ink_sample, audio_sample, label_sample in test_loader:
    # Get the raw tensors for prediction
    ink = ink_sample.squeeze(0)
    audio = audio_sample.squeeze(0)

    # Generate the prediction
    prediction_str = predict(model_instance, ink, audio)

    # Get the ground truth string
    true_label_str = "".join([idx_to_char[idx.item()] for idx in label_sample.squeeze(0) if idx != PAD_TOKEN])

    # Store results for inspection
    results.append({'truth': true_label_str, 'prediction': prediction_str})

    # Calculate metrics
    if prediction_str == true_label_str:
        exact_matches += 1
    total_cer += levenshtein_distance(prediction_str, true_label_str)

print("\n--- FINAL EVALUATION RESULTS ---")
print(f"Exact Match Accuracy: {exact_matches / len(test_dataset):.4f}")
print(f"Average Character Error Rate (CER): {total_cer / len(test_dataset):.4f}")

This final step provides the concrete performance metrics for your project, proving how well your model has learned to recognize mathematical expressions.