### Question 3. Text Generation with PyTorch Gated Recurrent Unit for the Shakespeare dataset

#### To be run on GPUs

In [1]:
import numpy as np
import pandas as pd
from collections import Counter
import time,math

import tensorflow as tf

#### 3.1 (1) PyTorch imports

In [2]:
# Your Code Here
import torch
from torch import nn, optim
from torch.utils.data import DataLoader, Dataset


torch.__version__

'1.6.0'

In [3]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device

device(type='cpu')

In [4]:
class Data (Dataset):
    def __init__(self,sequence_length):
        self.seq_length = sequence_length
        self.words = self.load_words()
        self.uniq_words = self.get_uniq_words()

        self.index_to_word = {index: word for index, word in enumerate(self.uniq_words)}
        self.word_to_index = {word: index for index, word in enumerate(self.uniq_words)}

        self.words_indexes = [self.word_to_index[w] for w in self.words]

    def load_words(self):
        path_to_file = tf.keras.utils.get_file('shakespeare.txt', 
            'https://storage.googleapis.com/download.tensorflow.org/data/shakespeare.txt')
        text = open(path_to_file, 'rb').read().decode()
        return text.replace("\n"," ").split()

    def get_uniq_words(self):
        word_counts = Counter(self.words)
        return sorted(word_counts, key=word_counts.get, reverse=True)

    def __len__(self):
        return len(self.words_indexes) - self.seq_length

    def __getitem__(self, index):
        return (
            torch.tensor(self.words_indexes[index:index+self.seq_length]),
            torch.tensor(self.words_indexes[index+1:index+self.seq_length+1]),
        )


In [5]:
sequence_length = 50

dataset = Data(sequence_length)


Downloading data from https://storage.googleapis.com/download.tensorflow.org/data/shakespeare.txt


#### 3.2 (2) Create the data loader with a mini-batch size of 256

In [7]:
# Your Code Here
batch = 256
dataset_loader = DataLoader(dataset = dataset, batch_size=batch, shuffle=True)


#### 3.3 (8) Create the GRU model class.

* The size of the embedding vector should be 256

* The number of layers in the GRU section should be 2

* Enable a dropout of 20% in the GRU section

* The output layer should be a linear layer

* Hint: GRUs do not have a cell state

In [19]:
# Your Code Here
class GRU_Model(nn.Module):
    def __init__(self, dataset):
        super(GRU_Model, self).__init__()
        self.gru_size = 256
        self.embedding_size = 256
        self.gru_layers = 2
        self.drop_rate = 0.2
        
        n_vocab = len(dataset.uniq_words)
        
        self.embedding = nn.Embedding(
            num_embeddings=n_vocab,
            embedding_dim=self.embedding_size,
        )
        self.GRU = nn.GRU(
            input_size=self.gru_size,
            hidden_size=self.gru_size,
            num_layers=self.gru_layers,
            dropout=self.drop_rate,
        )
        self.out = nn.Linear(self.gru_size, n_vocab)
        
    def forward(self, x, prev_state):
        embed = self.embedding(x)
        output, state = self.GRU(embed, prev_state)
        logits = self.out(output)
        
        return logits, state
    
    def init_state(self, seq_length):
        return torch.zeros(self.gru_layers, seq_length, self.gru_size)


#### 3.4  (2) Instantiate the model

In [20]:
# Your Code Here
model = GRU_Model(dataset)


#### 3.5 (2) Use a CrossEntropy loss and the Adam optimizer with a learning rate of 0.001

In [21]:
# Your Code Here
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)


#### 3.6 (5) Create the training loop

* After each epoch print the epoch number the perplexity and the loss

* Use a mini-batch size of 256

In [None]:
# Your Code Here
max_epochs = 20
model.train()

start=time.time()
for epoch in range(max_epochs):
    state_h = model.init_state(sequence_length)
    loss_sum, n = 0.0, 0
    for batch, (x,y) in enumerate(dataset_loader):
        x = x.to(device)
        y = y.to(device)
        
        optimizer.zero_grad()
        
        y_pred, state_h = model(x, (state_h.to(device)))
        loss = criterion(y_pred.transpose(1, 2), y)
        
        state_h = state_h.detach()
        
        loss.backward()
        optimizer.step()
        loss_sum += loss.item() * y.numel()
        n += y.numel()
    pp = np.round(math.exp(loss_sum / n))
    print(f"epoch {epoch+1} time {np.round(time.time()-start,2)} sec perplexity {pp} loss {loss.item()}")
    start = time.time()


#### 3.7 (5) Predict the test data

In [None]:


def predict(dataset, model, text, next_words=100):
    # Your Code Here
    words = text.split(' ')
    state_h = model.init_state(len(words))
    
    for i in range(next_words):
        x = torch.tensor([[dataset.word_to_index[w] for w in words[i:]]]).to(device)
        y_pred, state_h = model(x, (state_h.to(device)))
        
        last_word_logits = y_pred[0][-1]
        p = torch.nn.functional.softmax(last_word_logits, dim=0).cpu().detach().numpy()
        word_index = np.random.choice(len(last_word_logits), p=p)
        words.append(dataset.index_to_word[word_index])
    
    return words


In [None]:
words = predict(dataset, model, text='Romeo')

In [None]:
' '.join(words)