# Import Libraries

In [1]:
import math
import random
import sympy
from sympy import symbols, series
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader


In [2]:
# Define symbol and functions
x = symbols('x')
functions = [
    sympy.sin(x),
    sympy.cos(x),
    sympy.exp(x),
    sympy.log(1+x),
    sympy.tan(x),
    sympy.sinh(x),
    sympy.cosh(x),
    sympy.sqrt(1+x)
]

def get_taylor_series(func, order=5):
    # Returns Taylor series (up to x^(order-1)) as a string
    return str(func.series(x, 0, order).removeO())

# Create dataset: list of (function_str, taylor_series_str) pairs
data_pairs = []
for f in functions:
    func_str = str(f)
    taylor_str = get_taylor_series(f, order=5)
    data_pairs.append((func_str, taylor_str))

In [3]:
# Special tokens and vocabulary creation
special_tokens = ['<pad>', '<sos>', '<eos>']
all_chars = set()
for inp, out in data_pairs:
    all_chars.update(list(inp))
    all_chars.update(list(out))
all_chars = sorted(list(all_chars))
vocab = special_tokens + all_chars

# Mapping from char to index and vice versa
char2idx = {ch: idx for idx, ch in enumerate(vocab)}
idx2char = {idx: ch for ch, idx in char2idx.items()}

# Utility functions to encode/decode strings
def encode_string(s, add_sos_eos=False):
    tokens = []
    if add_sos_eos:
        tokens.append(char2idx['<sos>'])
    tokens.extend([char2idx[ch] for ch in s])
    if add_sos_eos:
        tokens.append(char2idx['<eos>'])
    return tokens

def decode_tokens(tokens):
    s = ""
    for t in tokens:
        ch = idx2char[t]
        if ch == '<eos>':
            break
        if ch not in special_tokens:
            s += ch
    return s

# Determine maximum lengths for source and target (including <sos>/<eos>)
max_len_src = max(len(encode_string(inp, add_sos_eos=True)) for inp, _ in data_pairs)
max_len_tgt = max(len(encode_string(tgt, add_sos_eos=True)) for _, tgt in data_pairs)

In [4]:
class TaylorDataset(Dataset):
    def __init__(self, data_pairs, max_len_src, max_len_tgt):
        self.data_pairs = data_pairs
        self.max_len_src = max_len_src
        self.max_len_tgt = max_len_tgt

    def __len__(self):
        return len(self.data_pairs)

    def pad_sequence(self, seq, max_len):
        return seq + [char2idx['<pad>']] * (max_len - len(seq))

    def __getitem__(self, idx):
        inp_str, tgt_str = self.data_pairs[idx]
        src_seq = encode_string(inp_str, add_sos_eos=True)
        tgt_seq = encode_string(tgt_str, add_sos_eos=True)
        src_seq = self.pad_sequence(src_seq, self.max_len_src)
        tgt_seq = self.pad_sequence(tgt_seq, self.max_len_tgt)
        return torch.tensor(src_seq, dtype=torch.long), torch.tensor(tgt_seq, dtype=torch.long)

dataset = TaylorDataset(data_pairs, max_len_src, max_len_tgt)
dataloader = DataLoader(dataset, batch_size=2, shuffle=True)

# Define lstm , Transformer

In [5]:
# --- LSTM-based Seq2Seq Model ---

class EncoderLSTM(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, num_layers, dropout=0.1):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.lstm = nn.LSTM(embed_dim, hidden_dim, num_layers, dropout=dropout, batch_first=True)

    def forward(self, src):
        # src: (batch, seq_len)
        embedded = self.embedding(src)  # (batch, seq_len, embed_dim)
        outputs, (hidden, cell) = self.lstm(embedded)
        return hidden, cell

class DecoderLSTM(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, num_layers, dropout=0.1):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.lstm = nn.LSTM(embed_dim, hidden_dim, num_layers, dropout=dropout, batch_first=True)
        self.fc_out = nn.Linear(hidden_dim, vocab_size)

    def forward(self, input, hidden, cell):
        # input: (batch,) -> we unsqueeze to (batch, 1)
        input = input.unsqueeze(1)
        embedded = self.embedding(input)  # (batch, 1, embed_dim)
        output, (hidden, cell) = self.lstm(embedded, (hidden, cell))
        prediction = self.fc_out(output.squeeze(1))  # (batch, vocab_size)
        return prediction, hidden, cell

class Seq2SeqLSTM(nn.Module):
    def __init__(self, encoder, decoder, device):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device

    def forward(self, src, tgt, teacher_forcing_ratio=0.5):
        batch_size = src.shape[0]
        tgt_len = tgt.shape[1]
        vocab_size = self.decoder.fc_out.out_features
        outputs = torch.zeros(batch_size, tgt_len, vocab_size).to(self.device)

        hidden, cell = self.encoder(src)
        # First input to decoder is <sos>
        input = tgt[:, 0]
        for t in range(1, tgt_len):
            output, hidden, cell = self.decoder(input, hidden, cell)
            outputs[:, t] = output
            teacher_force = random.random() < teacher_forcing_ratio
            top1 = output.argmax(1)
            input = tgt[:, t] if teacher_force else top1
        return outputs

In [6]:
# --- Transformer-based Seq2Seq Model ---

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=500):
        super().__init__()
        self.dropout = nn.Dropout(dropout)
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1).float()
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        if d_model % 2 == 1:
            pe[:, 1::2] = torch.cos(position * div_term[:pe[:, 1::2].shape[1]])
        else:
            pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(1)  # (max_len, 1, d_model)
        self.register_buffer('pe', pe)

    def forward(self, x):
        # x: (seq_len, batch, d_model)
        x = x + self.pe[:x.size(0)]
        return self.dropout(x)

class TransformerModel(nn.Module):
    def __init__(self, vocab_size, d_model=128, nhead=8, num_encoder_layers=3,
                 num_decoder_layers=3, dim_feedforward=256, dropout=0.1):
        super().__init__()
        self.d_model = d_model
        self.src_embedding = nn.Embedding(vocab_size, d_model)
        self.tgt_embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoder = PositionalEncoding(d_model, dropout)
        self.transformer = nn.Transformer(d_model, nhead, num_encoder_layers,
                                          num_decoder_layers, dim_feedforward, dropout)
        self.fc_out = nn.Linear(d_model, vocab_size)

    def forward(self, src, tgt):
        # src, tgt: (batch, seq_len)
        src = src.transpose(0, 1)  # (seq_len, batch)
        tgt = tgt.transpose(0, 1)

        src_emb = self.src_embedding(src) * math.sqrt(self.d_model)
        tgt_emb = self.tgt_embedding(tgt) * math.sqrt(self.d_model)
        src_emb = self.pos_encoder(src_emb)
        tgt_emb = self.pos_encoder(tgt_emb)

        # Generate mask for target (for autoregressive decoding)
        tgt_mask = nn.Transformer.generate_square_subsequent_mask(tgt_emb.size(0)).to(tgt_emb.device)

        output = self.transformer(src_emb, tgt_emb, tgt_mask=tgt_mask)
        output = self.fc_out(output)
        return output.transpose(0, 1)  # (batch, seq_len, vocab_size)

In [7]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
vocab_size = len(vocab)
num_epochs = 50
example_function = "exp(x)"


# Task 2

In [8]:
embed_dim = 128
hidden_dim = 256
num_layers = 2
encoder = EncoderLSTM(vocab_size, embed_dim, hidden_dim, num_layers).to(device)
decoder = DecoderLSTM(vocab_size, embed_dim, hidden_dim, num_layers).to(device)
model = Seq2SeqLSTM(encoder, decoder, device).to(device)
criterion = nn.CrossEntropyLoss(ignore_index=char2idx['<pad>'])
optimizer = optim.Adam(model.parameters(), lr=0.001)


print("Training LSTM model...")
for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    for src, tgt in dataloader:
        src, tgt = src.to(device), tgt.to(device)
        optimizer.zero_grad()
        output = model(src, tgt, teacher_forcing_ratio=0.5)
        # output: (batch, seq_len, vocab_size); shift by one for target
        loss = criterion(output[:,1:].reshape(-1, vocab_size), tgt[:,1:].reshape(-1))
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"[LSTM] Epoch {epoch+1}/{num_epochs}, Loss: {total_loss/len(dataloader):.4f}")


def predict_lstm(model, src_str, max_len=50):
    model.eval()
    src_seq = encode_string(src_str, add_sos_eos=True)
    src_seq = src_seq + [char2idx['<pad>']] * (max_len_src - len(src_seq))
    src_tensor = torch.tensor([src_seq], dtype=torch.long).to(device)
    hidden, cell = model.encoder(src_tensor)
    input_token = torch.tensor([char2idx['<sos>']], dtype=torch.long).to(device)
    output_seq = [char2idx['<sos>']]
    for i in range(max_len):
        output, hidden, cell = model.decoder(input_token, hidden, cell)
        next_token = output.argmax(1).item()
        output_seq.append(next_token)
        if next_token == char2idx['<eos>']:
            break
        input_token = torch.tensor([next_token], dtype=torch.long).to(device)
    return decode_tokens(output_seq)


predicted = predict_lstm(model, example_function)

Training LSTM model...
[LSTM] Epoch 1/50, Loss: 3.3835
[LSTM] Epoch 2/50, Loss: 2.7973
[LSTM] Epoch 3/50, Loss: 2.3043
[LSTM] Epoch 4/50, Loss: 2.0834
[LSTM] Epoch 5/50, Loss: 1.9885
[LSTM] Epoch 6/50, Loss: 1.8943
[LSTM] Epoch 7/50, Loss: 1.7338
[LSTM] Epoch 8/50, Loss: 1.6143
[LSTM] Epoch 9/50, Loss: 1.5717
[LSTM] Epoch 10/50, Loss: 1.6027
[LSTM] Epoch 11/50, Loss: 1.4203
[LSTM] Epoch 12/50, Loss: 1.5582
[LSTM] Epoch 13/50, Loss: 1.3243
[LSTM] Epoch 14/50, Loss: 1.2213
[LSTM] Epoch 15/50, Loss: 1.1464
[LSTM] Epoch 16/50, Loss: 1.1726
[LSTM] Epoch 17/50, Loss: 1.1727
[LSTM] Epoch 18/50, Loss: 1.0233
[LSTM] Epoch 19/50, Loss: 1.0587
[LSTM] Epoch 20/50, Loss: 0.9706
[LSTM] Epoch 21/50, Loss: 0.9614
[LSTM] Epoch 22/50, Loss: 0.9484
[LSTM] Epoch 23/50, Loss: 0.8828
[LSTM] Epoch 24/50, Loss: 0.8742
[LSTM] Epoch 25/50, Loss: 0.8378
[LSTM] Epoch 26/50, Loss: 0.8174
[LSTM] Epoch 27/50, Loss: 0.7124
[LSTM] Epoch 28/50, Loss: 0.7174
[LSTM] Epoch 29/50, Loss: 0.5650
[LSTM] Epoch 30/50, Loss: 0.6

# Task 3

In [9]:
model = TransformerModel(vocab_size).to(device)
criterion = nn.CrossEntropyLoss(ignore_index=char2idx['<pad>'])
optimizer = optim.Adam(model.parameters(), lr=0.001)

print("Training Transformer model...")
for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    for src, tgt in dataloader:
        src, tgt = src.to(device), tgt.to(device)
        optimizer.zero_grad()
        # For Transformer, feed the target sequence without the last token
        output = model(src, tgt[:,:-1])
        loss = criterion(output.reshape(-1, vocab_size), tgt[:,1:].reshape(-1))
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"[Transformer] Epoch {epoch+1}/{num_epochs}, Loss: {total_loss/len(dataloader):.4f}")



def predict_transformer(model, src_str, max_len=50):
    model.eval()
    src_seq = encode_string(src_str, add_sos_eos=True)
    src_seq = src_seq + [char2idx['<pad>']] * (max_len_src - len(src_seq))
    src_tensor = torch.tensor([src_seq], dtype=torch.long).to(device)
    tgt_seq = [char2idx['<sos>']]
    for i in range(max_len):
        tgt_tensor = torch.tensor([tgt_seq], dtype=torch.long).to(device)
        with torch.no_grad():
            output = model(src_tensor, tgt_tensor)
        next_token = output[0, -1].argmax().item()
        tgt_seq.append(next_token)
        if next_token == char2idx['<eos>']:
            break
    return decode_tokens(tgt_seq)


predicted = predict_transformer(model, example_function)

Training Transformer model...




[Transformer] Epoch 1/50, Loss: 2.8935
[Transformer] Epoch 2/50, Loss: 2.3734
[Transformer] Epoch 3/50, Loss: 1.9930
[Transformer] Epoch 4/50, Loss: 1.5652
[Transformer] Epoch 5/50, Loss: 1.2854
[Transformer] Epoch 6/50, Loss: 1.1796
[Transformer] Epoch 7/50, Loss: 1.0491
[Transformer] Epoch 8/50, Loss: 0.9328
[Transformer] Epoch 9/50, Loss: 0.9101
[Transformer] Epoch 10/50, Loss: 0.8805
[Transformer] Epoch 11/50, Loss: 0.8725
[Transformer] Epoch 12/50, Loss: 0.7806
[Transformer] Epoch 13/50, Loss: 0.7737
[Transformer] Epoch 14/50, Loss: 0.6951
[Transformer] Epoch 15/50, Loss: 0.6877
[Transformer] Epoch 16/50, Loss: 0.6432
[Transformer] Epoch 17/50, Loss: 0.6488
[Transformer] Epoch 18/50, Loss: 0.6777
[Transformer] Epoch 19/50, Loss: 0.5730
[Transformer] Epoch 20/50, Loss: 0.5659
[Transformer] Epoch 21/50, Loss: 0.5614
[Transformer] Epoch 22/50, Loss: 0.5877
[Transformer] Epoch 23/50, Loss: 0.5296
[Transformer] Epoch 24/50, Loss: 0.6700
[Transformer] Epoch 25/50, Loss: 0.5685
[Transfor