# Simbolic Calculation

In [2]:
import json
import random
import re
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence
import matplotlib.pyplot as plt
from sympy import symbols, sin, cos, exp, series, simplify, expand


## Dataset Preprocessing

As stated in the paper FASEROH : The key to FASEROH is generating a sufficiently diverse set of functions that span the space of functions
used in particle physics and cosmology. Therefore, the first step is to create an inventory of the functions
most commonly used in these fields, noting their typical parameterizations. We call this set the set of base
functions, B. A tentative algorithm for generating functions is given in Algorithm 2.

First of all to manage simbolic expression I am going to use sympy. The next step is to expand the randomic function created using algorithm 2 using taylor's expansion up to the fourth order.

In [None]:
# Symbol for Sympy expansions
x = symbols('x', real=True)

BASE_FUNCTIONS = [x, x**2, x**3, sin(x), cos(x), exp(x)]
OPERATORS = ['+', '-', '*', '/']

def generate_random_function():
    """
    Generates a random function by combining base functions explicitly.
    """
    num_terms = random.randint(1, 3)  
    f = random.choice(BASE_FUNCTIONS)
    
    for _ in range(num_terms - 1):
        operator = random.choice(OPERATORS)
        next_function = random.choice(BASE_FUNCTIONS)
        
        if operator == '/':
            # Ensure we are not dividing by an expression that can become zero
            if next_function != 0 and next_function != x - x:  
                f = f / next_function
        elif operator == '+':
            f = f + next_function
        elif operator == '-':
            f = f - next_function
        elif operator == '*':
            f = f * next_function

    f = simplify(expand(f))
    
    # Avoid returning 0
    if f == 0:
        return generate_random_function()  # Ricrea una funzione se otteniamo zero
    
    return f

def compute_taylor_expansion(f, x, order=4):
    """
    Computes and simplifies the Taylor's series expansion of a function around x=0.
    Default up to order=4.
    """
    try:
        expansion = series(f, x, 0, order+1).removeO()
        return simplify(expansion)
    except Exception:
        return None

def tokenize(expression_str):
    """
    Tokenizes a mathematical expression into a list of tokens
    """
    expression_str = str(expression_str).replace("**", "^")  
    
    expression_str = re.sub(r'([+\-*/^()])', r' \1 ', expression_str)
    
    expression_str = re.sub(r'(\s)-(\d+(\.\d*)?)', r' \1\2', expression_str)

    expression_str = re.sub(r'\b(sin|cos|exp|log|tan|sqrt)\s*\(', r' \1 ( ', expression_str)
    
    tokens = expression_str.strip().split()
    
    return tokens

In [None]:
def generate_dataset(num_samples=10000, filename="dataset.json", taylor_order=6):
    """
    Generates (function, taylor_expansion) pairs, each tokenized.
    Saves them into a JSON file: [ { "function": [...], "expansion": [...] }, ... ]
    This is usefull because we are going to use it later to train the LSTM
    """
    data = []
    for _ in range(num_samples):
        f = generate_random_function()
        expansion = compute_taylor_expansion(f, x, order=taylor_order)
        
        f_str = str(f)
        expansion_str = str(expansion)
        
        data.append({
            "function": tokenize(f_str),
            "expansion": tokenize(expansion_str)
        })
    
    with open(filename, "w") as fp:
        json.dump(data, fp, indent=4)
    


# generate_dataset(num_samples=200000, filename="dataset.json", taylor_order=4)


Now we can see how our helper functions builds the dataset

In [5]:
# Randomly creating a function
for i in range(10):
   print(generate_random_function())

x**2*(x - 1)
x*exp(x)
x
x**2 + sin(2*x)/2
x*(x**2 + cos(x))
(x**2 + cos(x))*exp(-x)
exp(x)
1
exp(x)
x**2/cos(x)


In [6]:
# Creating the taylor expansion
for i in range(10):
   print(compute_taylor_expansion(generate_random_function(),x))

x
x*(x**3 + x**2 + 6*x + 6)/6
x**3 - x
x**3
x**4/24 + x**3/6 + x**2/2 + x + 1
x**4/12 + x**3/6 + x + 2
x*(-x**2/6 + x + 1)
-x**3 + x
x
-x**4/6 + x**2


In [7]:
# How the dataset is tokenized
for i in range(10):
   print(tokenize(str(compute_taylor_expansion(generate_random_function(),x))))

['x', '^', '3', '/', '6']
['x', '*', '(', 'x', '-', '1', ')']
['7', '*', 'x', '^', '4', '/', '360', '+', 'x', '^', '2', '/', '6', '+', '1']
['x', '^', '4', '/', '24', '+', 'x', '^', '3', '/', '6', '+', 'x', '^', '2', '/', '2', '+', 'x', '+', '1']
['-', 'x', '^', '4', '/', '24', '-', 'x', '^', '3', '/', '3', '-', 'x', '^', '2', '/', '2', '-', '1']
['x', '^', '4', '/', '120', '+', 'x', '^', '3', '/', '24', '+', 'x', '^', '2', '/', '6', '+', 'x', '/', '2', '+', '1', '+', '1', '/', 'x']
['-', 'x', '^', '4', '/', '24', '+', '5', '*', 'x', '^', '3', '/', '6', '-', 'x', '^', '2', '/', '2', '-', 'x', '-', '1']
['x', '^', '3']
['x', '^', '4', '/', '24', '+', 'x', '^', '3', '/', '6', '+', 'x', '^', '2', '/', '2', '+', 'x', '+', '1']
['x']


Now we need a vocab, with unique entries to tokenize the dataset, knowing that we are going to need in a index format for training the LSTM(added also the token for SOS , EOS and PAD)

In [None]:
# Load the dataset
with open("dataset.json", "r") as fp:
    dataset = json.load(fp)


all_tokens = set()
for entry in dataset:
    for t in entry["function"]:
        all_tokens.add(t)
    for t in entry["expansion"]:
        all_tokens.add(t)


special_tokens = ["<PAD>", "<SOS>", "<EOS>", "<UNK>"]


vocab = {}
for i, st in enumerate(special_tokens):
    vocab[st] = i

offset = len(special_tokens)  
for idx, token in enumerate(sorted(all_tokens)):
    
    if token not in vocab:
        vocab[token] = idx + offset

# Build the reverse mapping
idx_to_token = {v: k for k, v in vocab.items()}


PAD_TOKEN = vocab["<PAD>"]
SOS_TOKEN = vocab["<SOS>"]
EOS_TOKEN = vocab["<EOS>"]
UNK_TOKEN = vocab["<UNK>"]

vocab_size = len(vocab)
print("Vocabulary size:", vocab_size)
print("Sample vocab items:", list(vocab.items())[:20])


Vocabulary size: 161
Sample vocab items: [('<PAD>', 0), ('<SOS>', 1), ('<EOS>', 2), ('<UNK>', 3), ('(', 4), (')', 5), ('*', 6), ('+', 7), ('-', 8), ('/', 9), ('1', 10), ('10', 11), ('100', 12), ('10080', 13), ('103', 14), ('105', 15), ('1080', 16), ('11', 17), ('112', 18), ('114', 19)]


In [None]:
def tokens_to_ids(tokens, vocab, unk_token=UNK_TOKEN):
    return [vocab[token] if token in vocab else unk_token for token in tokens]

numerical_data = []
for entry in dataset:
    func_ids = tokens_to_ids(entry["function"], vocab)
    exp_ids  = tokens_to_ids(entry["expansion"], vocab)
    numerical_data.append({
        "function": func_ids,
        "expansion": exp_ids
    })

with open("numerical_dataset.json", "w") as fp:
    json.dump(numerical_data, fp, indent=4)

print(f"Saved numerical dataset with {len(numerical_data)} samples.")


Saved numerical dataset with 137732 samples.


So basically now we have a dataset containing pair of fucntion and their taylor expansion. We already created the numerical dataset used to train the LSTM model.

## LSTM model
 
Now the goal is to use the dataset created to train a type of RNN called LSTM. Briefly recall of what this architecture are : A Recurrent Neural Network (RNN) is a type of artificial neural network designed to handle sequential data, such as time series, text, speech, and videos. Unlike feedforward neural networks, which process inputs independently, RNNs have a built-in memory mechanism that allows them to maintain contextual information across different time steps.

Key Characteristics of RNNs

Sequential Processing – Unlike traditional neural networks that treat each input independently, RNNs process sequences step by step, maintaining a state (hidden representation) that captures information from previous time steps.
Shared Weights – The same weights (parameters) are used at each time step, reducing the number of parameters compared to deep feedforward networks and making RNNs more efficient for sequential tasks.
Hidden State – RNNs maintain a hidden state 
h
t
h 
t
​	
  that acts as a memory, capturing past information.
Backpropagation Through Time (BPTT) – The training of RNNs involves unrolling the network through time and applying backpropagation to update weights.
Challenges of Standard RNNs


Key Components of an LSTM Cell

Forget Gate: Decides what portion of past information should be discarded.
Input Gate: Determines how much new information should be stored in the memory.
Cell State: Acts as a long-term memory that carries relevant information through time.
Output Gate: Controls what information from the cell state should be output at the current step.
By controlling the flow of information explicitly, LSTMs mitigate the vanishing gradient problem and improve the handling of long sequences.

The pipeline that we are going to apply for training the LSTM  : 
1. Creating our dataloader taylored for our purpose
2. Defining the model with the parameters
3. Training Loop
4. Evaluation test

In [None]:
class TaylorDataset(Dataset):
    """
    Loads the numerical data in memory. Each item is:
    {
      "function": [int, int, int, ...],
      "expansion": [int, int, int, ...]
    }
    We ll make the expansion the encoder input, and function the decoder target.
    """
    def __init__(self, json_path):
        super().__init__()
        with open(json_path, "r") as fp:
            self.data = json.load(fp)
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        item = self.data[idx]
        
        exp_seq = item["expansion"]
        func_seq = item["function"]
        
        func_seq_in  = [SOS_TOKEN] + func_seq
        func_seq_out = func_seq + [EOS_TOKEN]
        
        return {
            "expansion": torch.tensor(exp_seq, dtype=torch.long),
            "function_in": torch.tensor(func_seq_in, dtype=torch.long),
            "function_out": torch.tensor(func_seq_out, dtype=torch.long)
        }

def collate_fn(batch):
    """
    We need to pad the sequence given the architecture.
    Given a list of items from TaylorDataset.getitem(), produce:
      exps_padded, exp_lengths, funcs_in_padded, funcs_out_padded, func_lengths
    """
    exps      = [b["expansion"]    for b in batch]
    funcs_in  = [b["function_in"]  for b in batch]
    funcs_out = [b["function_out"] for b in batch]
    
    exp_lengths  = torch.tensor([len(seq) for seq in exps],      dtype=torch.long)
    func_lengths = torch.tensor([len(seq) for seq in funcs_in],  dtype=torch.long)
    
    exps_padded      = pad_sequence(exps,      batch_first=True, padding_value=PAD_TOKEN)
    funcs_in_padded  = pad_sequence(funcs_in,  batch_first=True, padding_value=PAD_TOKEN)
    funcs_out_padded = pad_sequence(funcs_out, batch_first=True, padding_value=PAD_TOKEN)
    
    return (
        exps_padded,      # [B, max_len_exp]
        exp_lengths,      # [B]
        funcs_in_padded,  # [B, max_len_func]
        funcs_out_padded, # [B, max_len_func]
        func_lengths      # [B]
    )


dataset_obj = TaylorDataset("numerical_dataset.json")
train_loader = DataLoader(
    dataset_obj,
    batch_size=256,
    shuffle=True,
    collate_fn=collate_fn
)


In [None]:
# This is how a batch is retrieved
for batch in train_loader:
   exps_padded, exp_lengths, funcs_in_padded, funcs_out_padded, func_lengths = batch
   
   print("\n--- Example Batch ---")
   print(f"Expansions (padded): \n{exps_padded}")
   print(f"Expansions lengths: {exp_lengths}")
   print(f"Function Inputs (padded): \n{funcs_in_padded}")
   print(f"Function Outputs (padded): \n{funcs_out_padded}")
   print(f"Function lengths: {func_lengths}")
   break  


--- Example Batch ---
Expansions (padded): 
tensor([[160, 153,  92,  ...,   0,   0,   0],
        [160,   6,   4,  ...,   0,   0,   0],
        [  8, 160, 153,  ...,   0,   0,   0],
        ...,
        [  4, 160, 153,  ...,   0,   0,   0],
        [160, 153,  92,  ...,   0,   0,   0],
        [160, 153,  92,  ...,   0,   0,   0]])
Expansions lengths: tensor([13, 14,  8, 13,  9, 21,  8, 13, 21,  9,  8, 15, 14, 19,  8, 21, 38, 21,
         8, 15, 21, 21, 21, 10, 15, 21, 13,  9, 13, 21, 13, 10, 11, 21, 27, 13,
        23, 13, 20, 23, 13, 13, 24,  3, 25, 25,  9, 21, 16, 19, 19, 49, 17, 10,
        16, 21, 13, 10, 17, 12,  9, 23, 10, 15, 11, 21, 16, 23, 13, 23, 13, 11,
        22, 14, 10, 14, 25,  3, 21,  3, 45, 15, 21, 15,  9, 15, 20, 13, 14,  8,
        13, 13, 15, 15,  9, 15, 21,  9,  9, 14, 24, 13, 27,  8, 16, 13, 10,  9,
         5,  8, 21,  3, 10, 21, 27,  3,  8,  3, 15, 25, 22,  8, 15, 31, 14, 14,
         8,  9,  5, 15, 13, 37, 21,  8,  8, 21, 13,  7, 11, 15,  8, 10, 13, 17,
     

To solve this task, we can use a basic sequence-to-sequence (seq2seq) model with LSTMs. This consists of:
An Encoder: Converts the Taylor expansion into a context vector,
A Decoder: Uses the context vector to generate the original function.

In [24]:
class Encoder(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, num_layers=1, pad_idx=0):
        super().__init__()
        self.embedding = nn.Embedding(
            num_embeddings=vocab_size,
            embedding_dim=embed_dim,
            padding_idx=pad_idx
        )
        self.lstm = nn.LSTM(
            input_size=embed_dim,
            hidden_size=hidden_dim,
            num_layers=num_layers,
            batch_first=True,
            dropout = 0.4
        )
    
    def forward(self, input_seqs, seq_lengths):
        # input_seqs: [B, T]
        embedded = self.embedding(input_seqs)  # [B, T, embed_dim]
        packed_embedded = pack_padded_sequence(
            embedded,
            seq_lengths.cpu(),  
            batch_first=True,
            enforce_sorted=False
        )
        _, (hidden, cell) = self.lstm(packed_embedded)
        return hidden, cell


class Decoder(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, num_layers=1, pad_idx=0):
        super().__init__()
        self.embedding = nn.Embedding(
            num_embeddings=vocab_size,
            embedding_dim=embed_dim,
            padding_idx=pad_idx
        )
        self.lstm = nn.LSTM(
            input_size=embed_dim,
            hidden_size=hidden_dim,
            num_layers=num_layers,
            batch_first=True,
            dropout=0.4
        )
        self.fc = nn.Linear(hidden_dim, vocab_size)
    
    def forward(self, input_seqs, hidden, cell):
        # input_seqs: [B, T_dec]
        embedded = self.embedding(input_seqs)  # [B, T_dec, embed_dim]
        output, (hidden, cell) = self.lstm(embedded, (hidden, cell))
        logits = self.fc(output)  # [B, T_dec, vocab_size]
        return logits, hidden, cell

# Simple wrapper to batch everything
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
    
    def forward(self, src, src_lengths, trg):
        """
        src: [B, T_enc]
        src_lengths: [B]
        trg: [B, T_dec], where T_dec includes <SOS> at the beginning
        """
        hidden, cell = self.encoder(src, src_lengths)
        outputs, _, _ = self.decoder(trg, hidden, cell)
        return outputs


In [25]:
# Hyperparam section
VOCAB_SIZE = vocab_size
EMBED_DIM = 150
HIDDEN_DIM = 512
NUM_LAYERS = 2
LR = 1e-3
NUM_EPOCHS = 10

# MPS check(I trained it on my laptop:) )
if torch.backends.mps.is_available():
    device = torch.device("mps")
    print("Using Apple Silicon MPS backend.")


# Model definition

encoder = Encoder(
    vocab_size=VOCAB_SIZE,
    embed_dim=EMBED_DIM,
    hidden_dim=HIDDEN_DIM,
    num_layers=NUM_LAYERS,
    pad_idx=PAD_TOKEN
)
decoder = Decoder(
    vocab_size=VOCAB_SIZE,
    embed_dim=EMBED_DIM,
    hidden_dim=HIDDEN_DIM,
    num_layers=NUM_LAYERS,
    pad_idx=PAD_TOKEN
)
model = Seq2Seq(encoder, decoder).to(device)
#-----
# Optimization setup
criterion = nn.CrossEntropyLoss(ignore_index=PAD_TOKEN)
optimizer = optim.Adam(model.parameters(), lr=LR)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=2)

#Training Loop
for epoch in range(NUM_EPOCHS):
    # break
    model.train()
    total_loss = 0
    
    for batch in train_loader:
        exps_padded, exp_lengths, funcs_in_padded, funcs_out_padded, func_lengths = [
            b.to(device) for b in batch
        ]
        
        # Forward pass
        logits = model(exps_padded, exp_lengths, funcs_in_padded)
        B, T_dec, V = logits.shape
        #Compute loss
        loss = criterion(logits.view(B*T_dec, V), funcs_out_padded.view(B*T_dec))
        
        #Backprop
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
    
    avg_loss = total_loss / len(train_loader)
    print(f"[Epoch {epoch+1}/{NUM_EPOCHS}] loss = {avg_loss:.4f}")
    scheduler.step(avg_loss)


Using Apple Silicon MPS backend.
[Epoch 1/10] loss = 0.4032
[Epoch 2/10] loss = 0.0739
[Epoch 3/10] loss = 0.0412
[Epoch 4/10] loss = 0.0329
[Epoch 5/10] loss = 0.0317
[Epoch 6/10] loss = 0.0313
[Epoch 7/10] loss = 0.0281
[Epoch 8/10] loss = 0.0288
[Epoch 9/10] loss = 0.0331
[Epoch 10/10] loss = 0.0275


In [None]:
MODEL_SAVE_PATH = "seq2seq_model.pth"

torch.save({
    
    'model_state_dict': model.state_dict(),
    
}, MODEL_SAVE_PATH)

In [None]:
def greedy_decode(model, expansion_token_ids, max_length=50):
    """
    Decode a single example (list of token IDs) using the trained model.
    Return a list of predicted token IDs.
    """
    model.eval()
    with torch.no_grad():
        
        src_tensor = torch.tensor([expansion_token_ids], dtype=torch.long, device=device)
        src_length = torch.tensor([len(expansion_token_ids)], dtype=torch.long, device=device)
        
        hidden, cell = model.encoder(src_tensor, src_length)
        
        
        dec_input = torch.tensor([[SOS_TOKEN]], dtype=torch.long, device=device)
        decoded_ids = []
        
        for _ in range(max_length):
            logits, hidden, cell = model.decoder(dec_input, hidden, cell)
            
            next_token_id = torch.argmax(logits[:, -1, :], dim=-1)  # shape: [1]
            ntid = next_token_id.item()
            if ntid == EOS_TOKEN:
                break
            decoded_ids.append(ntid)
            
            
            dec_input = next_token_id.unsqueeze(0)  # Now shape is [1,1]
            
    return decoded_ids


def evaluate_model(model):
    """
    Generates a random function, computes its Taylor expansion, tokenizes it, 
    performs greedy decoding, and prints results
    """
   
    f = generate_random_function()
    
    #print(f"Target fucntion (Symbolic Representation):  {f}")

    expansion = compute_taylor_expansion(f, x, order=4)
    
    expansion_tokens = tokenize(expansion)
    expansion_ids = [vocab[t] if t in vocab else UNK_TOKEN for t in expansion_tokens]

    predicted_token_ids = greedy_decode(model, expansion_ids, max_length=50)

    #print(f"Predicted Token IDs using the model: {predicted_token_ids}")

    predicted_function_tokens = []
    for token_id in predicted_token_ids:
        if token_id in idx_to_token:
            predicted_function_tokens.append(idx_to_token[token_id])
        else:
            predicted_function_tokens.append("[UNK]")  
   
    #print(f"Predicted Tokens:{predicted_function_tokens}")
    
    predicted_function_str = " ".join(predicted_function_tokens)

    true_function_str = " ".join(tokenize(str(f)))

    print(f"Predicted Function:  {predicted_function_str}")
    print(f"True Function:  {true_function_str}")
    print("#####")


for i in range(5):  
    evaluate_model(model)


Predicted Function:  exp ( x ) * sin ( x )
True Function:  exp ( x ) * sin ( x )
#####
Predicted Function:  x * ( 1 - x ) * exp ( x )
True Function:  x * ( 1 - x ) * exp ( x )
#####
Predicted Function:  - x + exp ( x )
True Function:  - x + exp ( x )
#####
Predicted Function:  - x ^ 3 + sqrt ( 2 ) * sin ( x + pi / 4 )
True Function:  - x ^ 3 + sqrt ( 2 ) * sin ( x + pi / 4 )
#####
Predicted Function:  x ^ 3 * sin ( x )
True Function:  x ^ 3 * sin ( x )
#####


After few tries we obtain good result using the LSTM. Sometimes we have allucination, this can be resolved improving the dataset and also by adjusting the distribution of the function in the dataset.

Created by Tommaso Vaccari