In [1]:
# Import necessary libraries
import os
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from transformers import T5ForConditionalGeneration, T5Tokenizer
from sklearn.metrics import accuracy_score

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# Load the data back from the pickle file
source_dir = "data"
train_df = pd.read_pickle(os.path.join(source_dir, "train.pkl"))
val_df = pd.read_pickle(os.path.join(source_dir, "val.pkl"))
test_df = pd.read_pickle(os.path.join(source_dir, "test.pkl"))

In [None]:
# Define the dataset class for amplitude data
class AmplitudeDataset(Dataset):
    def __init__(self, amplitudes, squared_amplitudes, tokenizer, max_length=512):
        self.tokenizer = tokenizer
        self.amplitudes = amplitudes
        self.squared_amplitudes = squared_amplitudes
        self.max_length = max_length
        
    def __len__(self):
        return len(self.amplitudes)
    
    def __getitem__(self, idx):
        amplitude = ' '.join(self.amplitudes[idx])
        squared_amplitude = ' '.join(self.squared_amplitudes[idx])
        
        # Tokenize inputs
        input_encoding = self.tokenizer(
            amplitude,
            max_length=self.max_length,
            padding='max_length',
            truncation=True,
            return_tensors='pt'
        )
        
        # Tokenize targets
        target_encoding = self.tokenizer(
            squared_amplitude,
            max_length=self.max_length,
            padding='max_length',
            truncation=True,
            return_tensors='pt'
        )
        
        return {
            'input_ids': input_encoding.input_ids.squeeze(),
            'attention_mask': input_encoding.attention_mask.squeeze(),
            'labels': target_encoding.input_ids.squeeze()
        }

In [3]:
# Initialize tokenizer and model
tokenizer = T5Tokenizer.from_pretrained('t5-small')
model = T5ForConditionalGeneration.from_pretrained('t5-small')

# Add special tokens for mathematical expressions
special_tokens = ['*', '/', '+', '-', '^', '(', ')', '{', '}', '_', 'gamma', 'sigma', 'e^2']
tokenizer.add_tokens(special_tokens)
model.resize_token_embeddings(len(tokenizer))

# Display tokenizer information
print(f"Vocabulary size after adding special tokens: {len(tokenizer)}")
print(f"Special tokens added: {special_tokens}")



Vocabulary size after adding special tokens: 32106
Special tokens added: ['*', '/', '+', '-', '^', '(', ')', '{', '}', '_', 'gamma', 'sigma', 'e^2']


In [6]:
tokenizer.bos_token_id, tokenizer.eos_token_id 

(None, 1)

In [None]:
# Create datasets
train_dataset = AmplitudeDataset(
    train_df['tokenized_amplitude'].tolist(),
    train_df['tokenized_squared_amplitude'].tolist(),
    tokenizer
)

val_dataset = AmplitudeDataset(
    val_df['tokenized_amplitude'].tolist(),
    val_df['tokenized_squared_amplitude'].tolist(),
    tokenizer
)

test_dataset = AmplitudeDataset(
    test_df['tokenized_amplitude'].tolist(),
    test_df['tokenized_squared_amplitude'].tolist(),
    tokenizer
)

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=8)
test_loader = DataLoader(test_dataset, batch_size=8)

print(f"Number of training batches: {len(train_loader)}")
print(f"Number of validation batches: {len(val_loader)}")
print(f"Number of test batches: {len(test_loader)}")


Number of training batches: 1556
Number of validation batches: 195
Number of test batches: 195


In [None]:
# Check a sample batch to verify data loading
sample_batch = next(iter(train_loader))
print(f"Input shape: {sample_batch['input_ids'].shape}")
print(f"Attention mask shape: {sample_batch['attention_mask'].shape}")
print(f"Labels shape: {sample_batch['labels'].shape}")

# Decode a sample input and output
sample_input = tokenizer.decode(sample_batch['input_ids'][0], skip_special_tokens=True)
sample_output = tokenizer.decode(sample_batch['labels'][0], skip_special_tokens=True)

print("\nSample input:")
print(sample_input[:100] + "..." if len(sample_input) > 100 else sample_input)
print("\nSample output:")
print(sample_output[:100] + "..." if len(sample_output) > 100 else sample_output)


Input shape: torch.Size([8, 512])
Attention mask shape: torch.Size([8, 512])
Labels shape: torch.Size([8, 512])

Sample input:
2/9 * i * e ^ 2 * gamma _ { +%nu_1,%eta_1,%eps_1 } * gamma _ { %nu_1,%gam_1,%eta_2 } * c _ { k_19330...

Sample output:
4/81 * e ^ 4 * ( 16 * m_c ^ 2 * m_d ^ 2 + 8 * m_d ^ 2 * s_12 + 8 * s_14 * s_23 + 8 * s_13 * s_24 + 8...


In [None]:
# Training function
def train_model(model, train_loader, val_loader, epochs=3, lr=5e-5):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"Using device: {device}")
    model.to(device)
    
    optimizer = torch.optim.AdamW(model.parameters(), lr=lr)
    
    # Training history
    history = {
        'train_loss': [],
        'val_loss': []
    }
    
    for epoch in range(epochs):
        # Training phase
        model.train()
        train_loss = 0
        
        for batch_idx, batch in enumerate(train_loader):
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)
            
            optimizer.zero_grad()
            
            outputs = model(
                input_ids=input_ids,
                attention_mask=attention_mask,
                labels=labels
            )
            
            loss = outputs.loss
            train_loss += loss.item()
            
            loss.backward()
            optimizer.step()
            
            # Print progress every 10 batches
            if (batch_idx + 1) % 10 == 0:
                print(f"Epoch {epoch+1}/{epochs} | Batch {batch_idx+1}/{len(train_loader)} | Loss: {loss.item():.4f}")
        
        avg_train_loss = train_loss / len(train_loader)
        history['train_loss'].append(avg_train_loss)
        
        # Validation phase
        model.eval()
        val_loss = 0
        
        with torch.no_grad():
            for batch in val_loader:
                input_ids = batch['input_ids'].to(device)
                attention_mask = batch['attention_mask'].to(device)
                labels = batch['labels'].to(device)
                
                outputs = model(
                    input_ids=input_ids,
                    attention_mask=attention_mask,
                    labels=labels
                )
                
                val_loss += outputs.loss.item()
        
        avg_val_loss = val_loss / len(val_loader)
        history['val_loss'].append(avg_val_loss)
        
        print(f"Epoch {epoch+1}/{epochs}")
        print(f"Train loss: {avg_train_loss:.4f}")
        print(f"Validation loss: {avg_val_loss:.4f}")
        print("-" * 50)
    
    return model, history

In [None]:
# Visualize training history
def plot_training_history(history):
    import matplotlib.pyplot as plt
    
    plt.figure(figsize=(10, 6))
    plt.plot(history['train_loss'], label='Training Loss')
    plt.plot(history['val_loss'], label='Validation Loss')
    plt.title('Training and Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.grid(True)
    plt.show()

In [None]:
# Evaluate sequence accuracy
def evaluate_sequence_accuracy(model, data_loader):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)
    model.eval()
    
    all_predictions = []
    all_targets = []
    
    with torch.no_grad():
        for batch_idx, batch in enumerate(data_loader):
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)
            
            # Generate predictions
            outputs = model.generate(
                input_ids=input_ids,
                attention_mask=attention_mask,
                max_length=512
            )
            
            # Decode predictions and targets
            predictions = [tokenizer.decode(pred, skip_special_tokens=True) for pred in outputs]
            targets = [tokenizer.decode(label, skip_special_tokens=True) for label in labels]
            
            all_predictions.extend(predictions)
            all_targets.extend(targets)
            
            # Print progress
            if (batch_idx + 1) % 5 == 0:
                print(f"Evaluated {batch_idx+1}/{len(data_loader)} batches")
    
    # Calculate sequence accuracy
    exact_matches = sum(1 for pred, target in zip(all_predictions, all_targets) if pred == target)
    sequence_accuracy = exact_matches / len(all_targets)
    
    # Return accuracy and some examples for inspection
    return sequence_accuracy, all_predictions[:5], all_targets[:5]


In [None]:
# Train the model
# Set epochs to a small number for initial testing, increase for better results
trained_model, history = train_model(model, train_loader, val_loader, epochs=2, lr=5e-5)

# Plot training history
plot_training_history(history)

Using device: cpu


KeyboardInterrupt: 

In [None]:
# Evaluate on test set
test_accuracy, sample_predictions, sample_targets = evaluate_sequence_accuracy(trained_model, test_loader)
print(f"Test sequence accuracy: {test_accuracy:.4f}")

# Display some example predictions
print("\nSample predictions vs targets:")
for i, (pred, target) in enumerate(zip(sample_predictions, sample_targets)):
    print(f"\nExample {i+1}:")
    print(f"Prediction: {pred[:100]}..." if len(pred) > 100 else f"Prediction: {pred}")
    print(f"Target: {target[:100]}..." if len(target) > 100 else f"Target: {target}")
    print(f"Correct: {pred == target}")


In [None]:
# Save the model and tokenizer
torch.save(trained_model.state_dict(), 'transformer_model.pt')
tokenizer.save_pretrained('amplitude_tokenizer')

print("Model and tokenizer saved successfully!")