# Lab 5: RNNs for sequence prediction
#### Prof. Forrest Davis, Colgate University

The goal of this notebook is to introduce you to PyTorch and give you hands-on experience with building and using RNNs.

Here are some learning objectives for this lab:

1. Build an intution for how RNNs work
2. Describe and apply some key aspects of PyTorch
3. Understand how to create a training pipeline for a neural network
4. Describe the merits of different evaluation approaches

In [None]:
import pandas as pd
import torch
import numpy as np
import time 

## Part 1: Load and preprocess data

In [None]:
data = pd.read_csv('addition_data.csv')
train = data[data['split'] == 'train']
valid = data[data['split'] == 'valid']
test = data[data['split'] == 'test']
print(train.head(10))

[**Written Answer**] What is our task (e.g., what is our output showing)? 

[**Code Answer**] Complete `encode_data` and `decode_data`

In [None]:
def get_vocab() -> dict:
    """ Get the relevant vocab """
    mapping = {}
    for i in range(10):
        mapping[str(i)] = i

    # Add special chars
    mapping[' '] = 10
    mapping['+'] = 11
    mapping['='] = 12
    mapping['?'] = 13
    
    return mapping 
    
def encode_data(data: np.array, mapping: dict) -> np.array: 
    """ Format our input into samples of tens ones op tens ones. 
    Args:
        data (np.array): Array of input strings
        mapping (dict): A mapping from characters to ids (i.e. numbers)

    Returns: 
        (np.array): Encoded samples 

    For example, with data as ["76 + 26 = ???"] and mapping the output of get_vocab,
    then the output should be [ 7  6 10 11 10  2  6 10 12 10 13 13 13], 
    Notice that there are numbers for each character (including the space character).
    """
    pass

def decode_data(data: np.array, mapping: dict) -> list:
    """ Converted encoded data back into strings
    Args:
        data (np.array): Array of encoded data (i.e. numbers)
        mapping (dict): A mapping from characters to ids (i.e. numbers)

    Returns: 
        (np.array): Decoded samples 

    For example, with data as [ 7  6 10 11 10  2  6 10 12 10 13 13 13] and mapping the output of get_vocab,
    then the output should be ["76 + 26 = ???"]
    """
    pass

In [None]:
mapping = get_vocab()
X = encode_data(train['input'].to_numpy(), mapping)
print(X)
Y = encode_data(train['output'].to_numpy(), mapping)

# Helpful print statements
#print(train['input'].to_numpy()[0], X[0], decode_data(X, mapping)[0])
#print(train['output'].to_numpy()[0], Y[0], decode_data(Y, mapping)[0])

Consider the following code snippet. 

[**Written Answer**] What does `one_hot` do and what does the line containing `labels_again` do and how? 

In [None]:
labels = torch.randint(0, 10, (10,))
one_hot = torch.nn.functional.one_hot(labels)
labels_again = torch.argmax(one_hot, dim=1)

[**Writen Answer**] Notice that we are using torch [tensors](https://pytorch.org/docs/stable/tensors.html). What are they?

In [None]:
# make tensors and one hot
def oneHot(data: np.array, mapping: dict) -> torch.tensor:
    d = torch.tensor(data).long()
    return torch.nn.functional.one_hot(d, len(mapping)).float()

def unHot(data: torch.tensor, mapping: dict) -> np.array:
    return torch.argmax(data, dim=-1).numpy()

assert (X == unHot(oneHot(X, mapping), mapping)).all()

## Part 2: Investigate a RNN

Below is a scaffold for a minimal implementation of a RNN that solves our addition task. Your task in this section is to complete the code. Consider PyTorch's [RNN](https://pytorch.org/docs/stable/generated/torch.nn.RNN.html) and other [documentation](https://pytorch.org/docs/stable/index.html).

In [None]:
class RNNModel(torch.nn.Module):

    def __init__(self, nInput, nHidden, nLayers, batchFirst = True):
        super(RNNModel, self).__init__()
        self.nInput = nInput
        self.nHidden = nHidden
        self.nLayers = nLayers
        self.batchFirst = batchFirst
        # Need to set an RNN and a decoder which maps from the last 
        # layer of the RNN to the predictions

    def forward(self, observation, hidden):
        """ Does the forward computation 

        Parameters: 
            observation (torch.Tensor): Tokenized input to the model 
            hidden (torch.Tensor): Prior hidden representations
        Returns: 
            torch.Tensor: logits of the model      
        """
        raise NotImplementedError
        
    def init_hidden(self, batchSize):
        """ Returns an empty hidden representation of size number of layers X batch size X number 
            of hidden units

        Parameters:
            batchSize (int): Batch size
        Returns:
            torch.tensor: All zero's hidden representation to use for the first input in a sequence
        """
        return torch.zeros((self.nLayers, batchSize, self.nHidden), dtype=torch.float)


Imagine we have two samples, each with a sequence length of 6 and an input dimensionality of 4. The following code snippet runs a forward pass on such a case with dummy data. 

In [None]:
X = torch.rand((2, 6, 4))
print(X)
print(X.shape)

m = RNNModel(nInput=4, nHidden=5, nLayers=2, batch_first=True)
hidden = m.init_hidden(2)
output, hidden = m.forward(X, hidden)
print(output)
print(output.shape)

## Part 3: Train and Evaluate

[**Written Answer**] Answer the following questions

1. What types of objects are the model, loss, and cost?
2. Why do we have `@torch.no_grad()` in front of accuracy and eval loss? What does this tell us about the model's default behavior? 

In [None]:
def train_model(X, Y, model, nEpochs, batchSize, lr):
    model.train()
    
    cost = torch.nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=lr, momentum=0.9, nesterov=True)
    for epoch in range(nEpochs):
        total_loss = 0.
        for idx in range(0, X.size(0), batchSize):
            optimizer.zero_grad()
            
            batch_X = X[idx:idx+batchSize, :, :]
            batch_Y = Y[idx:idx+batchSize,:]

            if batch_X.size(0) != batchSize:
                continue
            
            hidden = model.init_hidden(batchSize)
            logits, hidden = model.forward(batch_X, hidden)
            loss = cost(logits.reshape(-1, model.nInput), batch_Y.flatten().long())
            total_loss += loss.item()
            loss.backward()
            # `clip_grad_norm` helps prevent the exploding gradient problem in RNNs / LSTMs.
            torch.nn.utils.clip_grad_norm_(model.parameters(), 0.25)
            optimizer.step()
        print(f"Epoch {epoch}, Total Loss: {total_loss}")

In [None]:
# Set up data
mapping = get_vocab()
X = encode_data(train['input'].to_numpy(), mapping)
print(X)
Y = encode_data(train['output'].to_numpy(), mapping)
encoded_X = oneHot(X, mapping)
encoded_Y = torch.tensor(Y)[:,-3:]

In [None]:
nInput = encoded_X.shape[-1]
nHidden = 50
nLayers = 2
model = RNNModel(nInput, nHidden, nLayers)
train_model(encoded_X, encoded_Y, model, 15, 20, 0.05)

In [None]:
@torch.no_grad()
def evalLoss(X, Y, model):
    model.eval()
    total_loss = 0.
    cost = torch.nn.CrossEntropyLoss()

    for idx in range(X.size(0)):
        hidden = model.init_hidden(1)
        logits, hidden = model.forward(X[idx,:,:].unsqueeze(0), hidden)
        loss = cost(logits.reshape(-1, model.nInput), Y[idx,:].flatten().long())
        total_loss += loss.item()
    return total_loss/X.size(0)

def getPredictions(logits):
    probabilities = torch.nn.functional.softmax(logits, dim=-1)
    return torch.argmax(probabilities, -1)

@torch.no_grad()
def accuracy(X, Y, model):
    model.eval()
    correct = 0
    total = 0
    for idx in range(X.size(0)):
        hidden = model.init_hidden(1)
        logits, hidden = model.forward(X[idx,:,:].unsqueeze(0), hidden) 
        yhat = getPredictions(logits).flatten()
        ytrue = Y[idx, :].flatten()
        if (yhat == ytrue).all():
            #print(decode_data(unHot(X[idx, :, :].unsqueeze(0), mapping), mapping), ytrue)
            correct += 1
        total += 1
        
    return round(100*correct/total, 2)

In [None]:
valid_X = oneHot(encode_data(valid['input'].to_numpy(), mapping), mapping)
valid_Y = torch.tensor(encode_data(valid['output'].to_numpy(), mapping))[:, -3:]
accuracy(valid_X, valid_Y, model)

[**Code Answer**] Find better hyperparameters

The following functions permit you to interact with your model by putting in expressions and seeing the model's predictions. 

[**Code Answer**] Play around with your model and try to find some cases that it gets wrong.

In [None]:
def checkExpression(expression: str) -> bool:
    parts = expression.split(' ')
    if parts[0] == 'n':
        return True
    if len(parts) != 3:
        print("Expression is invalid. Valid expression are like 01 + 03")
        return False
    # Two digit numbers on left hand side
    if (len(parts[0]) != 2 or len(parts[2]) != 2):
        print('Left hand side has two two digit numbers')
        return False
    if parts[1] != '+':
        print('Need the +')
        return False
    return True

@torch.no_grad()
def interact(model, mapping):
    model.eval()
    while True:
        getInput = True
        while getInput:
            expression = input("Enter an expression (n to stop): ")
            if checkExpression(expression):
                getInput=False
        if expression == 'n':
            break
        X = oneHot(encode_data(np.array([expression+' = ???']), mapping), mapping)
        hidden = model.init_hidden(1)
        logits, _ = model(X, hidden)
        prediction = getPredictions(logits).flatten()
        output = ''.join(map(lambda x: str(int(x)), prediction))
        print(f"The model predicts: {output}")

interact(model, mapping)

## Part 4: Fun different data splits

Consider the following data and training set up. 

[**Written Answer**] Explain why model performance is so much worse in this set up. Come up with a hypothesis for at least one other data split that might result in worse performance.

In [None]:
# Let's try one with no symmetry, so you only see one of x + y / y + x

first_pair = {}
second_pair = {}
for i in range(0, 100):
    for j in range(0, 100):
        result = str(i + j) 
        str_i = str(i)
        str_j = str(j)
        if len(str_i) < 2:
            str_i = '0'+str_i
        if len(str_j) < 2:
            str_j = '0' + str_j
        result = '0'*(3-len(result))+result
        v1 = f"{str_i} + {str_j} = ???"
        v2 = f"{str_j} + {str_i} = ???"
        if not(v1 in first_pair or v2 in first_pair):
            first_pair[v1] = f"{str_i} + {str_j} = {result}"
        else:
            second_pair[v1] = f"{str_i} + {str_j} = {result}"

for pair in first_pair:
    assert pair not in second_pair

# Put data in pandas dataframe so I can shuffle
train_df = pd.DataFrame.from_dict({'input': list(first_pair.keys()), 
                                'output': list(first_pair.values())}).sample(frac=1).reset_index(drop=True)
test_df = pd.DataFrame.from_dict({'input': list(second_pair.keys()), 
                                'output': list(second_pair.values())}).sample(frac=1).reset_index(drop=True)

# Encode data 
mapping = get_vocab()
X_train = oneHot(encode_data(train_df['input'].to_numpy(), mapping), mapping)
Y_train = torch.tensor(encode_data(train_df['output'].to_numpy(), mapping))[:,-3:]
X_test = oneHot(encode_data(test_df['input'].to_numpy(), mapping), mapping)
Y_test = torch.tensor(encode_data(test_df['output'].to_numpy(), mapping))[:,-3:]

# Train Model

nInput = X_train.shape[-1]
nHidden = 250
nLayers = 3
model = RNNModel(nInput, nHidden, nLayers)
train_model(X_train, Y_train, model, 15, 20, 0.05)
print('Accuracy', accuracy(X_test, Y_test, model))

[**Code Answer**] Time permitting, test your hypothesis!