# Shakespeare poetry generation with recurrent neural networks (RNNs)

In this notebook we will take Shakespeares sonnets and train a recurrent neural network to generate poetry in a similar style. We will do so by training a character based simple RNN that takes in one character and learns to predict the likelihood of the next one. 
During inference we can use this trained network to give a random character to the model and let it sample an entire poem based on the previously sampled character (the network output) as input to the network's next time step.


This notebook is loosely based on the following towards data science post: https://towardsdatascience.com/writing-like-shakespeare-with-machine-learning-in-pytorch-d77f851d910c . Here the author uses a PyTorch LSTM to do poetry generation and we have taken the liberty to use the author's data preprocessing steps in this notebook for convenience and have added a RNN from scratch implementation together with the PyTorch LSTM model for the later part in this notebook. 

After data loading, we will thus first learn how to implement a recurrent neural network in PyTorch, but with the equations written from scratch similar to our previous Numpy implementations. However, adding this custom math to the PyTorch neural network model will allow us to nevertheless leverage PyTorch's backward and automatic gradient methods so that we do not need to implement this ourselves anymore. This is very helpful if we ever plan on implementing custom functions/layers in the future. 

If you feel like you first want to implement both RNNs and LSTMs from scratch entirely using Numpy, including forward and gradient calculations, Andrew Ng's deeplearning.ai deep learning specialization on coursera has a sequence model course (course 5) with an excellent first notebook on this: https://www.coursera.org/learn/nlp-sequence-models


In [0]:
import math
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn import Parameter

# Check whether GPU is available and can be used
# if CUDA is found then device is set accordingly
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

if not torch.cuda.is_available():
    print("Consider changing your run-time to GPU or training will be slow.")

## The data: Shakespeare's sonnets 

Shakespeare's sonnets can be found at the following URL featuring all of his works: http://shakespeare.mit.edu/

For convenience reasons we have extracted all the plain text of the sonnets: https://ocw.mit.edu/ans7870/6/6.006/s08/lecturenotes/files/t8.shakespeare.txt into a separate textfile and have added it to the class' repository. We will thus download it from there:

In [0]:
!pip install wget
import wget

# download the data
data = wget.download('https://raw.githubusercontent.com/ccc-frankfurt/Practical_ML_SS21/main/week06/sonnets.txt')

We can open the text file and print an excerpt.

In [0]:
# Open shakespeare text file and read the data
with open('sonnets.txt', 'r') as f:
    text = f.read()
    
# print an excerpt of the text 
print(text[:200])

As we are interested in a character based neural network, we will now create a mapping from the characters to numbers so that we can do our matrix calculations with numerical data. One such way is to simply replace every character with the corresponding integer in an alphabetical sequence. If we print our excerpt, we can now see the corresponding numerical values of each character.

In [0]:
# We create two dictionaries:
# 1. int2char, which maps integers to characters
# 2. char2int, which maps characters to integers
chars = tuple(set(text))
int2char = dict(enumerate(chars))
char2int = {ch: ii for ii, ch in int2char.items()}

# Encode the text
encoded = np.array([char2int[ch] for ch in text])

# Again showing the excerpt, but this time as integers 
encoded[:200]

### Data loader: batching

We now have our entire text file encoded as integers, which serves as our dataset. Next, we will need to define our data loader, mainly the part that is missing, the random sampling of batches. Let us define this method:

In [0]:
# Defining method to make mini-batches for training
def get_batches(arr, batch_size, seq_length):
    # determine the flattened batch size, i.e. sequence length times batch size
    batch_size_total = batch_size * seq_length
    # total number of batches we can make
    n_batches = len(arr)//batch_size_total
    
    # Keep only enough characters to make full batches
    arr = arr[:n_batches * batch_size_total]
    # Reshape into batch_size rows
    arr = arr.reshape((batch_size, -1))
    
    # iterate through the array, one sequence at a time
    for n in range(0, arr.shape[1], seq_length):
        # The features
        x = arr[:, n:n+seq_length]
        # The targets
        y = np.zeros_like(x)
        try:
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, n+seq_length]
        except IndexError:
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, 0]
        yield x, y

### Targets/Labels

We will be treating our problem as a classification task, where given an input the task is to predict the likelihood of the next character, i.e. we choose the class/character with the highest probability of a SoftMax output. Our model's output is thus a vector containing a probability for each unique character.

Since we want to be able to feed our model's output back as input for the next time step, we should also give the network a one-hot encoded character as the input instead of just an integer, similar to what we have seen on our lecture's last slide. 
This way the network gets as input a one-hot vector of length corresponding to the number of total unique characters and predicts the likelihood for each character as output (for the next character in the sequence). We will thus write a function that converts our encoded characters from integers to one-hot vectors.

In [0]:
def one_hot_encode(arr, n_labels):
    
    # Initialize the the encoded array
    one_hot = np.zeros((np.multiply(*arr.shape), n_labels), dtype=np.float32)
    
    # Fill the appropriate elements with ones
    one_hot[np.arange(one_hot.shape[0]), arr.flatten()] = 1.
    
    # Finally reshape it to get back to the original array
    one_hot = one_hot.reshape((*arr.shape, n_labels))
    
    return one_hot

## A simple RNN

We will start with writing a simple RNN in PyTorch. To get a better understanding of how the RNN model works, we will not be using PyTorch's convenience RNN implementation, but write the main portion by hand ourselves. We will later use the convenience functions for the much more complicated LSTMs. 

Note that we could in principle do the same thing in pure Numpy but the advantage of implementing the forward logic in PyTorch is that we can use the automatic differentation for our backward pass and we do not need to implement the backpropagation through time ourselves. 

What we will learn here is:
1. How to write a recurrent neural network (the forward pass)
2. How to implement custom mathematical equations in the forward pass of a PyTorch model and leverage the automatic backward. 

In [0]:
class RNN(nn.Module):
    def __init__(self, chars, device, hidden_sz, drop_prob=0.5):
        super().__init__()
        
        self.device = device
        
        # creating character dictionaries
        # we already have this code on the top, but giving it to our model 
        # will be convenient for doing predictions later
        # i.e. doing conversions from text to integers to one-hot & vice-versa
        self.n_chars = len(chars)
        self.int2char = dict(enumerate(chars))
        self.char2int = {ch: ii for ii, ch in self.int2char.items()}
        
        self.hidden_sz = hidden_sz
        
        # Note that this class inherits from the torch neural network class
        # Instead of using a pre-built function we will write the math ourselves
        # For this reason we will first need to define "Parameters()", that 
        # the PyTorch graph keeps track of and can optimize. In other words,
        # let's give our class the weights & the bias that the RNN will need. 
        
        # Set the parameters correctly:
        self.weight_ih = Parameter(torch.zeros(..., ...))
        self.weight_hh = Parameter(torch.zeros(..., ...))
        self.bias_hh = Parameter(torch.zeros(...))
        
        # Now that we have defined the RNN cell, let us define the output layer
        # We will use a dropout layer to prevent overfitting and then 
        # follow with a conventional linear layer (matrix multiplication) that 
        # maps the RNN cell's output (the hidden state of the network) to the 
        # class output. Remembert that the class output corresponds to a 
        # vector of length of unique characters. 
        
        # define a dropout layer
        self.dropout = nn.Dropout(drop_prob)
        
        # define the final, fully-connected output layer. We can use a 
        # PyTorch nn function here (or you could add the corresponding math
        # below and assign an additional weight & bias at the top). 
        # We can see that we can create very custom models this way
        
        # Set the finaly layer correctly
        self.fc = nn.Linear(..., ...)
        
        # We have assigned the Parameters above, but we will need to also 
        # initialize them. Let's write a function for that and initialize
        # our weights and bias. 
        self.init_weights()

    def init_weights(self):
        nn.init.xavier_uniform_(self.weight_ih)
        nn.init.xavier_uniform_(self.weight_hh)
        nn.init.zeros_(self.bias_hh)
    
    def forward(self, x, h_t):
        """Assumes x is of shape (batch, sequence, feature)"""
        bs, seq_sz, _ = x.size() # batch size, sequence size, feature size
        hidden_seq = []
        
        # Given an input and an initial hidden state, calculate the next hidden
        # state for each sequence element.
        # We append all the hidden states to a list (similar to a batch size)
        # so that we can concatenate them in the batch and feed them to our
        # last linear layer all in parallel to avoid looping through the final
        # output layer as there is no more dependence on other time steps. 
        
        # loop through the sequence
        for t in range(...):
            x_t = ... # slice the appropriate input element (sequence:index=1)
            h_t = ... # implement the RNN cell computation
            hidden_seq.append(h_t.unsqueeze(0)) 
            
        # Do the concatenation and reshaping for convenience
        hidden_seq = torch.cat(hidden_seq, dim=0)
        # reshape from shape (sequence, batch, feature) to (batch, sequence, feature)
        hidden_seq = hidden_seq.transpose(0, 1).contiguous()
        
        # Stack up the RNN outputs using view so that we can process the last 
        # layer in parallel
        r_output = hidden_seq.contiguous().view(-1, self.hidden_sz)
        
        # pass through a dropout layer
        out = ... 
        
        # Calculate fully connected layer output that yields our class vector
        out = ...
        
        return out, h_t
    
    def init_hidden(self, batch_size=1):
        ''' Initializes hidden state '''
        # This is a convenience function so that we can initialize a hidden
        # state to zero when we start prediction on a sequence. Every further
        # step will then depend on the previous hidden state. 
        
        # Create two new tensors with sizes batch_size x n_hidden,
        # initialized to zero for hidden the RNN's hidden state.
        weight = next(self.parameters()).data
        h_t = weight.new(batch_size, self.hidden_sz).zero_().to(device)
        
        return h_t


The only thing missing is our training loop. It will look very similar to everything we have previously written, with two main differences:

1. Our model is now also dependent on the hidden state and thus takes it as input and returns it as an additional output. 
2. Because we are using a recurrent neural network we will need to give our "loss.backward()" a "retain_graph=True" flag in order for it to log the history and be able to compute the backpropagation through time

In [0]:
# Declaring the train method
def train(model, data, device, optimizer, criterion, epochs=10, batch_size=10,
          seq_length=50, clip=5):
    model.train()
    
    for epoch in range(epochs):
        # initialize first hidden states with zeros
        h = model.init_hidden(batch_size)
        
        for x, y in get_batches(data, batch_size, seq_length):
            # One-hot encode our data, make them torch tensors & cast to device
            
            x = one_hot_encode(x, model.n_chars)
            inputs, targets = torch.from_numpy(x), torch.from_numpy(y)
            inputs, targets = inputs.to(device), targets.to(device)

            # zero accumulated gradients
            model.zero_grad()
            
            # get the output and hidden state from the model
            output, h = ...
            
            # calculate the loss and perform backprop
            # because we have flattened our batch and sequence in the model to 
            # be able to speed up the connection of the last fully-connected 
            # layer we now also need to view/flatten our target here
            loss = criterion(output, targets.view(batch_size*seq_length).long())
            loss.backward(retain_graph=True)
            
            # we use an additional trick of clipping gradients to avoid 
            # exploding gradients, which is a prominent problem in RNNs, just
            # as the opposite problem of vanishing gradients.
            nn.utils.clip_grad_norm_(model.parameters(), clip)
            optimizer.step()
            
            
        print("Epoch: {}/{}:".format(epoch + 1, epochs),
              "Loss: {:.4f}:".format(loss.item()))

In [0]:
# Define the model
n_hidden=512
model = ...

# Hyperparameters
batch_size = 128
seq_length = 100
epochs = 300 # start with 50 or similar if you are debugging 
# train much longer if you want good results

optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

# train the model
train(model, encoded, device, optimizer, criterion, epochs=epochs,
      batch_size=batch_size, seq_length=seq_length)

You should observe the loss sinking consistently. In fact you can observe that the model training hasn't fully converged yet. If you feel like you want to spend the time later to see how 
well you can get your RNN to perform, try training it for longer/until convergence. 

Once we have trained the model it will be interesting to use it for prediction. To generate new content we would like to feed in an initial sequence or even just a single character and see what the model generates for the rest of the sequence conditioned on our input. 

Let us write the logic for that:

In [0]:
def predict(model, char, device, h=None, top_k=5):
        ''' Given a character & hidden state, predict the next character.
            Returns the predicted character and the hidden state.
        '''
        
        # tensor inputs
        x = np.array([[model.char2int[char]]])
        x = one_hot_encode(x, model.n_chars)
        inputs = torch.from_numpy(x).to(device)
        
        with torch.no_grad():
            # get the output of the model
            out, h = ...

            # get the character probabilities
            # move to cpu for further processing with numpy etc. 
            p = F.softmax(out, dim=1).data.cpu()

            # get the top characters with highest likelihood
            p, top_ch = p.topk(top_k)
            top_ch = top_ch.numpy().squeeze()

            # select the likely next character with some element of randomness
            # for more variability
            p = p.numpy().squeeze()
            char = np.random.choice(top_ch, p=p/p.sum())
        
        # return the encoded value of the predicted char and the hidden state
        return model.int2char[char], h

In [0]:
def sample(model, size, device, prime='A', top_k=None):
    # method to generate new text based on a "prime"/initial sequence. 
    # Basically, the outer loop convenience function that calls the above
    # defined predict method. 
    model.eval() # eval mode
    
    # Calculate model for the initial prime characters
    chars = [ch for ch in prime]
    with torch.no_grad():
        # initialize hidden with 0 in the beginning. Set our batch size to 1 
        # as we wish to generate one sequence only. 
        h = ...
        for ch in prime:
            char, h = predict(...) # use the predict function

        # append the characters to the sequence
        chars.append(char)

        # Now pass in the previous/last character and get a new one
        # Repeat this process for the desired length of the sequence to be 
        # generated
        for ii in range(size):
            char, h = predict(...) # use the predict function
            chars.append(char)

    return ''.join(chars)

### Generating poems

We are now set to call our sample method with our trained model, a prime sequence and a desired sequence length to be generated. 

In [0]:
print(sample(model, 1000, device, prime='A', top_k=5))

We can see that our RNN typically starts out correctly and sometimes is able to generate correct words but quickly goes on to generate junk as there is no long term dependencies. 

We will now implement a PyTorch LSTM to see how to improve upon this.

## Long Short Term Memory (LSTM)

Let's take our recurrent neural network class that we have defined above and replace the simple RNN cell with one or even multiple stacked LSTM cells. 

If you want to go for the challenge you can try implementing this by hand similarly to the RNN cell we have defined. However, if you don't want to go through the tour-de-force exercise, you can go on ahead and use PyTorch's "nn.LSTM()" convenience method: https://pytorch.org/docs/stable/nn.html#torch.nn.LSTM 

You can try using a stack of 2 LSTM hidden layers to simply replace the RNN cell.

In [0]:
class LSTM(nn.Module):
    
    def __init__(self, chars, device, n_hidden=256, n_layers=2, drop_prob=0.5):
        super().__init__()
        
        self.device = device
        
        self.drop_prob = drop_prob
        self.n_layers = n_layers
        self.n_hidden = n_hidden
        
        # creating character dictionaries
        # we already have this code on the top, but giving it to our model 
        # will be convenient for doing predictions later
        # i.e. doing conversions from text to integers to one-hot & vice-versa
        self.n_chars = len(chars)
        self.int2char = dict(enumerate(chars))
        self.char2int = {ch: ii for ii, ch in self.int2char.items()}
        
        # define the LSTM
        # we no longer need to care about wieght initialization as PyTorch
        # will handle this for us now. 
        # When defining PyTorch's nn.LSTM() set "batch_first=True" to assign
        # the batch size to the first dimension (instead of the sequence) to
        # stay consistent with our RNN implementation and re-use our code. 
        self.lstm = nn.LSTM(...)
        
        # define a dropout layer
        self.dropout = nn.Dropout(drop_prob)
        
        # define the final, fully-connected output layer
        self.fc = nn.Linear(..., ...)
        
    def forward(self, x, hidden):
        ''' Forward pass through the network. 
            The inputs are x, and the hidden & cell state in a tuple. '''
        
        # get the outputs and the new hidden states from the LSTM. 
        # Note that the hidden variable now is a tuple of hidden and cell state
        # in contrast to the RNN that just had the hidden state. 
        # Because we are using the PyTorch LSTM we do not need to implement
        # the loop anymore as the sequence will be handled internally. 
        r_output, hidden = self.lstm(x, hidden)
        
        # pass through a dropout layer
        out = self.dropout(r_output)
        
        # Stack up the LSTM outputs using view so that we can process the last 
        # layer in parallel
        out = out.contiguous().view(-1, self.n_hidden)
        
        # Calculate fully connected layer output that yields our class vector
        out = self.fc(out)
        
        return out, hidden
    
    def init_hidden(self, batch_size=1):
        ''' Initializes hidden state '''
        # This is a convenience function so that we can initialize the hidden
        # states to zero when we start prediction on a sequence. Every further
        # step will then depend on the previous hidden states (c and h). 
        
        # Create a tuple of two new tensors with sizes
        # n_layers x batch_size x n_hidden, initialized to zero for the
        # LSTM hidden and cell states. 
        weight = next(self.parameters()).data

        hidden = (weight.new(self.n_layers, batch_size, self.n_hidden).zero_().to(device),
                  weight.new(self.n_layers, batch_size, self.n_hidden).zero_().to(device))
        
        return hidden

We can use the exact same code to train our newly defined LSTM model. Let's try with the same amount of hidden units and 2 LSTM cells. 

In [0]:
# Define the model
n_hidden=512
n_layers=2

model = LSTM(chars, device, n_hidden, n_layers).to(device)

# Declaring the hyperparameters
batch_size = 128
seq_length = 100
epochs = 300 # start with 50 or similar if you are debugging 
# train much longer if you want good results

optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

# train the model
train(model, encoded, device, optimizer, criterion, epochs=epochs,
      batch_size=batch_size, seq_length=seq_length)

We can observe that our model is able to achieve a much lower loss than before with our simple RNN implementation. This should now also be reflected when we generate/sample new sonnets. 

Again, you can observe that the loss still continues to improve, even after 300 epochs. For the best results, try training the model longer. 

In [0]:
# Generating new text
print(sample(model, 1000, device, prime='A', top_k=5))

Arguably there is still discrepancy to the original Shakespeare texts in our just generated examples. However, if we compare this output to our RNN's output, we can see that the LSTM is able to achieve much more consistency in generating proper words as well as sometimes portions of sentences that have improved in terms of grammar. There now seems to be more overall structure. 