In [1]:
import sympy as sp
x = sp.symbols('x')
from sympy import symbols, sin, cos, exp, ln, log, tan, asin, atan,cot
import pandas as pd
import numpy as np
import nltk
from nltk.tokenize import word_tokenize
from collections import Counter
import random
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torch.optim as optim

nltk.download('punkt')

[nltk_data] Downloading package punkt to /usr/share/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


True

In [2]:
class GenerateFunction:
    def __init__(self, vocab=None, max_depth=3):
        if vocab is None:
            self.vocab = np.array([
                ['add', 6, 2], ['sub', 3, 2], ['mul', 6, 2], ['div', 3, 2],
                ['pow', 3, 2], ['sq', 2, 1], ['sqrt', 2, 1], ['cb', 2, 1],
                ['cbrt', 2, 1], ['exp', 2, 1], ['ln', 2, 1], ['sin', 2, 1],
                ['cos', 2, 1], ['tan', 2, 1], ['asin', 2, 1], ['acos', 2, 1],
                ['atan', 2, 1], ['sinh', 2, 1], ['cosh', 2, 1], ['tanh', 2, 1],
                ['asinh', 2, 1], ['acosh', 2, 1], ['atanh', 2, 1], ['x', 10, 0]
            ])
        else:
            self.vocab = vocab
        self.max_depth = max_depth
        self.x = sp.symbols('x')

    def generate_expression(self, max_depth=None, depth=0, vocab=None):
        if vocab is None:
            vocab = self.vocab
        if max_depth is None:
            max_depth = self.max_depth
            
        if depth >= max_depth:
            return ['x']
        
        weights = vocab[:, 1].astype('float32')
        probs = weights / np.sum(weights)
        N = len(vocab)
        expr = []
        rand_idx = np.random.choice(N, p=probs)
        cur_token = vocab[rand_idx, 0]
        cur_arity = int(vocab[rand_idx, 2])
        expr.append(cur_token)
        
        if cur_arity == 0:
            return expr
        else:
            token_families = [
                ['sin', 'cos', 'tan', 'asin', 'acos', 'atan'],
                ['sinh', 'cosh', 'tanh', 'asinh', 'acosh', 'atanh'],
                ['exp', 'ln'], ['sq', 'sqrt'], ['cb', 'cbrt']
            ]
            token_family = next((f for f in token_families if cur_token in f), None)
            new_vocab = vocab
            if token_family:
                mask = np.isin(vocab[:, 0], token_family, invert=True)
                new_vocab = vocab[mask]
            
            if cur_arity == 1:
                child = self.generate_expression(max_depth, depth+1, new_vocab)
                return expr + child
            elif cur_arity == 2:
                child1 = self.generate_expression(max_depth, depth+1, new_vocab)
                child2 = self.generate_expression(max_depth, depth+1, new_vocab)
                return expr + child1 + child2

    def sequence_to_sympy(self, expr):
        cur_token = expr[0]
        try:
            return float(cur_token)
        except ValueError:
            pass
        
        cur_idx = np.where(self.vocab[:, 0] == cur_token)[0][0]
        cur_arity = int(self.vocab[cur_idx, 2])
        
        if cur_arity == 0:
            return self.x
        elif cur_arity == 1:
            operand = self.sequence_to_sympy(expr[1:])
            return self._handle_unary(cur_token, operand)
        elif cur_arity == 2:
            left_tokens, right_tokens = self._split_expression(expr)
            left_expr = self.sequence_to_sympy(left_tokens)
            right_expr = self.sequence_to_sympy(right_tokens)
            return self._handle_binary(cur_token, left_expr, right_expr)

    def _handle_unary(self, operator, operand):
        operators = {
            'sin': sp.sin, 'cos': sp.cos, 'tan': sp.tan,
            'asin': sp.asin, 'acos': sp.acos, 'atan': sp.atan,
            'sinh': sp.sinh, 'cosh': sp.cosh, 'tanh': sp.tanh,
            'asinh': sp.asinh, 'acosh': sp.acosh, 'atanh': sp.atanh,
            'exp': sp.exp, 'ln': sp.log,
            'sq': lambda x: x**2, 'sqrt': sp.sqrt,
            'cb': lambda x: x**3, 'cbrt': sp.cbrt
        }
        return operators[operator](operand)

    def _handle_binary(self, operator, left, right):
        operators = {
            'add': lambda a,b: a+b, 'sub': lambda a,b: a-b,
            'mul': lambda a,b: a*b, 'div': lambda a,b: a/b,
            'pow': lambda a,b: a**b
        }
        return operators[operator](left, right)

    def _split_expression(self, expr):
        arity_count = 1
        idx_split = 1
        for token in expr[1:]:
            try:
                float(token)
                arity_count -= 1
            except:
                idx = np.where(self.vocab[:, 0] == token)[0][0]
                arity_count += int(self.vocab[idx, 2]) - 1
            if arity_count == 0:
                break
            idx_split += 1
        return expr[1:idx_split], expr[idx_split:]

    def generate_functions(self, num_samples):
        functions = []
        while len(functions) < num_samples:
            try:
                expr = self.generate_expression()
                func = self.sequence_to_sympy(expr)
                functions.append(func)
            except:
                continue
        return functions[:num_samples]

In [3]:
gf = GenerateFunction(max_depth=3)
functions = gf.generate_functions(100)

In [4]:
# Print the first 5 generated functions
for i, func in enumerate(functions[:5]):
    print(f"Function {i+1}: {func}")

Function 1: x
Function 2: x
Function 3: x
Function 4: x
Function 5: exp(asinh(cos(x)))


# Dataset Generation and Tokenization

In [5]:
# Dataset Preprocessing
class TaylorDataset:
    def __init__(self, order, functions=None):
        self.order = order
        self.x = symbols('x')  # Centralize symbol definition
        self.functions = functions if functions else self.default_functions()
        self.vocab_to_int = None
        self.int_to_vocab = None

    @staticmethod
    def default_functions():
        #x = symbols('x')
        return [
            sin(x), cos(x), exp(x), ln(1 + x), log(1 + x, 10),
            1 / (1 + x), x ** 2 + x + 1, tan(x), asin(x), atan(x),exp(sin(x)),exp(tan(x)), cot(x),
            1/(1+x**2), exp(x)*(1+x), exp(x)*(1-x), 1/(1+x)**2, 1/(1-x)**2, 1/(1-x)**3, 1/(1-x**2), log(3+4*x),
            1/(1+x), 1/(1+x)**2, 1/(1-x), -ln(1-x)
        ]

    def generate(self):
        x = symbols('x')
        data = []
        for func in self.functions:
            try:
                expansion = func.series(x, 0, self.order + 1).removeO()
                # Only add if expansion is different from original function
                if str(func) != str(expansion):
                    data.append({
                        "function": str(func),
                        "expansion": str(expansion),
                        # Optional: include the SymPy objects for debugging
                        "func_obj": func,
                        "expansion_obj": expansion
                    })
            except (NotImplementedError, ValueError, TypeError) as e:
                # Skip functions that can't be expanded
                print(f"Skipping function {func}: {str(e)}")
                continue
            except Exception as e:
                # Catch any other unexpected errors
                print(f"Unexpected error with function {func}: {str(e)}")
                continue
        
        return pd.DataFrame(data).sample(frac=1, random_state=42, ignore_index=True)

    def tokenize(self, df):
        # Tokenize both function and expansion strings.
        # For Taylor expansion tokens, add <SOS> at start and <EOS> at end.
        tokens = []
        for _, row in df.iterrows():
            tokens.extend(word_tokenize(row['function']))
            # add <SOS> and <EOS> for expansions
            exp_tokens = ['<SOS>'] + word_tokenize(row['expansion']) + ['<EOS>']
            tokens.extend(exp_tokens)

        counter = Counter(tokens)
        vocab = sorted(counter, key=counter.get, reverse=True)
        # Ensure special tokens exist:
        for special in ['<SOS>', '<EOS>', '<UNK>']:
            if special not in vocab:
                vocab.append(special)

        self.vocab_to_int = {token: i for i, token in enumerate(vocab, 1)}
        self.int_to_vocab = {i: token for token, i in self.vocab_to_int.items()}

        tokenized_data = {"function_tokens": [], "expansion_tokens": []}

        for _, row in df.iterrows():
            func_tokens = [self.vocab_to_int.get(token, self.vocab_to_int["<UNK>"]) 
                           for token in word_tokenize(row["function"])]
            exp_tokens = (['<SOS>'] + word_tokenize(row["expansion"]) + ['<EOS>'])
            exp_tokens = [self.vocab_to_int.get(token, self.vocab_to_int["<UNK>"]) for token in exp_tokens]
            tokenized_data["function_tokens"].append(func_tokens)
            tokenized_data["expansion_tokens"].append(exp_tokens)
        
        return pd.DataFrame(tokenized_data)
    
    def get_token_dicts(self):
        return self.vocab_to_int, self.int_to_vocab

In [6]:
# Initialize Dataset
order = 4
taylor_dataset = TaylorDataset(order,functions)
df = taylor_dataset.generate()
df

Skipping function acos(cosh(x)**2): Could not calculate 5 terms for acos(cosh(_x)**2)
Unexpected error with function atanh(log(x**(1/3))): 
Asymptotic expansion of atanh around [-oo] is not implemented.
Unexpected error with function cosh(log(tan(x))): 
Asymptotic expansion of cosh around [-oo] is not implemented.


Unnamed: 0,function,expansion,func_obj,expansion_obj
0,exp(x),x**4/24 + x**3/6 + x**2/2 + x + 1,exp(x),x**4/24 + x**3/6 + x**2/2 + x + 1
1,log(atanh(x)),13*x**4/90 + x**2/3 + log(x),log(atanh(x)),13*x**4/90 + x**2/3 + log(x)
2,acosh(x),-I*x**3/6 - I*x + I*pi/2,acosh(x),-I*x**3/6 - I*x + I*pi/2
3,asin(x),x**3/6 + x,asin(x),x**3/6 + x
4,asinh(x),-x**3/6 + x,asinh(x),-x**3/6 + x
5,cosh(acos(x))**3,x**4*(11/8 + 7*sinh(pi/2)**2/(2*cosh(pi/2)**2)...,cosh(acos(x))**3,x**4*(11/8 + 7*sinh(pi/2)**2/(2*cosh(pi/2)**2)...
6,sin(x),-x**3/6 + x,sin(x),-x**3/6 + x
7,cos(x),x**4/24 - x**2/2 + 1,cos(x),x**4/24 - x**2/2 + 1
8,tan(x**3)**2,0,tan(x**3)**2,0
9,sqrt(tanh(exp(x))),x**4*(-3*tanh(1)**3/4 - 13*tanh(1)**2/24 - 11/...,sqrt(tanh(exp(x))),x**4*(-3*tanh(1)**3/4 - 13*tanh(1)**2/24 - 11/...


In [7]:
tokenized_df = taylor_dataset.tokenize(df)
tokenized_df

Unnamed: 0,function_tokens,expansion_tokens
0,"[27, 1, 5, 2]","[7, 17, 3, 48, 3, 18, 3, 5, 3, 4, 8]"
1,"[22, 1, 34, 1, 5, 2, 2]","[7, 66, 3, 67, 3, 22, 1, 5, 2, 8]"
2,"[31, 1, 5, 2]","[7, 68, 6, 69, 3, 35, 8]"
3,"[55, 1, 5, 2]","[7, 48, 3, 5, 8]"
4,"[23, 1, 5, 2]","[7, 36, 3, 5, 8]"
5,"[28, 1, 37, 1, 5, 2, 2, 15]","[7, 38, 1, 70, 3, 71, 1, 10, 2, 72, 1, 73, 1, ..."
6,"[20, 1, 5, 2]","[7, 36, 3, 5, 8]"
7,"[13, 1, 5, 2]","[7, 17, 6, 18, 3, 4, 8]"
8,"[56, 1, 50, 2, 11]","[7, 80, 8]"
9,"[14, 1, 9, 1, 27, 1, 5, 2, 2, 2]","[7, 38, 1, 81, 1, 4, 2, 82, 6, 83, 1, 4, 2, 84..."


# PyTorch Dataset and Collate Function

In [8]:
# PyTorch Dataset
class TrainDataset(Dataset):
    def __init__(self, dataset):
        self.dataset = dataset
    
    def __len__(self):
        return len(self.dataset)
    
    def __getitem__(self, idx):
        data = self.dataset.iloc[idx]
        # For seq2seq, we need both an encoder input and a decoder target.
        # Here, function_tokens are encoder input; expansion_tokens are decoder target.
        function_tensor = torch.tensor(data['function_tokens'], dtype=torch.long)
        expansion_tensor = torch.tensor(data['expansion_tokens'], dtype=torch.long)
        return function_tensor, expansion_tensor

In [9]:
# Collate function to pad sequences
def collate_fn(batch):
    # batch: list of (src, trg) pairs
    src_seqs, trg_seqs = zip(*batch)
    src_lengths = [len(s) for s in src_seqs]
    trg_lengths = [len(t) for t in trg_seqs]
    src_padded = nn.utils.rnn.pad_sequence(src_seqs, batch_first=True, padding_value=0)
    trg_padded = nn.utils.rnn.pad_sequence(trg_seqs, batch_first=True, padding_value=0)
    return src_padded, trg_padded

# LSTM Encoder-Decoder based model

In [10]:
# Encoder
class Encoder(nn.Module):
    def __init__(self, input_dim, embed_size, hidden_size, num_layers):
        super().__init__()
        self.embedding = nn.Embedding(input_dim, embed_size, padding_idx=0)
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True)
    
    def forward(self, src):
        # src: [batch, src_len]
        embedded = self.embedding(src)  # [batch, src_len, embed_size]
        outputs, (hidden, cell) = self.lstm(embedded)
        return hidden, cell

# Decoder
class Decoder(nn.Module):
    def __init__(self, output_dim, embed_size, hidden_size, num_layers):
        super().__init__()
        self.output_dim = output_dim
        self.embedding = nn.Embedding(output_dim, embed_size, padding_idx=0)
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_dim)
        self.log_softmax = nn.LogSoftmax(dim=1)
    
    def forward(self, input, hidden, cell):
        # input: [batch] -> we want [batch, 1]
        input = input.unsqueeze(1)
        embedded = self.embedding(input)  # [batch, 1, embed_size]
        output, (hidden, cell) = self.lstm(embedded, (hidden, cell))  # output: [batch, 1, hidden_size]
        prediction = self.log_softmax(self.fc(output.squeeze(1)))  # [batch, output_dim]
        return prediction, hidden, cell

# Seq2Seq Model
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device
    
    def forward(self, src, trg, teacher_forcing_ratio=0.5):
        # src: [batch, src_len]
        # trg: [batch, trg_len]
        batch_size = src.size(0)
        trg_len = trg.size(1)
        output_dim = self.decoder.output_dim
        
        outputs = torch.zeros(batch_size, trg_len, output_dim).to(self.device)
        hidden, cell = self.encoder(src)
        # first input to decoder is the <SOS> token
        input = trg[:, 0]  # [batch]
        
        for t in range(1, trg_len):
            output, hidden, cell = self.decoder(input, hidden, cell)
            outputs[:, t, :] = output
            teacher_force = random.random() < teacher_forcing_ratio
            top1 = output.argmax(1)  # [batch]
            input = trg[:, t] if teacher_force else top1
        return outputs

# Training Class for LSTM

In [11]:
# Training Class
class Train:
    def __init__(self, epoch, batch_size, input_dim, embed_size, hidden_size, num_layers, output_dim):
        self.epoch = epoch
        self.batch_size = batch_size
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        
        encoder = Encoder(input_dim, embed_size, hidden_size, num_layers)
        decoder = Decoder(output_dim, embed_size, hidden_size, num_layers)
        self.model = Seq2Seq(encoder, decoder, self.device).to(self.device)
        self.optimizer = optim.Adam(self.model.parameters(), lr=0.001)
        self.criterion = nn.NLLLoss(ignore_index=0)
    
    def run(self, dataloader, trg_pad_idx=0):
        for epoch in range(self.epoch):
            self.model.train()
            epoch_loss = 0
            for src, trg in dataloader:
                src, trg = src.to(self.device), trg.to(self.device)
                self.optimizer.zero_grad()
                output = self.model(src, trg)
                # output: [batch, trg_len, output_dim]
                # trg: [batch, trg_len]
                # flatten both for loss computation:
                output = output[:, 1:].reshape(-1, output.shape[-1])  # skip first token (<SOS>) prediction
                trg = trg[:, 1:].reshape(-1)
                loss = self.criterion(output, trg)
                loss.backward()
                self.optimizer.step()
                epoch_loss += loss.item()
            if (epoch + 1) % 100 == 0 or epoch == 0:
                print(f'Epoch {epoch+1} - Loss: {epoch_loss:.4f}')
    
    def get_model(self):
        return self.model.to("cpu")

# Initialize Dataset and Training Setup

In [12]:
# PyTorch Training Setup
vocab_to_int, int_to_vocab = taylor_dataset.get_token_dicts()
dataset = TrainDataset(tokenized_df)
train_loader = DataLoader(dataset, batch_size=1, shuffle=True, collate_fn=collate_fn)

# Hyperparameters
epoch = 500
batch_size = 1
embed_size = 32
hidden_size = 64
num_layers = 2
input_dim = len(vocab_to_int) + 1  # +1 for padding idx=0
output_dim = len(vocab_to_int) + 1

# Train Model
trainer = Train(epoch, batch_size, input_dim, embed_size, hidden_size, num_layers, output_dim)
trainer.run(train_loader)

Epoch 1 - Loss: 200.2745
Epoch 100 - Loss: 11.0487
Epoch 200 - Loss: 2.6364
Epoch 300 - Loss: 0.7637
Epoch 400 - Loss: 0.3181
Epoch 500 - Loss: 0.2666


# Prediction function

In [13]:
def predict_sample(model, src_tensor, vocab_to_int, int_to_vocab, max_len=30):
    """
    Predicts the output sequence for a given input sequence (src_tensor) using the trained model.
    
    Args:
        model: The trained Seq2Seq model.
        src_tensor: Tensor containing the tokenized input sequence (1D tensor).
        vocab_to_int: Dictionary mapping tokens to indices.
        int_to_vocab: Dictionary mapping indices to tokens.
        max_len: Maximum number of tokens to generate.
    
    Returns:
        List of tokens representing the predicted expansion (without <SOS> token).
    """
    model.eval()
    device = next(model.parameters()).device
    
    # Add batch dimension and send to device
    src_tensor = src_tensor.unsqueeze(0).to(device)
    
    # Encode the input sequence
    with torch.no_grad():
        hidden, cell = model.encoder(src_tensor)
    
    # First decoder input is <SOS>
    sos_token = vocab_to_int["<SOS>"]
    eos_token = vocab_to_int["<EOS>"]
    input_token = torch.tensor([sos_token], device=device)
    
    predicted_tokens = []
    
    # Decode one token at a time
    for _ in range(max_len):
        with torch.no_grad():
            output, hidden, cell = model.decoder(input_token, hidden, cell)
        top1 = output.argmax(1).item()
        if top1 == eos_token:
            break
        predicted_tokens.append(top1)
        input_token = torch.tensor([top1], device=device)
    
    # Convert token indices to words
    predicted_words = [int_to_vocab[token] for token in predicted_tokens]
    return predicted_words

# Assume 'trainer' is your training instance from the previous code and has been trained.
trained_model = trainer.get_model()
trained_model.eval()

Seq2Seq(
  (encoder): Encoder(
    (embedding): Embedding(170, 32, padding_idx=0)
    (lstm): LSTM(32, 64, num_layers=2, batch_first=True)
  )
  (decoder): Decoder(
    (embedding): Embedding(170, 32, padding_idx=0)
    (lstm): LSTM(32, 64, num_layers=2, batch_first=True)
    (fc): Linear(in_features=64, out_features=170, bias=True)
    (log_softmax): LogSoftmax(dim=1)
  )
)

In [14]:
# Pick a sample input from the dataset (e.g., first sample)
sample_input, sample_target = dataset[0]

# Predict the expansion using the sample function tokens
predicted_expansion = predict_sample(trained_model, sample_input, vocab_to_int, int_to_vocab)
print("Function Tokens (input):", [int_to_vocab[token] for token in sample_input.tolist()])
print("Predicted Expansion:", " ".join(predicted_expansion))

Function Tokens (input): ['exp', '(', 'x', ')']
Predicted Expansion: x**4/24 + x**3/6 + x**2/2 + x + 1


In [15]:
# Pick a sample input from the dataset (e.g., first sample)
sample_input, sample_target = dataset[1]

# Predict the expansion using the sample function tokens
predicted_expansion = predict_sample(trained_model, sample_input, vocab_to_int, int_to_vocab)
print("Function Tokens (input):", [int_to_vocab[token] for token in sample_input.tolist()])
print("Predicted Expansion:", " ".join(predicted_expansion))

Function Tokens (input): ['log', '(', 'atanh', '(', 'x', ')', ')']
Predicted Expansion: 13*x**4/90 + x**2/3 + log ( x )
