# Assignment 3: Text processing with LSTM in PyTorch

*Author:* Thomas Adler

*Copyright statement:* This  material,  no  matter  whether  in  printed  or  electronic  form,  may  be  used  for  personal  and non-commercial educational use only.  Any reproduction of this manuscript, no matter whether as a whole or in parts, no matter whether in printed or in electronic form, requires explicit prior acceptance of the authors.

In this assignment you will a train an LSTM to generate text. To be able to feed text into (recurrent) neural networks we first have to choose a good representation. There are several options to do so ranging from simple character embeddings to more sophisticated approaches like [word embeddings](https://towardsdatascience.com/introduction-to-word-embedding-and-word2vec-652d0c2060fa) or [token embeddings](https://medium.com/@_init_/why-bert-has-3-embedding-layers-and-their-implementation-details-9c261108e28a). We will use a character embedding in this assignment. 

Character embeddings work as follows. First we define an alphabet, a set of characters that we want to be able to represent. To feed a character into our network we use a one-hot vector. The dimension of this vector is equal to the size of our alphabet and the "hot" position indicates the character we want to represent. While this is logically a decent representation (all characters have the same norm, are orthogonal to one another, etc.) it is inefficient in terms of memory because we have to store a lot of zeros. In the first layer of our network we will multiply our one-hot vector with a weight matrix, i.e. we compute the preactivation by a matrix-vector product of the form $We_i$, where $e_i$ is the $i$-th canonical basis vector. This operation corresponds to selecting the $i$-th column of $W$. So an efficient implementation is to perform a simple lookup operation in $W$. This is how embedding layers work also for word or token embeddings. They are learnable lookup tables. 

## Exercise 1: Encoding characters

Write a class `Encoder` that implements the methods `__init__` and `__call__`. The method `__init__` takes a string as argument that serves as alphabet. The method `__call__` takes one argument. If it is a string then it should return a sequence of integers as `torch.Tensor` of shape  representing the input string. Each integer should represents a character of the alphabet. The alphabet consists of the characters matched by the regex `[a-z0-9 .!?]`. If the input text contains characters that are not in the alphabet, then `__call__` should either remove them or map them to a corresponding character that belongs to the alphabet. If the argument is a `torch.Tensor`, then the method should return a string representation of the input, i.e. it should function as decoder. 

In [6]:
import re
import torch
import string


########## YOUR SOLUTION HERE ##########
class Encoder:
    def __init__(self,alphabet): 
#         alphabet = alphabet.lower()
#         alphabet = re.sub(r"(@[A-Za-z0-9]+)|([^0-9A-Za-z \t])|(\w+://\S+)|^rt|http.+?", "", alphabet)
        
        self.char = sorted(set(''.join(alphabet)))
        self.char_dictionary = {char:ind for ind,char in enumerate(self.char)}
        self.char_int = {ind:char for ind,char in enumerate(self.char)}        
    
    def __call__(self,text):
        if type(text) == str:
            encode = []
            for l in text: 
                if l not in self.char_dictionary:
                    continue 
                encode.append(self.char_dictionary[l])
            return torch.FloatTensor(encode)
        else:
            decoded = []
            for l in text: 
                if int(l) not in self.char_int:
                    continue
                decoded.append(self.char_int[int(l)])
            return ''.join(decoded)
        
    def __len__(self):
        return len(self.char)

## Exercise 2: Pytorch Dataset

Write a class `TextDataset` that derives from `torch.utlis.data.Dataset`. It should wrap a text file and utilize it for training with pytorch. Implement the methods `__init__`, `__len__`, `__getitem__`. The method `__init__` should take a path to a text file as string and an integer `l` specifying the length of one sample sequence. The method `__len__` takes no arguments and should return the size of the dataset, i.e. the number of sample sequences in the dataset. The method `__getitem__` should take an integer indexing a sample sequence and should return that sequence as a `torch.Tensor`. The input file can be viewed as one long sequence. The first sample sequence consists of the characters at positions `0..l-1` in the input file. The second sequence consists of the characters at positions `l..2*l-1` and so on. That is, the samples of our dataset are non-overlapping sequences. The last incomplete sequence may be dropped. 

In [11]:
import torch
from torch.utils.data import Dataset

char = string.printable[:75]
########## YOUR SOLUTION HERE ##########
class TextDataset(Dataset):
    def __init__(self, path, length, alphabet):
        with open(path,encoding="utf8") as p:
            sample = p.read().lower()
        self.sample_sequence = re.sub("\n", "", sample)
        self.length = length
        x = self.length
        self.encoder = Encoder(alphabet)
        encoded_text = self.encoder(self.sample_sequence)
        
        self.slice =[encoded_text[y-x:y] for y in range(x, len(encoded_text)+x,x)]

    def __len__(self):
        return len(self.slice)
    
    def __getitem__(self, i):
        if len(self.slice[i]) != self.length:
            len_pad = self.length - len(self.slice[i])
            empty = len(self.encoder.char)
            en_pad = torch.ones(self.length)*empty
            enc = self.slice[i]
            
            for i in range(len(enc)):
                en_pad[i] = enc[i]
                
            return en_pad
        else:        
            return self.slice[i]
        

## Exercise 3: The Model

Write a class `NextCharLSTM` that derives from `torch.nn.Module` and takes `alphabet_size`, the `embedding_dim`, and the `hidden_dim` as arguments. It should consist of a `torch.nn.Embedding` layer that maps the alphabet to embeddings, a `torch.nn.LSTM` that takes the embeddings as inputs and maps them to hidden states, and a `torch.nn.Linear` output layer that maps the hidden states of the LSTM back to the alphabet. Implement the methods `__init__` that sets up the module and `forward` that takes an input sequence and returns the logits (i.e. no activation function on the output layer) of the model prediction at every time step. 

In [15]:
import torch.nn as nn
import torch.nn.functional as F


########## YOUR SOLUTION HERE ##########
# ## Exercise 3: The Model
class NextCharLSTM(nn.Module):
    def __init__(self, alphabet_size, embedding_dim, hidden_dim):
        super(NextCharLSTM, self).__init__()
        self.embedding = nn.Embedding(alphabet_size,embedding_dim)
        self.lstm = nn.LSTM(embedding_dim,hidden_dim,batch_first=True)
        self.linear = nn.Linear(hidden_dim,alphabet_size)
        self.hidden_dim = hidden_dim

    def forward(self, x, prev_state):
        embedded = self.embedding(x.long())
        output, state = self.lstm(embedded, prev_state)        
        logits = self.linear(output)
        
        return logits, state
    
    def zero_state(self, batch_size):
        return (torch.zeros(1, batch_size, self.hidden_dim),
                torch.zeros(1, batch_size, self.hidden_dim))

## Exercise 4: Training/Validation Epoch

Write a function `epoch` that takes a `torch.utils.data.DataLoader`, a `NextCharLSTM`, and a `torch.optim.Optimizer` as arguments, where the last one might be `None`. If the optimizer is `None`, then the function should validate the model. Otherwise it should train the model for next-character prediction in the many-to-many setting. That is, given a sequence `x` of length `l`, the input sequence is `x[:l-1]` and the corresponding target sequence is `x[1:]`. The function should perform one epoch of training/validation and return the loss values of each mini batch as a numpy array. Use the cross-entropy loss function for both training and validation. 

In [19]:
from torch.utils.data import DataLoader
import numpy as np
from tqdm import tqdm

device = torch.device("cpu")

optimizer = torch.optim.Optimizer
dataloader = torch.utils.data.DataLoader
criterion = nn.CrossEntropyLoss()

########## YOUR SOLUTION HERE ##########

def epoch(loader, network, criterion, optimizer=None):
    loss = 0
    if optimizer == None:
        network.eval()
    else:
        network.train()
    for data in tqdm(loader):
        data.to(device)
        
        x = data[:,:-1]
        y = data[:,1:]
        state_h, state_c = (torch.zeros(1, data.shape[0], network.hidden_dim),
                torch.zeros(1, data.shape[0], network.hidden_dim))
        state_h = state_h.to(device)
        state_c = state_c.to(device)
        #for x,y in zip(inputs,target):
        if optimizer != None:
            optimizer.zero_grad()
        
        x = torch.tensor(x).to(device)
        y = torch.tensor(y).to(device)
        logits, (state_h, state_c) = network(x, (state_h, state_c))
        loss = criterion(logits.transpose(1, 2), y.long())
        state_h = state_h.detach()
        state_c = state_c.detach()

        loss_value = loss.item()
        loss += loss_value
        if optimizer != None:
        # Perform back-propagation
            loss.backward()

            # Update the network's parameters
            optimizer.step()

    return loss

## Exercise 5: Model Selection

Usually, we would now train and validate our model with different hyperparameters to see which setting performs best. However, this pretty expensive in terms of compute so we will provide you with a setting that should work quite well. Train your model for 30 epochs using `torch.optim.Adam`. Validate your model after every epoch and persist the model that performs best on the validation set using `torch.save`. Visualize and discuss the training and validation progress. 

In [20]:
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader

sequence_length = 100
batch_size = 256
embedding_dim = 8
hidden_dim = 512
learning_rate = 1e-3
num_epochs = 100

alphabet = "abcdefghijklmnopqrstuvwxyz0123456789 .!?"

network = NextCharLSTM(alphabet_size=len(alphabet), embedding_dim=8, hidden_dim=512)
network.to(device)

########## YOUR SOLUTION HERE ##########

def get_loss_and_train(model, lr=0.001):
    criterion = nn.CrossEntropyLoss()
    opt = torch.optim.Adam(network.parameters(), lr=lr)
    return criterion, opt

alphabet = "abcdefghijklmnopqrstuvwxyz0123456789 .!?"
criterion, opt = get_loss_and_train(network, learning_rate)

train = TextDataset(path='trump_train.txt', length=sequence_length,alphabet=alphabet)  
val_dataset = TextDataset(path='trump_val.txt', length=sequence_length,alphabet=alphabet)

train_loader = DataLoader(train, batch_size=batch_size, shuffle=False,drop_last=False)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

Loss = []
Validation_loss = []

for i in range(num_epochs):
    loss = 0 
    validation_loss = 0
    
    loss += epoch(train_loader, network, criterion, opt)
    validation_loss += epoch(val_loader, network, criterion, )
    
    Loss.append(loss)
    Validation_loss.append(validation_loss)
    
    print(f'Epoch: {i+1} Loss: {loss} Valid_Loss: {validation_loss}')

  x = torch.tensor(x).to(device)
  y = torch.tensor(y).to(device)
 97%|███████████████████████████████████████████████████████████████████████████████▌  | 32/33 [00:58<00:01,  1.84s/it]


IndexError: index out of range in self

In [None]:
plt.figure(figsize=(10, 10))
plt.title("learning curves")
plt.grid()
plt.plot(Loss)
plt.plot(Validation_loss)
plt.xlabel('Epoch')
plt.ylabel('Loss')

## Exercise 6: Top-$k$ Accuracy

Write a function `topk_accuracy` that takes a list of integers $k$, a model, and a data loader and returns the top-$k$ accuracy of the model on the given data set for each $k$. A sample is considered to be classified correctly if the true label appears in the top-$k$ classes predicted by the model. Then load the best model from the previous exercise using `torch.load` and plot its top-$k$ accuracy as a function of $k$ for all possible values of $k$. Discuss the results. 

In [None]:
########## YOUR SOLUTION HERE ##########
def topk_accuracy(k_list, model, dataloader):
    acc = np.empty(len(k_list))
    for i in range(len(k_list)):
        
        y_in_k = []
        for x in dataloader:
            logits, _ = model(x)
            top_k = torch.argsort(logits, dim=-1, descending=True)[:, -1, :k_list[i]]
            
            y = x[:, -1]
            y_in_k += [y[i] in top_k[i] for i in range(y.shape[0])]
        
        acc[i] = sum(y_in_k) / len(y_in_k)
        
    return acc


In [None]:
alphabet = "abcdefghijklmnopqrstuvwxyz0123456789 .!?"
net = NextCharLSTM(alphabet_size,embedding_dim,hidden_dim)

list_k = list(range(1, len(alphabet) + 1))
accuracies = topk_accuracy(list_k, net, val_loader)
    
# Plot the results
fig, ax = plt.subplots(figsize=(8, 6))

ax.set_xlabel("k")
ax.set_ylabel("Accuracy")
ax.set_title(f"Top-k Accuracy")

plt.plot(list_k, accuracies)

plt.show()

## Exercise 7: Deterministic Text Generation

In this exercise we utilize the trained network to generate novel text. To do this, take some seed text, which can be chosen by the user, and feed it to the network. Subsequently, extrapolate new text by always appending the top-1 character according to the model prediction to the input sequence. Discuss the quality of your model as a text generator. 

In [None]:
########## YOUR SOLUTION HERE ##########
enc = Encoder("abcdefghijklmnopqrstuvwxyz0123456789 .!?")

def det_text_generation(text, model, encoder, length):
    # Encode the seed text
    encoded_text = enc(text)
    
    if len(encoded_text) >= length:
        raise ValueError("Target text length cannot be smaller or equal than the seed text.")
    
    # Generate new text
    x = torch.empty(size=(1, length), dtype=int)
    x[0, :len(encoded_text)] = encoded_text[:]
    
    for i in range(len(encoded_text), length): 
        logits, _ = model.forward(x[:, :i])
        x[0, i] = torch.argmax(logits[:, -1, :])
        
    return enc(x[0, :])


In [None]:
enc = Encoder("abcdefghijklmnopqrstuvwxyz0123456789 .!?")
print(det_text_generation("america", network, enc, 100))

## Exercise 8: Probabilistic Text Generation

Utilize your trained model as text generator as in the previous exercise but with one difference. Instead of always choosing the top-1 character make a probabilistic choice. The network prediction constitutes a probability distribution over the alphabet. Choose the next character by sampling from this distribution. Compare the results to those of the previous exercise and discuss the observed differences. 

In [None]:
from torch.distributions import Categorical

########## YOUR SOLUTION HERE ##########
def prob_text_generation(text, model, encoder, length):
    encoded_text = enc(text)
    
    if len(encoded_text) >= length:
        raise ValueError("Target text length cannot be smaller or equal than the seed text.")
    
    # Generate new text
    x = torch.empty(size=(1, length), dtype=int)
    x[0, :len(encoded_text)] = encoded_text[:]
    
    for i in range(len(encoded_text), length):        
        forward, _ = model.forward(x[:, :i])
        distribution = Categorical(logits=forward[:, -1, :])
        x[0, i] = distribution.sample().item() 
        
    return enc(x[0, :])

In [None]:
enc = Encoder("abcdefghijklmnopqrstuvwxyz0123456789 .!?")
print(prob_text_generation("america", network, enc, 100))

## Exercise 9: Visualize Neurons

Visualize the value of the 512 neurons while the trained model processes some user-defined text. Take a look at the last figure of [this blog](https://openai.com/blog/unsupervised-sentiment-neuron/) (which is also a good read) to get an idea of how to do the visualization. You can install and use the package `colorama` for that. Can you figure out certain repsonsibilities of certain neurons?

In [None]:
# provides readable names for ANSI escape sequences
from colorama import Fore, Back, Style

########## YOUR SOLUTION HERE ##########
def visualize_neurons(text, model, encoder):
    encoded_text = enc(text)
    decoded_text = enc(encoded_text)
    print(decoded_text)
    
    encoded_text = torch.reshape(enc(text), (1, len(text)))
      
    _, output = model.forward(encoded_text, ret_lstm_out=True)
    neuron_value = torch.squeeze(torch.moveaxis(lstm_out, 0, -1)).detach().numpy()
    
    

In [None]:
enc = Encoder("abcdefghijklmnopqrstuvwxyz0123456789 .!?")
text = "Isn't he a great guy."
visualize_neurons(text, network, enc)

## Bonus Exercise (3 Points):

Adapt your code from the previous exercises such that the model runs in the many-to-one setting, i.e., it should read `l-1` characters of a sample sequence and predict the `l`-th character. Train/validate the model in the many-to-one setting and compare it to the many-to-many setting in terms of top-$k$ accuracy on the validation set and probabilistic text generation. Visualize your results. What are the pros and cons? 

In [None]:
########## YOUR SOLUTION HERE ##########
