<img src="https://s3.amazonaws.com/weclouddata/images/logos/wcd_logo_new_2.png" width="10%">
<h1><center>RNN Exercise</center></h1>

In this exercise, we will build a Recurrent Neural Network (RNN) using PyTorch to generate text in the style of Shakespeare.

## Import Libraries

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import requests

## Prepare the Dataset

In [2]:
texts = [
    "In the burgeoning cityscape, where skyscrapers pierce the clouds and the buzz of urban life is never-ending, the stories of a million souls intertwine, each one a thread in the tapestry of a metropolis.",
    "Beneath the canopy of the ancient forest, where light filters through leaves and the air is alive with the whispers of nature, the old secrets of the earth are guarded by the timeless sentinels of the woods.",
    "On the tempestuous seas, where the waves are mountains and the wind roars with the fury of the gods, mariners navigate by starlight, their hearts as boundless as the ocean they traverse.",
    "In the silent expanse of the desert, where sand dunes rise like waves in a frozen sea and the sun reigns unchallenged, the beauty of the barren is a testament to the extremes of our world.",
    "Above the peaks of the highest mountains, where the air is thin and the edge of the sky seems a hand's breadth away, the horizon stretches into infinity, a reminder of the vastness of our planet."
]


In [3]:
# Create the vocabulary(char list), don't forget <eos>, <pad>, <bos>
# Create a mapping from unique characters to indices(char2int) and integers(index) to the characters (int2char)
# your code here
vocab = set(list(''.join(texts)) + ['<eos>', '<pad>', '<bos>'])
print(list(''.join(texts)) + ['<eos>', '<pad>', '<bos>'])
print(f' unique characters: \n {vocab}')
print('-'*140)
int2char =dict(enumerate(vocab))
print(f'\n dictionary int2char: \n {int2char}')
print('-'*140)
char2int = {char: index for index, char in int2char.items()}
print(f'\n dictionary char2int: \n {char2int}')
print('-'*140)

['I', 'n', ' ', 't', 'h', 'e', ' ', 'b', 'u', 'r', 'g', 'e', 'o', 'n', 'i', 'n', 'g', ' ', 'c', 'i', 't', 'y', 's', 'c', 'a', 'p', 'e', ',', ' ', 'w', 'h', 'e', 'r', 'e', ' ', 's', 'k', 'y', 's', 'c', 'r', 'a', 'p', 'e', 'r', 's', ' ', 'p', 'i', 'e', 'r', 'c', 'e', ' ', 't', 'h', 'e', ' ', 'c', 'l', 'o', 'u', 'd', 's', ' ', 'a', 'n', 'd', ' ', 't', 'h', 'e', ' ', 'b', 'u', 'z', 'z', ' ', 'o', 'f', ' ', 'u', 'r', 'b', 'a', 'n', ' ', 'l', 'i', 'f', 'e', ' ', 'i', 's', ' ', 'n', 'e', 'v', 'e', 'r', '-', 'e', 'n', 'd', 'i', 'n', 'g', ',', ' ', 't', 'h', 'e', ' ', 's', 't', 'o', 'r', 'i', 'e', 's', ' ', 'o', 'f', ' ', 'a', ' ', 'm', 'i', 'l', 'l', 'i', 'o', 'n', ' ', 's', 'o', 'u', 'l', 's', ' ', 'i', 'n', 't', 'e', 'r', 't', 'w', 'i', 'n', 'e', ',', ' ', 'e', 'a', 'c', 'h', ' ', 'o', 'n', 'e', ' ', 'a', ' ', 't', 'h', 'r', 'e', 'a', 'd', ' ', 'i', 'n', ' ', 't', 'h', 'e', ' ', 't', 'a', 'p', 'e', 's', 't', 'r', 'y', ' ', 'o', 'f', ' ', 'a', ' ', 'm', 'e', 't', 'r', 'o', 'p', 'o', 'l', 'i',

In [4]:
maxlen = len(max(texts, key=len))
print("The longest string has {} characters".format(maxlen))

The longest string has 207 characters


## Create Input and Target Sequences

In [5]:
# Vocabulary size
vocab_size = len(char2int)

In [6]:
# Special token indices
bos_idx = char2int['<bos>']
eos_idx = char2int['<eos>']
pad_idx = char2int['<pad>']

In [7]:
# Creating lists that will hold our input and target sequences
# Note: In This Exercise, let's make train_X and train_Y are lists of **integer indices**, **not** one-hot encoded vectors.
train_X = []
train_Y = []

In [8]:
# Creating lists that will hold our input and target sequences
# Note: In This Exercise, let's make train_X and train_Y are lists of **integer indices**, **not** one-hot encoded vectors.
train_X = []
train_Y = []

# Iterate the texts and slice the texts into sequences of 'maxlen' characters
# finish the forloop below following the instructions
for text in texts:

    # Convert text to list of integer indices and prepend with <bos> and append <eos>
    # Your code here
    indexed_text = [bos_idx] + [char2int[char] for char in text] + [eos_idx]

    # Extract the target sequence (shifted one character to the right) and append <eos>
    # Your code here
    target_text = indexed_text[1:] + [eos_idx]

    # Check for shorter sequences and pad them
    # The length should be at least maxlen+2
    # Your code here
    padding_length = maxlen + 2 - len(indexed_text)
    if padding_length > 0:
        indexed_text += [char2int['<pad>']] * padding_length
        target_text += [char2int['<pad>']] * padding_length


    # Append the extracted sequences to the training lists
    # Your code here
    train_X.append(indexed_text)
    train_Y.append(target_text)

In [None]:
# Quick check
for x,y in zip(train_X[0], train_Y[0]):
    print(int2char[x], '->', int2char[y])

<bos> -> I
I -> n
n ->  
  -> t
t -> h
h -> e
e ->  
  -> b
b -> u
u -> r
r -> g
g -> e
e -> o
o -> n
n -> i
i -> n
n -> g
g ->  
  -> c
c -> i
i -> t
t -> y
y -> s
s -> c
c -> a
a -> p
p -> e
e -> ,
, ->  
  -> w
w -> h
h -> e
e -> r
r -> e
e ->  
  -> s
s -> k
k -> y
y -> s
s -> c
c -> r
r -> a
a -> p
p -> e
e -> r
r -> s
s ->  
  -> p
p -> i
i -> e
e -> r
r -> c
c -> e
e ->  
  -> t
t -> h
h -> e
e ->  
  -> c
c -> l
l -> o
o -> u
u -> d
d -> s
s ->  
  -> a
a -> n
n -> d
d ->  
  -> t
t -> h
h -> e
e ->  
  -> b
b -> u
u -> z
z -> z
z ->  
  -> o
o -> f
f ->  
  -> u
u -> r
r -> b
b -> a
a -> n
n ->  
  -> l
l -> i
i -> f
f -> e
e ->  
  -> i
i -> s
s ->  
  -> n
n -> e
e -> v
v -> e
e -> r
r -> -
- -> e
e -> n
n -> d
d -> i
i -> n
n -> g
g -> ,
, ->  
  -> t
t -> h
h -> e
e ->  
  -> s
s -> t
t -> o
o -> r
r -> i
i -> e
e -> s
s ->  
  -> o
o -> f
f ->  
  -> a
a ->  
  -> m
m -> i
i -> l
l -> l
l -> i
i -> o
o -> n
n ->  
  -> s
s -> o
o -> u
u -> l
l -> s
s ->  
  -> i
i -> n
n 

## Define the RNN Model

In [9]:
# Implement the RNN model follow the instructions and function doc
# Let's use the nn.RNN() here

class RNNModel(nn.Module):
    """
    A class for creating an RNN model which can process sequences of data by using an embedding layer followed by recurrent layers and a final fully connected layer to produce outputs corresponding to the vocabulary size.

    Attributes:
        embedding (nn.Embedding): An embedding layer that converts input indices into dense vectors of a specified size.
        rnn (nn.RNN): The RNN layers that sequentially process the data, maintaining a hidden state through the sequence.
        fc (nn.Linear): A linear layer that projects the RNN layer outputs to a space with dimensionality equal to the size of the vocabulary.

    """
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, n_layers, drop_prob=0):
        """
        Initialize the RNNModel with the given parameters.

        Args:
            vocab_size (int): The number of items in the vocabulary.
            embedding_dim (int): The size of the embedding vector.
            hidden_dim (int): The number of features in the hidden state of the RNN.
            output_dim (int): The size of the output vector (should be equal to the vocab size).
            n_layers (int): The number of stacked RNN layers.
            drop_prob (float): The dropout rate.
            """
        super(RNNModel, self).__init__()
        self.hidden_dim = hidden_dim
        self.n_layers = n_layers
        # Create an embedding layer (nn.Embedding) with vocab_size and embedding_dim
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        # Create an RNN layer (nn.RNN) with embedding_dim, hidden_dim, n_layers, and dropout, uses batch_first=True to accept input and output tensors with (batch, seq, feature).
        self.rnn = nn.RNN(embedding_dim, hidden_dim, n_layers, dropout=drop_prob, batch_first=True)
        # Create a fully connected layer (nn.Linear) with hidden_dim and output_dim
        self.fc = nn.Linear(hidden_dim, output_dim)


    def forward(self, x, hidden):
        """
        Forward pass of the RNN model.

        Args:
            x (Tensor): A batch of input sequences represented as token indices.
            hidden (Tensor): The initial hidden state of the RNN.

        Returns:
            Tensor, Tensor: The output logits for each sequence at each timestep, and the final hidden state.
        """
        # Apply the embedding layer on x
        embedded = self.embedding(x)
        # Pass the embedded input and hidden state through the RNN layer
        output, hidden = self.rnn(embedded, hidden)
        # Prepare the output from RNN for the fully connected layer. Hint: use contiguous().view()
        output = output.contiguous().view(-1, self.hidden_dim)
        # Apply the fully connected layer on the RNN output
        output = self.fc(output)
        return output, hidden


    def init_hidden(self, batch_size):
        """
        Initializes the hidden state to zero for the start of a new batch processing.

        Args:
            batch_size (int): The size of the batch for which to create the initial hidden state.

        Returns:
            Tensor: A new tensor of zeros for the initial hidden state of the RNN with the appropriate dimensions.
        """
        weight = next(self.parameters()).data
        hidden = weight.new(self.n_layers, batch_size, self.hidden_dim).zero_().to(device)
        return hidden

## Training the Model

In [10]:
# Check if GPU/CUDA is available and set the device accordingly
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

Using device: cpu


In [11]:
# TODO: Set the hyperparameters
# Define the size of the vocabulary (number of unique characters in the dataset)
# Define the dimension size for the embeddings
# Define the number of features in the hidden state of the RNN
# The output dimension is the size of the vocabulary
# Define the number of RNN layers
# Your Code Here
embedding_dim = 256
hidden_dim = 512
n_layers = 2
output_dim = vocab_size


# Initialize the RNN model using the hyperparameters
# move the model to device
# Your Code Here
model = RNNModel(vocab_size, embedding_dim, hidden_dim, output_dim, n_layers).to(device)


# Initialize the loss function as CrossEntropyLoss
# Initialize the optimizer as Adam, with learning rate of 0.001
# Your Code Here
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)


# Set the number of epochs for training
# Your Code Here
epochs = 100


# Start the training loop
# Iterate over each epoch
# In each epoch, you need to:
# 1.Initialize the hidden layer, the batch size is 1
# 2.Iterate over each sample in the dataset
# 3.Calculate and print the loss every 10 epoch
# And in each iteration, you need to:
# 1.Convert the current input and target sequences to tensors and add a batch dimension, move the tensor to device.
# Hint: you need to use unsqueeze() to deal with the dimension
# 2.Zero the gradients before running the forward pass.
# 3.Perform the forward pass through the model
# 4.Compute the loss between the outputs and the targets, you need to use view() to adjust the dimension
# 5.Perform the backward pass to compute the gradients
# 6.Update the parameters based on the gradients
# 7.Detach hidden state to prevent backpropagating through the entire history
# Your Code Here
for epoch in range(epochs):
    hidden = model.init_hidden(1)  # Batch size is 1

    for i in range(len(train_X)):
        # Convert input and target sequences to tensors and move to device
        inputs = torch.tensor(train_X[i], dtype=torch.long).unsqueeze(0).to(device)
        targets = torch.tensor(train_Y[i], dtype=torch.long).to(device)

        # Zero the gradients
        optimizer.zero_grad()

        # Forward pass
        output, hidden = model(inputs, hidden)

        # Compute the loss
        loss = criterion(output, targets)

        # Backward pass and optimize
        loss.backward()
        optimizer.step()

        # Detach hidden state
        hidden = hidden.detach()

    if (epoch + 1) % 10 == 0:
        print(f'Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}')

Epoch [10/100], Loss: 0.6117
Epoch [20/100], Loss: 0.0883
Epoch [30/100], Loss: 0.0190
Epoch [40/100], Loss: 0.0117
Epoch [50/100], Loss: 0.0091
Epoch [60/100], Loss: 0.0076
Epoch [70/100], Loss: 0.0066
Epoch [80/100], Loss: 0.0059
Epoch [90/100], Loss: 0.0054
Epoch [100/100], Loss: 0.0050


## Text Generation

In [12]:
# Write a generate text function, which can use the model to generate a string with the giving start_string

def generate_text(start_string, model, length=1000, device='cpu'):

    # Ensure the model is in evaluation mode
    model.eval()

    # Convert the start string to a list of integers (character indices).
    input_eval = [char2int[s] for s in start_string]

    # Convert this list to a PyTorch tensor and add a batch dimension.
    input_eval = torch.tensor(input_eval, device=device).unsqueeze(0)

    # Initialize the hidden state of the model.
    hidden = model.init_hidden(1).to(device)

    generated = start_string

    with torch.no_grad():
        for _ in range(length):

            # Pass the input and hidden state to the model to obtain the next output and new hidden state.
            output, hidden = model(input_eval, hidden)

            # Only take the last character's logits from the output to make predictions.
            last_char_logits = output[-1]

            # Convert the logits to probabilities for sampling.
            probabilities = torch.nn.functional.softmax(last_char_logits, dim=-1)

            # Randomly select the next character based on the probability distribution.
            predicted_id = torch.multinomial(probabilities, num_samples=1).item()

            # Append the predicted character to the generated text.
            generated += int2char[predicted_id]

            # Prepare the input for the next prediction step.
            input_eval = torch.tensor([[predicted_id]], device=device)

            # If the model predicts the end of sentence token, stop the generation.
            if int2char[predicted_id] == '<eos>':
                return generated

    return generated

# Generate and print the text
print(generate_text("On ", model, 300, device))

On the tempestuous seas, where the waves are mountains and the wind roars with the fury of the gods, mariners navigate by starlight, their hearts as boundless as the ocean they traverse.<eos>
