In [4]:
import numpy as np
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms

import functools
from pathlib import Path
from tqdm.notebook import tqdm

'tqdm' is a Python library that allows you to output a smart progress bar by wrapping any iterable. Also, shows the estimated time remaining for the iterable.
For import, -conda activate pytorchenv
            -conda install tqdm

# Download dataset

'BeautifulSoup' is a Python library for pulling data out of HTML and XML files. That makes it easy to scrape information from web pages.

In [2]:
import requests
from urllib.parse import urljoin
from bs4 import BeautifulSoup

base_url = "http://shakespeare.mit.edu/Poetry/"
base_dataset_dir = 'shakespeare_sonnets'

## Get page with sonnet list
res = requests.get(urljoin(base_url, "sonnets.html"))
assert res.status_code == 200

## Get all sonnet links
soup = BeautifulSoup(res.text)
all_links = [link.get('href') for link in soup.find_all('a')]
all_links = [link for link in all_links if link.startswith('sonnet')]

## Download each sonnet
for link in tqdm(all_links):
  # Get web page with the sonnet
  res = requests.get(urljoin(base_url, link))
  assert res.status_code == 200
  # Convert to proper text
  soup = BeautifulSoup(res.text)
  sonnet_text = soup.find('blockquote').get_text()
  # Save file
  sonnet_file = Path(base_dataset_dir) / link.replace('html', 'txt')
  sonnet_file.parent.mkdir(exist_ok=True, parents=True) # Create parent dir, if required
  with open(sonnet_file, 'w') as f:
    f.write(sonnet_text)

HBox(children=(HTML(value=''), FloatProgress(value=0.0, max=154.0), HTML(value='')))




Data processing:
Read from file -> Random Crop -> Character-level encoding
->
            One-hot-encoding -> To tensor -> Batch

# Dataset definition

In [9]:
class ShakespeareDataset(Dataset):
    
    def __init__(self, dataset_dir , transform = None):
        # Convert dataset_dir to a Path object
        dataset_dir = Path(dataset_dir)
        
        # Load sonnet from each text file in dataset_dir
        self.sonnet_list = []
        for sonnet_file in dataset_dir.iterdir():
            with open(sonnet_file,'r') as f:
                sonnet_text = f.read()
            self.sonnet_list.append(sonnet_text)
                
        # Save the transformation
        self.transform = transform
        
    def __len__(self):
        return len(self.sonnet_list)
    
    def __getitem__(self,idx):
        # Get sonnet text
        sample = self.sonnet_list[idx]
        
        # Transform if defined
        if self.transform:
            sample = self.transform(sample)
            
        return sample
    
    dataset_dir = 'shakespeare_sonnets'
    dataset = ShakespeareDataset(dataset_dir)
    
    
    index = 1
    print(f'SONNET AT INDEX {index}')
    print(dataset[index])

SONNET AT INDEX 1
O, how much more doth beauty beauteous seem
By that sweet ornament which truth doth give!
The rose looks fair, but fairer we it deem
For that sweet odour which doth in it live.
The canker-blooms have full as deep a dye
As the perfumed tincture of the roses,
Hang on such thorns and play as wantonly
When summer's breath their masked buds discloses:
But, for their virtue only is their show,
They live unwoo'd and unrespected fade,
Die to themselves. Sweet roses do not so;
Of their sweet deaths are sweetest odours made:
  And so of you, beauteous and lovely youth,
  When that shall fade, my verse distills your truth.



## Data transormation

#### Random Crop

Choose a random substring (encoded) of length "crop_len" from sample

In [14]:
class RandomCrop():
    
    def __init__(self,crop_len):
        self.crop_len = crop_len
    
    
    def __call__(self,sample):
        total_chars = len(sample)
        if total_chars <= self.crop_len: # do not crop if samplae is shorter than crop_len
            return sample
        # Randomly choose an index inside a valid range
        start_index = np.random.randint(0, total_chars - self.crop_len)
        end_index = start_index + self.crop_len
    
        # Crop the sample
        return sample[start_index: end_index]

random_crop = RandomCrop(crop_len=30)
cropped_sample = random_crop(dataset[0])
print("CROPPED SAMPLE")
print(cropped_sample)
    
    

CROPPED SAMPLE
must, each day say o'er the ve


#### Character-level encoder

The 'ord' and 'chr' functions simply convert a character to the corresponding ASCII code and return a list of the encoding.

In [19]:
class EncodeText():
    
    def __call__(self,text):
        encoded_text = [ord(t) for t in text]
        return encoded_text
    
    
encode_text = EncodeText()
encoded_text = encode_text("testt")
print(encoded_text)

[116, 101, 115, 116, 116]


In [21]:
class DecodeText():
    
    def __call__(self,encoded_text):
        decoded_text = [chr(et) for et in encoded_text]
        decoded_text = functools.reduce(lambda x, y: x+y, decoded_text)
        return decoded_text
    
decode_text = DecodeText()
decoded_text = decode_text(encoded_text)
print(decoded_text)
    

testt


  From a list of encoded characters (list of values from 0 to 255), convert the 
  ASCII code to the corresponding letter and return a single string.

#### One-hot encoder

Convert each encoded character in "sample" in a one-hot representation.

In [25]:
class OneHotEncoder():
    
    def __init__(self, alphabet_len):
        self.alphabet_len = alphabet_len
    
    def __call__(self,sample):
    
    # Create one hot matrix
        onehot = np.zeros([len(sample), self.alphabet_len])
        tot_chars = len(sample)
        onehot[np.arange(tot_chars),sample] = 1
    
        return onehot


one_hot_encoder = OneHotEncoder(10)
test_sample = [1,2,5,6,4]
onehot = one_hot_encoder(test_sample)
print(onehot)

[[0. 1. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 1. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 1. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 1. 0. 0. 0.]
 [0. 0. 0. 0. 1. 0. 0. 0. 0. 0.]]


## To Tensor

In [28]:
 # Convert one hot encoded text to pytorch tensor
class ToTensor():
    
    def __call__(self,sample):
        return torch.tensor(sample).float()
    
to_tensor = ToTensor()
tensor = to_tensor(onehot)
print(onehot)

[[0. 1. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 1. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 1. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 1. 0. 0. 0.]
 [0. 0. 0. 0. 1. 0. 0. 0. 0. 0.]]


### Composed transform
##### Define the preprocessing pipeline, random crop-> character encoding -> one-hot encoding -> to tensor

In [31]:
crop_len = 50
alphabet_len = 255

transform = transforms.Compose([
    RandomCrop(crop_len),
    EncodeText(),
    OneHotEncoder(alphabet_len),
    ToTensor()
])

# Test all the chain
transformed_sample = transform(dataset[0])
print(transformed_sample)
print(transformed_sample.shape)

tensor([[0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.],
        ...,
        [0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.]])
torch.Size([50, 255])


### Dataloader

In [33]:
# Redefine the dataset with the composed transformation

dataset_dir = 'shakespeare_sonnets'
dataset = ShakespeareDataset(dataset_dir, transform=transform)

### Define the dataloader to enable batching and shuffling
dataloader = DataLoader(dataset, batch_size=52, shuffle=True)

# Test dataloader output
batch_sample = next(iter(dataloader))
print(batch_sample.shape)

torch.Size([52, 50, 255])


## Network
##### with LSTM, softmax activation

In [42]:
class Network(nn.Module):
   
    def __init__(self, input_size, hidden_units, layers_num, dropout_prob=0):
        super().__init__()
    
        # define recurrent layers
        self.rnn = nn.LSTM(input_size=input_size,
                        hidden_size = hidden_units,
                        num_layers = layers_num,
                        dropout = dropout_prob,
                        batch_first = True)
    
        # define output layer
        self.out = nn.Linear(hidden_units, input_size)
    
    
    def forward(self,x,state=None):
        
       # LSTM
        x, rnn_state = self.rnn(x,state)
    
    # Linear layer
        x = self.out(x)
    
    # Remember to return also the RNN state, you will need it

Define Network

In [43]:
input_size = 255
hidden_units = 128
layers_num = 128
dropout_prob = 0.3
net = Network(input_size,hidden_units,layers_num,dropout_prob)

In [45]:
# Test network with one batch
batch_sample = next(iter(dataloader))
    
# Test the network output
out, rnn_state = net(batch_sample)
print(f"Out shape: \t\t{out.shape}")
print(f"Hidden state shape: \t{rnn_state[0].shape}")
print(f"Cell state shape: \t{rnn_state[1].shape}")

TypeError: cannot unpack non-iterable NoneType object

### Loss and optimizer

In [49]:
# Define the optimizer
optimizer = torch.optim.RMSprop(net.parameters())

# Define the loss function
loss_fn = nn.CrossEntropyLoss()

### Training loop

In [50]:
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
print(f'training device: {device}')

net.to(device) # move network to the proper device
net.train() # network in training mode

# Iterate through the dataloader for num_epochs
num_epochs = 1000
for num_epochs in tqdm(range(num_epochs)):
    epoch_losses = []
    for batch_sample in dataloader:
        
        # Move samples to proper device
        batch_sample = batch_sample.to(device)
        
        #P repare network input and labels
        net_input = batch_sample[:,:-1,:]
        labels= batch_sample[:,1:, :]
        
        # Forward pass
        # Clear previous recorded gradients
        optimizer.zero_grad()
        
        # Forward pass
        net_out, _ = net(net_input)
## we dont need to rnn state at this point, we can ignore the output with "_"

        #Update network
        labels = labels.argmax(dim=-1)
        net_out = net_out.permute([0,2,1])
        loss = loss_fn(net_out, labels)
        
        # Backward pass
        loss.backward()
        
        # Update
        optimizer.step()
        
        # Save batch loss
        epoch_losses.append(loss.data.cpu().numpy())
        
        # print avg epoch loss
        print(f'epocsh {num_epochs + 1} loss: {np.mean(epoch_losses)}')
        

training device: cpu


HBox(children=(HTML(value=''), FloatProgress(value=0.0, max=1000.0), HTML(value='')))




TypeError: cannot unpack non-iterable NoneType object