# Image To Latex ML
[IM2LATEX-100K dataset info](https://www.emergentmind.com/topics/im2latex-100k-dataset-ad016d42-2c17-4b9f-a959-bbe2d1d350d9)

[im2markup](https://github.com/harvardnlp/im2markup/blob/master/README.md)

Related Papers:

[Image-to-Markup Generation with Coarse-to-Fine Attention (Harvard)](https://proceedings.mlr.press/v70/deng17a/deng17a.pdf)

[Image To Latex (Stanford)](https://cs231n.stanford.edu/reports/2017/pdfs/815.pdf)

In [None]:
# Standard imports
import numpy as np
import matplotlib.pyplot as plt
plt.style.use('dark_background')
import pandas as pd
from collections import Counter
import io
from PIL import Image
# Standard Pytorch imports (note the aliases).
import torch
from torch.utils.data import Dataset
import torchvision.transforms as T
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import DataLoader
import torch.nn as nn

from pylatexenc.latexwalker import LatexWalker, LatexCharsNode, LatexMacroNode, LatexGroupNode

torch.cuda.is_available()

False

In [14]:
print("loading ...")

base_url = "hf://datasets/yuntian-deng/im2latex-100k/"
splits = {
    'train': 'data/train-00000-of-00001-93885635ef7c6898.parquet', 
    'test': 'data/test-00000-of-00001-fce261550cd3f5db.parquet', 
    'val': 'data/val-00000-of-00001-3f88ebb0c1272ccf.parquet'
}

train_df = pd.read_parquet(base_url + splits["train"])
val_df   = pd.read_parquet(base_url + splits["val"])
test_df  = pd.read_parquet(base_url + splits["test"])

print("First 5 records: \n", train_df.head())
print("First 5 records formulas: \n", train_df.head().formula)


loading ...
First 5 records: 
                                              formula        filename  \
0  \widetilde \gamma _ { \mathrm { h o p f } } \s...  66667cee5b.png   
1  ( { \cal L } _ { a } g ) _ { i j } = 0 , \ \ \...  1cbb05a562.png   
2  S _ { s t a t } = 2 \pi \sqrt { N _ { 5 } ^ { ...  ed164cc822.png   
3  \hat { N } _ { 3 } = \sum \sp f _ { j = 1 } a ...  e265f9dc6b.png   
4  \, ^ { * } d \, ^ { * } H = \kappa \, ^ { * } ...  242a58bc3a.png   

                                         image.bytes image.path  
0  b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\...       None  
1  b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\...       None  
2  b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\...       None  
3  b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\...       None  
4  b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\...       None  
First 5 records formulas: 
 0    \widetilde \gamma _ { \mathrm { h o p f } } \s...
1    ( { \cal L } _ { a } g ) _ { i j } = 0 , \ \ \...
2 

## 1. Pre-processing
- **Normalizzazione**: Poichè le formule hanno dimensioni variabili, è comune raggrupparle in "bucket" di risoluzione simile o ridimensionarle a una dimensione fissa (es. \(480 X 160\) o \(640 X 160\)) mantenendo l'aspect ratio. (sembrerebbe essere già fatta)
- **Tokenizzazione**: La stringa LaTeX viene scomposta in token (es. \frac, {, x, ^, 2, }). Viene aggiunto un vocabolario con token speciali come <sos> (inizio), <eos> (fine) e <pad> (riempimento)
[Latex tokenizer](https://pylatexenc.readthedocs.io/en/latest/latexwalker/)

In [None]:
class Vocabulary:
    def __init__(self):
        # Definiamo i token speciali con ID fissi
        self.itos = {0: "<pad>", 1: "<sos>", 2: "<eos>", 3: "<unk>"}
        self.stoi = {v: k for k, v in self.itos.items()}
        self.threshold = 1 # Frequenza minima per includere un token

    def build_vocabulary(self, sentence_list):
        frequencies = Counter()
        idx = 4
        
        for sentence in sentence_list:
            for word in sentence:
                frequencies[word] += 1
                
        for word, freq in frequencies.items():
            if freq >= self.threshold:
                self.stoi[word] = idx
                self.itos[idx] = word
                idx += 1

    def numericalize(self, text):
        return [self.stoi.get(token, self.stoi["<unk>"]) for token in text]

def parse_nodes(nodes):
        flat_tokens = []
        for node in nodes:
            if node is None: continue

            if node.isNodeType(LatexCharsNode):
                # Caratteri semplici (a, b, =, +, ecc.)
                # Li dividiamo ulteriormente per carattere singolo se necessario
                for c in node.chars:
                    if not c.isspace(): flat_tokens.append(c)
                    
            elif node.isNodeType(LatexMacroNode):
                # Comandi tipo \frac, \alpha, \cal
                m_name = node.macroname
                token = "\\" + m_name if m_name else "\\\\"
                if token.strip() == "\\": continue
                flat_tokens.append(token)
                
            elif node.isNodeType(LatexGroupNode):
                # Gruppi tra { ... }, li apriamo ricorsivamente
                flat_tokens.append("{")
                flat_tokens.extend(parse_nodes(node.nodelist))
                flat_tokens.append("}")
        return flat_tokens


def get_final_tokens(formula):
    walker = LatexWalker(rf"{formula}")
    try:
        (nodes, pos, len_) = walker.get_latex_nodes()
    except:
        return []
    
    return ["<sos>"] + parse_nodes(nodes) + ["<eos>"]


# 1. Istanzia e costruisci il vocabolario sui tuoi token
tokenized = [get_final_tokens(formula) for formula in train_df.formula.values]
vocab = Vocabulary()
vocab.build_vocabulary(tokenized)

# 2. Esempio di conversione della tua formula (record 1)
example_indices = vocab.numericalize(tokenized[0])

print(f"formula originale: {test_df.formula.values[0]}")
print(f"Token originali: {tokenized[0]}")
print(f"Indici numerici: {example_indices}")
print(f"Dimensione del vocabolario: {len(vocab.stoi)}")

formula originale: \alpha _ { 1 } ^ { r } \gamma _ { 1 } + \dots + \alpha _ { N } ^ { r } \gamma _ { N } = 0 \quad ( r = 1 , . . . , R ) \; ,
Token originali: ['<sos>', '\\widetilde', '_', '{', '\\mathrm', '}', '\\simeq', '\\sum', '_', '{', 'n', '>', '0', '}', '\\widetilde', '_', '{', 'n', '}', '{', '\\frac', '}', '<eos>']
Indici numerici: [4, 5, 6, 7, 8, 9, 10, 11, 6, 7, 12, 13, 14, 9, 5, 6, 7, 12, 9, 7, 15, 9, 16]
Dimensione del vocabolario: 442


### Aggiornare il Dataset

In [None]:
class Im2LatexDataset(Dataset):
    def __init__(self, df, vocab, transform = None):
        self.df = df
        self.vocab = vocab
        self.transform = transform
        self.formulas= [get_final_tokens(f) for f in df.formula.values]
    
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, index):
        img_bytes = self.id.iloc[index]['images.bytes']
        image = Image.open(io.BytesIO(img_bytes)).convert("RGB")

        if self.transform:
            image = self.transform(image)

        tokens = self.formulas[index]
        numerical_indices = self.vocab.numericalize(tokens)
        
        return image, torch.tensor(numerical_indices)
    

# Trasformazioni: Resize fisso (importante!) e normalizzazione
transform = T.Compose([
    T.Resize((64, 320)), # Dimensioni immagini del dataset (Height, Width)
    T.ToTensor(),
    T.Normalize((0.5,), (0.5,)) 
])

dataset = Im2LatexDataset(test_df, vocab, transform=transform)

### DataLoader

In [None]:
def collate_fn(batch):
    images, sequences = zip(*batch)

    #unire le immagini in unico tensore
    images = torch.stack(images, dim=0)

    #padding sequenze corte con 0 (<pad>)
    sequences_padded = pad_sequence(sequences, batch_first=True, padding_value=0)

    return images, sequences_padded

dataLoader = DataLoader(dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)

## 2. Feature Extraction (Encoder CNN)
- **BakcBone**: ResNet-18 o una CNN custom (con circa 4-6 strati convoluzionali).
- **Output**: Se l'input è \(480\times 160\), la CNN produrrà una mappa di feature di circa \(30\times 10\) con \(512\) canali. Ogni "pixel" di questa mappa rappresenta una piccola regione della formula

In [None]:
class CnnEncoder(nn.Module):
    def __init__(self, d_model):
        super().__init__()

        # Una CNN semplice (stile ResNet ridotta)
        self.conv = nn.Sequential(
            nn.Conv2d(3, 64, 3, 1, 1), nn.ReLU(), nn.MaxPool2d(2), # -> 32x160
            nn.Conv2d(64, 128, 3, 1, 1), nn.ReLU(), nn.MaxPool2d(2), # -> 16x80
            nn.Conv2d(128, 256, 3, 1, 1), nn.ReLU(), nn.MaxPool2d(2), # -> 8x40
            nn.Conv2d(256, d_model, 3, 1, 1), nn.ReLU(), nn.MaxPool2d((2, 2)) # -> 4x20
        )

    
    def forward(self, x):
        # Output: [Batch, d_model, 4, 20]
        features = self.conv(x)
        # Lo "appiattiamo" per il Transformer: [Batch, 80, d_model]
        # 80 è la lunghezza della sequenza visiva (4*20)
        features = features.flatten(2).permute(0, 2, 1)
        return features

## 3. Dalla griglia alla sequenza
Il Transformer lavora su sequenze 1D, ma le formule sono intrinsecamente 2D.
- **Flattening spaziale**: La mappa \(30\times 10\) viene appiattita in una sequenza di \(300\) vettori.
- **2D Positional Encoding**: Fondamentale per Im2Latex. Poiché una frazione ha elementi sopra e sotto, un encoding posizionale che mantenga coordinate \((x,y)\) aiuta il modello a capire la gerarchia verticale, non solo l'ordine destra-sinistra. 

## 4. Transformer Decoder (Generazione LaTeX)
- **Self-Attention**: Il decoder analizza i token LaTeX già generati (es. se ha scritto \begin{equation}, "sa" che dovrà chiuderlo).
- **Cross-Attention**: Il decoder "guarda" i vettori estratti dalla CNN. Ad esempio, quando deve generare l'esponente, l'attenzione si sposterà sulla regione in alto a destra del simbolo di base nell'immagine. 

In [19]:
class TransformerDecoder(nn.Module):
    def __init__(self, vocab_size, d_model, nhead, num_layers):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoding = nn.Parameter(torch.randn(1,1000, d_model)) # Positional enoding

        decoder_layer = nn.TransformerDecoderLayer(d_model, nhead, batch_first=True)
        self.transformer_decoder = nn.TransformerDecoder(decoder_layer, num_layers)

        self.fc_out = nn.Linear(d_model, vocab_size)

    
    def forward(self, tgt, memory, tgt_mask):
        # tgt: token Latex [Batch, Seq_Len]
        # memory: feature della CNN [Batch, 80, d_model]

        tgt_emb = self.embedding(tgt) + self.pos_encoding[:, :tgt.size(1), :]

        output = self.transformer_decoder(tgt_emb, memory, tgt_mask=tgt_mask)

        return self.fc_out(output)

In [None]:
class Im2LatexModel(nn.Module):
    def __init__(self, vocab_size, d_model=256, nhead=8, num_layers=4):
        super().__init__()
        
        # 1. ENCODER (CNN)
        # Prende l'immagine [B, 3, 64, 320] -> restituisce feature [B, 80, d_model]
        self.encoder_cnn = CnnEncoder(d_model) 
        
        # 2. DECODER (Transformer)
        # Prende i token LaTeX e le feature della CNN -> predice il prossimo token
        self.decoder_transformer = TransformerDecoder(vocab_size, d_model, nhead, num_layers)

    def forward(self, src_img, tgt_tokens):
        # A. Estrazione feature visive (Memory)
        # src_img shape: [Batch, 3, 64, 320]
        memory = self.encoder_cnn(src_img) # Output: [Batch, 80, d_model]
        
        # B. Creazione maschera per il Decoder
        # Impedisce di guardare i token futuri nella sequenza LaTeX
        device = tgt_tokens.device
        tgt_mask = nn.Transformer.generate_square_subsequent_mask(tgt_tokens.size(1)).to(device)
        
        # C. Generazione output tramite il Decoder
        # tgt_tokens: [Batch, Seq_Len]
        output = self.decoder_transformer(tgt_tokens, memory, tgt_mask)
        
        return output # [Batch, Seq_Len, Vocab_Size]

## 5. Training e Inferenza
- **Loss**: Si usa la Cross-Entropy calcolata su ogni token della sequenza LaTeX.
- **Decoding**: In fase di test, non si sceglie solo il token più probabile (Greedy Search), ma si usa la Beam Search per esplorare più combinazioni e trovare la formula sintatticamente più corretta. 


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = Im2LatexModel(vocab_size=len(vocab.stoi)).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)
criterion = nn.CrossEntropyLoss(ignore_index=0) # Ignora il <pad>
train_dataset = Im2LatexDataset(train_df, vocab, transform=transform)
val_dataset   = Im2LatexDataset(val_df, vocab, transform=transform)

train_loader = DataLoader(
    train_dataset, 
    batch_size=32, 
    shuffle=True, 
    collate_fn=collate_fn,
    num_workers=2 # Opzionale: velocizza il caricamento
)

val_loader = DataLoader(
    val_dataset, 
    batch_size=32, 
    shuffle=False, 
    collate_fn=collate_fn
)

def generate_tgt_mask(sz, device):
    mask = torch.triu(torch.ones(sz, sz, device=device) == 1)
    mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
    return mask

def train_epoch(model, dataloader, optimizer, criterion):
    model.train()
    total_loss = 0
    
    for imgs, captions in dataloader:
        imgs, captions = imgs.to(device), captions.to(device)
        
        # Prepariamo input e target: 
        # tgt_input: da <sos> a penultimo token
        # tgt_expected: da secondo token a <eos>
        tgt_input = captions[:, :-1]
        tgt_expected = captions[:, 1:]
        
        tgt_mask = generate_tgt_mask(tgt_input.size(1), device)
        
        # Forward
        preds = model(imgs, tgt_input, tgt_mask) # [B, Seq, Vocab]
        
        # Calcolo Loss (Flatten per CrossEntropy)
        loss = criterion(preds.reshape(-1, preds.shape[-1]), tgt_expected.reshape(-1))
        
        # Backprop
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
    
    return total_loss / len(dataloader)

def evaluate(model, dataloader, criterion):
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for imgs, captions in dataloader:
            imgs, captions = imgs.to(device), captions.to(device)
            tgt_input = captions[:, :-1]
            tgt_expected = captions[:, 1:]
            tgt_mask = generate_tgt_mask(tgt_input.size(1), device)
            
            preds = model(imgs, tgt_input, tgt_mask)
            loss = criterion(preds.reshape(-1, preds.shape[-1]), tgt_expected.reshape(-1))
            total_loss += loss.item()
            
    return total_loss / len(dataloader)

def predict(model, img, vocab, max_len=150):
    model.eval()
    device = img.device
    
    with torch.no_grad():
        # 1. Estrai le feature dall'immagine (Memory)
        memory = model.encoder(img) 
        
        # 2. Inizia con il token <sos>
        ys = torch.full((1, 1), vocab.stoi["<sos>"], dtype=torch.long, device=device)
        
        for i in range(max_len):
            # Crea la maschera per i token generati finora
            tgt_mask = generate_tgt_mask(ys.size(1), device)
            
            # Predici il prossimo token
            out = model(img, ys, tgt_mask)
            prob = out[:, -1, :] # Prendi l'ultimo token predetto
            _, next_word = torch.max(prob, dim=1) # Greedy: prendi il più probabile
            
            next_word = next_word.item()
            ys = torch.cat([ys, torch.ones(1, 1, device=device, dtype=torch.long) * next_word], dim=1)
            
            # Se il modello predice <eos>, fermati
            if next_word == vocab.stoi["<eos>"]:
                break
        
        # Converti gli indici in stringhe LaTeX
        decoded_words = [vocab.itos[idx.item()] for idx in ys[0]]
        return " ".join(decoded_words)


for epoch in range(10):
    train_loss = train_epoch(model, train_loader, optimizer, criterion)
    val_loss = evaluate(model, val_loader, criterion)
    print(f"Epoch {epoch} | Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f}")

    example_img, example_tgt = next(iter(val_loader))
    prediction = predict(model, example_img[0].unsqueeze(0).to(device), vocab)
    print(f"Real: {vocab.indices_to_string(example_tgt[0])}")
    print(f"Pred: {prediction}")