# NLP with PyTorch

In [1]:
import torch
from torch import nn
import torch.nn.functional as F

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline

In [2]:
with open('./PYTORCH_NOTEBOOKS/Data/shakespeare.txt', 'r', encoding='utf8') as f:
    text = f.read()

In [3]:
type(text)

str

In [4]:
print(text[:1000])


                     1
  From fairest creatures we desire increase,
  That thereby beauty's rose might never die,
  But as the riper should by time decease,
  His tender heir might bear his memory:
  But thou contracted to thine own bright eyes,
  Feed'st thy light's flame with self-substantial fuel,
  Making a famine where abundance lies,
  Thy self thy foe, to thy sweet self too cruel:
  Thou that art now the world's fresh ornament,
  And only herald to the gaudy spring,
  Within thine own bud buriest thy content,
  And tender churl mak'st waste in niggarding:
    Pity the world, or else this glutton be,
    To eat the world's due, by the grave and thee.


                     2
  When forty winters shall besiege thy brow,
  And dig deep trenches in thy beauty's field,
  Thy youth's proud livery so gazed on now,
  Will be a tattered weed of small worth held:  
  Then being asked, where all thy beauty lies,
  Where all the treasure of thy lusty days;
  To say within thine own deep su

In [5]:
len(text)

5445609

In [7]:
all_characters = set(text)
len(all_characters)

84

In [9]:
decoder = dict(enumerate(all_characters))

In [11]:
encoder = {char: ind for ind, char in decoder.items()}

In [12]:
encoded_text = np.array([encoder[char] for char in text])

In [13]:
encoded_text[:500]

array([ 5, 83, 83, 83, 83, 83, 83, 83, 83, 83, 83, 83, 83, 83, 83, 83, 83,
       83, 83, 83, 83, 83, 47,  5, 83, 83, 27, 13, 38, 71, 83, 59, 21, 28,
       13,  4, 80, 63, 83, 19, 13,  4, 21, 63, 56, 13,  4, 80, 83,  2,  4,
       83, 39,  4, 80, 28, 13,  4, 83, 28, 30, 19, 13,  4, 21, 80,  4, 53,
        5, 83, 83, 74,  1, 21, 63, 83, 63,  1,  4, 13,  4, 42, 34, 83, 42,
        4, 21, 56, 63, 34, 10, 80, 83, 13, 38, 80,  4, 83, 71, 28, 57,  1,
       63, 83, 30,  4, 17,  4, 13, 83, 39, 28,  4, 53,  5, 83, 83, 26, 56,
       63, 83, 21, 80, 83, 63,  1,  4, 83, 13, 28,  8,  4, 13, 83, 80,  1,
       38, 56, 54, 39, 83, 42, 34, 83, 63, 28, 71,  4, 83, 39,  4, 19,  4,
       21, 80,  4, 53,  5, 83, 83, 45, 28, 80, 83, 63,  4, 30, 39,  4, 13,
       83,  1,  4, 28, 13, 83, 71, 28, 57,  1, 63, 83, 42,  4, 21, 13, 83,
        1, 28, 80, 83, 71,  4, 71, 38, 13, 34, 31,  5, 83, 83, 26, 56, 63,
       83, 63,  1, 38, 56, 83, 19, 38, 30, 63, 13, 21, 19, 63,  4, 39, 83,
       63, 38, 83, 63,  1

In [14]:
def one_hot_encoder(encoded_text, num_uni_chars):

    # encoded_text --> batch of encoded text
    # num_uni_chars --> len(set(text))

    one_hot = np.zeros((encoded_text.size, num_uni_chars))
    one_hot = one_hot.astype(np.float32)
    one_hot[np.arange(one_hot.shape[0]), encoded_text.flatten()] = 1.0
    one_hot = one_hot.reshape((*encoded_text.shape, num_uni_chars))
    return one_hot

In [20]:
def generate_batches(encoded_text, samp_per_batch = 10, seq_len = 50):
    # X : encoded text of len seq_len
    # Y : encoded text shifted by 1

    char_per_batch = samp_per_batch * seq_len
    num_batches_avail = int(len(encoded_text)/char_per_batch)
    encoded_text = encoded_text[:num_batches_avail*char_per_batch]
    encoded_text = encoded_text.reshape((samp_per_batch, -1))
    for n in range(0, encoded_text.shape[1], seq_len):
        x = encoded_text[:, n:n+seq_len]
        y = np.zeros_like(x)
        try:
            y[:, :-1] = x[:, 1:]
            y[:,-1] = encoded_text[:,n+seq_len]
        except:
            y[:, :-1] = x[:,1:]
            y[:,-1] = encoded_text[:,0]
        
        yield x, y

In [21]:
sample_text = encoded_text[:20]

In [22]:
sample_text

array([ 5, 83, 83, 83, 83, 83, 83, 83, 83, 83, 83, 83, 83, 83, 83, 83, 83,
       83, 83, 83])

In [24]:
batch_generator = generate_batches(sample_text, samp_per_batch=2, seq_len=5)
x, y = next(batch_generator)
x,y

(array([[ 5, 83, 83, 83, 83],
        [83, 83, 83, 83, 83]]),
 array([[83, 83, 83, 83, 83],
        [83, 83, 83, 83, 83]]))

In [25]:
class CharModel(nn.Module):
    def __init__(self, all_chars, num_hidden = 256, num_layers = 4, drop_prob = 0.5, use_gpu = False):
        super().__init__()
        self.drop_prob = drop_prob
        self.num_layers = num_layers
        self.use_gpu = use_gpu
        self.num_hidden = num_hidden
        
        self.all_chars = all_chars
        self.decoder = dict(enumerate(all_chars))
        self.encoder = {char:ind for ind, char in decoder.items()}

        self.lstm = nn.LSTM(len(self.all_chars), num_hidden, num_layers, dropout = drop_prob, batch_first = True)
        self.dropout = nn.Dropout(drop_prob)
        self.fc_linear = nn.Linear(num_hidden, len(self.all_chars))

    def forward(self, x, hidden):
        lstm_output, hidden = self.lstm(x, hidden)
        drop_output = self.dropout(lstm_output)
        drop_output = drop_output.contiguous().view(-1, self.num_hidden)
        final_out = self.fc_linear(drop_output)
        return final_out, hidden

    def hidden_state(self, batch_size):
        if self.use_gpu:
            hidden = (torch.zeros(self.num_layers, batch_size, self.num_hidden).cuda(), torch.zeros(self.num_layers, batch_size, self.num_hidden).cuda())
        else:
            hidden = (torch.zeros(self.num_layers, batch_size, self.num_hidden), torch.zeros(self.num_layers, batch_size, self.num_hidden))
        return hidden

In [26]:
model = CharModel(all_chars=all_characters,
                  num_hidden=512,
                  num_layers = 3,
                  drop_prob=0.5,
                  use_gpu=True)

In [27]:
train_percent = 0.9
train_ind = int(len(encoded_text) * train_percent)
train_data = encoded_text[:train_ind]
val_data = encoded_text[train_ind:]

In [28]:
# VARIABLES
epochs = 60
batch_size = 100

seq_len = 100
tracker = 0
num_chars = max(encoded_text) + 1

In [29]:
optimizer = torch.optim.Adam(model.parameters(), lr = 1e-3)
criterion = nn.CrossEntropyLoss()

In [None]:
model.train()

if model.use_gpu:
    model.cuda()

for i in range(epochs):
    hidden = model.hidden_state(batch_size)
    for x, y in generate_batches(train_data, batch_size, seq_len):
        tracker += 1
        x = one_hot_encoder(x, num_chars)
        inputs = torch.from_numpy(x)
        targets = torch.from_numpy(y)

        if model.use_gpu:
            inputs = inputs.cuda()
            targets = targets.cuda()

        hidden = tuple([state.data for state in hidden])
        optimizer.zero_grad()
        lstm_output, hidden = model.forward(inputs, hidden)
        loss = criterion(lstm_output, targets.view(batch_size*seq_len).long())
        loss.backward()
        nn.utils.clip_grad_norm_(model.parameters(), max_norm = 5)
        optimizer.step()

        if tracker%25 == 0:
            val_hidden = model.hidden_state(batch_size)
            val_losses = []
            model.eval()

            for x, y in generate_batches(val_data, batch_size, seq_len):
                x = one_hot_encoder(x, num_chars)
                inputs = torch.from_numpy(x)
                targets = torch.from_numpy(y)
                if model.use_gpu:
                    inputs = inputs.cuda()
                    targets = targets.cuda()
                
                val_hidden = tuple([state.data for state in val_hidden])
                lstm_output, val_hidden = model.forward(inputs, val_hidden)
                val_loss = criterion(lstm_output, targets.view(batch_size*seq_len).long())
            model.train()
            print(f'EPOCH: {i+1} Step: {tracker} VAL LOSS: {val_loss.item()}')

In [None]:
def predict_next_char(model, char, hidden = None, k = 1):
    encoded_text = model.encoder[char]
    encoded_text = np.array([[encoded_text]])
    encoded_text = one_hot_encoder(encoded_text, len(model.all_chars))
    inputs = torch.from_numpy(encoded_text)

    if model.use_gpu:
        inputs = inputs.cuda()
    
    hidden = tuple([state.data for state in hidden])
    lstm_out, hidden = model(inputs, hidden)
    probs = F.softmax(lstm_out, dim = 1).data
    if model.use_gpu:
        probs = probs.cpu()

    probs, index_positions = probs.topk(k)
    index_positions = index_positions.numpy().squeeze()
    probs = probs.numpy().flatten()
    probs = probs/probs.sum()
    char = np.random.choice(index_positions, p = probs)
    return model.decoder[char], hidden

In [None]:
def generate_text(model, size, seed = "The", k = 1):
    if model.use_gpu:
        model.cuda()
    else:
        model.cpu()

    model.eval()
    output_chars = [c for c in seed]
    hidden = model.hidden_state(1)
    for char in seed:
        char, hidden = predict_next_char(model, char, hidden, k=k)
    output_chars.append(char)
    for i in range(size):
        char, hidden = predict_next_char(model, output_chars[-1], hidden, k = k)
        output_chars.append(char)
    
    return ' '.join(output_chars)

In [None]:
print(generate_text(model, 1000, seed = 'The', k = 3))