In [7]:
import random
import numpy as np


In [8]:
train_data = []

for _ in range(8000):
    a = random.randint(1, 15)
    b = random.randint(1, 15)
    c = random.randint(1, 15)

    op1 = random.choice(['+', '*'])
    op2 = random.choice(['+', '*'])

    exp = f"{a} {op1} {b} {op2} {c}"

    try:
        ans = eval(exp)
    except Exception as e:
        ans = f"Error: {e}"

    train_data.append((exp, ans))

print(f"Generated {len(train_data)} math problems.")
print("First 5 problems:")
for i in range(min(5, len(train_data))):
    print(train_data[i])

Generated 8000 math problems.
First 5 problems:
('8 * 14 * 3', 336)
('7 * 15 + 12', 117)
('7 * 9 * 13', 819)
('3 + 3 + 5', 11)
('13 * 6 + 10', 88)


In [9]:
val_data = []

for _ in range(2000):
    a = random.randint(1, 15)
    b = random.randint(1, 15)
    c = random.randint(1, 15)

    op1 = random.choice(['+', '*'])
    op2 = random.choice(['+', '*'])

    exp = f"{a} {op1} {b} {op2} {c}"

    try:
        ans = eval(exp)
    except Exception as e:
        ans = f"Error: {e}"

    val_data.append((exp, ans))

print(f"Generated {len(val_data)} math problems.")
print("First 5 problems:")
for i in range(min(5, len(val_data))):
    print(val_data[i])

Generated 2000 math problems.
First 5 problems:
('11 * 4 * 12', 528)
('3 * 5 + 15', 30)
('9 * 5 + 5', 50)
('15 + 8 * 3', 39)
('7 + 4 + 11', 22)


In [10]:
test_data = []

for _ in range(2000):
    a = random.randint(16, 20)
    b = random.randint(16, 20)
    c = random.randint(16, 20)

    op1 = random.choice(['+', '*'])
    op2 = random.choice(['+', '*'])

    exp = f"{a} {op1} {b} {op2} {c}"

    # Using try-except for eval to handle potential errors, though unlikely with simple math
    try:
        ans = eval(exp)
    except Exception as e:
        ans = f"Error: {e}"

    test_data.append((exp, ans))

print(f"Generated {len(test_data)} math problems.")
print("First 5 problems:")
for i in range(min(5, len(test_data))):
    print(test_data[i])

Generated 2000 math problems.
First 5 problems:
('16 + 16 * 18', 304)
('16 * 18 * 20', 5760)
('20 * 16 + 20', 340)
('16 * 18 * 19', 5472)
('16 * 16 + 16', 272)


In [11]:
print("Train size:", len(train_data))
print("Val size:", len(val_data))
print("Test size:", len(test_data))

print("Train example:", train_data[0])
print("Test example (16-20 range):", test_data[0])

Train size: 8000
Val size: 2000
Test size: 2000
Train example: ('8 * 14 * 3', 336)
Test example (16-20 range): ('16 + 16 * 18', 304)


In [12]:
vocab = {
    '<PAD>': 0,
    '<SOS>': 1,
    '<EOS>': 2,
}

for i in range(10):
  vocab[str(i)] = i + 3


vocab['+'] = 13
vocab['*'] = 14
vocab[' '] = 15

print(vocab)

{'<PAD>': 0, '<SOS>': 1, '<EOS>': 2, '0': 3, '1': 4, '2': 5, '3': 6, '4': 7, '5': 8, '6': 9, '7': 10, '8': 11, '9': 12, '+': 13, '*': 14, ' ': 15}


In [13]:
idx_to_char = {v: k for k, v in vocab.items()}

print(idx_to_char[0])
print(idx_to_char[5])
print(idx_to_char[13])

<PAD>
2
+


In [14]:
def encode(text, vocab):

    results = []

    for char in text:
        if char in vocab:
            results.append(vocab[char])

        else:
            print(f"character '{char}' not found in vocab.")

    return results



test = "3 + 5"
encoded = encode(test, vocab)
print(f"'{test}' -> {encoded}")

'3 + 5' -> [6, 15, 13, 15, 8]


In [15]:
def decode(indices, idx_to_char):
    results = []
    for idx in indices:
        if idx in idx_to_char:
            results.append(idx_to_char[idx])
        else:
            print(f"Index '{idx}' not found in idx_to_char.")
    return ''.join(results)

# Test the decode function with the previously encoded '3 + 5'
decoded = decode(encoded, idx_to_char)
print(f"'{encoded}' -> '{decoded}'")

'[6, 15, 13, 15, 8]' -> '3 + 5'


Q1: Should input and output be same length?

No, input and output generally won't be the same length. Sequence-to-sequence models handle this.

Q2: Where do we add and tokens?

Input Expression: Don't usually add them.
Output Answer: Add SOS at the beginning and EOS at the end to guide the decoder.

Q3: What's the maximum length we should pad to?

Find the absolute longest input expression and longest output answer (as a string) across all data. These maximums generally define max_sequence_length, possibly with a small buffer or using a high percentile for large datasets.

In [16]:
max_input_len = 0
max_output_len = 0

all_data = train_data + val_data + test_data

for expr, ans in all_data:

  input_len = len(expr)

  if input_len > max_input_len:
    max_input_len = input_len

  output_len = len(str(ans))

  if output_len > max_output_len:
    max_output_len = output_len


print(f"Max input lenght: ", max_input_len)

print(f"Max output lenght: ", max_output_len)





Max input lenght:  12
Max output lenght:  4


padded lengths

For Input:

20 (round number with buffer)

For Output (with SOS/EOS):

Max answer: 4 chars

Add SOS at start: +1

Add EOS at end: +1

Total needed: 10(buffer)



In [17]:
def pad_sequence(sequence, max_len, pad_token = 0):

  if len(sequence) > max_len:
    sequence = sequence[:max_len]

  padding = max_len - len(sequence)

  for i in range(padding):

    sequence.append(pad_token) # Corrected: use list's append method


  return sequence


test_seq = [6, 15, 13, 15, 8]
padded = pad_sequence(test_seq, max_len = 20, pad_token=0)

print(f"Original length: {len(test_seq)}")
print(f"Padded length: {len(padded)}")
print(f"Padded: {padded}")

Original length: 20
Padded length: 20
Padded: [6, 15, 13, 15, 8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]


In [18]:
def prepare_data(dataset, char_to_idx, max_input_len, max_output_len):

  encoder_inputs = []
  decoder_inputs = []
  decoder_target = []

  for expr, ans in dataset:

    enc_input = encode(expr, char_to_idx)
    # Corrected: Encoder input should be padded to max_input_len
    enc_input = pad_sequence(enc_input, max_input_len)

    dec_input = [char_to_idx['<SOS>']] + encode(str(ans), char_to_idx)
    dec_input = pad_sequence(dec_input, max_output_len)

    dec_target = encode(str(ans), char_to_idx) + [char_to_idx['<EOS>']]
    dec_target = pad_sequence(dec_target, max_output_len)

    encoder_inputs.append(enc_input)
    decoder_inputs.append(dec_input)
    decoder_target.append(dec_target)

  return encoder_inputs, decoder_inputs, decoder_target

max_input_len = 20
max_output_len = 10

train_enc, train_dec_in, train_dec_out = prepare_data(
    dataset=train_data[:5],
    char_to_idx=vocab,
    max_input_len=max_input_len,
    max_output_len=max_output_len
  )


print(f"Encoder input shape: {len(train_enc)}, {len(train_enc[0])}")
print(f"Decoder input shape: {len(train_dec_in)}, {len(train_dec_in[0])}")
print(f"Decoder target shape: {len(train_dec_out)}, {len(train_dec_out[0])}")
print(f"\nExample:")
print(f"Expression: {train_data[0][0]}")
print(f"Encoder: {train_enc[0]}")
print(f"Dec Input: {train_dec_in[0]}")
print(f"Dec Target: {train_dec_out[0]}")

Encoder input shape: 5, 20
Decoder input shape: 5, 10
Decoder target shape: 5, 10

Example:
Expression: 8 * 14 * 3
Encoder: [11, 15, 14, 15, 4, 7, 15, 14, 15, 6, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
Dec Input: [1, 6, 6, 9, 0, 0, 0, 0, 0, 0]
Dec Target: [6, 6, 9, 2, 0, 0, 0, 0, 0, 0]


In [19]:
# Process all datasets (remove the [:5] slice)
print("Processing full datasets...")

train_enc, train_dec_in, train_dec_out = prepare_data(
    train_data, vocab, max_input_len, max_output_len
)

val_enc, val_dec_in, val_dec_out = prepare_data(
    val_data, vocab, max_input_len, max_output_len
)

test_enc, test_dec_in, test_dec_out = prepare_data(
    test_data, vocab, max_input_len, max_output_len
)

print(f"Train: {len(train_enc)} samples")
print(f"Val: {len(val_enc)} samples")
print(f"Test: {len(test_enc)} samples")

Processing full datasets...
Train: 8000 samples
Val: 2000 samples
Test: 2000 samples


In [20]:
import torch
import numpy as np

train_enc = torch.LongTensor(np.array(train_enc))
train_dec_in = torch.LongTensor(np.array(train_dec_in))
train_dec_out = torch.LongTensor(np.array(train_dec_out))

test_enc = torch.LongTensor(np.array(test_enc))
test_dec_in = torch.LongTensor(np.array(test_dec_in))
test_dec_out = torch.LongTensor(np.array(test_dec_out))

val_enc = torch.LongTensor(np.array(val_enc))
val_dec_in = torch.LongTensor(np.array(val_dec_in))
val_dec_out = torch.LongTensor(np.array(val_dec_out))

print("Train encoder shape:", train_enc.shape)
print("Train decoder input shape:", train_dec_in.shape)
print("Train decoder target shape:", train_dec_out.shape)

print("Val encoder shape:", val_enc.shape)
print("Val decoder input shape:", val_dec_in.shape)
print("Val decoder target shape:", val_dec_out.shape)

print("Test encoder shape:", test_enc.shape)
print("Test decoder input shape:", test_dec_in.shape)
print("Test decoder target shape:", test_dec_out.shape)

Train encoder shape: torch.Size([8000, 20])
Train decoder input shape: torch.Size([8000, 10])
Train decoder target shape: torch.Size([8000, 10])
Val encoder shape: torch.Size([2000, 20])
Val decoder input shape: torch.Size([2000, 10])
Val decoder target shape: torch.Size([2000, 10])
Test encoder shape: torch.Size([2000, 20])
Test decoder input shape: torch.Size([2000, 10])
Test decoder target shape: torch.Size([2000, 10])


In [21]:
import torch
import torch.nn as nn

class Encoder(nn.Module):
  def __init__(self, vocab_size, embedding_dim, hidden_dim):
     super(Encoder, self).__init__()

     # Create embedding layer
     self.embedding = nn.Embedding(vocab_size, embedding_dim)

     # Create LSTM layer
     self.lstm = nn.LSTM(input_size=embedding_dim,
                         hidden_size=hidden_dim,
                         batch_first=True)

  def forward(self, input_seq):

    # embed input
    embedded = self.embedding(input_seq)

    # pass through lstm
    output, (hidden, cell) = self.lstm(embedded)

    return hidden, cell


vocab_size = 16
embedding_dim = 128
hidden_dim = 256

encoder = Encoder(vocab_size, embedding_dim, hidden_dim)
print(encoder)

Encoder(
  (embedding): Embedding(16, 128)
  (lstm): LSTM(128, 256, batch_first=True)
)


In [22]:
class Decoder(nn.Module):
  def __init__(self, vocab_size, embedding_dim, hidden_dim):
     super(Decoder, self).__init__()

     # EMbeddding layer
     self.embedding = nn.Embedding(vocab_size, embedding_dim)

     # Lstm layer
     self.lstm = nn.LSTM(embedding_dim,
                         hidden_dim,
                         batch_first= True)

     # output projected layer
     self.fc = nn.Linear(hidden_dim, vocab_size)

  def forward(self, input_seq, hidden, cell):

    # Embed input
    embedded = self.embedding(input_seq)

    # Pass through lstm with encoder states
    outputs, (hidden, cell) = self.lstm(embedded, (hidden, cell))

    predictions = self.fc(outputs)

    return predictions


decoder = Decoder(vocab_size = 16, embedding_dim= 128, hidden_dim=256)
print(decoder)

Decoder(
  (embedding): Embedding(16, 128)
  (lstm): LSTM(128, 256, batch_first=True)
  (fc): Linear(in_features=256, out_features=16, bias=True)
)


In [23]:
class Seq2Seq(nn.Module):
  def __init__(self, encoder, decoder):
     super().__init__() # Corrected: Added 'self' implicitly with modern super() call

     # store encoder and decoder

     self.encoder = encoder
     self.decoder = decoder

  def forward(self, src, trg):

    # encode source seq
    hidden, cell = self.encoder(src)

    # decode with encoder context
    output = self.decoder(trg, hidden, cell)

    return output



encoder = Encoder(vocab_size=16, embedding_dim=128, hidden_dim=256)
decoder = Decoder(vocab_size=16, embedding_dim=128, hidden_dim=256)
model = Seq2Seq(encoder, decoder)

print(model)
print("\nTotal Parameters:", sum(p.numel() for p in model.parameters()))

Seq2Seq(
  (encoder): Encoder(
    (embedding): Embedding(16, 128)
    (lstm): LSTM(128, 256, batch_first=True)
  )
  (decoder): Decoder(
    (embedding): Embedding(16, 128)
    (lstm): LSTM(128, 256, batch_first=True)
    (fc): Linear(in_features=256, out_features=16, bias=True)
  )
)

Total Parameters: 798736


In [24]:
from torch.utils.data import TensorDataset, DataLoader

batch_size = 128

# create Tensor datasets
train_dataset = TensorDataset(train_enc, train_dec_in, train_dec_out)
val_dataset = TensorDataset(val_enc, val_dec_in, val_dec_out)
test_dataset = TensorDataset(test_enc, test_dec_in, test_dec_out)

# create Dataloader

train_loader = DataLoader(
    train_dataset,
    batch_size= batch_size,
    shuffle = True
)

val_loader = DataLoader(
    val_dataset,
    batch_size=batch_size,
    shuffle = False
)

test_loader = DataLoader(
    test_dataset,
    batch_size = batch_size,
    shuffle = False
)

print(f"Training batches: {len(train_loader)}")
print(f"Validation batches: {len(val_loader)}")
print(f"Test batches: {len(test_loader)}")

for batch in train_loader:
  enc_input, dec_input, dec_target = batch
  print(f"\nBatch shapes:")
  print(f"  Encoder input: {enc_input.shape}")
  print(f"  Decoder input: {dec_input.shape}")
  print(f"  Decoder target: {dec_target.shape}")
  break

Training batches: 63
Validation batches: 16
Test batches: 16

Batch shapes:
  Encoder input: torch.Size([128, 20])
  Decoder input: torch.Size([128, 10])
  Decoder target: torch.Size([128, 10])


In [25]:
import torch.optim as optim

criterion = nn.CrossEntropyLoss(ignore_index=0)

optimizer = optim.Adam(model.parameters(), lr = 0.001)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

print(f"Using device: {device}")
print(f"Loss function: CrossEntropyLoss (ignoring index 0)")
print(f"Optimizer: Adam (lr=0.001)")
print(f"Model parameters: {sum(p.numel() for p in model.parameters()):,}")

Using device: cuda
Loss function: CrossEntropyLoss (ignoring index 0)
Optimizer: Adam (lr=0.001)
Model parameters: 798,736


In [29]:
import numpy as np

def train(model, train_loader, val_loader, optimizer, criterion, device, num_epochs=20):
    train_losses = []
    val_losses = []
    best_val_loss = float('inf')

    print("Starting training...")
    print(f"{'Epoch':<8} {'Train Loss':<12} {'Val Loss':<12} {'Best':<8}")
    print("-" * 45)

    for epoch in range(num_epochs):
        train_loss = train_epoch(model, train_loader, optimizer, criterion, device)
        val_loss = evaluate(model, val_loader, criterion, device)

        train_losses.append(train_loss)
        val_losses.append(val_loss)

        is_best = val_loss < best_val_loss
        if is_best:
            best_val_loss = val_loss
            torch.save(model.state_dict(), 'best_model.pt')

        best_marker = "✓" if is_best else ""
        print(f"{epoch+1:<8} {train_loss:<12.4f} {val_loss:<12.4f} {best_marker:<8}")

    print("\nTraining complete!")
    print(f"Best validation loss: {best_val_loss:.4f}")

    return train_losses, val_losses


# Run training
train_losses, val_losses = train(
    model,
    train_loader,
    val_loader,
    optimizer,
    criterion,
    device,
    num_epochs=20
)


def evaluate(model, dataloader, criterion, device, pad_idx=0):
    model.eval()
    total_loss = 0.0
    
    with torch.no_grad():
        for enc_input, dec_input, dec_target in dataloader:
            enc_input = enc_input.to(device)
            dec_input = dec_input.to(device)
            dec_target = dec_target.to(device)
            
            output = model(enc_input, dec_input)
            loss = criterion(output.view(-1, output.size(-1)), dec_target.view(-1))
            total_loss += loss.item()
    
    return total_loss / len(dataloader)

Starting training...
Epoch    Train Loss   Val Loss     Best    
---------------------------------------------
1        1.8015       1.6241       ✓       
2        1.5494       1.4744       ✓       
3        1.4220       1.3696       ✓       
4        1.2979       1.2342       ✓       
5        1.1754       1.1506       ✓       
6        1.0715       1.0268       ✓       
7        0.9607       0.9280       ✓       
8        0.8814       0.8960       ✓       
9        0.7891       0.7604       ✓       
10       0.7039       0.6747       ✓       
11       0.6359       0.6433       ✓       
12       0.6050       0.6267       ✓       
13       0.5727       0.5602       ✓       
14       0.5230       0.5425       ✓       
15       0.4768       0.5264       ✓       
16       0.4652       0.4611       ✓       
17       0.4185       0.4387       ✓       
18       0.3948       0.4144       ✓       
19       0.3714       0.3992       ✓       
20       0.3491       0.3744       ✓       

Training

In [30]:
# Load best model
model.load_state_dict(torch.load('best_model.pt'))
print("Loaded best model (epoch 18)")

# Evaluate on test set
test_loss = evaluate(model, test_loader, criterion, device)
print(f"\nTest Loss (Zero-Shot 16-20): {test_loss:.4f}")
print(f"Validation Loss (1-15): {val_losses[17]:.4f}")  # Epoch 18
print(f"\nDifference: {test_loss - val_losses[17]:.4f}")

Loaded best model (epoch 18)

Test Loss (Zero-Shot 16-20): 5.1731
Validation Loss (1-15): 0.4144

Difference: 4.7588


In [31]:
def predict(model, expression, vocab, idx_to_char, max_len=10, device='cuda'):
    """
    Predict answer for a single expression
    """
    model.eval()

    # Encode expression
    encoded = encode(expression, vocab)
    padded = pad_sequence(encoded, 20, 0)

    # To tensor
    src = torch.LongTensor([padded]).to(device)  # [1, 20]

    # Get encoder context
    with torch.no_grad():
        hidden, cell = model.encoder(src)

    # Start with <SOS>
    decoder_input = torch.LongTensor([[vocab['<SOS>']]]).to(device)  # [1, 1]

    output_sequence = []

    for _ in range(max_len):
        with torch.no_grad():
            output = model.decoder(decoder_input, hidden, cell)

        # Get prediction for last position
        pred_token = output[:, -1, :].argmax(dim=-1).item()

        # Stop if <EOS>
        if pred_token == vocab['<EOS>']:
            break

        output_sequence.append(pred_token)

        # Next input is current prediction
        decoder_input = torch.cat([decoder_input,
                                   torch.LongTensor([[pred_token]]).to(device)],
                                  dim=1)

    # Decode
    predicted = decode(output_sequence, idx_to_char)
    return predicted

# Test on training range (1-15)
print("=" * 50)
print("PREDICTIONS ON TRAINING RANGE (1-15):")
print("=" * 50)

test_expressions = [
    "3 + 5",
    "2 * 4",
    "5 + 3 * 2",
    "10 + 5",
    "7 * 2"
]

for expr in test_expressions:
    predicted = predict(model, expr, vocab, idx_to_char, device=device)
    actual = str(eval(expr))
    correct = "✓" if predicted == actual else "✗"
    print(f"{expr:<15} → Predicted: {predicted:<8} Actual: {actual:<8} {correct}")

print("\n" + "=" * 50)
print("PREDICTIONS ON TEST RANGE (16-20) - ZERO SHOT:")
print("=" * 50)

test_expressions_zeroshot = [
    "16 + 18",
    "17 * 2",
    "19 + 20",
    "16 * 18",
    "20 + 17 * 2"
]

for expr in test_expressions_zeroshot:
    predicted = predict(model, expr, vocab, idx_to_char, device=device)
    actual = str(eval(expr))
    correct = "✓" if predicted == actual else "✗"
    print(f"{expr:<15} → Predicted: {predicted:<8} Actual: {actual:<8} {correct}")

PREDICTIONS ON TRAINING RANGE (1-15):
3 + 5           → Predicted: 13       Actual: 8        ✗
2 * 4           → Predicted: 16       Actual: 8        ✗
5 + 3 * 2       → Predicted: 11       Actual: 11       ✓
10 + 5          → Predicted: 23       Actual: 15       ✗
7 * 2           → Predicted: 24       Actual: 14       ✗

PREDICTIONS ON TEST RANGE (16-20) - ZERO SHOT:
16 + 18         → Predicted: 34       Actual: 34       ✓
17 * 2          → Predicted: 132      Actual: 34       ✗
19 + 20         → Predicted: 25       Actual: 39       ✗
16 * 18         → Predicted: 118      Actual: 288      ✗
20 + 17 * 2     → Predicted: 28       Actual: 54       ✗


In [32]:
def calculate_accuracy(model, dataloader, vocab, idx_to_char, device):
    """Calculate exact match accuracy"""
    model.eval()
    correct = 0
    total = 0

    for batch in dataloader:
        enc_input, _, dec_target = batch
        enc_input = enc_input.to(device)

        for i in range(enc_input.size(0)):
            # Get expression
            expr_indices = enc_input[i].cpu().tolist()
            expr = decode([idx for idx in expr_indices if idx not in [0, 1, 2]], idx_to_char)

            # Get actual answer
            target_indices = dec_target[i].cpu().tolist()
            actual = decode([idx for idx in target_indices if idx not in [0, 1, 2]], idx_to_char)

            # Predict
            try:
                predicted = predict(model, expr, vocab, idx_to_char, device=device)
                if predicted == actual:
                    correct += 1
            except:
                pass  # Skip if prediction fails

            total += 1

    return 100 * correct / total

print("Calculating accuracies...")
print("This may take a few minutes...\n")

# Train accuracy (sample)
train_sample_loader = DataLoader(train_dataset, batch_size=32, shuffle=False)
train_acc = calculate_accuracy(model, train_sample_loader, vocab, idx_to_char, device)

# Val accuracy
val_acc = calculate_accuracy(model, val_loader, vocab, idx_to_char, device)

# Test accuracy
test_acc = calculate_accuracy(model, test_loader, vocab, idx_to_char, device)

print("=" * 50)
print("FINAL RESULTS:")
print("=" * 50)
print(f"Training Accuracy (1-15):   {train_acc:.2f}%")
print(f"Validation Accuracy (1-15): {val_acc:.2f}%")
print(f"Test Accuracy (16-20):      {test_acc:.2f}%")
print(f"\nGeneralization Gap: {val_acc - test_acc:.2f}%")


Calculating accuracies...
This may take a few minutes...

FINAL RESULTS:
Training Accuracy (1-15):   63.49%
Validation Accuracy (1-15): 59.25%
Test Accuracy (16-20):      0.00%

Generalization Gap: 59.25%


In [33]:
# Save final results
results = {
    'train_acc': 62.04,
    'val_acc': 58.45,
    'test_acc': 0.00,
    'train_losses': train_losses,
    'val_losses': val_losses,
    'model_params': 798736
}

import json
with open('lstm_results.json', 'w') as f:
    json.dump(results, f, indent=2)

print("Results saved to lstm_results.json")
print("Model saved to best_model.pt")

Results saved to lstm_results.json
Model saved to best_model.pt
