# Char-RNN

In [1]:
# data wrangling
import pandas as pd
import numpy as np

# data viz
import matplotlib.pyplot as plt
import seaborn as sns

# pytorch framework
import torch
from torch import nn
import torch.nn.functional as F

from torch.utils.data import Dataset, DataLoader, random_split
from torchvision import transforms, utils
from torch.nn.utils.rnn import pad_sequence

# pytorch-lightning framework
import pytorch_lightning as pl

# utils
import os
import shutil

# Data Wrangling

In [2]:
with open('../data/shakespeare_midsummer_nights_dream.txt', 'r') as f:
    text = f.read()
# filtering gutenberg's introduction
text = text[945:]
text[:100]

'ACT I\nScene I. Athens. A room in the Palace of Theseus\nScene II. The Same. A Room in a Cottage\n\nACT '

In [3]:
chars = tuple(set(text))
int2char = dict(enumerate(chars))
char2int = {ch: ii for ii, ch in int2char.items()}
encoded = np.array([char2int[ch] for ch in text])

## Dataloader

In [4]:
class ToTensor(object):
    def __call__(self, sample):
        return torch.from_numpy(sample).int()

class SequenceData(Dataset):
    def __init__(self, encoded:np.array, seq_length_per_sample:int, transform=None):
        self.encoded = encoded.copy()
        self.transform = transform
        self.seq_length_per_sample = seq_length_per_sample

    def __len__(self):
        return len(self.encoded)//self.seq_length_per_sample
    
    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()
        K = self.seq_length_per_sample
        sample = {}
        sample['x'] = self.encoded[idx*K:((idx+1)*K)]
        sample['y'] = self.encoded[idx*K:((idx+1)*K) + 1][1:]
        if self.transform:
            sample['x'] = self.transform(sample['x'])
            sample['y'] = self.transform(sample['y'])
        return sample

K = 10
seq_dataset = SequenceData(
    encoded = encoded,
    transform=ToTensor(),
    seq_length_per_sample = K
)

for index, i in enumerate(seq_dataset):
    print('INDEX', index, 'indices...', np.arange(index*K, ((index+1)*K)))
    print('ENCODED ARR', encoded[index*K:((index+1)*K)])
    y, x = i['y'], i['x']
    print('...Our dataset')
    print('x', x)
    print('y', y)
    print('\n')
    if index == 1:
        break


INDEX 0 indices... [0 1 2 3 4 5 6 7 8 9]
ENCODED ARR [72 14 47 40 86 83 62  0 15 16]
...Our dataset
x tensor([72, 14, 47, 40, 86, 83, 62,  0, 15, 16], dtype=torch.int32)
y tensor([14, 47, 40, 86, 83, 62,  0, 15, 16, 15], dtype=torch.int32)


INDEX 1 indices... [10 11 12 13 14 15 16 17 18 19]
ENCODED ARR [15 40 86 77 40 72 46 59 15 16]
...Our dataset
x tensor([15, 40, 86, 77, 40, 72, 46, 59, 15, 16], dtype=torch.int32)
y tensor([40, 86, 77, 40, 72, 46, 59, 15, 16, 70], dtype=torch.int32)




In [5]:
seq_length_per_sample = 100
dataset_dict = {
    'train': (
        SequenceData(
            encoded = encoded,
            transform=ToTensor(),
            seq_length_per_sample = seq_length_per_sample
        )
    ),
    'val': (
        SequenceData(
            encoded = encoded[-1000:],
            transform=ToTensor(),
            seq_length_per_sample = seq_length_per_sample
        )
    )
}

In [6]:
def pad_collate(batch):
  (xx, yy) = [i['x'] for i in batch], [i['y'] for i in batch]
  xx_pad = pad_sequence(xx, batch_first=True, padding_value=0)
  yy_pad = pad_sequence(yy, batch_first=True, padding_value=0)

  return {'x': xx_pad, 'y': yy_pad}

seq_dataloader = DataLoader(
    dataset_dict['train'],
    batch_size=10,
    num_workers=os.cpu_count(),
    shuffle=False,
    collate_fn=pad_collate
)

for index, sample in enumerate(seq_dataloader):
    print('INDEX', index, 'indices...', np.arange(index*K, ((index+1)*K)))
    print('ENCODED ARR', encoded[index*K:((index+1)*K)][0:10])
    y, x = sample['y'], sample['x']
    print('...Our dataset')
    print('x', x.shape, x[0, :10])
    print('y', y.shape, y[0, :10])
    print('\n')
    if index == 1:
        break

INDEX 0 indices... [0 1 2 3 4 5 6 7 8 9]
ENCODED ARR [72 14 47 40 86 83 62  0 15 16]
...Our dataset
x torch.Size([10, 100]) tensor([72, 14, 47, 40, 86, 83, 62,  0, 15, 16], dtype=torch.int32)
y torch.Size([10, 100]) tensor([14, 47, 40, 86, 83, 62,  0, 15, 16, 15], dtype=torch.int32)


INDEX 1 indices... [10 11 12 13 14 15 16 17 18 19]
ENCODED ARR [15 40 86 77 40 72 46 59 15 16]
...Our dataset
x torch.Size([10, 100]) tensor([46, 15, 18, 70, 40,  4, 16, 40, 46, 59], dtype=torch.int32)
y torch.Size([10, 100]) tensor([15, 18, 70, 40,  4, 16, 40, 46, 59, 15], dtype=torch.int32)




# Modelling

In [7]:
for index, sample in enumerate(seq_dataloader):
    print('INDEX', index, 'indices...', np.arange(index*K, ((index+1)*K)))
    print('ENCODED ARR', encoded[index*K:((index+1)*K)][0:10])
    y, x = sample['y'], sample['x']
    print('...Our dataset')
    print('x', x.shape, x[0, :10])
    print('1 hot enc', F.one_hot(x.long(), len(chars)).shape, F.one_hot(x.long(), len(chars)).dtype)
    print('y', y.shape, y[0, :10])
    print('\n')
    if index == 1:
        break

INDEX 0 indices... [0 1 2 3 4 5 6 7 8 9]
ENCODED ARR [72 14 47 40 86 83 62  0 15 16]
...Our dataset
x torch.Size([10, 100]) tensor([72, 14, 47, 40, 86, 83, 62,  0, 15, 16], dtype=torch.int32)
1 hot enc torch.Size([10, 100, 88]) torch.int64
y torch.Size([10, 100]) tensor([14, 47, 40, 86, 83, 62,  0, 15, 16, 15], dtype=torch.int32)


INDEX 1 indices... [10 11 12 13 14 15 16 17 18 19]
ENCODED ARR [15 40 86 77 40 72 46 59 15 16]
...Our dataset
x torch.Size([10, 100]) tensor([46, 15, 18, 70, 40,  4, 16, 40, 46, 59], dtype=torch.int32)
1 hot enc torch.Size([10, 100, 88]) torch.int64
y torch.Size([10, 100]) tensor([15, 18, 70, 40,  4, 16, 40, 46, 59, 15], dtype=torch.int32)




In [24]:
class CharNN(pl.LightningModule):
    def __init__(self, input_size, output_size, hidden_dim, n_layers, dataset_dict, dropout, char2int, int2char):
        super().__init__()

        self.dataset_dict = dataset_dict

        self.criterion = nn.CrossEntropyLoss()

        self.char2int = char2int
        self.int2char = int2char

        self.hidden_dim = hidden_dim
        self.n_layers = n_layers
        self.input_size = input_size
        self.output_size = output_size
        self.hc = None

        self.lstm = nn.LSTM(
            input_size=self.input_size, 
            hidden_size=self.hidden_dim, 
            num_layers=self.n_layers, 
            dropout=dropout, 
            batch_first=True
        )
        self.dropout = nn.Dropout(p=dropout)
        self.fc = nn.Linear(hidden_dim, output_size)
    
    
    def init_hidden(self, batch_size):
        ''' Initializes hidden state '''
        # Create two new tensors with sizes n_layers x n_seqs x n_hidden,
        # initialized to zero, for hidden state and cell state of LSTM
        weight = next(self.parameters()).data
        return (weight.new(self.n_layers, batch_size, self.hidden_dim).zero_(),
                weight.new(self.n_layers, batch_size, self.hidden_dim).zero_())
    

    def forward(self, x, hc=None):
        if hc is None:
            hc = self.hc

        x = F.one_hot(x.long(), self.output_size).float()

        # Because pytorch-lightning has a validation step where it train with a small dataset first...
        if hc is not None and hc[0].shape[1] != x.shape[0]:
            hc = None

        outs, (h, c) = self.lstm(x, hc)

        self.hc = (h.detach(),c.detach())
        outs = outs.reshape(-1, self.hidden_dim)
        x = self.fc(outs)
        return x, (h,c)
    
    def predict(self, char, hc=None, top_k=None, temperature=1):
        ''' Given a character, predict the next character.
            Returns the predicted character and the hidden state.
        '''
        x = torch.tensor([[self.char2int[char]]])
        out, (h,c) = self.forward(x, hc)
        out = out / temperature

        p = F.softmax(out, dim=1).detach()
        
        if top_k is None:
            top_ch = torch.arange(self.output_size)
        else:
            p, top_ch = p.topk(top_k)
            top_ch = top_ch.squeeze()
            
        p = p.squeeze().cpu().detach()
        char = np.random.choice(top_ch.cpu().detach().numpy(), p=p.numpy())
            
        return self.int2char[char], (h,c)

    def training_step(self, batch, batch_idx):
        x, y = batch['x'], batch['y']
        y_hat, (h, c) = self(x)
        loss = self.criterion(y_hat, y.flatten().long())
        self.log("train_loss", loss)
        return loss
    
    def validation_step(self, batch, batch_idx):
        x, y = batch['x'], batch['y']
        y_hat, (h, c) = self(x)
        loss = self.criterion(y_hat, y.flatten().long())
        self.log("val_loss", loss, prog_bar=True)
        return loss

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=0.001)
        return optimizer
    
    def train_dataloader(self):
        return DataLoader(
            self.dataset_dict['train'],
            batch_size=50,
            num_workers=os.cpu_count(),
            shuffle=False,
            collate_fn=pad_collate
        )
    
    def val_dataloader(self):
        return DataLoader(
            self.dataset_dict['val'],
            batch_size=50,
            num_workers=os.cpu_count(),
            shuffle=False,
            collate_fn=pad_collate
        )

model = CharNN(
    input_size=len(chars),
    output_size=len(chars),
    hidden_dim=512,
    n_layers=2,
    dataset_dict=dataset_dict,
    dropout=0.3,
    char2int=char2int,
    int2char=int2char
    
)

shutil.rmtree('../lightning_logs', ignore_errors=True)

trainer = pl.Trainer(
    default_root_dir='../',
    log_every_n_steps=5,
    max_epochs=10,
    enable_progress_bar=True,
    gradient_clip_val=5,
    gradient_clip_algorithm="norm"
)

trainer.fit(model)

GPU available: False, used: False
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
Missing logger folder: ../lightning_logs

  | Name      | Type             | Params
-----------------------------------------------
0 | criterion | CrossEntropyLoss | 0     
1 | lstm      | LSTM             | 3.3 M 
2 | dropout   | Dropout          | 0     
3 | fc        | Linear           | 45.1 K
-----------------------------------------------
3.4 M     Trainable params
0         Non-trainable params
3.4 M     Total params
13.517    Total estimated model params size (MB)


Epoch 9: 100%|██████████| 24/24 [00:44<00:00,  1.87s/it, loss=2.59, v_num=0, val_loss=2.760]


## Generating outputs

In [49]:
def generate_samples(net, n=100, hc=None, prime='We', top_k=None):
    net.eval()
    # First off, run through the prime characters
    chars = [ch for ch in prime]
    for ch in prime:
        char, hc = net.predict(ch, hc, top_k=top_k)
    
    # Now pass in the previous character and get a new one
    for ii in range(n):
        char, hc = net.predict(chars[-1], hc, top_k=top_k, temperature=1)
        chars.append(char)

    return ''.join(chars)

print(generate_samples(model, n=1000))

Wers feeghe aktemu dlertotertand bumaclussinc
pho:
Pn ild me mounrt; lutoatirdgancmeteety; tfanmcoiggonmend anttu r se; onker, haliren ir kistoroanmorinlesI Dudpp umatt ot nutheit abguon is n! Dlipot s sothilt be tofet toise aelt eiSs.

NDLRMTR, Hy Lx le:
ou cewink le ikikn ler meseod’ sheu) I s aacTlidindiut tooveitheesmesgleiond sbead.
DP[Oo ihe lim?

YaOLSYPTEMASSTOEBETLEOIH.
M. Dhine thad l’ fen lutitus, anhoce Rtoy I od monnslaatb cey are llanpaucn f allris; lruf and Etinlarh’t sorer cesey nnaa)um corheve tles feavd d batt sove tevt indeit  larurghed dhien pasnthaand,
Te!
H8nUEHELSEMMs d  met thuate Istasditin, fancethe pith  f ord piiret pemervorun™sage.B
.IITO
YTMEEtUfas ho tove huranhasthanl,,Were!:Yod carifoeknd imesu, ecsdin colru
PDTOANLR
OS.
Tocat wusranthithead ugd ase nand fans.
Weees. ™omcis ti the Qlrelepbe1taw ow wuinr, Tor sut aht on wouthitfeukf, pthed antefse lhyk tiqtoot thoiddemed w fholiro yhol I owlen ald ffof foDd kot. I  ligwcLiugb bil. Yhede Pn ondercocteifga