# IS319 - Deep Learning

## TP3 - Recurrent neural networks

Credits: Andrej Karpathy

The goal of this TP is to experiment with recurrent neural networks for a character-level language model to generate text that looks like training text data.

In [14]:
import torch
import torch.nn as nn
import torch.optim as optimizer
import numpy as np
import torch.nn.functional as F
import torch.distributions as distributions
import matplotlib.pyplot as plt
device = (
    "cuda"
    if torch.cuda.is_available()
    # else "mps"
    # if torch.backends.mps.is_available() # For macOS
    else "cpu"
)
print(f'Using {device}')

Using cpu


## 1. Text data preprocessing

Several text datasets are provided, feel free to experiment with different ones throughout the TP. At the beginning, use a small subset of a given dataset (for example use only 10k characters).

In [40]:
dir_datasets = "./"
# text_data_fname = 'baudelaire.txt'  # ~0.1m characters (French)
# text_data_fname = 'proust.txt'      # ~7.3m characters (French)
text_data_fname = 'shakespeare.txt' # ~0.1m characters (English)
# text_data_fname = 'lotr.txt'        # ~2.5m characters (English)
# text_data_fname = 'doom.c'          # ~1m characters (C Code)
# text_data_fname = 'linux.c'         # ~11.5m characters (C code)

text_data = open(dir_datasets+text_data_fname, 'r').read()
text_data = text_data # use a small subset
print(f'Dataset `{text_data_fname}` contains {len(text_data)} characters.')
print('Excerpt of the dataset:')
print(text_data[:2000])

Dataset `shakespeare.txt` contains 95665 characters.
Excerpt of the dataset:
    SONNETS



TO THE ONLY BEGETTER OF
THESE INSUING SONNETS
MR. W. H. ALL HAPPINESS
AND THAT ETERNITY
PROMISED BY
OUR EVER-LIVING POET WISHETH
THE WELL-WISHING
ADVENTURER IN
SETTING FORTH
T. T.


I.

FROM fairest creatures we desire increase,
That thereby beauty's rose might never die,
But as the riper should by time decease,
His tender heir might bear his memory:
But thou, contracted to thine own bright eyes,
Feed'st thy light'st flame with self-substantial fuel,
Making a famine where abundance lies,
Thyself thy foe, to thy sweet self too cruel.
Thou that art now the world's fresh ornament
And only herald to the gaudy spring,
Within thine own bud buriest thy content
And, tender churl, makest waste in niggarding.
  Pity the world, or else this glutton be,
  To eat the world's due, by the grave and thee.

II.

When forty winters shall beseige thy brow,
And dig deep trenches in thy beauty's field,
Thy youth's p

**(Question)** Create a character-level vocabulary for your text data. Create two dictionaries: `ctoi` mapping each character to an index, and the reverse `itoc` mapping each index to its corresponding character. Implement the functions to convert text to tensor and tensor to text using these mappings. Apply these functions to some text data.

In [41]:
# Create the vocabulary and the two mapping dictionaries
def create_vocab(text):
    vocab, ctoi, itoc = [], {}, {}
    for character in text :
        if character not in vocab :
            vocab += [character]
            ctoi[character] = vocab.index(character)
            itoc[vocab.index(character)] = character
    return vocab, ctoi, itoc


# Implement the function converting text to tensor
def text_to_tensor(text, ctoi):
    arr = np.zeros((len(text)))
    for idx, char in enumerate(text):
        arr[idx] = ctoi[char]
    return torch.tensor(arr, dtype=torch.long)



# Implement the function converting tensor to text
def tensor_to_text(tensor, itoc):
    arr = tensor.cpu().detach().numpy()
    text = ""
    for elm in arr :
        text+=itoc[elm]
    return text

# Apply your functions to some text data

vocab, ctoi, itoc = create_vocab(text_data)

print(vocab)
print(ctoi)
print(itoc)

example_tensor = text_to_tensor(text_data, ctoi)
print(example_tensor)

example_text = tensor_to_text(example_tensor, itoc)
# verify integrity
assert example_text == text_data

[' ', 'S', 'O', 'N', 'E', 'T', '\n', 'H', 'L', 'Y', 'B', 'G', 'R', 'F', 'I', 'U', 'M', '.', 'W', 'A', 'P', 'D', 'V', '-', 'f', 'a', 'i', 'r', 'e', 's', 't', 'c', 'u', 'w', 'd', 'n', ',', 'h', 'b', 'y', "'", 'o', 'm', 'g', 'v', 'p', 'l', ':', 'k', 'z', 'x', '!', ';', '?', 'C', 'q', 'j', 'X', 'K', 'J', '[', ']']
{' ': 0, 'S': 1, 'O': 2, 'N': 3, 'E': 4, 'T': 5, '\n': 6, 'H': 7, 'L': 8, 'Y': 9, 'B': 10, 'G': 11, 'R': 12, 'F': 13, 'I': 14, 'U': 15, 'M': 16, '.': 17, 'W': 18, 'A': 19, 'P': 20, 'D': 21, 'V': 22, '-': 23, 'f': 24, 'a': 25, 'i': 26, 'r': 27, 'e': 28, 's': 29, 't': 30, 'c': 31, 'u': 32, 'w': 33, 'd': 34, 'n': 35, ',': 36, 'h': 37, 'b': 38, 'y': 39, "'": 40, 'o': 41, 'm': 42, 'g': 43, 'v': 44, 'p': 45, 'l': 46, ':': 47, 'k': 48, 'z': 49, 'x': 50, '!': 51, ';': 52, '?': 53, 'C': 54, 'q': 55, 'j': 56, 'X': 57, 'K': 58, 'J': 59, '[': 60, ']': 61}
{0: ' ', 1: 'S', 2: 'O', 3: 'N', 4: 'E', 5: 'T', 6: '\n', 7: 'H', 8: 'L', 9: 'Y', 10: 'B', 11: 'G', 12: 'R', 13: 'F', 14: 'I', 15: 'U', 16

## 2. Setup a character-level recurrent neural network

**(Question)** Setup a simple embedding layer with `nn.Embedding` to project character indices to `embedding_dim` dimensional vectors. Explain precisely how this layer works and what are its outputs for a given input sequence.

In [42]:
# n_vocab : the total number of unique indices that the embedding layer can handle.
# n_dim : the size of the vector space in which the indices will be embedded.
n_vocab, n_dim = len(vocab), 16

# initiate the Embedding layer
emb_layer = nn.Embedding(n_vocab, embedding_dim=n_dim)

# given the example tensor generate an embedding of each index (indirectly character) of the text
emb_data = emb_layer(example_tensor)

print(emb_data)
print(emb_data.shape)

tensor([[ 0.2131, -1.1736,  0.0767,  ..., -0.3563, -0.6911, -0.5352],
        [ 0.2131, -1.1736,  0.0767,  ..., -0.3563, -0.6911, -0.5352],
        [ 0.2131, -1.1736,  0.0767,  ..., -0.3563, -0.6911, -0.5352],
        ...,
        [ 1.5456,  0.2271, -2.0628,  ...,  0.2706,  0.0348,  0.8645],
        [ 0.2865,  0.3737, -0.7090,  ..., -0.5581, -1.3705,  0.4577],
        [ 1.3353,  0.6194, -0.1677,  ...,  0.6436,  1.5125, -0.0368]],
       grad_fn=<EmbeddingBackward0>)
torch.Size([95665, 16])


**(Question)** Setup a single-layer RNN with `nn.RNN` (without defining a custom class). Use `hidden_dim` size for hidden states. Explain precisely the outputs of this layer for a given input sequence.

In [43]:
input_size, hidden_size = 16, 16
rnn_layer = nn.RNN(input_size, hidden_size)

# Initialize the hidden state
hidden_state = torch.zeros(1, hidden_size)

# run the embedded data through the RNN
output_sequence, final_hidden_state = rnn_layer(emb_data, hidden_state)

print(output_sequence.shape, output_sequence)
print(final_hidden_state)

torch.Size([95665, 16]) tensor([[ 0.8377, -0.2862,  0.4819,  ..., -0.6358,  0.7918, -0.0943],
        [ 0.8505, -0.1561,  0.5117,  ..., -0.2866,  0.5960, -0.1933],
        [ 0.8643, -0.2143,  0.4815,  ..., -0.3118,  0.5495, -0.2887],
        ...,
        [ 0.3187, -0.2441,  0.3486,  ...,  0.6295,  0.5231,  0.5456],
        [-0.5798, -0.3524, -0.6550,  ...,  0.4541,  0.8113, -0.5964],
        [ 0.5324, -0.0520, -0.5687,  ...,  0.4035,  0.1437, -0.4652]],
       grad_fn=<SqueezeBackward1>)
tensor([[ 0.5324, -0.0520, -0.5687,  0.6769, -0.5412, -0.4323,  0.0864,  0.2950,
          0.7322,  0.8644,  0.3394,  0.5966,  0.8700,  0.4035,  0.1437, -0.4652]],
       grad_fn=<SqueezeBackward1>)


## Answer :
The `output_sequence` represents the output of the RNN at each time step when it processes an input sequence. It is a tensor that contains the predicted value for each hidden state at each time step in the sequence. And the `final_hidden_state` is, as shown, the final hidden state of the layer.
****

**(Question)** Implement a simple training loop to overfit on a small input sequence. The loss function should be a categorical cross entropy on the predicted characters. Monitor the loss function value over the iterations.

In [44]:
# Sample a small input sequence into tensor `input_seq` and store its corresponding expected sequence into tensor `target_seq`
input_seq = torch.arange(0, 40).long()
target_seq = (input_seq+1).clone() 

# Implement a training loop overfitting an input sequence and monitoring the loss function
def train_overfit(model, input_seq, target_seq, optim, loss_function, n_iters=200, learning_rate=0.002, check_iter=50):

    for i in range(n_iters):
        # Forward pass
        output, hidden = model(input_seq.unsqueeze(0))  # Add batch dimension
        loss = loss_function(output.squeeze(0), target_seq)

        # Backward pass and optimization
        optim.zero_grad()
        loss.backward()
        optim.step()

        if (i + 1) % check_iter == 0:
            print(f"Iteration {i + 1}/{n_iters}, Loss: {loss.item()}")

# Initialize a model and make it overfit the input sequence
class simpleRNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(simpleRNN, self).__init__()
        self.emb = nn.Embedding(input_size, hidden_size)
        self.rnn = nn.RNN(hidden_size, hidden_size, batch_first=True)
        self.lin = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        x = self.emb(x)
        output, hidden = self.rnn(x)
        output = self.lin(output)
        return output, hidden

# Initialize the model, loss function, and optimizer (Adam)
input_size, hidden_size, output_size, learning_rate = 200, 16, 200, 0.002
model = simpleRNN(input_size, hidden_size, output_size)
optim = optimizer.Adam(model.parameters(), lr=learning_rate)
loss_function = nn.CrossEntropyLoss()

# train the model
train_overfit(model, input_seq, target_seq, optim, loss_function, n_iters=900)

Iteration 50/900, Loss: 3.5179476737976074
Iteration 100/900, Loss: 1.9047702550888062
Iteration 150/900, Loss: 0.8824202418327332
Iteration 200/900, Loss: 0.4462552070617676
Iteration 250/900, Loss: 0.26752108335494995
Iteration 300/900, Loss: 0.18084551393985748
Iteration 350/900, Loss: 0.13251593708992004
Iteration 400/900, Loss: 0.10254301130771637
Iteration 450/900, Loss: 0.08233128488063812
Iteration 500/900, Loss: 0.0679052546620369
Iteration 550/900, Loss: 0.05718304589390755
Iteration 600/900, Loss: 0.0489538349211216
Iteration 650/900, Loss: 0.04247773066163063
Iteration 700/900, Loss: 0.03727369382977486
Iteration 750/900, Loss: 0.03301691263914108
Iteration 800/900, Loss: 0.029481202363967896
Iteration 850/900, Loss: 0.026505444198846817
Iteration 900/900, Loss: 0.023972200229763985


**(Question)** Implement a `predict_argmax` method for your `RNN` model. Then, verify your overfitting: use some characters of your input sequence as context to predict the remaining ones. Experiment with the current model and analyze the results.

In [45]:
class CharRNN(simpleRNN):
    def predict_argmax(self, context_tensor, n_predictions):
        predictions, hidden = [], None

        # Use the context tensor to apply the forward pass for the context tensor
        for char_index in context_tensor:
            output, hidden = self.forward(char_index.unsqueeze(0).unsqueeze(0))

        # Predict the next n_predictions indices
        for _ in range(n_predictions):
            # Forward pass with the last index and hidden state
            output, hidden = self.forward(context_tensor[-1].unsqueeze(0).unsqueeze(0))

            # Get the index of the predicted index using argmax
            predicted_index = output.squeeze(0).argmax().item()
            predictions.append(predicted_index)

            # Update the context tensor with the new prediction
            context_tensor = torch.cat((context_tensor, torch.tensor([predicted_index])))

        return predictions

# Initialize a model and make it overfit as above
# Then verify your overfitting by predicting characters given some context
model = CharRNN(input_size, hidden_size, output_size)
optim = optimizer.Adam(model.parameters(), lr=learning_rate)
loss_function = nn.CrossEntropyLoss()

train_overfit(model, input_seq, target_seq, optim, loss_function, n_iters=900)

Iteration 50/900, Loss: 3.439159393310547
Iteration 100/900, Loss: 1.9932798147201538
Iteration 150/900, Loss: 1.0686637163162231
Iteration 200/900, Loss: 0.5870904922485352
Iteration 250/900, Loss: 0.35546037554740906
Iteration 300/900, Loss: 0.23519167304039001
Iteration 350/900, Loss: 0.16645558178424835
Iteration 400/900, Loss: 0.12433061748743057
Iteration 450/900, Loss: 0.09685086458921432
Iteration 500/900, Loss: 0.07793130725622177
Iteration 550/900, Loss: 0.0642877146601677
Iteration 600/900, Loss: 0.05410720780491829
Iteration 650/900, Loss: 0.04632621258497238
Iteration 700/900, Loss: 0.0402187816798687
Iteration 750/900, Loss: 0.03532419353723526
Iteration 800/900, Loss: 0.031332828104496
Iteration 850/900, Loss: 0.028027767315506935
Iteration 900/900, Loss: 0.025253888219594955


In [46]:
# predict 3 indices
print(model.predict_argmax(input_seq[:35], n_predictions=3))

[35, 36, 37]


Using the argmax function to predict the next character can yield a deterministic generator always predicting the same characters. Instead, it is common to predict the next character by sampling from the distribution of output predictions, adding some randomness into the generator.

**(Question)** Implement a `predict_proba` method for your `RNN` model. It should be very similar to `predict_argmax`, but instead of using argmax, it should randomly sample from the output predictions. To do that, you can use the `torch.distribution.Categorical` class and its `sample()` method. Verify that your method correctly added some randomness.

In [47]:
class CharRNN(CharRNN):
    def predict_proba(self, context_tensor, n_predictions):
        predictions, hidden = [], None

        # Use the context tensor to apply the forward pass for the context tensor
        for char_index in context_tensor:
            output, hidden = self.forward(char_index.unsqueeze(0).unsqueeze(0))

        # Predict the next n_predictions characters by sampling from the distribution
        for _ in range(n_predictions):
            # Forward pass with the last character and hidden state
            output, hidden = self.forward(context_tensor[-1].unsqueeze(0).unsqueeze(0))

            # Use Categorical distribution to sample from the predicted probabilities
            categorical_dist = distributions.Categorical(logits=output.squeeze(0))
            predicted_index = categorical_dist.sample().item()
            predictions.append(predicted_index)

            # Update the context tensor with the new prediction
            context_tensor = torch.cat((context_tensor, torch.tensor([predicted_index])))

        return predictions

# Verify that your predictions are not deterministic anymore
model = CharRNN(input_size, hidden_size, output_size)
optim = optimizer.Adam(model.parameters(), lr=learning_rate)
loss_function = nn.CrossEntropyLoss()

train_overfit(model, input_seq, target_seq, optim, loss_function, n_iters=900)

Iteration 50/900, Loss: 3.517578125
Iteration 100/900, Loss: 1.9130096435546875
Iteration 150/900, Loss: 0.9699044227600098
Iteration 200/900, Loss: 0.5434590578079224
Iteration 250/900, Loss: 0.3417798578739166
Iteration 300/900, Loss: 0.23311099410057068
Iteration 350/900, Loss: 0.1664937287569046
Iteration 400/900, Loss: 0.12370067834854126
Iteration 450/900, Loss: 0.09567565470933914
Iteration 500/900, Loss: 0.07620358467102051
Iteration 550/900, Loss: 0.062127985060214996
Iteration 600/900, Loss: 0.05183090642094612
Iteration 650/900, Loss: 0.044098202139139175
Iteration 700/900, Loss: 0.038113728165626526
Iteration 750/900, Loss: 0.03336089104413986
Iteration 800/900, Loss: 0.029505740851163864
Iteration 850/900, Loss: 0.026323989033699036
Iteration 900/900, Loss: 0.023659957572817802


In [48]:
# predict 3 indices
print(model.predict_proba(input_seq[:30], n_predictions=3))

[30, 31, 32]


## 3. Train the RNN model on text data

**(Question)** Adapt your previous code to implement a proper training loop for a text dataset. To do so, we need to specify a sequence length `seq_len`, acting similarly to the batch size in classic neural networks. Then, you can either randomly sample sequences of length `seq_len` from the text dataset over `n_iters` iterations, or properly loop over the text dataset for `n_epochs` epochs (with a random starting point for each epoch to ensure different sequences), to make sure the whole dataset is seen by the model. Feel free to adjust training and model parameters empirically. Start with a small model and a small subset of the text dataset, then move on to larger experiments. Remember to use GPU if available.

In [52]:
# Create the text dataset, compute its mappings and convert it to tensor
vocab, ctoi, itoc = create_vocab(text_data)
data_tensor = text_to_tensor(text_data, ctoi)
seq_len = 20
# Initialize training parameters
input_size, hidden_size, output_size, learning_rate = len(vocab), 64, len(vocab), 0.005

# Initialize a character-level RNN model
model = CharRNN(input_size, hidden_size, output_size).to(device)
optim = optimizer.Adam(model.parameters(), lr=learning_rate)
loss_function = nn.CrossEntropyLoss()

# Setup the training loop
# Regularly record the loss and sample from the model to monitor what is happening
# YOUR CODE HERE
def train_loop(model, data_tensor, seq_len, n_epochs, optim, loss_function, device=device):
    model.train()
    
    for epoch in range(n_epochs):
        # Randomly choose a starting point for each epoch
        start_index = np.random.randint(0, data_tensor.size(0) - seq_len - 1)
        
        for i in range(start_index, data_tensor.size(0) - seq_len, seq_len):
            input_seq = data_tensor[i:i+seq_len].unsqueeze(0).to(device)
            target_seq = data_tensor[i+1:i+seq_len+1].to(device)

            optim.zero_grad()
            output, hidden = model(input_seq)

            loss = loss_function(output.squeeze(0), target_seq)
            loss.backward()
            optim.step()

        if (epoch + 1) % 10 == 0:
            print(f"Epoch {epoch + 1}/{n_epochs}, Loss: {loss.item()}")

n_epochs = 500
train_loop(model, data_tensor, seq_len, n_epochs, optim, loss_function)


Epoch 10/500, Loss: 2.170841693878174
Epoch 20/500, Loss: 2.259021520614624
Epoch 30/500, Loss: 1.9983110427856445
Epoch 40/500, Loss: 2.1692848205566406
Epoch 50/500, Loss: 2.2065696716308594
Epoch 60/500, Loss: 2.2631304264068604
Epoch 70/500, Loss: 2.474623918533325
Epoch 80/500, Loss: 2.31829833984375
Epoch 90/500, Loss: 2.199216365814209
Epoch 100/500, Loss: 2.444514751434326
Epoch 110/500, Loss: 2.188258171081543
Epoch 120/500, Loss: 2.559018850326538
Epoch 130/500, Loss: 2.0517821311950684
Epoch 140/500, Loss: 2.432243824005127
Epoch 150/500, Loss: 2.6098146438598633
Epoch 160/500, Loss: 2.2503933906555176
Epoch 170/500, Loss: 2.2001070976257324
Epoch 180/500, Loss: 2.5504813194274902
Epoch 190/500, Loss: 2.6394217014312744
Epoch 200/500, Loss: 2.303760051727295
Epoch 210/500, Loss: 2.5553977489471436
Epoch 220/500, Loss: 2.3157849311828613
Epoch 230/500, Loss: 2.4115264415740967
Epoch 240/500, Loss: 2.5328922271728516
Epoch 250/500, Loss: 2.238132953643799
Epoch 260/500, Loss: 

**(Question)** From your trained model, play around with its predictions: start with a custom input sequence and ask the model to predict the rest. Analyze and comment your results.

In [53]:
start_text = "to the only begetter of these insuing"
model.eval()
generated_text = start_text
n_chars = 10

with torch.no_grad():
    # hidden_state = torch.zeros(1, 1, hidden_size).to(device)
    input_seq = text_to_tensor(start_text, ctoi).to(device)
    predicted_indices = model.predict_proba(input_seq, n_predictions=n_chars)
    
    for idx in predicted_indices :
        if idx < len(vocab) :
            generated_text += itoc[idx]
        else : 
            print(idx)



In [54]:
print(generated_text)

to the only begetter of these insuinghino qutwa


## Answer :
****

## 4. Experiment with different RNN architectures

**(Question)** Experiment with different RNN architecures. Potential ideas are multi-layer RNNs, GRUs and LSTMs. All models can be extended to multi-layer using the `num_layers` parameter. Analyze and comment your results.

In [None]:
# YOUR CODE HERE
raise NotImplementedError()

## Answer :
****