### Recurent Neural Network for text generation (character level)

Adapted from: https://github.com/spro/practical-pytorch/blob/master/char-rnn-generation/char-rnn-generation.ipynb

In [247]:
import time
import matplotlib.pyplot as plt
import numpy as np
import string
import random
import re
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
import torch.utils.data

In [5]:
random_seed = 77777

#### Import the corpus - Compilation of Shakespeare texts

In [6]:
# All characters available
all_characters = string.printable
vocab = set(all_characters)
n_characters = len(vocab)
chunk_len = 200

# give every character an index
word_to_ix = {word: i for i, word in enumerate(vocab)}
ix_to_word = {i: word for i, word in enumerate(vocab)}

file = open('./shakespeare.txt').read()
file_len = len(file)
print('The file length is: ', file_len)
print()
print("All existing characters in the corpus: \n"+str(all_characters))
print("Number of unique characters: "+str(n_characters))

The file length is:  1115393

All existing characters in the corpus: 
0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!"#$%&'()*+,-./:;<=>?@[\]^_`{|}~ 	

Number of unique characters: 100


#### Helper function to create chunks of text

In [16]:
def random_chunk(chunk_len):
    start_index = random.randint(0, file_len - chunk_len)
    end_index = start_index + chunk_len + 1
    return file[start_index:end_index]

print(random_chunk(20))

constant.

KING EDWAR


#### Targets

The input will be all characters up to the last, and the target will be all characters from the first. <br>
Example: <br>
Text:"We shall go!" --> Input:"We shall go" --> Target: "e shall go!"

In [87]:
def textToWin(text, seq_len, step_size):
    inputs = []
    outputs = []
    for i in range(0, len(text) - seq_len, step_size):
        window = text[i:seq_len+i]
        inputs.append(window)
    outputs = [i for i in text[seq_len::step_size]]
    return inputs, outputs

_s= file[:6]
print(a)
inptest, outtest = textToWin(_s, 1, 1)
print("x sample size = ", len(inptest), inptest)
print("y sample size = ", len(outtest), outtest)

First 
x sample size =  5 ['F', 'i', 'r', 's', 't']
y sample size =  5 ['i', 'r', 's', 't', ' ']


In [96]:
def textToTensor(text, seq_len, step_size):
    inputs, outputs = textToWin(text, seq_len, step_size)
    X = torch.zeros(len(inputs), seq_len).long()
    y = torch.zeros(len(inputs)).long()
    for i, seq in enumerate(inputs):
        for t, char in enumerate(seq):
            X[i, t] = word_to_ix[seq[t]]
        y[i] = word_to_ix[outputs[i]]
    # outputs X, y - (sample_size, seq_len), (sample_size) with value 0 < c < n_letters
    return X, y

test_text = file[:6]
testX, testy = textToTensor(test_text, 1, 1)
print(testX, testy)

tensor([[ 46],
        [ 67],
        [ 81],
        [ 82],
        [ 11]]) tensor([ 67,  81,  82,  11,  87])


#### Create one-hot encoder function

In [89]:
def one_hot_encode(arr, n_labels):
    
    one_hot = np.zeros((np.multiply(*arr.shape), n_labels), dtype=np.float32)
    one_hot[np.arange(one_hot.shape[0]), arr.flatten()] = 1.
    one_hot = one_hot.reshape((*arr.shape, n_labels))
    
    return one_hot

In [183]:
def get_one_hot_train_set(vectors):
    _aux = torch.FloatTensor()
    for i in range(vectors.size()[0]):
        for j in vectors[i]:
            _aux = torch.cat((_aux, torch.FloatTensor(one_hot_encode(np.array([[j]]),100))), 1)
        #x_one_hot = _aux.view(vectors.size()[0],-1)
    return _aux

#### Example of text being one-hot-vectorized

In [186]:
test_text = file[:5]
print("Example string: "+str(test_text))
testX, testy = textToTensor(test_text, 1, 1)
print(testX)
print("One hot representation: "+str(get_one_hot_train_set(testX).size())+" \n "+str(get_one_hot_train_set(testX)))

Example string: First
tensor([[ 46],
        [ 67],
        [ 81],
        [ 82]])
One hot representation: torch.Size([1, 4, 100]) 
 tensor([[[ 0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,
           0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,
           0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,
           0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,
           0.,  0.,  1.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,
           0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,
           0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,
           0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,
           0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,
           0.],
         [ 0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,
           0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,
           0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,
           0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,
      

#### Hyperparameters

In [272]:
n_epochs = 200
num_hidden = 100
num_hidden_layers = 1
learning_rate = 0.005

sequence_length = 5
step_size=1
batch_size = 128

#### Create iterator with minibatches

In [None]:
# Get text into indices
idx_X, y = textToTensor(file[:20000], sequence_length, step_size)
# one-hot-vectorize the train set
X = get_one_hot_train_set(idx_X)
X = X.view(idx_X.size()[0],-1)

sample_size = len(y)
train_size = int(0.9*sample_size)

X_train, y_train = X[:train_size], y[:train_size]
X_val, y_val = X[train_size:], y[train_size:]

train_dataset = torch.utils.data.TensorDataset(X_train, y_train)
val_dataset = torch.utils.data.TensorDataset(X_val, y_val)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, num_workers=1, drop_last=True)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size, num_workers=1, drop_last=True)

print (len(X)/N)
print(len(train_loader) + len(val_loader))

#### Model

In [None]:
torch.manual_seed(random_seed)

In [269]:
class RNN(nn.Module):
    def __init__(self, input_size, sequence_length, num_hidden, num_output, num_hidden_layers, batch_size):
        super(RNN, self).__init__()
        
        self.input_size = input_size
        self.sequence_length = sequence_length
        self.num_hidden = num_hidden
        self.num_hidden_layers = num_hidden_layers
        self.num_output = num_output
        self.batch_size = batch_size
        
        self.rnn = nn.RNN(input_size, num_hidden, num_hidden_layers, batch_first=True, nonlinearity="relu")        
        self.linear_out = nn.Linear(num_hidden, num_output)
    
    def forward(self, x, hidden):
        x = x.view(self.batch_size, self.sequence_length, self.input_size)
        output, hidden = self.rnn(x, hidden)
        output = self.linear_out(output.view(-1, self.num_output))
        return output, hidden
    
    def init_hidden(self):
        return Variable(torch.zeros(self.num_hidden_layers, self.batch_size, self.num_hidden))

#### Loss function, model, optimizer

In [270]:
model = RNN(n_characters, sequence_length, num_hidden, n_characters, num_hidden_layers, batch_size)
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
loss_func = nn.CrossEntropyLoss()

#### Setup training

In [271]:
plot_train, plot_val = [], []
best_val_loss = 100.0

for epoch in range(n_epochs):
    start_time = time.time()
    train_loss, train_acc = 0.0, 0.0
    val_loss, val_acc = 0.0, 0.0
    hidden = model.init_hidden()

    # train
    for batch_idx, (data, target) in enumerate(train_loader):
        # forward, backward, optimize
        hidden = hidden.detach()
        optimizer.zero_grad()        
        output, hidden = model(data, hidden)
        loss = loss_func(output, target)
        loss.backward()
        optimizer.step()

        train_loss += loss.item()/len(train_loader)


    # evaluate with validation set
    with torch.no_grad():
        for batch_idx, (data, target) in enumerate(val_loader):
            # forward only
            hidden = hidden.detach()
            output, hidden = model(data, hidden)
            loss = loss_func(output, target)

            val_loss += loss.item()/len(val_loader)

    plot_train.append(train_loss)
    plot_val.append(val_loss)

    print('[%d] train loss: %.3f val loss: %.3f time: %.3f' % (epoch + 1, train_loss, val_loss, time.time() - start_time))
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), 'best_loss_weight')
        print('saving least val loss model from epoch [%d]'% (epoch+1))

[1] train loss: 6.779 val loss: 2.615 time: 0.704
saving least val loss model from epoch [1]
[2] train loss: 2.571 val loss: 2.508 time: 0.717
saving least val loss model from epoch [2]
[3] train loss: 2.496 val loss: 2.484 time: 0.787
saving least val loss model from epoch [3]
[4] train loss: 2.470 val loss: 2.476 time: 0.795
saving least val loss model from epoch [4]
[5] train loss: 2.457 val loss: 2.472 time: 0.821
saving least val loss model from epoch [5]
[6] train loss: 2.448 val loss: 2.463 time: 0.824
saving least val loss model from epoch [6]
[7] train loss: 2.440 val loss: 2.466 time: 0.815
[8] train loss: 2.434 val loss: 2.462 time: 0.825
saving least val loss model from epoch [8]
[9] train loss: 2.428 val loss: 2.463 time: 0.808
[10] train loss: 2.423 val loss: 2.460 time: 0.841
saving least val loss model from epoch [10]
[11] train loss: 2.418 val loss: 2.459 time: 0.816
saving least val loss model from epoch [11]
[12] train loss: 2.414 val loss: 2.462 time: 0.816
[13] tra

Process Process-759:
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/hveiga/Desktop/Data_Science/dl-venv/lib/python3.6/site-packages/torch/utils/data/dataloader.py", line 52, in _worker_loop
    r = index_queue.get()


KeyboardInterrupt: 

  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/queues.py", line 335, in get
    res = self._reader.recv_bytes()
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/connection.py", line 407, in _recv_bytes
    buf = self._recv(4)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
KeyboardInterrupt


#### Setup testing

To evaluate the network we will feed one character at a time, use the outputs of the network as a probability distribution for the next character, and repeat. <br>
To start the generation we pass a priming string to start building up the hidden state, from which we then generate one character at a time.

In [70]:
def test(prime_str='A', predict_len=100, temperature=0.8):
    hidden = model.init_hidden()
    prime_input = char_tensor(prime_str)
    predicted = prime_str

    # Use priming string to "build up" hidden state
    for p in range(len(prime_str) - 1):
        _, hidden = model(prime_input[p].view(1, batch_size, -1), hidden)
    inp = prime_input[-1]
    
    for p in range(predict_len):
        output, hidden = model(inp.view(1, batch_size, -1), hidden)
        
        # Sample from the network as a multinomial distribution
        output_dist = output.data.view(-1).div(temperature).exp()
        top_i = torch.multinomial(output_dist, 1)[0]
        
        # Add predicted character to string and use as next input
        predicted_char = all_characters[top_i]
        predicted += predicted_char
        inp = char_tensor(predicted_char)

    return predicted