In [2]:
import torch
from torch import nn 
import torch.nn.functional as F 

import numpy as np 
import matplotlib.pyplot as plt 
%matplotlib inline 

## Explore Data

In [3]:
file = r'/../shakespeare.txt'

In [5]:
with open(file, encoding = "utf8") as f: 
    text = f.read()

In [93]:
print(text[:1000])


                     1
  From fairest creatures we desire increase,
  That thereby beauty's rose might never die,
  But as the riper should by time decease,
  His tender heir might bear his memory:
  But thou contracted to thine own bright eyes,
  Feed'st thy light's flame with self-substantial fuel,
  Making a famine where abundance lies,
  Thy self thy foe, to thy sweet self too cruel:
  Thou that art now the world's fresh ornament,
  And only herald to the gaudy spring,
  Within thine own bud buriest thy content,
  And tender churl mak'st waste in niggarding:
    Pity the world, or else this glutton be,
    To eat the world's due, by the grave and thee.


                     2
  When forty winters shall besiege thy brow,
  And dig deep trenches in thy beauty's field,
  Thy youth's proud livery so gazed on now,
  Will be a tattered weed of small worth held:  
  Then being asked, where all thy beauty lies,
  Where all the treasure of thy lusty days;
  To say within thine own deep su

In [94]:
len(text)

5445609

In [95]:
all_characters = set(text)
print(all_characters)

{'c', 'O', '"', '3', 'T', 'W', 'A', 'j', 'u', '!', 'g', '?', 'f', '8', 'l', '6', 'K', 'k', ']', 'D', 'd', '.', '1', '0', 'm', 'S', '(', '5', 'V', '_', ' ', 'a', '<', 'Z', 'y', 'G', 'R', ',', 'P', 'o', 'r', ':', 'Y', ')', 'z', 'X', '&', 'M', "'", 't', 'J', 'E', 'L', 'w', 's', 'B', 'I', 'x', '\n', 'N', 'b', 'F', 'H', '[', '4', '}', 'e', 'C', '2', '`', '9', 'n', ';', 'Q', 'q', '|', '7', 'h', '>', 'i', 'v', 'p', 'U', '-'}


## Build Encoder to understand theory of NLP

In [96]:
### create a dict  
decoder = dict(enumerate(all_characters))

In [97]:
### encode the dict that is created by the decoder 
encoder = {char : index for index, char in decoder.items()}

In [162]:
### Use the encoder to iterate through the text and give all the characters a number 
encoded_text = np.array([encoder[char] for char in text])

In [163]:
encoded_text[:50]

array([58, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30,
       30, 30, 30, 30, 30, 22, 58, 30, 30, 61, 40, 39, 24, 30, 12, 31, 79,
       40, 66, 54, 49, 30,  0, 40, 66, 31, 49,  8, 40, 66, 54, 30, 53])

In [106]:
### the number 30 is just a space 
decoder[30]

' '

##### One hot encoding

In [69]:
def one_hot_encoder(encoded_text, number_unique_characters): 
    
    """
    The encoder creates a matrix with the size of --> encoded_text * encoded_text
    
    """
    
    one_hot = np.zeros((encoded_text.size, number_unique_characters))
    
    one_hot = one_hot.astype(np.float32) ### pytorch needs this precision 
    
    one_hot[np.arange(one_hot.shape[0]), encoded_text.flatten()] = 1.0
    
    one_hot = one_hot.reshape((*encoded_text.shape, number_unique_characters))
    
    return one_hot

In [77]:
### Thats an example how the one_hot_encoder works

example_array = np.array([1,2,0])
one_hot_encoder(example_array, 3)

array([[0., 1., 0.],
       [0., 0., 1.],
       [1., 0., 0.]], dtype=float32)

In [101]:
encoded_text = one_hot_encoder(encoded_text, len(set(encoded_text)))

## Generate Training Batches

In [111]:
def generate_batches(encoded_text, samp_per_batch=10, seq_len=50):
    
    '''
    Generate (using yield) batches for training.
    
    X: Encoded Text of length seq_len
    Y: Encoded Text shifted by one
    
    Example:
    
    X:
    
    [[1 2 3]]
    
    Y:
    
    [[ 2 3 4]]
    
    encoded_text : Complete Encoded Text to make batches from
    batch_size : Number of samples per batch
    seq_len : Length of character sequence
       
    '''
    
    # Total number of characters per batch
    # Example: If samp_per_batch is 2 and seq_len is 50, then 100
    # characters come out per batch.
    char_per_batch = samp_per_batch * seq_len
    
    
    # Number of batches available to make
    # Use int() to roun to nearest integer
    num_batches_avail = int(len(encoded_text)/char_per_batch)
    
    # Cut off end of encoded_text that
    # won't fit evenly into a batch
    encoded_text = encoded_text[:num_batches_avail * char_per_batch]
    
    
    # Reshape text into rows the size of a batch
    encoded_text = encoded_text.reshape((samp_per_batch, -1))
    

    # Go through each row in array.
    for n in range(0, encoded_text.shape[1], seq_len):
        
        # Grab feature characters
        x = encoded_text[:, n:n+seq_len]
        
        # y is the target shifted over by 1
        y = np.zeros_like(x)
       
        #
        try:
            y[:, :-1] = x[:, 1:]
            y[:, -1]  = encoded_text[:, n+seq_len]
            
        # FOR POTENTIAL INDEXING ERROR AT THE END    
        except:
            y[:, :-1] = x[:, 1:]
            y[:, -1] = encoded_text[:, 0]
            
        yield x, y

In [121]:
batch_generator = generate_batches(sample_text,samp_per_batch=2,seq_len=5)

In [122]:
x, y = next(batch_generator)

## Modelling

In [123]:
class CharModel(nn.Module):
    
    def __init__(self, all_chars, num_hidden=256, num_layers=4,drop_prob=0.5,use_gpu=False):
        
        
        # SET UP ATTRIBUTES
        super().__init__()
        self.drop_prob = drop_prob
        self.num_layers = num_layers
        self.num_hidden = num_hidden
        self.use_gpu = use_gpu
        
        #CHARACTER SET, ENCODER, and DECODER
        self.all_chars = all_chars
        self.decoder = dict(enumerate(all_chars))
        self.encoder = {char: ind for ind,char in decoder.items()}
        
        
        self.lstm = nn.LSTM(len(self.all_chars), num_hidden, num_layers, dropout=drop_prob, batch_first=True)
        
        self.dropout = nn.Dropout(drop_prob)
        
        self.fc_linear = nn.Linear(num_hidden, len(self.all_chars))
      
    
    def forward(self, x, hidden):
                  
        
        lstm_output, hidden = self.lstm(x, hidden)
        
        
        drop_output = self.dropout(lstm_output)
        
        drop_output = drop_output.contiguous().view(-1, self.num_hidden)
        
        
        final_out = self.fc_linear(drop_output)
        
        
        return final_out, hidden
    
    
    def hidden_state(self, batch_size):
        '''
        Used as separate method to account for both GPU and CPU users.
        '''
        
        if self.use_gpu:
            
            hidden = (torch.zeros(self.num_layers,batch_size,self.num_hidden).cuda(),
                     torch.zeros(self.num_layers,batch_size,self.num_hidden).cuda())
        else:
            hidden = (torch.zeros(self.num_layers,batch_size,self.num_hidden),
                     torch.zeros(self.num_layers,batch_size,self.num_hidden))
        
        return hidden
        

#### Instance of the Model

In [125]:
model = CharModel(
    all_chars=all_characters,
    num_hidden=512,
    num_layers=3,
    drop_prob=0.5,
    use_gpu=False,
)

#### Total Parameters

In [127]:
total_param  = []
for p in model.parameters():
    total_param.append(int(p.numel()))

In [130]:
sum(total_param) ### should be rougly equal to the size of input data

5470292

#### Optimizer and Loss 

In [126]:
optimizer = torch.optim.Adam(model.parameters(),lr=0.001)
criterion = nn.CrossEntropyLoss()

## Training and Validation Data

In [171]:
# percentage of data to be used for training
train_percent = 0.7

In [172]:
encoded_text = encoded_text[:500000]

In [173]:
int(len(encoded_text) * (train_percent))

350000

In [174]:
train_ind = int(len(encoded_text) * (train_percent))

In [175]:
train_data = encoded_text[:train_ind]
val_data = encoded_text[train_ind:]

## Model Training

##### Variables

In [176]:
# Epochs to train for
epochs = 50
# batch size 
batch_size = 128

# Length of sequence
seq_len = 100

# for printing report purposes
# always start at 0
tracker = 0

# number of characters in text
num_char = max(encoded_text)+1

##### Training

In [177]:
len(train_data)

350000

In [178]:
# Set model to train
model.train()


# Check to see if using GPU
if model.use_gpu:
    model.cuda()

for i in range(epochs):
    
    hidden = model.hidden_state(batch_size)
    
    
    for x,y in generate_batches(train_data, batch_size, seq_len):
        
        tracker += 1
        
        # One Hot Encode incoming data
        x = one_hot_encoder(x,num_char)
        
        # Convert Numpy Arrays to Tensor
        
        inputs = torch.from_numpy(x)
        targets = torch.from_numpy(y)
        
        # Adjust for GPU if necessary
        
        if model.use_gpu:
            
            inputs = inputs.cuda()
            targets = targets.cuda()
            
        # Reset Hidden State
        # If we dont' reset we would backpropagate through all training history
        hidden = tuple([state.data for state in hidden])
        
        model.zero_grad()
        
        lstm_output, hidden = model.forward(inputs,hidden)
        loss = criterion(lstm_output,targets.view(batch_size*seq_len).long())
        
        loss.backward()
        
        # POSSIBLE EXPLODING GRADIENT PROBLEM!
        # LET"S CLIP JUST IN CASE
        nn.utils.clip_grad_norm_(model.parameters(),max_norm=5)
        
        optimizer.step()
        
        
        
        ###################################
        ### CHECK ON VALIDATION SET ######
        #################################
        
        if tracker % 25 == 0:
            
            val_hidden = model.hidden_state(batch_size)
            val_losses = []
            model.eval()
            
            for x,y in generate_batches(val_data,batch_size,seq_len):
                
                # One Hot Encode incoming data
                x = one_hot_encoder(x,num_char)
                

                # Convert Numpy Arrays to Tensor

                inputs = torch.from_numpy(x)
                targets = torch.from_numpy(y)

                # Adjust for GPU if necessary

                if model.use_gpu:

                    inputs = inputs.cuda()
                    targets = targets.cuda()
                    
                # Reset Hidden State
                # If we dont' reset we would backpropagate through 
                # all training history
                val_hidden = tuple([state.data for state in val_hidden])
                
                lstm_output, val_hidden = model.forward(inputs,val_hidden)
                val_loss = criterion(lstm_output,targets.view(batch_size*seq_len).long())
        
                val_losses.append(val_loss.item())
            
            # Reset to training model after val for loop
            model.train()
            
            print(f"Epoch: {i} Step: {tracker} Val Loss: {val_loss.item()}")

Epoch: 0 Step: 25 Val Loss: 3.1671459674835205
Epoch: 1 Step: 50 Val Loss: 3.076900005340576
Epoch: 2 Step: 75 Val Loss: 2.9281005859375
Epoch: 3 Step: 100 Val Loss: 2.7887871265411377
Epoch: 4 Step: 125 Val Loss: 2.678755760192871
Epoch: 5 Step: 150 Val Loss: 2.558563470840454
Epoch: 6 Step: 175 Val Loss: 2.426154136657715
Epoch: 7 Step: 200 Val Loss: 2.33539080619812
Epoch: 8 Step: 225 Val Loss: 2.243366241455078
Epoch: 9 Step: 250 Val Loss: 2.165147304534912
Epoch: 10 Step: 275 Val Loss: 2.1109201908111572
Epoch: 11 Step: 300 Val Loss: 2.065962076187134
Epoch: 12 Step: 325 Val Loss: 2.026796817779541
Epoch: 12 Step: 350 Val Loss: 1.9852628707885742
Epoch: 13 Step: 375 Val Loss: 1.9609087705612183
Epoch: 14 Step: 400 Val Loss: 1.9281138181686401
Epoch: 15 Step: 425 Val Loss: 1.904717206954956
Epoch: 16 Step: 450 Val Loss: 1.8838211297988892
Epoch: 17 Step: 475 Val Loss: 1.8585619926452637
Epoch: 18 Step: 500 Val Loss: 1.8366867303848267
Epoch: 19 Step: 525 Val Loss: 1.819007515907287

## Saving the model

In [179]:
# Be careful to overwrite our original name file!
model_name = 'example.net'

In [180]:
torch.save(model.state_dict(),model_name)

## Load Model 

In [181]:
# MUST MATCH THE EXACT SAME SETTINGS AS MODEL USED DURING TRAINING!

model = CharModel(
    all_chars=all_characters,
    num_hidden=512,
    num_layers=3,
    drop_prob=0.5,
    use_gpu=True,
)

In [182]:
model.load_state_dict(torch.load(model_name))
model.eval()

CharModel(
  (lstm): LSTM(84, 512, num_layers=3, batch_first=True, dropout=0.5)
  (dropout): Dropout(p=0.5, inplace=False)
  (fc_linear): Linear(in_features=512, out_features=84, bias=True)
)

In [183]:
def predict_next_char(model, char, hidden=None, k=1):
        
        # Encode raw letters with model
        encoded_text = model.encoder[char]
        
        # set as numpy array for one hot encoding
        # NOTE THE [[ ]] dimensions!!
        encoded_text = np.array([[encoded_text]])
        
        # One hot encoding
        encoded_text = one_hot_encoder(encoded_text, len(model.all_chars))
        
        # Convert to Tensor
        inputs = torch.from_numpy(encoded_text)
        
        # Check for CPU
        if(model.use_gpu):
            inputs = inputs.cuda()
        
        
        # Grab hidden states
        hidden = tuple([state.data for state in hidden])
        
        
        # Run model and get predicted output
        lstm_out, hidden = model(inputs, hidden)

        
        # Convert lstm_out to probabilities
        probs = F.softmax(lstm_out, dim=1).data
        
        
        
        if(model.use_gpu):
            # move back to CPU to use with numpy
            probs = probs.cpu()
        
        
        # k determines how many characters to consider
        # for our probability choice.
        # https://pytorch.org/docs/stable/torch.html#torch.topk
        
        # Return k largest probabilities in tensor
        probs, index_positions = probs.topk(k)
        
        
        index_positions = index_positions.numpy().squeeze()
        
        # Create array of probabilities
        probs = probs.numpy().flatten()
        
        # Convert to probabilities per index
        probs = probs/probs.sum()
        
        # randomly choose a character based on probabilities
        char = np.random.choice(index_positions, p=probs)
       
        # return the encoded value of the predicted char and the hidden state
        return model.decoder[char], hidden

In [189]:
def generate_text(model, size, seed='The', k=1):
    
    # CHECK FOR GPU
    if(model.use_gpu):
        model.cuda()
    else:
        model.cpu()
    
    # Evaluation mode
    model.eval()
    
    # begin output from initial seed
    output_chars = [c for c in seed]
    
    # intiate hidden state
    hidden = model.hidden_state(1)
    
    # predict the next character for every character in seed
    for char in seed:
        char, hidden = predict_next_char(model, char, hidden, k=k)
    
    # add initial characters to output
    output_chars.append(char)
    
    # Now generate for size requested
    for i in range(size):
        
        # predict based off very last letter in output_chars
        char, hidden = predict_next_char(model, output_chars[-1], hidden, k=k)
        
        # add predicted character
        output_chars.append(char)
    
    # return string of predicted text
    return ''.join(output_chars)