In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import os
from PIL import Image
import torch
from torchvision import models, transforms
from torch.utils.data import Dataset
import re


# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory




# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
train_code = pd.read_csv("/kaggle/input/converting-handwritten-equations-to-latex-code/col_774_A4_2023/SyntheticData/train.csv")
train_code.head()

In [None]:
train_code.iloc[0, 1]

In [None]:
train_code.formula.values

In [None]:
class LatexTokenizer:
    def __init__(self, latex_strings_corpus, max_seq_len=100):
        self.max_seq_len = max_seq_len
        self.vocab = self.build_vocab(latex_strings_corpus)
        self.token_to_idx = self.vocab
        self.idx_to_token = self.create_inverse_vocab(self.vocab)

    def tokenize(self, latex_string):
        token_pattern = r"\\[a-zA-Z]+|[{}]|[0-9]+|[^\s]"
        tokens = re.findall(token_pattern, latex_string)
        return tokens

    def create_inverse_vocab(self, vocab):
        inv_dict = {}
        for key, value in vocab.items():
            inv_dict[value] = key
        return inv_dict
    
    def build_vocab(self, latex_strings):
        all_tokens = []
        for latex_string in latex_strings:
            tokens = self.tokenize(latex_string)
            all_tokens.extend(tokens)
            
        all_tokens.extend(["<|SOS|>", "<|EOS|>", "<|PAD|>"])
        all_tokens_set = set(all_tokens)
        vocab = {}
        for idx, item in enumerate(all_tokens_set):
            vocab[item] = idx

        return vocab

    def get_vocab_size(self):
        return len(self.vocab)

    def encode(self, latex_string):
        tokens = self.tokenize(latex_string)
        encoded_tokens = [self.token_to_idx["<|SOS|>"]] + [self.token_to_idx.get(token, self.token_to_idx["<|PAD|>"]) for token in tokens] + [self.token_to_idx["<|EOS|>"]]

        if len(encoded_tokens) < self.max_seq_len:
            encoded_tokens += [self.token_to_idx["<|PAD|>"]] * (self.max_seq_len - len(encoded_tokens))
        else:
            encoded_tokens = encoded_tokens[:self.max_seq_len]
            
        return encoded_tokens

    def decode(self, encoded_sequence):
        tokens = [self.idx_to_token[idx] for idx in encoded_sequence if idx != self.token_to_idx["<|PAD|>"]]

        if "<|SOS|>" in tokens: tokens.remove("<|SOS|>")
        if "<|EOS|>" in tokens: tokens.remove("<|EOS|>")
        sequence = " ".join(tokens)
        sequence = re.sub(r'\s+([,.?/!;:"()_\']|--)', r'\1', sequence)
        return sequence

In [None]:
# Sample dataset
latex_strings = ["\\frac{a}{b} + c", "\\sqrt{x^2 + y^2}"]

# Initialize tokenizer
tokenizer = LatexTokenizer(latex_strings, max_seq_len=30)

# Build vocabulary from dataset
# tokenizer.build_vocab(latex_strings)
print("Vocabulary:", tokenizer.token_to_idx)

# Encode a single LaTeX string
encoded_sequence = tokenizer.encode("\\frac{a}{b} + c")
print("Encoded sequence:", encoded_sequence)

# Decode the sequence back into a string
decoded_string = tokenizer.decode(encoded_sequence)
print("Decoded string:", decoded_string)

In [None]:
class EquationsDataset(Dataset):
    def __init__(self, csv_file, image_folder, transform=None, max_seq_len=100):
        self.data = pd.read_csv(csv_file)
        self.image_folder = image_folder
        self.transform = transform
        self.max_seq_len = max_seq_len
        self.latex_tokenizer = LatexTokenizer(latex_strings_corpus=self.data.formula.values, max_seq_len=self.max_seq_len)
        self.vocab_size = self.latex_tokenizer.get_vocab_size()
        self.vocab = self.latex_tokenizer.vocab

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        image_name = self.data.iloc[idx, 0]
        formula = self.data.iloc[idx, 1]

        image_path = os.path.join(self.image_folder, image_name)
        image = Image.open(image_path).convert("L")

        if self.transform:
            image = self.transform(image)

        encoded_formula = self.latex_tokenizer.encode(formula)

        return image, torch.tensor(encoded_formula)

In [None]:
csv_file = "/kaggle/input/converting-handwritten-equations-to-latex-code/col_774_A4_2023/SyntheticData/train.csv"
image_folder = "/kaggle/input/converting-handwritten-equations-to-latex-code/col_774_A4_2023/SyntheticData/images"
image_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.Grayscale(num_output_channels=3),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5], std=[0.5])
])
dataset = EquationsDataset(csv_file=csv_file, image_folder=image_folder,
                              transform=image_transform, max_seq_len=60)
image_tensor, label_tensor = dataset[0]
print("Image shape:", image_tensor.shape)       # Example: [3, 224, 224]
print("Label tensor:", label_tensor.shape)

In [None]:
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader

# Assuming `dataset` is your custom dataset instance
dataloader = DataLoader(dataset, batch_size=4, shuffle=True)

# Get a single batch of data
images, labels = next(iter(dataloader))

# Plot the images and labels
fig, axes = plt.subplots(1, 4, figsize=(35, 5))  # 1 row, 4 columns
for i in range(4):
    ax = axes[i]
    image = images[i].permute(1, 2, 0).numpy()  # Convert from (C, H, W) to (H, W, C)
    label = labels[i]

    ax.imshow(image)
    ax.set_title(f"Label: {label}")
    ax.axis("off")
plt.tight_layout()
plt.show()

In [None]:
import torch.nn as nn

class Encoder(nn.Module):
    def __init__(self):
        super(Encoder, self).__init__()
        self.cnn = models.resnet50(pretrained=True)
        self.cnn = nn.Sequential(*list(self.cnn.children())[:-2])  # Remove FC layers

    def forward(self, x):
        return self.cnn(x)  # Output: feature map

# class Decoder(nn.Module):
#     def __init__(self, vocab_size, embed_dim=256, hidden_dim=512):
#         super(Decoder, self).__init__()
#         self.embedding = nn.Embedding(vocab_size, embed_dim)
#         self.lstm = nn.LSTM(embed_dim + hidden_dim, hidden_dim)
#         self.fc = nn.Linear(hidden_dim, vocab_size)
#         self.attention_layer = nn.Linear(hidden_dim + 2048, hidden_dim)  # Combine feature map and decoder state
#         self.attention_softmax = nn.Softmax(dim=1)

#     def forward(self, prev_token_idx, encoder_features, prev_hidden_state):
#         embedded_token = self.embedding(prev_token_idx)
#         combined_features = torch.cat([encoder_features.flatten(1), prev_hidden_state], dim =-1)
#         attention_weights = self.attention_softmax(self.attention_layer(combined_features))
#         context_vector = torch.sum(attention_weights.unsqueeze(-1) * encoder_features.flatten(2), dim=1)

#         lstm_input = torch.cat([embedded_token.unsqueeze(1), context_vector.unsqueeze(1)], dim=-1)
#         lstm_output, new_hidden_state = self.lstm(lstm_input)

#         token_logits = self.fc(lstm_output.squeeze(1))

#         return token_logits, new_hidden_state

class Decoder(nn.Module):
    def __init__(self, vocab_size, embed_dim=256, hidden_dim=512):
        super(Decoder, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.lstm = nn.LSTM(embed_dim + 2048, hidden_dim)  # Input is embedding + context vector
        self.fc = nn.Linear(hidden_dim, vocab_size)
        self.attention = nn.Linear(2048 + hidden_dim, 1)  # Attention scoring function

    def forward(self, prev_token_idx, encoder_features, hidden_state, cell_state):
        # Get token embeddings
        # Shape: [batch_size, embed_dim]
        embedded = self.embedding(prev_token_idx)
        batch_size = prev_token_idx.size(0)
        
        # Reshape encoder features
        # Shape: [batch_size, num_pixels, encoder_dim]
        encoder_dim = encoder_features.size(1)
        num_pixels = encoder_features.size(2) * encoder_features.size(3)
        encoder_features = encoder_features.permute(0, 2, 3, 1).view(batch_size, num_pixels, encoder_dim)
        
        # Expand hidden state for attention calculation
        # Shape: [batch_size, num_pixels, hidden_dim]
        hidden_expanded = hidden_state.unsqueeze(1).repeat(1, num_pixels, 1)
        
        # Calculate attention scores
        # Shape: [batch_size, num_pixels, 1]
        attn_inputs = torch.cat([hidden_expanded, encoder_features], dim=2)
        attn_scores = self.attention(attn_inputs)
        attn_weights = torch.softmax(attn_scores, dim=1)
        
        # Calculate context vector using attention weights
        # Shape: [batch_size, encoder_dim]
        context_vector = (encoder_features * attn_weights).sum(dim=1)
        
        # Combine embedding and context for LSTM input
        # Shape: [1, batch_size, embed_dim + encoder_dim]
        lstm_input = torch.cat([embedded, context_vector], dim=1).unsqueeze(0)
        
        # LSTM forward pass
        # hidden_state shape: [1, batch_size, hidden_dim]
        # cell_state shape: [1, batch_size, hidden_dim]
        lstm_output, (new_hidden_state, new_cell_state) = self.lstm(
            lstm_input, (hidden_state.unsqueeze(0), cell_state.unsqueeze(0))
        )
        
        # Predict next token
        # Shape: [batch_size, vocab_size]
        output = self.fc(lstm_output.squeeze(0))
        
        return output, new_hidden_state.squeeze(0), new_cell_state.squeeze(0)

In [None]:
# class ImageToLatexModel(nn.Module):
#     def __init__(self, vocab_size, embed_dim=256, hidden_dim=512):
#         super(ImageToLatexModel, self).__init__()
#         self.encoder = Encoder()  # Use the Encoder class
#         self.decoder = Decoder(vocab_size, embed_dim, hidden_dim)  # Use the Decoder class

#     def forward(self, image, target_sequence):
#         """
#         Args:
#             image: Input image tensor (shape: [batch_size, channels, height, width]).
#             target_sequence: Ground truth token indices (shape: [batch_size, seq_len]).

#         Returns:
#             logits: Predicted logits for each token in the sequence.
#         """
#         batch_size = image.size(0)
#         seq_len = target_sequence.size(1)

#         # Encode the image
#         encoder_features = self.encoder(image)  # Shape: [batch_size, channels=2048, height=7, width=7]

#         # Initialize decoder hidden state
#         hidden_state = torch.zeros(1, batch_size, 512).to(image.device)  # LSTM hidden state
#         cell_state = torch.zeros(1, batch_size, 512).to(image.device)    # LSTM cell state

#         # Initialize token predictions
#         logits = []

#         # Iterate through each timestep in the sequence
#         for t in range(seq_len):
#             prev_token_idx = target_sequence[:, t]  # Get token index at timestep t

#             # Decode one step
#             token_logits, (hidden_state, cell_state) = self.decoder(
#                 prev_token_idx,
#                 encoder_features,
#                 hidden_state.squeeze(0)  # Pass hidden state from previous timestep
#             )

#             logits.append(token_logits)

#         # Stack logits across timesteps
#         logits = torch.stack(logits, dim=1)  # Shape: [batch_size, seq_len, vocab_size]
#         return logits


class ImageToLatexModel(nn.Module):
    def __init__(self, vocab_size, embed_dim=256, hidden_dim=512):
        super(ImageToLatexModel, self).__init__()
        self.encoder = Encoder()
        self.decoder = Decoder(vocab_size, embed_dim, hidden_dim)
        self.vocab_size = vocab_size

    def forward(self, images, target_sequences=None, max_len=100, teacher_forcing_ratio=0.5):
        """
        Args:
            images: Input images (shape: [batch_size, channels, height, width])
            target_sequences: Ground truth sequences (shape: [batch_size, seq_len])
                             None during inference
            max_len: Maximum sequence length for inference
            teacher_forcing_ratio: Probability of using teacher forcing
        
        Returns:
            outputs: Predicted logits for each token
        """
        batch_size = images.size(0)
        
        # Encode images
        encoder_features = self.encoder(images)
        
        # Initialize decoder states
        hidden_state = torch.zeros(batch_size, 512).to(images.device)
        cell_state = torch.zeros(batch_size, 512).to(images.device)
        
        # Determine sequence length
        if target_sequences is not None:
            max_len = target_sequences.size(1) - 1  # Exclude <EOS>
            
        # Initialize first input token as <SOS>
        current_token_idx = torch.ones(batch_size, dtype=torch.long).to(images.device) * dataset.vocab["<|SOS|>"]
        
        # Placeholder for outputs
        outputs = torch.zeros(batch_size, max_len, self.vocab_size).to(images.device)
        
        # Generate sequence
        for t in range(max_len):
            # Decode one step
            output, hidden_state, cell_state = self.decoder(
                current_token_idx, encoder_features, hidden_state, cell_state
            )
            
            # Store output
            outputs[:, t, :] = output
            
            # Determine next input token (teacher forcing or predicted)
            if target_sequences is not None and t < max_len - 1:
                # During training
                teacher_force = torch.rand(1).item() < teacher_forcing_ratio
                if teacher_force:
                    current_token_idx = target_sequences[:, t+1]  # Next token from target
                else:
                    current_token_idx = output.argmax(dim=1)  # Predicted token
            else:
                # During inference
                current_token_idx = output.argmax(dim=1)  # Predicted token
        
        return outputs

In [None]:

vocab_size = dataset.vocab_size
vocab = dataset.vocab
model = ImageToLatexModel(vocab_size=vocab_size)
print(model)
criterion = nn.CrossEntropyLoss(ignore_index=vocab["<|PAD|>"])  # Ignore padding tokens in loss calculation
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [None]:
# import torch
# import numpy as np

# # Define hyperparameters
# num_epochs = 10
# patience = 3  # Number of epochs to wait before early stopping
# delta = 0.001  # Minimum change to qualify as improvement

# # Move model to device (GPU/CPU)
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# model.to(device)

# # Initialize tracking variables for early stopping
# best_loss = float('inf')
# early_stopping_counter = 0
# best_epoch = 0

# # Training loop
# for epoch in range(num_epochs):
#     model.train()
#     epoch_loss = 0.0
    
#     for batch_idx, (images, target_sequences) in enumerate(dataloader):
#         images = images.to(device)
#         target_sequences = target_sequences.to(device)
        
#         # Forward pass
#         optimizer.zero_grad()
#         outputs = model(images, target_sequences)
        
#         # Calculate loss (reshape to [batch_size*seq_len, vocab_size])
#         loss = criterion(
#             outputs.reshape(-1, vocab_size),
#             target_sequences[:, 1:].reshape(-1)  # Exclude < SOS > from targets
#         )
        
#         # Backward pass and optimize
#         loss.backward()
#         optimizer.step()
        
#         epoch_loss += loss.item()
#         if (batch_idx + 1) % 10 == 0:
#             print(f"Epoch [{epoch+1}/{num_epochs}], Batch [{batch_idx+1}/{len(dataloader)}], Loss: {loss.item():.4f}")
    
#     # Calculate average loss for the epoch
#     avg_epoch_loss = epoch_loss / len(dataloader)
#     print(f"Epoch [{epoch+1}/{num_epochs}], Average Loss: {avg_epoch_loss:.4f}")
    
#     # Save checkpoint after each epoch
#     checkpoint = {
#         'epoch': epoch,
#         'model_state_dict': model.state_dict(),
#         'optimizer_state_dict': optimizer.state_dict(),
#         'loss': avg_epoch_loss,
#         'vocab': dataset.vocab
#     }
#     torch.save(checkpoint, f'checkpoint_epoch_{epoch+1}.pth')
    
#     # Check if this is the best model so far
#     if avg_epoch_loss < best_loss - delta:
#         print(f"Loss improved from {best_loss:.4f} to {avg_epoch_loss:.4f}. Saving best model...")
#         best_loss = avg_epoch_loss
#         best_epoch = epoch + 1
#         torch.save(checkpoint, 'best_model.pth')
#         early_stopping_counter = 0  # Reset counter
#     else:
#         early_stopping_counter += 1
#         print(f"Loss did not improve. Early stopping counter: {early_stopping_counter}/{patience}")
        
#     # Check if early stopping criteria is met
#     if early_stopping_counter >= patience:
#         print(f"Early stopping triggered after epoch {epoch+1}. Best loss: {best_loss:.4f} at epoch {best_epoch}")
#         break

# print(f"Training complete. Best loss: {best_loss:.4f} at epoch {best_epoch}")

In [None]:
# import torch
# import numpy as np
# from torch.utils.data import DataLoader, random_split

# # Create train/validation split
# train_size = int(0.8 * len(dataset))  # 80% for training
# val_size = len(dataset) - train_size  # 20% for validation

# train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

# # Create data loaders
# train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
# val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False)

# # Define hyperparameters
# num_epochs = 10
# patience = 3  # Number of epochs to wait before early stopping
# delta = 0.001  # Minimum change to qualify as improvement

# # Move model to device (GPU/CPU)
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# model.to(device)

# # Initialize tracking variables for early stopping
# best_val_loss = float('inf')
# early_stopping_counter = 0
# best_epoch = 0

# # Training loop
# for epoch in range(num_epochs):
#     # Training phase
#     model.train()
#     train_loss = 0.0
    
#     for batch_idx, (images, target_sequences) in enumerate(train_loader):
#         images = images.to(device)
#         target_sequences = target_sequences.to(device)
        
#         # Forward pass
#         optimizer.zero_grad()
#         outputs = model(images, target_sequences)
        
#         # Calculate loss
#         loss = criterion(
#             outputs.reshape(-1, vocab_size),
#             target_sequences[:, 1:].reshape(-1)  # Exclude <SOS> from targets
#         )
        
#         # Backward pass and optimize
#         loss.backward()
#         optimizer.step()
        
#         train_loss += loss.item()
#         # if (batch_idx + 1) % 10 == 0:
#         #     print(f"Epoch [{epoch+1}/{num_epochs}], Batch [{batch_idx+1}/{len(train_loader)}], Loss: {loss.item():.4f}")
    
#     avg_train_loss = train_loss / len(train_loader)
    
#     # Validation phase
#     model.eval()
#     val_loss = 0.0
    
#     with torch.no_grad():  # No gradients needed for validation
#         for images, target_sequences in val_loader:
#             images = images.to(device)
#             target_sequences = target_sequences.to(device)
            
#             # Forward pass
#             outputs = model(images, target_sequences)
            
#             # Calculate loss
#             loss = criterion(
#                 outputs.reshape(-1, vocab_size),
#                 target_sequences[:, 1:].reshape(-1)
#             )
            
#             val_loss += loss.item()
    
#     avg_val_loss = val_loss / len(val_loader)
    
#     # Print epoch statistics
#     print(f"Epoch [{epoch+1}/{num_epochs}], Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}")
    
#     # Save checkpoint after each epoch
#     checkpoint = {
#         'epoch': epoch,
#         'model_state_dict': model.state_dict(),
#         'optimizer_state_dict': optimizer.state_dict(),
#         'train_loss': avg_train_loss,
#         'val_loss': avg_val_loss,
#         'vocab': dataset.vocab
#     }
#     torch.save(checkpoint, f'checkpoint_epoch_{epoch+1}.pth')
    
#     # Check if validation loss improved
#     if avg_val_loss < best_val_loss - delta:
#         print(f"Validation loss improved from {best_val_loss:.4f} to {avg_val_loss:.4f}. Saving best model...")
#         best_val_loss = avg_val_loss
#         best_epoch = epoch + 1
#         torch.save(checkpoint, 'best_model.pth')
#         early_stopping_counter = 0  # Reset counter
#     else:
#         early_stopping_counter += 1
#         print(f"Validation loss did not improve. Early stopping counter: {early_stopping_counter}/{patience}")
        
#     # Check if early stopping criteria is met
#     if early_stopping_counter >= patience:
#         print(f"Early stopping triggered after epoch {epoch+1}. Best validation loss: {best_val_loss:.4f} at epoch {best_epoch}")
#         break

# print(f"Training complete. Best validation loss: {best_val_loss:.4f} at epoch {best_epoch}")


import torch
import numpy as np
import os
from torch.utils.data import DataLoader, random_split

# Create train/validation split
train_size = int(0.8 * len(dataset))  # 80% for training
val_size = len(dataset) - train_size  # 20% for validation
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False)

# Define hyperparameters
num_epochs = 10
patience = 3  # Number of epochs to wait before early stopping
delta = 0.001  # Minimum change to qualify as improvement

# Move model to device (GPU/CPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Initialize tracking variables for early stopping
best_val_loss = float('inf')
early_stopping_counter = 0
best_epoch = 0
start_epoch = 0

# Check if best model checkpoint exists and load it
best_model_path = 'best_model.pth'
if os.path.exists(best_model_path):
    print(f"Found existing checkpoint at {best_model_path}. Loading...")
    checkpoint = torch.load(best_model_path)
    model.load_state_dict(checkpoint['model_state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    start_epoch = checkpoint['epoch'] + 1  # Start from next epoch
    best_val_loss = checkpoint['val_loss']
    best_epoch = checkpoint['epoch'] + 1
    print(f"Resuming training from epoch {start_epoch}, best validation loss: {best_val_loss:.4f}")

# Training loop
for epoch in range(start_epoch, start_epoch + num_epochs):
    # Training phase
    model.train()
    train_loss = 0.0
    
    for batch_idx, (images, target_sequences) in enumerate(train_loader):
        images = images.to(device)
        target_sequences = target_sequences.to(device)
        
        # Forward pass
        optimizer.zero_grad()
        outputs = model(images, target_sequences)
        
        # Calculate loss
        loss = criterion(
            outputs.reshape(-1, vocab_size),
            target_sequences[:, 1:].reshape(-1)  # Exclude < SOS > from targets
        )
        
        # Backward pass and optimize
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item()
    
    avg_train_loss = train_loss / len(train_loader)
    
    # Validation phase
    model.eval()
    val_loss = 0.0
    
    with torch.no_grad():  # No gradients needed for validation
        for images, target_sequences in val_loader:
            images = images.to(device)
            target_sequences = target_sequences.to(device)
            
            # Forward pass
            outputs = model(images, target_sequences)
            
            # Calculate loss
            loss = criterion(
                outputs.reshape(-1, vocab_size),
                target_sequences[:, 1:].reshape(-1)
            )
            
            val_loss += loss.item()
    
    avg_val_loss = val_loss / len(val_loader)
    
    # Print epoch statistics
    print(f"Epoch [{epoch+1}/{start_epoch + num_epochs}], Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}")
    
    # Save checkpoint after each epoch
    checkpoint = {
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'train_loss': avg_train_loss,
        'val_loss': avg_val_loss,
        'vocab': dataset.vocab
    }
    torch.save(checkpoint, f'checkpoint_epoch_{epoch+1}.pth')
    
    # Check if validation loss improved
    if avg_val_loss < best_val_loss - delta:
        print(f"Validation loss improved from {best_val_loss:.4f} to {avg_val_loss:.4f}. Saving best model...")
        best_val_loss = avg_val_loss
        best_epoch = epoch + 1
        torch.save(checkpoint, 'best_model.pth')
        early_stopping_counter = 0  # Reset counter
    else:
        early_stopping_counter += 1
        print(f"Validation loss did not improve. Early stopping counter: {early_stopping_counter}/{patience}")
        
    # Check if early stopping criteria is met
    if early_stopping_counter >= patience:
        print(f"Early stopping triggered after epoch {epoch+1}. Best validation loss: {best_val_loss:.4f} at epoch {best_epoch}")
        break

print(f"Training complete. Best validation loss: {best_val_loss:.4f} at epoch {best_epoch}")

In [None]:
# # Save the entire model (architecture + parameters)
# torch.save(model, 'image_to_latex_full_model.pth')

# # Or, save just the model parameters (recommended approach)
# torch.save(model.state_dict(), 'image_to_latex_model.pth')

In [None]:
# # Load the entire model
# loaded_model = torch.load('image_to_latex_full_model.pth')
# loaded_model.eval()  # Set to evaluation mode

# # Or load just the parameters into a model instance
# model = ImageToLatexModel(vocab_size)  # Create model instance
# model.load_state_dict(torch.load('image_to_latex_model.pth'))
# model.eval()

# Or load a checkpoint with additional info
checkpoint = torch.load('best_model.pth')
model = ImageToLatexModel(vocab_size)
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
start_epoch = checkpoint['epoch'] + 1
vocab = checkpoint['vocab']

In [None]:
import torch

# Example: Save model weights
torch.save(model.state_dict(), '/kaggle/working/handwritten_to_latex_model.pth')

In [None]:
model = ImageToLatexModel(vocab_size)  # Replace with your actual model class
model.load_state_dict(torch.load('/kaggle/working/handwritten_to_latex_model.pth'))

In [None]:
def predict_latex(image_path, model, tokenizer, device):
    # Prepare image
    image = Image.open(image_path).convert('L')
    image = image_transform(image).unsqueeze(0).to(device)
    
    # Set model to evaluation mode
    model.eval()
    
    # Generate prediction
    with torch.no_grad():
        outputs = model(image)
        predicted_indices = outputs.argmax(dim=2).squeeze(0)
    
    # Decode prediction
    predicted_latex = tokenizer.decode(predicted_indices.cpu().numpy())
    return predicted_latex

In [None]:
def predict_latex(model, image_path, tokenizer, device, max_length=60):
    """
    Generate LaTeX code for a given image using the trained model
    
    Args:
        model: Trained ImageToLatexModel
        image_path: Path to the image file
        tokenizer: LaTeX tokenizer with encode/decode methods
        device: Device to run inference on (CPU/GPU)
        max_length: Maximum output sequence length
        
    Returns:
        predicted_latex: String containing the predicted LaTeX code
    """
    # Prepare image
    image_transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.Grayscale(num_output_channels=3),  # Convert to 3-channel grayscale
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
    ])
    
    image = Image.open(image_path).convert('L')
    image = image_transform(image).unsqueeze(0).to(device)
    
    # Set model to evaluation mode
    model.eval()
    
    # Generate prediction
    with torch.no_grad():
        # Initialize first token as <SOS>
        current_token_idx = torch.ones(1, dtype=torch.long).to(device) * tokenizer.token_to_idx["<|SOS|>"]
        
        # Initialize decoder states
        encoder_features = model.encoder(image)
        hidden_state = torch.zeros(1, 512).to(device)
        cell_state = torch.zeros(1, 512).to(device)
        
        # Store generated tokens
        generated_tokens = [current_token_idx.item()]
        
        # Generate sequence token by token
        for _ in range(max_length):
            # Get next token prediction
            output, hidden_state, cell_state = model.decoder(
                current_token_idx, encoder_features, hidden_state, cell_state
            )
            
            # Get most probable token
            current_token_idx = output.argmax(dim=1)
            
            # Add to generated tokens
            generated_tokens.append(current_token_idx.item())
            
            # Stop if <EOS> token is generated
            if current_token_idx.item() == tokenizer.token_to_idx["<|EOS|>"]:
                break
        
        # Decode the generated tokens
        predicted_latex = tokenizer.decode(generated_tokens)
        
    return predicted_latex

# Load the best model
def load_model(model_path, vocab_size):
    """
    Load a saved model from checkpoint
    
    Args:
        model_path: Path to the model checkpoint
        vocab_size: Size of the vocabulary
        
    Returns:
        model: Loaded model
        tokenizer: Tokenizer with vocabulary from checkpoint
    """
    checkpoint = torch.load(model_path, map_location=torch.device('cpu'))
    
    model = ImageToLatexModel(vocab_size)
    model.load_state_dict(checkpoint['model_state_dict'])
    
    # Create tokenizer with saved vocabulary
    vocab = checkpoint['vocab']
    tokenizer = LatexTokenizer([], max_seq_len=60)  # Create empty tokenizer
    tokenizer.vocab = vocab  # Set vocabulary from checkpoint
    tokenizer.token_to_idx = vocab
    tokenizer.idx_to_token = tokenizer.create_inverse_vocab(vocab)
    
    return model, tokenizer

# Example usage
if __name__ == "__main__":
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    # Load model and tokenizer
    model_path = 'best_model.pth'
    model, tokenizer = load_model(model_path, vocab_size)
    model.to(device)
    
    # Make prediction on test image
    test_image_path = "/kaggle/input/converting-handwritten-equations-to-latex-code/col_774_A4_2023/SyntheticData/images/10016fd166.png"
    predicted_latex = predict_latex(model, test_image_path, tokenizer, device)
    
    print(f"Predicted LaTeX: {predicted_latex}")

In [22]:
import torch
import numpy as np
import pandas as pd
from PIL import Image
import torchvision.transforms as transforms
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
import re

# Load the test data
test_csv = "/kaggle/input/converting-handwritten-equations-to-latex-code/col_774_A4_2023/SyntheticData/test.csv"
test_data = pd.read_csv(test_csv)
test_image_folder = "/kaggle/input/converting-handwritten-equations-to-latex-code/col_774_A4_2023/SyntheticData/images"

# Define image transformation
image_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.Grayscale(num_output_channels=3),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

# Load model and tokenizer
def load_model(model_path, vocab_size):
    checkpoint = torch.load(model_path, map_location=torch.device('cpu'))
    
    model = ImageToLatexModel(vocab_size)
    model.load_state_dict(checkpoint['model_state_dict'])
    
    # Create tokenizer with saved vocabulary
    vocab = checkpoint['vocab']
    tokenizer = LatexTokenizer([], max_seq_len=100)
    tokenizer.vocab = vocab
    tokenizer.token_to_idx = vocab
    tokenizer.idx_to_token = tokenizer.create_inverse_vocab(vocab)
    
    return model, tokenizer, vocab

# Predict LaTeX for an image
def predict_latex(model, image_path, tokenizer, device, max_length=60):
    # Prepare image
    image = Image.open(image_path).convert('L')
    image = image_transform(image).unsqueeze(0).to(device)
    
    # Set model to evaluation mode
    model.eval()
    
    # Generate prediction
    with torch.no_grad():
        # Initialize first token as <SOS>
        current_token_idx = torch.ones(1, dtype=torch.long).to(device) * tokenizer.token_to_idx["<|SOS|>"]
        
        # Initialize decoder states
        encoder_features = model.encoder(image)
        hidden_state = torch.zeros(1, 512).to(device)
        cell_state = torch.zeros(1, 512).to(device)
        
        # Store generated tokens
        generated_tokens = [current_token_idx.item()]
        
        # Generate sequence token by token
        for _ in range(max_length):
            # Get next token prediction
            output, hidden_state, cell_state = model.decoder(
                current_token_idx, encoder_features, hidden_state, cell_state
            )
            
            # Get most probable token
            current_token_idx = output.argmax(dim=1)
            
            # Add to generated tokens
            generated_tokens.append(current_token_idx.item())
            
            # Stop if <EOS> token is generated
            if current_token_idx.item() == tokenizer.token_to_idx["<|EOS|>"]:
                break
        
        # Decode the generated tokens
        predicted_latex = tokenizer.decode(generated_tokens)
        
    return predicted_latex

# Tokenize LaTeX for BLEU score calculation
def tokenize_latex(latex_string):
    token_pattern = r"\\[a-zA-Z]+|[{}]|[0-9]+|[^\s]"
    tokens = re.findall(token_pattern, latex_string)
    return tokens

# Calculate BLEU scores
def calculate_bleu(reference, candidate):
    # Tokenize reference and candidate
    reference_tokens = tokenize_latex(reference)
    candidate_tokens = tokenize_latex(candidate)
    
    # Calculate BLEU score with smoothing
    smoothie = SmoothingFunction().method1
    bleu1 = sentence_bleu([reference_tokens], candidate_tokens, weights=(1, 0, 0, 0), smoothing_function=smoothie)
    bleu2 = sentence_bleu([reference_tokens], candidate_tokens, weights=(0.5, 0.5, 0, 0), smoothing_function=smoothie)
    bleu3 = sentence_bleu([reference_tokens], candidate_tokens, weights=(0.33, 0.33, 0.33, 0), smoothing_function=smoothie)
    bleu4 = sentence_bleu([reference_tokens], candidate_tokens, weights=(0.25, 0.25, 0.25, 0.25), smoothing_function=smoothie)
    
    return {
        'BLEU-1': bleu1,
        'BLEU-2': bleu2,
        'BLEU-3': bleu3,
        'BLEU-4': bleu4
    }

# Main execution
if __name__ == "__main__":
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    # Load model and tokenizer
    model_path = '/kaggle/working/best_model.pth'
    model, tokenizer, vocab = load_model(model_path, len(vocab))
    model.to(device)
    
    # Keep track of all BLEU scores
    all_bleu_scores = []
    
    # Number of test samples to evaluate (set to a small number for testing, then increase)
    num_samples = 20
    
    # Calculate BLEU score for each test sample
    for i in range(min(num_samples, len(test_data))):
        image_name = test_data.iloc[i, 0]
        ground_truth = test_data.iloc[i, 1]
        
        image_path = os.path.join(test_image_folder, image_name)
        
        # Get prediction
        predicted_latex = predict_latex(model, image_path, tokenizer, device)
        
        # Calculate BLEU scores
        bleu_scores = calculate_bleu(ground_truth, predicted_latex)
        all_bleu_scores.append(bleu_scores)
        
        print(f"Sample {i+1}:")
        print(f"Image: {image_name}")
        print(f"Ground Truth: {ground_truth}")
        print(f"Prediction: {predicted_latex}")
        print(f"BLEU Scores: {bleu_scores}")
        print("-" * 50)
    
    # Calculate average BLEU scores
    avg_bleu1 = np.mean([score['BLEU-1'] for score in all_bleu_scores])
    avg_bleu2 = np.mean([score['BLEU-2'] for score in all_bleu_scores])
    avg_bleu3 = np.mean([score['BLEU-3'] for score in all_bleu_scores])
    avg_bleu4 = np.mean([score['BLEU-4'] for score in all_bleu_scores])
    
    print("\nAverage BLEU Scores:")
    print(f"BLEU-1: {avg_bleu1:.4f}")
    print(f"BLEU-2: {avg_bleu2:.4f}")
    print(f"BLEU-3: {avg_bleu3:.4f}")
    print(f"BLEU-4: {avg_bleu4:.4f}")

  checkpoint = torch.load(model_path, map_location=torch.device('cpu'))


Sample 1:
Image: 1cb0b785da.png
Ground Truth: $ \mathcal { F } _ { \mathrm { i n } } ^ { ( 0 ) } = - S [ T _ { + } ^ { 2 } - C ^ { a } C ^ { a } ] $
Prediction: $ { \cal F }_ { \mathrm { T } } ^ {( 2) } = - [ T ^ { a } T ^ { a } - S ^ { 2 } T ^ { a } ] $
BLEU Scores: {'BLEU-1': 0.8220130616218343, 'BLEU-2': 0.697038298428997, 'BLEU-3': 0.5801888111947915, 'BLEU-4': 0.47385478523101565}
--------------------------------------------------
Sample 2:
Image: 6f2229183a.png
Ground Truth: $ d ( l _ { 0 } + 1 , k _ { 1 } ; l _ { 0 } , k _ { 1 } ) c ( l _ { 0 } , k _ { 1 } ; l _ { 0 } + 1 , k _ { 1 } ) = \frac { ( k _ { 1 } + k _ { 0 } ) a - ( k _ { 1 } - k _ { 0 } ) b } { 2 k _ { 1 } ( k _ { 1 } - l _ { 0 } ) ( k _ { 1 } + l _ { 0 } + 1 ) } , $
Prediction: $ d(( k_ { 0 }, k_ { 1 }, k_ { 1 }, k_ { 1 }, k_ { 1 }, k_ { 1 }, k_ { 1 }, k_ { 1 }, k_ { 1 }, k_
BLEU Scores: {'BLEU-1': 0.30049438676849755, 'BLEU-2': 0.27761370272271946, 'BLEU-3': 0.24944306838886127, 'BLEU-4': 0.22014615105130464}
-----