# NLP

- Networks do not understand raw text so all text has to be encoded.
- Then it needs to be one-hot encoded

# Libraries

In [3]:
import torch
from torch import nn
import torch.nn.functional as F

from google.colab import drive
drive.mount('/content/drive')

import time
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [4]:
# GPU check
torch.cuda.is_available()

True

# Data

## Read data

In [5]:
with open('/content/drive/MyDrive/AI Data/shakespeare.txt','r', encoding='utf8') as f:
    text = f.read()

In [6]:
print(text[:1000])


                     1
  From fairest creatures we desire increase,
  That thereby beauty's rose might never die,
  But as the riper should by time decease,
  His tender heir might bear his memory:
  But thou contracted to thine own bright eyes,
  Feed'st thy light's flame with self-substantial fuel,
  Making a famine where abundance lies,
  Thy self thy foe, to thy sweet self too cruel:
  Thou that art now the world's fresh ornament,
  And only herald to the gaudy spring,
  Within thine own bud buriest thy content,
  And tender churl mak'st waste in niggarding:
    Pity the world, or else this glutton be,
    To eat the world's due, by the grave and thee.


                     2
  When forty winters shall besiege thy brow,
  And dig deep trenches in thy beauty's field,
  Thy youth's proud livery so gazed on now,
  Will be a tattered weed of small worth held:  
  Then being asked, where all thy beauty lies,
  Where all the treasure of thy lusty days;
  To say within thine own deep su

## Encoding text

We will create a set of all characters, assign an id to each character, and build two dictionaries one with id > text, another with text > id

In [7]:
# Create a set of all unique characters in the text
all_characters = set(text)
len(all_characters)

84

In [8]:
# Create a decoder that reads the ID and returns the character.
# Assign an ID to each character and save it in a dictionary
decoder = dict(enumerate(all_characters))
decoder

{0: '-',
 1: '!',
 2: 'Z',
 3: 'H',
 4: '1',
 5: 'J',
 6: 'Y',
 7: 'M',
 8: '>',
 9: 'q',
 10: 'k',
 11: 'N',
 12: 'C',
 13: 'm',
 14: '\n',
 15: 'l',
 16: 'i',
 17: 'r',
 18: 'e',
 19: '5',
 20: '7',
 21: '8',
 22: 't',
 23: 'O',
 24: ']',
 25: '9',
 26: 'g',
 27: '0',
 28: 'X',
 29: '}',
 30: ')',
 31: "'",
 32: 'x',
 33: 'R',
 34: '|',
 35: 's',
 36: 'F',
 37: 'I',
 38: 'U',
 39: 'T',
 40: '?',
 41: 'A',
 42: '4',
 43: '6',
 44: 'b',
 45: ':',
 46: 'w',
 47: '&',
 48: '.',
 49: ',',
 50: 'a',
 51: 'y',
 52: 'u',
 53: 'o',
 54: 'B',
 55: 'E',
 56: ';',
 57: 'L',
 58: '(',
 59: 'n',
 60: 'j',
 61: 'd',
 62: 'p',
 63: '`',
 64: '[',
 65: 'h',
 66: '<',
 67: 'V',
 68: '3',
 69: 'W',
 70: 'v',
 71: ' ',
 72: 'D',
 73: 'c',
 74: 'z',
 75: 'G',
 76: 'S',
 77: '"',
 78: '_',
 79: 'P',
 80: 'K',
 81: 'f',
 82: 'Q',
 83: '2'}

In [9]:
# Create an encoder that reads the character and returns the ID.
encoder = {char:ind for ind,char in decoder.items()}
encoder

{'\n': 14,
 ' ': 71,
 '!': 1,
 '"': 77,
 '&': 47,
 "'": 31,
 '(': 58,
 ')': 30,
 ',': 49,
 '-': 0,
 '.': 48,
 '0': 27,
 '1': 4,
 '2': 83,
 '3': 68,
 '4': 42,
 '5': 19,
 '6': 43,
 '7': 20,
 '8': 21,
 '9': 25,
 ':': 45,
 ';': 56,
 '<': 66,
 '>': 8,
 '?': 40,
 'A': 41,
 'B': 54,
 'C': 12,
 'D': 72,
 'E': 55,
 'F': 36,
 'G': 75,
 'H': 3,
 'I': 37,
 'J': 5,
 'K': 80,
 'L': 57,
 'M': 7,
 'N': 11,
 'O': 23,
 'P': 79,
 'Q': 82,
 'R': 33,
 'S': 76,
 'T': 39,
 'U': 38,
 'V': 67,
 'W': 69,
 'X': 28,
 'Y': 6,
 'Z': 2,
 '[': 64,
 ']': 24,
 '_': 78,
 '`': 63,
 'a': 50,
 'b': 44,
 'c': 73,
 'd': 61,
 'e': 18,
 'f': 81,
 'g': 26,
 'h': 65,
 'i': 16,
 'j': 60,
 'k': 10,
 'l': 15,
 'm': 13,
 'n': 59,
 'o': 53,
 'p': 62,
 'q': 9,
 'r': 17,
 's': 35,
 't': 22,
 'u': 52,
 'v': 70,
 'w': 46,
 'x': 32,
 'y': 51,
 'z': 74,
 '|': 34,
 '}': 29}

In [10]:
# Encode the text as a numpy array
encoded_text = np.array([encoder[char] for char in text])
encoded_text[:100]

array([14, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71,
       71, 71, 71, 71, 71,  4, 14, 71, 71, 36, 17, 53, 13, 71, 81, 50, 16,
       17, 18, 35, 22, 71, 73, 17, 18, 50, 22, 52, 17, 18, 35, 71, 46, 18,
       71, 61, 18, 35, 16, 17, 18, 71, 16, 59, 73, 17, 18, 50, 35, 18, 49,
       14, 71, 71, 39, 65, 50, 22, 71, 22, 65, 18, 17, 18, 44, 51, 71, 44,
       18, 50, 52, 22, 51, 31, 35, 71, 17, 53, 35, 18, 71, 13, 16])

## One-hot encoding

We will be creating a one-hot encoding matrix of all characters in the text.

In [11]:
def one_hot_encoder(encoded_text, num_uni_chars):
    '''
    Returns a one-hot encoded matrix of shape (encoded_text.size, unique_characters)
    Parameters
    ----------
    - encoded_text [np.array]: batch of encoded text
    - num_uni_chars [int]: number of unique characters in the text
    '''
    
    # Create a matrix of zeros
    one_hot = np.zeros((encoded_text.size,num_uni_chars))
    
    # Convert the matrix to Float32 to ensure Torch compatibility
    one_hot = one_hot.astype(np.float32)
    
    # One-hot encode original matrix
    one_hot[np.arange(one_hot.shape[0]),encoded_text.flatten()] = 1.0
    
    # Reshape to match the batch size. 
    one_hot = one_hot.reshape((*encoded_text.shape,num_uni_chars))
    
    return one_hot
    

In [12]:
# Sample
one_hot_encoder(np.array([0,2,2,3,1]),4)

array([[1., 0., 0., 0.],
       [0., 0., 1., 0.],
       [0., 0., 1., 0.],
       [0., 0., 0., 1.],
       [0., 1., 0., 0.]], dtype=float32)

## Training batches

The training batches target data will be the data shifted by one position. Instead of providing only the next letter, the entire context will be provided. This will allow the network to learn the gramatical rules, not just the most probable letter.

In [13]:
# Sample
t = [c for c in 'Hello there']
print(f'X = {t[:-1]}')
print(f'y = {t[1:]}')

X = ['H', 'e', 'l', 'l', 'o', ' ', 't', 'h', 'e', 'r']
y = ['e', 'l', 'l', 'o', ' ', 't', 'h', 'e', 'r', 'e']


We need to create a batch generator that will reshape the data to be of shape (batches, elements per batch)

In [14]:
# Example
sample_text = np.arange(100)
print(f'Original text = {sample_text}')
print(f'\nTransformed to 5 batches = \n{sample_text.reshape(10,-1)}')

Original text = [ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
 96 97 98 99]

Transformed to 5 batches = 
[[ 0  1  2  3  4  5  6  7  8  9]
 [10 11 12 13 14 15 16 17 18 19]
 [20 21 22 23 24 25 26 27 28 29]
 [30 31 32 33 34 35 36 37 38 39]
 [40 41 42 43 44 45 46 47 48 49]
 [50 51 52 53 54 55 56 57 58 59]
 [60 61 62 63 64 65 66 67 68 69]
 [70 71 72 73 74 75 76 77 78 79]
 [80 81 82 83 84 85 86 87 88 89]
 [90 91 92 93 94 95 96 97 98 99]]


In [15]:
def generate_batches(encoded_text, samp_per_batch=10, seq_len=50):
    '''
    Generator object to create training batches as requested.
    Parameters
    ----------
    - encoded_text [np.array]: encoded text to be batched
    - samp_per_batch [int]: samples per batch that will be created
    - seq_len [int]: number of characters to include in each sample
    
    Output
    -------
    X [np.array]: encoded text of length seq_len
    y [np.array]: X shifted by one position to the right
    '''
    
    # Calculate total number of characters per batch
    chars_per_batch = samp_per_batch * seq_len
    
    # Calculate the total number of batches that can be made
    num_batches = int(len(encoded_text)/chars_per_batch)
    
    # Remove extra characters that won't fit into a batch
    encoded_text = encoded_text[:num_batches*chars_per_batch]
    
    # Reshape encoded text
    encoded_text = encoded_text.reshape((samp_per_batch,-1))
    
    # Generate sequences
    for i in range(0,encoded_text.shape[1],seq_len):
        
        X = encoded_text[:,i:i+seq_len]
        y = np.zeros_like(X)
        
        # Insert in y the x values shifted by one place. X is one position smaller than y
        y[:,:-1] = X[:,1:] 
        
        # Code block to handle last row in the data
        try:
            # Insert the following value. This is different to i:i+seq_len as i:i+seq_len 
            # will not include i+seq_len item. i+seq_len will only include that item
            y[:,-1] = encoded_text[:i+seq_len] 
        except:
            # Insert the first value instead of the last value
            y[:, -1] = encoded_text[:, 0]
            
        yield X,y

In [16]:
# Test sample text
test = np.arange(1000)
test_batches = generate_batches(test)
tx, ty = next(test_batches)

tx[0], ty[0]

(array([ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15, 16,
        17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33,
        34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49]),
 array([ 1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15, 16, 17,
        18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34,
        35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49,  0]))

## Train/test split

In [17]:
# Split data into trianing and testing sets
train_pct = 0.9
train_idx = int(len(encoded_text)*train_pct)
train_data = encoded_text[:train_idx]
test_data = encoded_text[train_idx:]

# Define LSTM model

In [18]:
class CharModel(nn.Module):

  def __init__(self,all_chars,num_hidden=256,num_layers=4,drop_prob=0.5,use_gpu=False):

    # Instantiate nn.Module
    super().__init__()

    # Set model parameters
    self.drop_prob = drop_prob
    self.num_layers = num_layers
    self.num_hidden = num_hidden
    self.use_gpu = use_gpu

    # Create encoder and decoder
    self.all_chars = all_chars
    self.decoder = dict(enumerate(set(all_chars)))
    self.encoder = {char:idx for idx,char in decoder.items()}

    # Define the layers
    self.lstm = nn.LSTM(len(self.all_chars),self.num_hidden,self.num_layers,
                        dropout=self.drop_prob,batch_first=True)
    self.dropout = nn.Dropout(self.drop_prob)
    self.fc_linear = nn.Linear(self.num_hidden,len(self.all_chars))

  def forward(self,x,hidden):
    lstm_out, self.hidden = self.lstm(x,hidden)
    drop_output = self.dropout(lstm_out)
    drop_output = drop_output.contiguous().view(-1,self.num_hidden)
    final_out = self.fc_linear(drop_output)

    return final_out,hidden

  def hidden_state(self,batch_size):
    if self.use_gpu:
      hidden = (torch.zeros(self.num_layers,batch_size,self.num_hidden).cuda(),
                torch.zeros(self.num_layers,batch_size,self.num_hidden).cuda())
    else:
      hidden = (torch.zeros(self.num_layers,batch_size,self.num_hidden),
                torch.zeros(self.num_layers,batch_size,self.num_hidden))
      
    return hidden


In [19]:
model = CharModel(all_chars=all_characters,num_hidden=512,
                   num_layers=3,drop_prob=0.5,use_gpu=True)

In [20]:
total_param = [p.numel() for p in model.parameters()]
print(f'{sum(total_param):,}')

5,470,292


## Optimiser and Loss

In [21]:
optimiser = torch.optim.Adam(model.parameters(),lr=0.001)
criterion = nn.CrossEntropyLoss()

# Training the model

In [None]:
# Set training variables
epochs = 60
batch_size = 100
seq_len = 100
tracker = 0
num_char = max(encoded_text)+1

In [None]:

start_time = time.time()

model.train()

# Use CUDA if GPU available
if model.use_gpu:
    model.cuda()

for i in range(epochs):
  
    # Initialise hidden states
    hidden = model.hidden_state(batch_size)

    # Train on batches
    for x,y in generate_batches(train_data, batch_size, seq_len):
        
        tracker +=1
        
        # Create one-hot encoded matrix of input
        x = one_hot_encoder(x, num_char)

        # Create tensors from numpy arrays
        inputs = torch.from_numpy(x)
        targets = torch.from_numpy(y)

        # Drop to GPU if use_gpu is true
        if model.use_gpu:
            inputs = inputs.cuda()
            targets = targets.cuda()

        ##################
        #### Training ####
        ##################

        # Reset the hidden states to avoid them backpropagating through the entire
        # training history. 
        hidden = tuple([state.data for state in hidden])

        model.zero_grad()

        # Crete predictions
        lstm_out, hidden = model.forward(inputs, hidden)
        
        # Calculate loss
        loss = criterion(lstm_out, targets.view(batch_size*seq_len).long())
        # Backpropagate
        loss.backward()

        # Clip gradients to avoid an exploding gradient problem
        nn.utils.clip_grad_norm_(model.parameters(),max_norm=5)

        # Iterate optimiser over all parameters
        optimiser.step()

        ####################
        #### Validation ####
        ####################

        if tracker % 25==0:
        
            # Initialise validation hidden states
            val_hidden = model.hidden_state(batch_size)

            # List to save loss
            val_losses = []

            # Set the model on evaluation mode
            model.eval()

            for x,y in generate_batches(test_data, batch_size, seq_len):
                
                # Create one-hot encoded matrix of input
                x = one_hot_encoder(x, num_char)

                # Create tensors from numpy arrays
                inputs = torch.from_numpy(x)
                targets = torch.from_numpy(y)

                # Drop to GPU if use_gpu is true
                if model.use_gpu:
                    inputs = inputs.cuda()
                    targets = targets.cuda()

                # Reset the hidden states to avoid them backpropagating through the entire
                # training history. 
                val_hidden = tuple([state.data for state in val_hidden])

                # Crete predictions
                lstm_out, val_hidden = model.forward(inputs, val_hidden)
                
                # Calculate loss
                val_loss = criterion(lstm_out, targets.view(batch_size*seq_len).long())
                val_losses.append(val_loss.item())

            # Set the model to train after validation loop
            model.train()
            
            print(f"Epoch: {i} Step: {tracker} Val Loss: {val_loss.item()}")

print(f'Training time: {(time.time()-start_time)/60:.2f} mins')

Epoch: 0 Step: 25 Val Loss: 3.2229514122009277
Epoch: 0 Step: 50 Val Loss: 3.2003657817840576
Epoch: 0 Step: 75 Val Loss: 3.136951446533203
Epoch: 0 Step: 100 Val Loss: 3.005025863647461
Epoch: 0 Step: 125 Val Loss: 2.8695871829986572
Epoch: 0 Step: 150 Val Loss: 2.729569673538208
Epoch: 0 Step: 175 Val Loss: 2.6371586322784424
Epoch: 0 Step: 200 Val Loss: 2.543445587158203
Epoch: 0 Step: 225 Val Loss: 2.4671242237091064
Epoch: 0 Step: 250 Val Loss: 2.3978536128997803
Epoch: 0 Step: 275 Val Loss: 2.319489002227783
Epoch: 0 Step: 300 Val Loss: 2.2505581378936768
Epoch: 0 Step: 325 Val Loss: 2.203413963317871
Epoch: 0 Step: 350 Val Loss: 2.1635212898254395
Epoch: 0 Step: 375 Val Loss: 2.128622531890869
Epoch: 0 Step: 400 Val Loss: 2.094848871231079
Epoch: 0 Step: 425 Val Loss: 2.0643391609191895
Epoch: 0 Step: 450 Val Loss: 2.043923854827881
Epoch: 0 Step: 475 Val Loss: 2.007802724838257
Epoch: 1 Step: 500 Val Loss: 1.9787120819091797
Epoch: 1 Step: 525 Val Loss: 1.957869529724121
Epoch:

## Save the model

In [None]:
# Save the trained model
model_name = f'/content/drive/MyDrive/Colab Notebooks/models/hidden{model.num_hidden}_layers{model.num_layers}_{time.strftime("%Y%M%d%H%m",time.gmtime(time.time()))}_shakespeare.net'
torch.save(model.state_dict(),model_name)

# Predictions

We will be geneting two functions, one function to generate the next character prediction and another one to generate strings of sentences.

## Load the model

In [49]:
# MUST MATCH THE EXACT SAME SETTINGS AS MODEL USED DURING TRAINING!

model = CharModel(
    all_chars=all_characters,
    num_hidden=512,
    num_layers=3,
    drop_prob=0.5,
    use_gpu=True,
)

In [50]:
model.load_state_dict(torch.load('/content/drive/MyDrive/Colab Notebooks/Final_Shakespeare.net'))
model.eval()

CharModel(
  (lstm): LSTM(84, 512, num_layers=3, batch_first=True, dropout=0.5)
  (dropout): Dropout(p=0.5, inplace=False)
  (fc_linear): Linear(in_features=512, out_features=84, bias=True)
)

## Prediction functions

In [51]:
def predict_next_char(model, char, hidden=None, k=1):

    # Encode the text
    encoded_text = model.encoder[char]

    # Convert to a numpy array with 1 observation of 1 feature [[x]]
    # to maintain the same dimensions
    encoded_text = np.array([[encoded_text]])

    # One-hot encode the character in a matrix of all characters
    encoded_text = one_hot_encoder(encoded_text,len(model.all_chars))

    # Convert to a tensor and load to GPU if required
    inputs = torch.from_numpy(encoded_text)
    if model.use_gpu:
        inputs = inputs.cuda()

    # Initialise the hidden states
    hidden = tuple([state.data for state in hidden])

    # Generate predictions
    lstm_out, hidden = model.lstm(inputs, hidden)

    # Pass through a softmax layer to generate probabilities
    probs = F.softmax(lstm_out, dim=1).data

    # Move probabilities back to CPU to use numpy
    if model.use_gpu:
        probs = probs.cpu()
    
    # Extract the top k probability items from the probabilities array
    probs, idx_pos = probs.topk(k)

    # Remove any axis of length one 
    idx_pos = idx_pos.numpy().squeeze()

    # Flatten the array to a one-dimensional array
    probs = probs.numpy().flatten()

    # Convert to probabilities per index
    probs = probs/probs.sum()

    # Randomly chose a character based on probabilities
    char = np.random.choice(idx_pos, p=probs)

    # Return the encoded value of the predicted char and the hidden state
    return model.decoder[char], hidden


In [52]:
def generate_text(model, size, seed='The', k=1):

    # Load model on gpu if required
    if model.use_gpu:
        model.cuda()
    else:
        model.cpu()

    # Set the model on evaluation mode
    model.eval()

    # Begin output from initial seed
    output_chars = [c for c in seed]

    # Initiate the hidden state
    hidden = model.hidden_state(batch_size=1)

    # Predict the next character for every character in seed
    for char in seed:
        char, hidden = predict_next_char(model, char, hidden, k=k)
    
    # Add initial characters to output
    output_chars.append(char)

    # Generate for required size
    for i in range(size):

        # Predict based off very last letter in output_chars
        char, hidden = predict_next_char(model, output_chars[-1], hidden, k=k)

        # Append predictex character
        output_chars.append(char)

    # Return the string of predicted text
    return ''.join(output_chars)

In [53]:
print(generate_text(model, 1000, seed='The ', k=3))

The ,!----!---,-!,-,!,---,,--!,,!-!-,!-!!-!,,,,,!-!,,-!!!,!!-,!!!-,,,-,-!,!,,,-!!,,,-,-!!---!,!!-!!!,!,--!-,,-,,---,---!,------,,!--!-,!-!,!-,-!-!!-,!-!-!,!-!,,--!!-!!,,,,,,-,-!!-,!!!!-!,---!,,,,!!---!,,,--!,--!!,,!!!!,,,!!-!-,,!-,,--!-,!-!,!--,!-!!!,-!!!,!,,!!,,,!---!,,!,-,---,,,!!,,!-!!-,,!--!---,-,!----!!!--,!-,-----!--,!,,-,!---,,,!!,!---!-,,,,--!-!-,-,,!!-!!!!-!!,-,-,!!,-!!-,,,,,-!,!,,!!,,!!!!,!,!-,-!,!!,!,,!,!!----,!---,-!,,--,!,,--!!--!!,-,!!,!,,--!!-!--!!,-!--,--!!-,,---!!!,,-!!--,!,!!,-,----!!!---,,--!-,,!-!!---!-,-!,!,-,-!,,,----,-,,!!-,,-,---!,!!--,!,!!!!-!!--,-,!,---!,,,!-!-,,,,--,!,!-,-,,,,,-!-!!!!!-,-,-!!-!--,,,!,-!!,--!,,,!-,---,!-!,!,--!----,,,!-!-,,!!!!!,-,!-!!!--,!!!!-,,,--!--,!,!!!,!-!,-,,--!!,!-!,!-,!,-,!-,,-,-,,!--,,,-!!-!--!-!--!--!!!,!!!,,-!--,,!,!-!!-,!!-!,!!-!-,,!-!!,!,!--,,!!,-!!!,-!!--!,---,,--,,,-!!!,,!,,,!,!,,,!!,-!!-,!,,---,--,!-,,,-,-!!-!--!,-!,----!,---,---,,!,!-!!,-!!!!!!,,--!,-,-!!-,,,-,,!,!-!!,!,!-!-!---!,---,----!,-!!!,!!-,-!,!--!,!,!-,!--!!,!!--!,-,