# Addition using Recurrent Neural Networks
This notebook demonstrates how to perform a simple arithmetic operation, specifically addition, using a Recurrent Neural Network (RNN). The goal is to train an RNN model that can take two integers as inputs and produce their sum as output. 

As it is easy to note, this does not work. Can you fix it?

In [1]:
import numpy as np
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch

# Data
In this section, we generate synthetic data for training the RNN. Each data point consists of two integers $ A $ and $ B $, where $ X = (A, B) $ represents the input, and $ Y = A + B $ serves as the target output. 

We define a function `generate_data` to create random integer pairs

In [2]:
def generate_data(n_samples, n_min = 1, n_max = 100, seed=42):
    np.random.seed(seed)
    X = np.zeros((n_samples, 2), dtype=int)
    cont = 0
    while cont < n_samples:
        A = np.random.randint(n_min, n_max)
        B = np.random.randint(n_min, n_max)
        X[cont, 0] = A
        X[cont, 1] = B
        cont += 1
        X[cont, 1] = A
        X[cont, 0] = B
        cont += 1
    return X

n_min = 0
n_max = 100
X_int_train = generate_data(10000, n_min = n_min, n_max = n_max)
# No validation for this simple example
print(X_int_train[0:6, :])

[[51 92]
 [92 51]
 [14 71]
 [71 14]
 [60 20]
 [20 60]]


## Vocabulary

To process the input data, we need to map each character or symbol in the input to a numerical representation. We create a vocabulary containing digits and special characters used in this task:

1. **Digits (0-9)**: Each digit is represented by its corresponding index.
2. **Special Symbols**: The addition symbol "+" is mapped to a unique index, a padding token `<PAD>` is introduced to handle sequence lengths, `<EOS>` token denotes the end of the sequence

In [3]:
int_to_idx = {str(i) : i for i in range(10)}
int_to_idx['+'] = 10
int_to_idx['<PAD>'] = 11
int_to_idx['<EOS>'] = 12

idx_to_int = {i : str(i) for i in range(10)}
idx_to_int[10] = '+'
idx_to_int[11] = "<PAD>"
idx_to_int[12] = "<EOS>"

## Dataset and Dataloaders

Before training, we need to encode the input sequences as numerical arrays, using the vocabulary defined earlier. The model expects fixed-length sequences, so padding is applied to ensure uniform input size.

In [4]:
class IntDataset(Dataset):
    def __init__(self, X, int_to_idx):
        self.num_data = X
        self.vocab_size  = len(int_to_idx)
        self.map = int_to_idx

    def __len__(self):
        return self.num_data.shape[0]

    def __getitem__(self, idx):
        '''
        Input: two integers
        Output: their sum
        '''
        X = []
        y = []
        A, B = self.num_data[idx, :]
        A = str(A)
        B = str(B)
        
        # Take the maximum length 
        length_max = max(len(A), len(B))
        
        # Pad both to the maximal lengh
        while len(A) < length_max:
            A = '0' + A
        while len(B) < length_max:
            B = '0' + B
            
        for c in A:
            X.append(self.map[c])
        
        X.append(self.map["+"])
        
        
        for c in B:
            X.append(self.map[c])
            
        X.append(self.map["<EOS>"])
        
        C = int(A) + int(B)
        
        C = str(C)
        for c in C:
            y.append(self.map[c])
            
        y.append(self.map["<EOS>"])
        
        return torch.tensor(X), torch.tensor(y)

# Dataset for the training
dataset_train = IntDataset(X_int_train, int_to_idx)

In [5]:
def collate_fn(batch, pad_value):
    data, targets = zip(*batch)

    padded_data = nn.utils.rnn.pad_sequence(data, batch_first=True,
                                          padding_value=pad_value)
    padded_targets = nn.utils.rnn.pad_sequence(targets, batch_first=True,
                                             padding_value=pad_value)
    
    return padded_data, padded_targets

# Dataloader
batch_size = 8
dataloader_train = DataLoader(dataset_train, batch_size=batch_size, collate_fn=lambda b: collate_fn(b, int_to_idx["<PAD>"]),  shuffle=True)

## Model

In this part, we define the architecture of the RNN. The model consists of:

1. **Embedding Layer**: Transforms the input indices into dense vector representations.
2. **RNN Layer**: Processes the sequence to capture dependencies between the input values.
3. **Output Layer**: Maps the RNN's hidden states to a final output.

In [6]:
class Model(nn.Module):
    def __init__(self, int_to_idx, idx_to_int, hidden_size, emb_dim=8, n_layers=1):
        super(Model, self).__init__()

        self.vocab_size  = len(int_to_idx)
        self.hidden_size = hidden_size
        self.emb_dim     = emb_dim
        self.n_layers    = n_layers

        self.embedding = nn.Embedding(
            num_embeddings=self.vocab_size,
            embedding_dim =self.emb_dim,
            padding_idx=int_to_idx["<PAD>"])

        self.rnn = nn.RNN(input_size=self.emb_dim,
                          hidden_size=self.hidden_size,
                          num_layers =self.n_layers,
                          batch_first=True, nonlinearity='relu')


        self.fc = nn.Linear(
            in_features =self.hidden_size,
            out_features=self.vocab_size)

    def forward(self, x, prev_state):
        embed = self.embedding(x)
        yhat, state = self.rnn(embed, prev_state)
        out = self.fc(yhat)
        return out, state

    def init_state(self, batch_size=2):
        if batch_size > 1:
            return torch.zeros(self.n_layers, batch_size, self.hidden_size)
        else:
            return torch.zeros(self.n_layers, self.hidden_size)

## Train

In [7]:
model = Model(int_to_idx, idx_to_int, hidden_size = 128, n_layers=2)
epochs = 100
lr = 0.001
criterion = nn.CrossEntropyLoss(ignore_index=int_to_idx["<PAD>"])
optimizer = torch.optim.SGD(model.parameters(), lr = lr, momentum = 0.99)

# Set the exact device 
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'mps' 
    if torch.backends.mps.is_available() else 'cpu')
model = model.to(DEVICE)


In [8]:
model.train()

for epoch in range(epochs):
    running_loss = 0
    for X, y in dataloader_train:
        # Pad y until the length of X with <PAD> token 
        diff_shape = X.shape[1] - y.shape[1]
        
        y_pad = torch.full((y.shape[0], diff_shape), int_to_idx["<PAD>"])
        
        # Concatenate 
        y = torch.cat((y, y_pad), 1)
        
        optimizer.zero_grad()
        
        
        # Initialize the state h_0
        prev_state = model.init_state(batch_size=batch_size)
        prev_state = prev_state.to(DEVICE)
            
        # Forward
        X = X.to(DEVICE)
        y_pred, h = model(X, prev_state)
        
        

        # Calculate loss
        y = y.to(DEVICE)
        loss = criterion(y_pred.transpose(1, 2), y)  
        running_loss += loss.item()


        # Calculate gradients and update parameters
        loss.backward()
        
        # Clip the gradient 
        nn.utils.clip_grad_norm_(model.parameters(), 1)
        optimizer.step()
    
    if (epoch + 1) % 10 == 0:
        print("Epoch {}, Loss = {}".format(epoch + 1, running_loss / len(dataloader_train)))

Epoch 10, Loss = 1.5284101007461548
Epoch 20, Loss = 1.520075101661682
Epoch 30, Loss = 1.5144786883354187
Epoch 40, Loss = 1.5099656485557555
Epoch 50, Loss = 1.507754513168335
Epoch 60, Loss = 1.506077953338623
Epoch 70, Loss = 1.5053257856369018
Epoch 80, Loss = 1.5032473198890686
Epoch 90, Loss = 1.5036709668159485
Epoch 100, Loss = 1.5020613679885864


## Test set

In [9]:
model.to("cpu")
test_size = 1000
X_int_test = generate_data(test_size, n_min = n_min, n_max = n_max)
correct = 0
dataset_test = IntDataset(X_int_test, int_to_idx)
for i, (X, y) in enumerate(dataset_test):
    optimizer.zero_grad()
        
        
    # Initialize the state h_0
    prev_state = model.init_state(batch_size=1)

    # Forward
    y_pred, h = model(X, prev_state)
    
    _, predicted = torch.max(y_pred.data, 1)
    
    out = ""
    for x in predicted:
        if x == int_to_idx["<EOS>"]:
            break
        out += str(idx_to_int[int(x)])
    
    
    out = int(out)
    
    if out == X_int_test[i, 0] + X_int_test[i, 1]:
        correct += 1
    
print("Accuracy:", 100 * correct / test_size, "%")

Accuracy: 0.5 %


## Custom numbers

In this section you can sum your favourite integers

In [10]:
A = np.random.randint(n_min, n_max)
B = np.random.randint(n_min, n_max)
C = A + B 

X = []
y = []
A = str(A)
B = str(B)

 # Take the maximum length 
length_max = max(len(A), len(B))

# Pad both to the maximal lengh
while len(A) < length_max:
    A = '0' + A
while len(B) < length_max:
    B = '0' + B
    
for c in A:
    X.append(int_to_idx[c])

X.append(int_to_idx["+"])

for c in B:
    X.append(int_to_idx[c])

X.append(int_to_idx["<EOS>"])

 # Initialize the state h_0  (no batch)
prev_state = model.init_state(batch_size=1)

# Forward
y_pred, h = model(torch.tensor(X), prev_state)

_, predicted = torch.max(y_pred.data, 1)

out = A + " + " +  B + " = "
for x in predicted:
    if x == int_to_idx["<EOS>"]:
        break
    out += str(idx_to_int[int(x)])
    
print(" Predicted:", out, ". Acutual: ", C)

 Predicted: 33 + 07 = 12 . Acutual:  40
