# 5.2 Assignment: LSTM Sequence Completion

 Daniel Nascimento  -  danielsdn0725@gmail.com

### 1 - Modules and dataset

In [2]:
from __future__ import print_function
import torch
from torch import nn
from torch.autograd import Variable
import torch.nn.functional as F
import math
from torch import nn, Tensor


import os
import random as rnd
import numpy as np
import pickle
import time
import string

import torchvision.utils as vutils
from torchvision import transforms
from torch.utils.data import Subset
from torch.utils.data import DataLoader

from tqdm import tqdm
import matplotlib.pyplot as plt
import sklearn.metrics as metrics
import matplotlib.pyplot as plt

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

  from .autonotebook import tqdm as notebook_tqdm


device(type='cuda')

In [3]:
#  The file input.txt - is my root, if you need to salve it in a different folder, please informa here:
# root_folder=''
# data_folder_name='./'
# model_folder_name=''

root_folder=''
data_folder_name='./data'
model_folder_name=''

filename='input.txt'
DATA_PATH = os.path.abspath(os.path.join(root_folder, data_folder_name))
model_dir = os.path.abspath(os.path.join(root_folder, model_folder_name))
train_path = os.path.join(DATA_PATH, filename)

seed = 1

In [9]:
def load_text_data(filename, init_dialog=False):
    ''' Load the texts from the filename, splitting by lines and removing empty strings.
        Setting init_dialog = True will remove lines where the character who is going to speak is indicated
    '''
    sentences = []
    with open(filename, 'r') as reader:
      for line in reader:
            if init_dialog or ':' not in line:
                sentences.append(line[:-1])
                
    return sentences



def clean_text(sentences, alpha=False):
    ''' Cleaning process of the text'''
    if alpha:
        # Remove non alphabetic character
        cleaned_text = [''.join([t.lower() for t in text if t.isalpha() or t.isspace()]) for text in sentences]
    else:
        # Simply lower the characters
        cleaned_text = [t.lower() for t in sentences]
    # Remove any emoty string
    cleaned_text = [t for t in cleaned_text if t!='']
    
    return cleaned_text


class CharVocab: 
    ''' Create a Vocabulary for '''
    def __init__(self, type_vocab,pad_token='<PAD>', eos_token='<EOS>', unk_token='<UNK>'): #Initialization of the type of vocabulary
        self.type = type_vocab
        #self.int2char ={}
        self.int2char = []
        if pad_token !=None:
            self.int2char += [pad_token]
        if eos_token !=None:
            self.int2char += [eos_token]
        if unk_token !=None:
            self.int2char += [unk_token]
        self.char2int = {}
        
    def __call__(self, text):       
        chars = set(''.join(text))

        self.int2char += list(chars)
        self.char2int = {char: ind for ind, char in enumerate(self.int2char)}

In [5]:
sentences = load_text_data(train_path)
print('Number of sentences: ', len(sentences))
print(sentences[:20])

Number of sentences:  29723
['Before we proceed any further, hear me speak.', '', 'Speak, speak.', '', 'You are all resolved rather to die than to famish?', '', 'Resolved. resolved.', '', 'First, you know Caius Marcius is chief enemy to the people.', '', "We know't, we know't.", '', "Let us kill him, and we'll have corn at our own price.", "Is't a verdict?", '', '', 'One word, good citizens.', '', 'We are accounted poor citizens, the patricians good.', 'would yield us but the superfluity, while it were']


In [8]:
# Clean the sentences
sentences = clean_text(sentences, False)
# entences in a one long string
sentences = ' '.join(sentences)
print('Number of characters: ', len(sentences))
print(sentences[:200])

Number of characters:  894875
before we proceed any further, hear me speak. speak, speak. you are all resolved rather to die than to famish? resolved. resolved. first, you know caius marcius is chief enemy to the people. we know't


In [19]:
vocab = CharVocab('char',None,None,'<UNK>')
vocab(sentences)
#print('Length of vocabulary: ', len(vocab.int2char))
#print('Int to Char: ', vocab.int2char)
#print('Char to Int: ', vocab.char2int)


if not os.path.exists(DATA_PATH): 
    os.makedirs(DATA_PATH)
    

with open(os.path.join(DATA_PATH, 'char_dict.pkl'), "wb") as f:
    pickle.dump(vocab.char2int, f)

with open(os.path.join(DATA_PATH, 'int_dict.pkl'), "wb") as f:
    pickle.dump(vocab.int2char, f)
    
    
# Check or create the directory 
if not os.path.exists(DATA_PATH): # in you environment, you have this path
    os.makedirs(DATA_PATH)
    
# Save the dictionary to data path dir  
with open(os.path.join(DATA_PATH, 'char_dict.pkl'), "wb") as f:
    pickle.dump(vocab.char2int, f)

with open(os.path.join(DATA_PATH, 'int_dict.pkl'), "wb") as f:
    pickle.dump(vocab.int2char, f)    

### 2 - Encoding the dataset and batch creation

In [20]:
def one_hot_encode(indices, dict_size):
    ''' Define one hot encode matrix for our sequences'''
    # Creating a multi-dimensional array with the desired output shape
    # Encode every integer with its one hot representation
    features = np.eye(dict_size, dtype=np.float32)[indices.flatten()]
    
    # Finally reshape it to get back to the original array
    features = features.reshape((*indices.shape, dict_size))
            
    return features

def encode_text(input_text, vocab, one_hot = False):
    ''' Encode the input_text replacing the char by its integer number based on the dictionary vocab'''
    # Replace every char by its integer value based on the vocabulary
    output = [vocab.char2int.get(character,0) for character in input_text]
    
    if one_hot:
    # One hot encode every integer of the sequence
        dict_size = len(vocab.char2int)
        return one_hot_encode(output, dict_size)
    else:
        return np.array(output)

In [None]:
# Encode the train dataset
train_data = encode_text(sentences, vocab, one_hot = False)

# Create the input sequence, from 0 to len-1
input_seq=train_data[:-1]
# Create the target sequence, from 1 to len. It is right-shifted one place
target_seq=train_data[1:]
#print('\nOriginal text:')
#print(sentences[:100000])
#print('\nEncoded text:')
#print(train_data[:100000])
#print('\nInput sequence:')
#print(input_seq[:100000])
#print('\nTarget sequence:')
#print(target_seq[:100000])

# Save the encoded text to a file
encoded_data = os.path.join(DATA_PATH, 'input_data.pkl')
with open(encoded_data, 'wb') as fp:
    pickle.dump(train_data, fp)
    
    
    
print('Encoded characters: ',train_data[100:102])
print('One-hot-encoded characters: ',one_hot_encode(train_data[100:102], 100))    

In [26]:
def batch_generator_sequence(features_seq, label_seq, batch_size, seq_len):
    """Generator function that yields batches of data (input and target)

    Args:
        features_seq: sequence of chracters, feature of our model.
        label_seq: sequence of chracters, the target label of our model
        batch_size (int): number of examples (in this case, sentences) per batch.
        seq_len (int): maximum length of the output tensor.

    Yields:
        x_epoch: sequence of features for the epoch
        y_epoch: sequence of labels for the epoch
    """
    # calculate the number of batches
    num_batches = len(features_seq) // (batch_size * seq_len)
    if num_batches == 0:
        raise ValueError("No batches created. Use smaller batch size or sequence length.")

    rounded_len = num_batches * batch_size * seq_len
    x = np.reshape(features_seq[: rounded_len], [batch_size, num_batches * seq_len])
    y = np.reshape(label_seq[: rounded_len], [batch_size, num_batches * seq_len])
    
    epoch = 0
    while True:
        x_epoch = np.split(np.roll(x, -epoch, axis=0), num_batches, axis=1)
        y_epoch = np.split(np.roll(y, -epoch, axis=0), num_batches, axis=1)
        for batch in range(num_batches):
            yield x_epoch[batch], y_epoch[batch]
        epoch += 1

### 3 - Model

In [27]:
class RNNModel(nn.Module):
    def __init__(self, vocab_size, embedding_size, hidden_dim, n_layers, drop_rate=0.2):
        super(RNNModel, self).__init__()
        self.hidden_dim = hidden_dim
        self.embedding_size = embedding_size
        self.n_layers = n_layers
        self.vocab_size = vocab_size
        self.drop_rate = drop_rate
        self.char2int = None
        self.int2char = None    
        # Dropout layer
        self.dropout = nn.Dropout(drop_rate)
        # LSTM Layer
        self.rnn = nn.LSTM(embedding_size, hidden_dim, n_layers, dropout=drop_rate, batch_first = True)
        # Fully connected layer
        self.decoder = nn.Linear(hidden_dim, vocab_size)
    
    def forward(self, x, state):
        rnn_out, state = self.rnn(x, state)
        rnn_out = self.dropout(rnn_out)
        rnn_out = rnn_out.contiguous().view(-1, self.hidden_dim)
        logits = self.decoder(rnn_out)

        #print('Output model shape: ', logits.shape)
        return logits, state
    
    def init_state(self, device, batch_size=1):
        """
        initialises rnn states.
        """
        return (torch.zeros(self.n_layers, batch_size, self.hidden_dim).to(device),
                torch.zeros(self.n_layers, batch_size, self.hidden_dim).to(device))

    def predict(self, input):
        logits, hidden = self.forward(input)
        probs = F.softmax(logits)
        probs = probs.view(input.size(0), input.size(1), probs.size(1))
        return probs, hidden

### 4 - Training

In [28]:
def train_main(model, optimizer, loss_fn, batch_data, num_batches, val_batches, batch_size, seq_len, n_epochs, clip_norm, device):
    
    for epoch in range(1, n_epochs + 1):
        epoch_losses = []
        hidden = model.init_state(device, batch_size)
        for i in tqdm(range(num_batches-val_batches), desc="Epoch {}/{}".format(epoch, n_epochs+1)):
            input_batch, target_batch = next(batch_data)
            input_batch = one_hot_encode(input_batch, model.vocab_size)
            input_data = torch.from_numpy(input_batch)
            target_data = torch.from_numpy(target_batch)

            hidden = tuple(([Variable(var.data) for var in hidden]))
            input_data = input_data.to(device)

            model.train()
            optimizer.zero_grad() # Clears existing gradients from previous epoch

            output, hidden = model(input_data, hidden)
            output = output.to(device)
            target_data = target_data.to(device)
            target_data = torch.reshape(target_data, (batch_size*seq_len,))
            loss = loss_fn(output, target_data.view(batch_size*seq_len))
            epoch_losses.append(loss.item()) #data[0]
        
            loss.backward() # backpropagation and  gradients
            nn.utils.clip_grad_norm_(model.parameters(), clip_norm)
            
            optimizer.step() # Updates the weights 
    
        model.eval()
        val_hidden = model.init_state(device, batch_size)
        val_losses = []
        for i in tqdm(range(val_batches), desc="Val Epoch {}/{}".format(epoch, n_epochs+1)):
            input_batch, target_batch = next(batch_data)
            input_batch = one_hot_encode(input_batch, model.vocab_size)
            input_data = torch.from_numpy(input_batch)
            target_data = torch.from_numpy(target_batch)
            hidden = tuple(([Variable(var.data) for var in val_hidden]))
            input_data = input_data.to(device)
            output, hidden = model(input_data, hidden)
            output = output.to(device)
            target_data = target_data.to(device)
            target_data = torch.reshape(target_data, (batch_size*seq_len,))
            loss = loss_fn(output, target_data.view(batch_size*seq_len))

            val_losses.append(loss.item())

        model.train()                  
        #if epoch%2 == 0:
        print('Epoch: {}/{}.............'.format(epoch, n_epochs), end=' ')
        print("Train Loss: {:.4f}".format(np.mean(epoch_losses)), end=' ')
        print("Val Loss: {:.4f}".format(np.mean(val_losses)))
        
    return epoch_losses

In [29]:
# Hyperparameters for training
n_epochs = 50
lr=0.001
batch_size=64
maxlen=64
clip_norm=5
val_fraction = 0.1


hidden_dim = 64 #64
n_layers = 1
embedding_size=len(vocab.char2int)
dict_size = len(vocab.char2int)
drop_rate = 0.2

# Set the device for training
print('Device: ', device)
# Set a seed to reproduce experiments
torch.manual_seed(seed)

Device:  cuda


<torch._C.Generator at 0x7f93de3c8ed0>

In [33]:
model = RNNModel(dict_size,embedding_size, hidden_dim, n_layers).to(device)
print(model)
# Define Loss, Optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)

input_seq = input_seq[:100000]
target_seq = target_seq[:100000]
print(len(input_seq))

RNNModel(
  (dropout): Dropout(p=0.2, inplace=False)
  (rnn): LSTM(38, 64, batch_first=True, dropout=0.2)
  (decoder): Linear(in_features=64, out_features=38, bias=True)
)
100000


In [None]:
# Calculate the number of batches to train
num_batches = len(input_seq) // (batch_size*maxlen)
val_batches = int(num_batches*val_fraction)
# Create the batch data generator
batch_data = batch_generator_sequence(input_seq, target_seq, batch_size, maxlen)
losses = train_main(model, optimizer, criterion, batch_data, num_batches, val_batches, batch_size, 
                    maxlen, n_epochs, clip_norm, device)

In [35]:
# Save the param
model_info_path = os.path.join(model_dir, 'model_info.pth')
with open(model_info_path, 'wb') as f:
    model_info = {
        'n_layers': n_layers,
        'embedding_dim': embedding_size,
        'hidden_dim': hidden_dim,
        'vocab_size': dict_size,
        'drop_rate': drop_rate
    }
    torch.save(model_info, f)

# Save the model param
model_path = os.path.join(model_dir, 'model.pth')
with open(model_path, 'wb') as f:
    torch.save(model.state_dict(), f)

### 5 - Testing

In [36]:
def sample_from_probs(probs, top_n=10):
    """
    truncated weighted random choice.
    """
    _, indices = torch.sort(probs)
    probs[indices.data[:-top_n]] = 0
    sampled_index = torch.multinomial(probs, 1)
    return sampled_index

def predict_probs(model, hidden, character, vocab):
    character = np.array([[vocab.char2int[c] for c in character]])
    character = one_hot_encode(character, model.vocab_size)
    character = torch.from_numpy(character)
    character = character.to(device)
    
    out, hidden = model(character, hidden)

    prob = nn.functional.softmax(out[-1], dim=0).data

    return prob, hidden

In [48]:
def generate_from_text(model, out_len, vocab, top_n=1, start='hey'):
    model.eval() # eval mode
    start = start.lower()

    chars = [ch for ch in start]
    size = out_len - len(chars)
    state = model.init_state(device, 1)
    
    for ch in chars:
        probs, state = predict_probs(model, state, ch, vocab)
        next_index = sample_from_probs(probs, top_n)

    for ii in range(size):
        probs, state = predict_probs(model, state, chars, vocab)
        next_index = sample_from_probs(probs, top_n)
        chars.append(vocab.int2char[next_index.data[0]])

    return ''.join(chars)

In [57]:
text_predicted = generate_from_text(model, 30, vocab, 3, 'Antonio wants a coffee ')
print(text_predicted)
print(len(text_predicted))

antonio wants a coffee has the
30


In [58]:
def generate_from_char(model, out_len, vocab, top_n=1, start='hey'):
    model.eval() # eval mode
    start = start.lower()
    chars = [ch for ch in start]
    size = out_len - len(chars)
    state = model.init_state(device, 1)
    for ch in chars:
        probs, state = predict_probs(model, state, ch, vocab)
        next_index = sample_from_probs(probs, top_n)
        
    chars.append(vocab.int2char[next_index.data[0]])   
    
    for ii in range(size-1):
        probs, state = predict_probs(model, state, chars[-1], vocab)
        next_index = sample_from_probs(probs, top_n)
        chars.append(vocab.int2char[next_index.data[0]])

    return ''.join(chars)

In [59]:
text_predicted = generate_from_char(model, 30, vocab, 3, 'Antonio wants a coffee prepared by ')
print(text_predicted)
print(len(text_predicted))

antonio wants a coffee prepared by t
36


In [60]:
def predict_char(model, character, vocab):
    # One-hot encoding our input to fit into the model
    character = np.array([[vocab.char2int[c] for c in character]])
    #character = one_hot_encode(character, len(vocab.char2int), character.shape[1], 1)
    character = one_hot_encode(character, model.vocab_size)
    character = torch.from_numpy(character)
    # Generate set the device
    character = character.to(device)
    
    model.eval() # eval mode
    # Generate the initial hidden state
    state = model.init_state(device, 1)

    out, hidden = model(character, state)

    prob = nn.functional.softmax(out[-1], dim=0).data
    # Taking the class with the highest probability score from the output
    m = torch.max(prob, dim=0)
    char_ind = torch.max(prob, dim=0)[1].item()

    return vocab.int2char[char_ind], hidden

In [63]:
t,_ = predict_char(model, 'we want a coffee servide by ', vocab)
print('Initial string: ', t)

Initial string:  t
