### Character Level text generator

This notebook is the first notebook in the character level text generation. We are going to create a model that will generate text. 

_"The model will be fed with a word and will predict what the next character in the sentence will be. This process will repeat itself until we generate a sentence of our desired length"._

### Imports


In [22]:
import torch
from torch import nn
from torch.nn import functional as F
from torch.autograd import Variable

import os, time, pickle, string, random
import numpy as np

torch.__version__

'1.9.0+cu102'

### Device

In [5]:
device = torch.device("cuda" if torch.cuda.is_available() else 'cpu')
device

device(type='cuda')

### Data
We are going to use the dataset that I've downloaded [here](https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt) we are then going to load it from google drive and the steps will be as follows:


In [6]:
from google.colab import drive
drive.mount("/content/drive")

Mounted at /content/drive


In [7]:
file_path = "/content/drive/My Drive/NLP Data/text-gen/input.txt"
os.path.exists(file_path)

True

### Loading the dataset.

First, we'll define the sentences that we want our model to output the first few characters. Our dataset is a text file containing Shakespeare's plays or books that we will extract sequence of chars to use as input to our model. Then our model will learn how to complete sentences like "Shakespeare would do".

### SEEDS


In [8]:
SEED = 42

torch.manual_seed(SEED)
torch.backends.cudnn.deterministic = True
np.random.seed(SEED)
random.seed(SEED)

### Loading the data

In [9]:
def load_text_data(filename, init_dialog=False):
  """
  Setting init_dialog = True will remove lines where the character who is going to speak is indicate
  """
  sentences = []
  with open(filename, 'r') as reader:
    for line in reader:
      if init_dialog or ":" not in line:
        sentences.append(line[:-1])

  return sentences
  

In [10]:
sentences = load_text_data(file_path)
print('Number of sentences: ', len(sentences))
print(sentences[:20])

Number of sentences:  29723
['Before we proceed any further, hear me speak.', '', 'Speak, speak.', '', 'You are all resolved rather to die than to famish?', '', 'Resolved. resolved.', '', 'First, you know Caius Marcius is chief enemy to the people.', '', "We know't, we know't.", '', "Let us kill him, and we'll have corn at our own price.", "Is't a verdict?", '', '', 'One word, good citizens.', '', 'We are accounted poor citizens, the patricians good.', 'would yield us but the superfluity, while it were']


### Data Cleaning
We will convert to lowercase the text and remove non alphanumeric chracters (a parameter configuration).

In [11]:
def clean_text(sentences, alpha=False):
  if alpha:
    # Remove non alphabetic character
    cleaned_text = [''.join([t.lower() for t in text if t.isalpha() or t.isspace()])
      for text in sentences]
  else:
    cleaned_text = [t.lower() for t in sentences]
  
  return [t for t in cleaned_text if t!='']
  

In [12]:
# Clean the sentences
sentences = clean_text(sentences)
# Join all the sentences in a one long string
sentences = ' '.join(sentences)
print('Number of characters: ', len(sentences))
print(sentences[:100])

Number of characters:  894876
before we proceed any further, hear me speak. speak, speak. you are all resolved rather to die than 


### Creating the dictionary

Now we'll create a dictionary out of all the characters that we have in the sentences and map them to an integer. This will allow us to convert our input characters to their respective integers (char2int) and viceversa (int2char).

In [13]:
class CharVocab:
  def __init__(self, type_vocab,
               pad_token = "<pad>",
               eos_token = "<eos>",
               unk_token = "<unk>"
               ):
    self.type = type_vocab
    self.int2char = []
    if pad_token !=None:
      self.int2char += [pad_token]
    if eos_token !=None:
      self.int2char += [eos_token]
    if unk_token !=None: 
      self.int2char += [unk_token]

    self.char2int = {}
  
  def __call__(self, text):
    chars = set(''.join(text))
    self.int2char += list(chars)
    self.char2int = {char: ind for ind, char in enumerate(self.int2char)}

In [14]:

vocab = CharVocab('char', None, None,'<unk>')
vocab(sentences)
print('Length of vocabulary: ', len(vocab.int2char))
print('Int to Char: ', vocab.int2char)
print('Char to Int: ', vocab.char2int)

Length of vocabulary:  38
Int to Char:  ['<unk>', '.', 'q', 'p', 'n', 't', ',', 's', 'o', 'e', 'h', 'j', 'g', '!', 'u', '-', '3', 'a', 'b', 'r', 'i', '?', '&', 'c', 'k', 'x', 'm', 'y', 'z', "'", ' ', 'v', 'l', ';', '$', 'd', 'f', 'w']
Char to Int:  {'<unk>': 0, '.': 1, 'q': 2, 'p': 3, 'n': 4, 't': 5, ',': 6, 's': 7, 'o': 8, 'e': 9, 'h': 10, 'j': 11, 'g': 12, '!': 13, 'u': 14, '-': 15, '3': 16, 'a': 17, 'b': 18, 'r': 19, 'i': 20, '?': 21, '&': 22, 'c': 23, 'k': 24, 'x': 25, 'm': 26, 'y': 27, 'z': 28, "'": 29, ' ': 30, 'v': 31, 'l': 32, ';': 33, '$': 34, 'd': 35, 'f': 36, 'w': 37}


We are then going to save the dictionary.

In [15]:
with open( "/content/drive/My Drive/NLP Data/text-gen/char_dict.plk", "wb") as f:
  pickle.dump(vocab.char2int, f)

with open( "/content/drive/My Drive/NLP Data/text-gen/int_dict.plk", "wb") as f:
  pickle.dump(vocab.int2char, f)

print("Done")


Done


### Creating the input data and labels for training.

As we're going to predict the next character in the sequence at each time step, we'll have to divide each sentence into

* **Input data:** The last input character should be excluded as it does not need to be fed into the model
** *Target/Ground Truth Label:** One time-step ahead of the Input data as this will be the "correct answer" for the model at each time step corresponding to the input data



In [16]:
def one_hot_encode(indices, dict_size):
  features = np.eye(dict_size, dtype="float32")[indices.flatten()]
  features = features.reshape((*indices.shape, dict_size))
  return features

def encode_text(input_text, vocab, one_hot=False):
  ''' Encode the input_text replacing the char by its integer number based on the dictionary vocab'''
  output = [vocab.char2int.get(character,0) for character in input_text]
  if one_hot:
    dict_size = len(vocab.char2int)
    return one_hot_encode(output, dict_size)
  else:
    return np.array(output)


Now, we can encode our text, replacing every character by the integer value in the dictionary. When we have our dataset unified and prepared, we should do a quick check and see an example of the data our model will be trained on. This is generally a good idea as it allows you to see how each of the further processing steps affects the reviews and it also ensures that the data has been loaded correctly.

In [18]:
train_data = encode_text(sentences, vocab, one_hot=False)

# Create the input sequence, from 0 to len-1
input_seq =train_data[:-1]

# Create the target sequence, from 1 to len. It is right-shifted one place

target_seq=train_data[1:]
print('\nOriginal text:')
print(sentences[:100])
print('\nEncoded text:')
print(train_data[:100])
print('\nInput sequence:')
print(input_seq[:100])
print('\nTarget sequence:')
print(target_seq[:100])



Original text:
before we proceed any further, hear me speak. speak, speak. you are all resolved rather to die than 

Encoded text:
[18  9 36  8 19  9 30 37  9 30  3 19  8 23  9  9 35 30 17  4 27 30 36 14
 19  5 10  9 19  6 30 10  9 17 19 30 26  9 30  7  3  9 17 24  1 30  7  3
  9 17 24  6 30  7  3  9 17 24  1 30 27  8 14 30 17 19  9 30 17 32 32 30
 19  9  7  8 32 31  9 35 30 19 17  5 10  9 19 30  5  8 30 35 20  9 30  5
 10 17  4 30]

Input sequence:
[18  9 36  8 19  9 30 37  9 30  3 19  8 23  9  9 35 30 17  4 27 30 36 14
 19  5 10  9 19  6 30 10  9 17 19 30 26  9 30  7  3  9 17 24  1 30  7  3
  9 17 24  6 30  7  3  9 17 24  1 30 27  8 14 30 17 19  9 30 17 32 32 30
 19  9  7  8 32 31  9 35 30 19 17  5 10  9 19 30  5  8 30 35 20  9 30  5
 10 17  4 30]

Target sequence:
[ 9 36  8 19  9 30 37  9 30  3 19  8 23  9  9 35 30 17  4 27 30 36 14 19
  5 10  9 19  6 30 10  9 17 19 30 26  9 30  7  3  9 17 24  1 30  7  3  9
 17 24  6 30  7  3  9 17 24  1 30 27  8 14 30 17 19  9 30 17 32 32 30 19
  

Now we can save our encoded dataset to a file, so we can restore it whenever it is necessary. It is important to note the format of the data that we are saving as we will need to know it when we write the training code. In our case, we will save the dataset as a pickle object, it is the array containing the whole dataset encoded as an integer value for every character.

In [19]:
with open( "/content/drive/My Drive/NLP Data/text-gen/input_data.plk", "wb") as f:
  pickle.dump(train_data, f)

print("Done")

Done


Lets check our one-hot-encode function that we will use later during the training phase:

In [20]:
print('Encoded characters: ',train_data[100:102])
print('One-hot-encoded characters: ',one_hot_encode(train_data[100:102], 28))


Encoded characters:  [5 8]
One-hot-encoded characters:  [[0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
  0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
  0. 0. 0. 0.]]


### Creating a batch data generator

When training on the dataset, we need to extract a batch size examples from the inputs and targets, forward and backward the RNN and then repite the iteration with another batch size examples. A batch generator will help us to extract a batch size examples from our datasets.

In [21]:
def batch_generator_sequence(
    features_seq, label_seq, batch_size, seq_len
  ):
  """Generator function that yields batches of data (input and target)

  Args:
      features_seq: sequence of chracters, feature of our model.
      label_seq: sequence of chracters, the target label of our model
      batch_size (int): number of examples (in this case, sentences) per batch.
      seq_len (int): maximum length of the output tensor.

  Yields:
      x_epoch: sequence of features for the epoch
      y_epoch: sequence of labels for the epoch
  """
  num_batches = len(features_seq) //(batch_size * seq_len)
  if num_batches == 0:
    raise ValueError("No batches created. Use smaller batch size or sequence length.")

  # calculate effective length of text to use
  rounded_len = num_batches * batch_size * seq_len
  # Reshape the features matrix in batch size x num_batches * seq_len
  x = np.reshape(features_seq[: rounded_len], [batch_size, num_batches * seq_len])
  # Reshape the target matrix in batch size x num_batches * seq_len
  y = np.reshape(label_seq[: rounded_len], [batch_size, num_batches * seq_len])

  epoch = 0
  while True:
    # roll so that no need to reset rnn states over epochs
    x_epoch = np.split(np.roll(x, -epoch, axis=0), num_batches, axis=1)
    y_epoch = np.split(np.roll(y, -epoch, axis=0), num_batches, axis=1)
    for batch in range(num_batches):
        yield x_epoch[batch], y_epoch[batch]
    epoch += 1


### Define RNN model

The model is very simple:
* An LSTM layer to encode the input (there is no need for an embedding layer because the data is one-hot-encoded)
* A dropout layer to reduce overfitting
* The decoder, a fully connected layer mapping to a vocabulary size outputs

The output provides the probability of every item in the vocabulary to be the next char.


In [23]:
class RNNModel(nn.Module):
  def __init__(self, vocab_size, embedding_size,
               hidden_dim, n_layers, dropout=.2):
    super(RNNModel, self).__init__()

    self.hidden_dim = hidden_dim
    self.embedding_size = embedding_size
    self.n_layers = n_layers
    self.vocab_size = vocab_size
    self.dropout = dropout
    self.char2int = None
    self.int2char = None

    self.dropout = nn.Dropout(dropout)
    self.rnn = nn.LSTM(embedding_size, hidden_dim, 
                        n_layers, dropout=dropout, batch_first = True)
    self.decoder = nn.Linear(hidden_dim, vocab_size)

  def forward(self, x, state):
    # input shape: [batch_size, seq_len, embedding_size]
    out, state = self.rnn(x, state)
    # out shape: [batch_size, seq_len, rnn_size]
    # hidden shape: [2, num_layers, batch_size, rnn_size]
    out = self.dropout(out)
    out = out.contiguous().view(-1, self.hidden_dim)

    logits = self.decoder(out)
    # output shape: [seq_len * batch_size, vocab_size]
    return logits, state

  def init_state(self, device, batch_size=1):
    """
    initialises rnn states.
    """
    return (torch.zeros(self.n_layers, batch_size, self.hidden_dim).to(device),
                torch.zeros(self.n_layers, batch_size, self.hidden_dim).to(device))
    
  def predict(self, input):
    # input shape: [seq_len, batch_size]
    logits, hidden = self.forward(input)
    # logits shape: [seq_len * batch_size, vocab_size]
    # hidden shape: [2, num_layers, batch_size, rnn_size]
    probs = F.softmax(logits)
    # shape: [seq_len * batch_size, vocab_size]
    probs = probs.view(input.size(0), input.size(1), probs.size(1))
    # output shape: [seq_len, batch_size, vocab_size]
    return probs, hidden


### Training the model
During training:
* In every epoch get the next batch data, move the tensors to the device, call the model (Forward pass), calculate the loss function, get the gradients and update the weights.

In [24]:
from tqdm import tqdm

In [41]:
def train_main(model, optimizer, loss_fn,
               batch_data, num_batches, val_batches, 
               batch_size, seq_len, n_epochs,
               clip_norm, device
  ):
  # Training Run
  for epoch in range(1, n_epochs + 1):
    # Store the loss in every batch iteration
    #epoch_losses = torch.Tensor(num_batches)
    epoch_losses = []
    # Init the hidden state
    hidden = model.init_state(device, batch_size)
    # Train all the batches in every epoch
    print("Epoch {}/{}".format(epoch, n_epochs+1))
    for i in range(num_batches-val_batches):
      # Get the next batch data for input and target
      input_batch, target_batch = next(batch_data)
      # Onr hot encode the input data
      input_batch = one_hot_encode(input_batch, model.vocab_size)
      # Tranform to tensor
      input_data = torch.from_numpy(input_batch)
      target_data = torch.from_numpy(target_batch)
      # Create a new variable for the hidden state, necessary to calculate the gradients
      hidden = tuple(([Variable(var.data) for var in hidden]))
      # Move the input data to the device
      input_data = input_data.to(device)
      #print('Input shape: ', input_data.shape)
      #print('Hidden shape: ', hidden[0].shape, hidden[1].shape)
      # Set the model to train and prepare the gradients
      model.train()
      optimizer.zero_grad() # Clears existing gradients from previous epoch
      # Pass Fordward the RNN
      output, hidden = model(input_data, hidden)
      #print('Output shape: ', output.shape)
      output = output.to(device)
      #print('Output shape: ', output.shape)
      #print('Target shape; ', target_data.shape)
      # Move the target data to the device
      target_data = target_data.to(device)
      #print('Target shape; ', target_data.shape)
      target_data = torch.reshape(target_data, (batch_size*seq_len,))
      #print('Target shape; ', target_data.shape)
      loss = loss_fn(output, target_data.view(batch_size*seq_len))
      #print(loss)
      # Save the loss
      #epoch_losses[i] = loss.item() #data[0]
      epoch_losses.append(loss.item()) #data[0]
  
      loss.backward() # Does backpropagation and calculates gradients
      # clip gradient norm
      nn.utils.clip_grad_norm_(model.parameters(), clip_norm)
      
      optimizer.step() # Updates the weights accordingly

    # Now, when epoch is finished, evaluate the model on validation data
    model.eval()
    val_hidden = model.init_state(device, batch_size)
    val_losses = []
    print("Val Epoch {}/{}".format(epoch, n_epochs+1))
    for i in range(val_batches):
      # Get the next batch data for input and target
      input_batch, target_batch = next(batch_data)
      # Onr hot encode the input data
      input_batch = one_hot_encode(input_batch, model.vocab_size)
      # Tranform to tensor
      input_data = torch.from_numpy(input_batch)
      target_data = torch.from_numpy(target_batch)
      # Create a new variable for the hidden state, necessary to calculate the gradients
      hidden = tuple(([Variable(var.data) for var in val_hidden]))
      # Move the input data to the device
      input_data = input_data.to(device)
      # Pass Fordward the RNN
      output, hidden = model(input_data, hidden)
      #print('Output shape: ', output.shape)
      output = output.to(device)
      #print('Output shape: ', output.shape)
      #print('Target shape; ', target_data.shape)
      # Move the target data to the device
      target_data = target_data.to(device)
      #print('Target shape; ', target_data.shape)
      target_data = torch.reshape(target_data, (batch_size*seq_len,))
      #print('Target shape; ', target_data.shape)
      loss = loss_fn(output, target_data.view(batch_size*seq_len))
      #print(loss)
      # Save the loss
      val_losses.append(loss.item()) #data[0]

    model.train()                  
    print('Epoch: {}/{}.............'.format(epoch, n_epochs), end=' ')
    print("Train Loss: {:.4f}".format(np.mean(epoch_losses)), end=' ')
    print("Val Loss: {:.4f}".format(np.mean(val_losses)))
    print()

    if epoch != n_epochs:
      print("> Next epoch")
  return epoch_losses


After defining the model above, we'll have to instantiate the model with the relevant parameters and define our hyperparamters as well. The hyperparameters we're defining below are:

* n_epochs: Number of Epochs --> This refers to the number of times our model will go through the entire training dataset
* lr: Learning Rate --> This affects the rate at which our model updates the weights in the cells each time backpropogation is done
  * A smaller learning rate means that the model changes the values of the weight with a smaller magnitude
  * A larger learning rate means that the weights are updated to a larger extent for each time step
* batch_size: Number of examples to train on every train step
maxlen: Length of the input sequence of char
* embedding_size: the vocab size because the input feature is one-hot-encoded
* hidden_dim: the number of hidden units in our LSTM module
* n_layers: number of layers of our LSTM module

In [35]:
# Define hyperparameters for training
n_epochs = 5
lr=0.01
batch_size=64
maxlen=64
clip_norm=5
val_fraction = 0.1

# Define hypeparameters of the model
hidden_dim = 64 #64
n_layers = 1
embedding_size=len(vocab.char2int)
dict_size = len(vocab.char2int)
drop_rate = 0.2

Optimizer and loss functions

In [36]:
model = RNNModel(dict_size,embedding_size, hidden_dim, n_layers)
model

  "num_layers={}".format(dropout, num_layers))


RNNModel(
  (dropout): Dropout(p=0.2, inplace=False)
  (rnn): LSTM(38, 64, batch_first=True, dropout=0.2)
  (decoder): Linear(in_features=64, out_features=38, bias=True)
)

Counting model parameters.

In [37]:
def count_trainable_params(model):
  n_t_params =sum(p.numel() for p in model.parameters() if p.requires_grad == True)
  return n_t_params

print(f'The model has {count_trainable_params(model):,} trainable parameters')


The model has 29,094 trainable parameters


In [38]:
model = model.to(device)
print(model)

RNNModel(
  (dropout): Dropout(p=0.2, inplace=False)
  (rnn): LSTM(38, 64, batch_first=True, dropout=0.2)
  (decoder): Linear(in_features=64, out_features=38, bias=True)
)


In [39]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)

In [42]:

# Calculate the number of batches to train
num_batches = len(input_seq) // (batch_size*maxlen)
val_batches = int(num_batches*val_fraction)

# Create the batch data generator
batch_data = batch_generator_sequence(input_seq, target_seq, batch_size, maxlen)
losses = train_main(model, optimizer, criterion, batch_data, num_batches, val_batches, batch_size, 
                    maxlen, n_epochs, clip_norm, device)


Epoch 1/6
Val Epoch 1/6
Epoch: 1/5............. Train Loss: 1.7377 Val Loss: 1.6951

> Next epoch
Epoch 2/6
Val Epoch 2/6
Epoch: 2/5............. Train Loss: 1.7166 Val Loss: 1.6787

> Next epoch
Epoch 3/6
Val Epoch 3/6
Epoch: 3/5............. Train Loss: 1.6984 Val Loss: 1.6667

> Next epoch
Epoch 4/6
Val Epoch 4/6
Epoch: 4/5............. Train Loss: 1.6862 Val Loss: 1.6552

> Next epoch
Epoch 5/6
Val Epoch 5/6
Epoch: 5/5............. Train Loss: 1.6749 Val Loss: 1.6461



### Predicting the input sequences


In [43]:
def sample_from_probs(probs, top_n=10):
    """
    truncated weighted random choice.
    """
    _, indices = torch.sort(probs)
    # set probabilities after top_n to 0
    probs[indices.data[:-top_n]] = 0
    #print(probs.shape)
    sampled_index = torch.multinomial(probs, 1)
    return sampled_index

def predict_probs(model, hidden, character, vocab):
    # One-hot encoding our input to fit into the model
    character = np.array([[vocab.char2int[c] for c in character]])
    #character = one_hot_encode(character, len(vocab.char2int), character.shape[1], 1)
    character = one_hot_encode(character, model.vocab_size)
    character = torch.from_numpy(character)
    character = character.to(device)
    
    out, hidden = model(character, hidden)

    prob = nn.functional.softmax(out[-1], dim=0).data

    return prob, hidden

Let’s test our model now and see what kind of output we will get.

In [45]:
def generate_from_text(model, out_len, vocab, top_n=1, start='hey'):
    model.eval() # eval mode
    start = start.lower()
    # First off, run through the starting characters
    chars = [ch for ch in start]
    size = out_len - len(chars)
    # Generate the initial hidden state
    state = model.init_state(device, 1)
    
    # Warm up the initial state, predicting on the initial string
    for ch in chars:
        #char, state = predict(model, ch, state, top_n=top_k)
        probs, state = predict_probs(model, state, ch, vocab)
        next_index = sample_from_probs(probs, top_n)

    # Now pass in the previous characters and get a new one
    for ii in range(size):
        #char, h = predict_char(model, chars, vocab)
        probs, state = predict_probs(model, state, chars, vocab)
        next_index = sample_from_probs(probs, top_n)
        # append to sequence
        chars.append(vocab.int2char[next_index.data[0]])

    return ''.join(chars)

In [46]:
text_predicted = generate_from_text(model, 30, vocab, 3, 'we want ')
print(text_predicted)
print(len(text_predicted))

we want and the these she thee
30


In [49]:

def generate_from_char(model, out_len, vocab, top_n=1, start='hey'):
    model.eval() # eval mode
    start = start.lower()
    # First off, run through the starting characters
    chars = [ch for ch in start]
    size = out_len - len(chars)
    # Generate the initial hidden state
    state = model.init_state(device, 1)
    # Warm up the initial state, predicting on the initial string
    for ch in chars:
        #char, state = predict(model, ch, state, top_n=top_k)
        probs, state = predict_probs(model, state, ch, vocab)
        next_index = sample_from_probs(probs, top_n)
        
    # Include the last char predicted to the predicted output
    chars.append(vocab.int2char[next_index.data[0]])   
    
    # Now pass in the previous characters and get a new one
    for ii in range(size-1):
        #char, h = predict_char(model, chars, vocab)
        probs, state = predict_probs(model, state, chars[-1], vocab)
        next_index = sample_from_probs(probs, top_n)
        # append to sequence
        chars.append(vocab.int2char[next_index.data[0]])

    return ''.join(chars)

In [50]:
text_predicted = generate_from_char(model, 30, vocab, 3, 'we want ')
print(text_predicted)
print(len(text_predicted))

we want and strike and this so
30


### Conclusion

In the next Notebook we will try to improve the model performance.