In [None]:
# Importing libraries
import numpy as np
import torch
from torch import nn
import torch.nn.functional as F

In [None]:
# Open text file and read in data as `text`
with open('data/Grimm_text.txt', 'r', encoding='utf-8') as f:
    text = f.read()
    text = text.lower()

In [None]:
# Showing the first 100 characters
text[:100]

In [None]:
# encode the text and map each character to an integer and vice versa

# We create two dictionaries:
# 1. int2char, which maps integers to characters
# 2. char2int, which maps characters to integers
chars = tuple(set(text))
int2char = dict(enumerate(chars))
char2int = {ch: ii for ii, ch in int2char.items()}

In [None]:
# Encode the text
encoded = np.array([char2int[ch] for ch in text])

In [None]:
# Showing the first 100 encoded characters
encoded[:100]

In [None]:
# Defining method to encode one hot labels
#as char by char generation, the vocab size is 68, one hot encode is good enough

def one_hot_encode(arr, n_labels):
    
    # Initialize the the encoded array
    one_hot = np.zeros((np.multiply(*arr.shape), n_labels), dtype=np.float32)
    
    # Fill the appropriate elements with ones
    one_hot[np.arange(one_hot.shape[0]), arr.flatten()] = 1.
    
    # Finally reshape it to get back to the original array
    one_hot = one_hot.reshape((*arr.shape, n_labels))
    
    return one_hot

In [None]:
# Defining method to make mini-batches for training, batch is generated by "reshape", instead of rolling window
def get_batches(arr, batch_size, seq_length):
    '''Create a generator that returns batches of size
       batch_size x seq_length from arr.
       
       Arguments
       ---------
       arr: Array you want to make batches from
       batch_size: Batch size, the number of sequences per batch
       seq_length: Number of encoded chars in a sequence       
    '''
    
    batch_size_total = batch_size * seq_length
    # total number of batches we can make
    n_batches = len(arr)//batch_size_total
    
    # Keep only enough characters to make full batches
    arr = arr[:n_batches * batch_size_total]
    # Reshape into batch_size rows
    arr = arr.reshape((batch_size, -1))
    
    # iterate through the array, one sequence at a time (step = sequence length)
    for n in range(0, arr.shape[1], seq_length):
        # The features
        x = arr[:, n:n+seq_length]
        # The targets, shifted by one to right side of x
        y = np.zeros_like(x)
        try:
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, n+seq_length]
        except IndexError:
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, 0]
        yield x, y

In [None]:
# Check if GPU is available
train_on_gpu = torch.cuda.is_available()
if(train_on_gpu):
    print('Training on GPU!')
else: 
    print('No GPU available, training on CPU; consider making n_epochs very small.')

In [None]:
# Declaring the model
class CharRNN(nn.Module):
    
    def __init__(self, tokens, n_hidden=256, n_layers=2,
                               drop_prob=0.5, lr=0.001):
        super().__init__()
        self.drop_prob = drop_prob
        self.n_layers = n_layers
        self.n_hidden = n_hidden
        self.lr = lr
        
        # creating character dictionaries
        self.chars = tokens #vocab lib
        self.int2char = dict(enumerate(self.chars))
        self.char2int = {ch: ii for ii, ch in self.int2char.items()}
        
        #define the LSTM
        #LSTM hyperparameter:
        #input size = one-hot-encoding size = len of vocab lib
        #n_hidden = LSTM output vector size
        #n_layers = number of vertically stacked lstm layers.
        self.lstm = nn.LSTM(len(self.chars), n_hidden, n_layers, 
                            dropout=drop_prob, batch_first=True)
        
        #define a dropout layer
        self.dropout = nn.Dropout(drop_prob)
        
        #define the final, fully-connected output layer
        #fc layer is required, as n_hidden <> input size, the lstm output size will be (sequence length, batch size, n_hidden) 
        #fc output shape is (sequence length, batch size, one-hot-encoding size)
        #so that each of the output can correspondon to "probabilty" of each char in the the vocab lib
        self.fc = nn.Linear(n_hidden, len(self.chars))      
    
    def forward(self, x, hidden):
        ''' Forward pass through the network. 
            These inputs are x, and the hidden/cell state `hidden`. '''
                
        #get the outputs and the new hidden state from the lstm
        #hidden has two tensor, one for cell state and one for hidden state
        r_output, hidden = self.lstm(x, hidden)
        
        #pass through a dropout layer
        out = self.dropout(r_output)
        
        # Stack up all the lstm output for all batches, horizontally, 
        #this is to feed into the format requirement of cross entropy loss function
        out = out.contiguous().view(-1, self.n_hidden)
        
        #put output through the fully-connected layer
        #no last layer activation at training, as we are using cross entropy loss, which is equivalent to have logsoftmax + nllloss
        out = self.fc(out)
        
        # return the final output and the hidden state
        return out, hidden

In [None]:
# Declaring the train method
def train(net, data, epochs, batch_size, seq_length_set, lr=0.001, clip=5, val_frac=0.1, print_every=10):
    ''' Training a network 
    
        Arguments
        ---------
        
        net: CharRNN network
        data: text data to train the network
        epochs: Number of epochs to train
        batch_size: Number of mini-sequences per mini-batch, aka batch size
        seq_length: Number of character steps per mini-batch
        lr: learning rate
        clip: gradient clipping
        val_frac: Fraction of data to hold out for validation
        print_every: Number of steps for printing training and validation loss
    
    '''
    #set the net to training model, this enables drop out
    net.train()
    
    torch.enable_grad()
    
    #use adam optimizer and cross entropy loss, as the task is one label classification
    opt = torch.optim.Adam(net.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss()
    
    # create training and validation data
    # % of data is held out for validation
    val_idx = int(len(data)*(1-val_frac))
    data, val_data = data[:val_idx], data[val_idx:]
    
    if(train_on_gpu):
        net.cuda()
    
    counter = 0
    n_chars = len(net.chars)
    for seq_length in seq_length_set:
        for e in range(epochs):
            # initialize hidden state
            h = init_hidden(net,batch_size)

            for x, y in get_batches(data, batch_size, seq_length):
                counter += 1

                # One-hot encode our data and make them Torch tensors
                x = one_hot_encode(x, n_chars)
                inputs, targets = torch.from_numpy(x), torch.from_numpy(y)

                if(train_on_gpu):
                    inputs, targets = inputs.cuda(), targets.cuda()

                # convert h from torch tensor to normal array, so that backprop will not happen on h            
                h = tuple([each.data for each in h])

                # zero accumulated gradients
                net.zero_grad()

                # get the output from the model
                output, h = net(inputs, h)

                # calculate the loss and perform backprop
                loss = criterion(output, targets.view(batch_size*seq_length).long())
                loss.backward()

                # `clip_grad_norm` helps prevent the exploding gradient problem in RNNs / LSTMs.
                nn.utils.clip_grad_norm_(net.parameters(), clip)
                opt.step()

                # Do one run of validation on validation set for n batches run.
                if counter % print_every == 0:
                    # Get validation loss
                    val_h = init_hidden(net,batch_size)
                    val_losses = []

                    #set network to eval mode

                    net.eval()
                    torch.no_grad()
                    val_seq_length=1
                    for x, y in get_batches(val_data, batch_size, val_seq_length):
                        # One-hot encode our data and make them Torch tensors
                        x = one_hot_encode(x, n_chars)
                        x, y = torch.from_numpy(x), torch.from_numpy(y)

                        #no need to do this, as there is no backprop anyway
                        #val_h = tuple([each.data for each in val_h])

                        inputs, targets = x, y
                        if(train_on_gpu):
                            inputs, targets = inputs.cuda(), targets.cuda()

                        output, val_h = net(inputs, val_h)
                        val_loss = criterion(output, targets.view(batch_size*val_seq_length).long())

                        #no backward, so model parameter not changed while test on validation set

                        val_losses.append(val_loss.item())

                    net.train() # reset to train mode after iterationg through validation data

                    print("Epoch: {}/{}...".format(e+1, epochs),
                          "Step: {}...".format(counter),
                          "Loss: {:.4f}...".format(loss.item()),
                          "Val Loss: {:.4f}".format(np.mean(val_losses)))

In [None]:
def init_hidden(net, batch_size):
    ''' Initializes hidden state '''
    # Create two new tensors with sizes n_layers x batch_size x n_hidden,
    # initialized to zero, for hidden state and cell state of LSTM
    weight = next(net.parameters()).data

    if (train_on_gpu):
        hidden = (weight.new(net.n_layers, batch_size, net.n_hidden).zero_().cuda(),
                  weight.new(net.n_layers, batch_size, net.n_hidden).zero_().cuda())
    else:
        hidden = (weight.new(net.n_layers, batch_size, net.n_hidden).zero_(),
                  weight.new(net.n_layers, batch_size, net.n_hidden).zero_())

    return hidden

In [None]:
# Define and print the net
n_hidden=512
n_layers=2

In [None]:
net = CharRNN(chars, n_hidden, n_layers)

In [None]:
print(net)

In [None]:
# Declaring the hyperparameters
batch_size = 128
set=np.linspace(10,190,10)
np.random.shuffle(set)
seq_length_set = list(map(int,  list(set)))
n_epochs = 2 # start smaller if you are just testing initial behavior

In [None]:
# train the model
train(net, encoded, epochs=n_epochs, batch_size=batch_size, seq_length_set=seq_length_set, lr=0.001, print_every=50)

In [None]:
# Saving the model
model_name = 'model/CharRNN.pt'

checkpoint = {'n_hidden': net.n_hidden,
              'n_layers': net.n_layers,
              'state_dict': net.state_dict(),
              'tokens': net.chars}

with open(model_name, 'wb') as f:
    torch.save(checkpoint, f)

In [None]:
# Defining a method to generate the next character
def predict(net, char, h=None, top_k=None):
        ''' Given a character, predict the next character.
            Returns the predicted character and the hidden state.
        '''
        
        # tensor inputs
        x = np.array([[net.char2int[char]]])
        x = one_hot_encode(x, len(net.chars))
        
        #input is one hot encoding of a single character
        inputs = torch.from_numpy(x)
        
        if(train_on_gpu):
            inputs = inputs.cuda()
        
        # during generation, the hidden state is not reset
        # instead, it will always take the hidden state from previous char output and use it as input for this char
        # h = tuple([each.data for each in h])
        
        # get the output of the model
        #during training, sequence length is > 1, but during prediction, sequence length is 1.
        #so each time it will generate one char, but because the hidden state is not cleared, it can still
        #remember information from all previous characters
        out, h = net(inputs, h)

        # get the character probabilities, here we use softmax
        p = F.softmax(out, dim=1).data
        if(train_on_gpu):
            p = p.cpu() # move to cpu
        
        # get top characters
        if top_k is None:
            top_ch = np.arange(len(net.chars))
        else:
            p, top_ch = p.topk(top_k)
            top_ch = top_ch.numpy().squeeze()
        
        # select the likely next character with some element of randomness
        p = p.numpy().squeeze()
        char = np.random.choice(top_ch, p=p/p.sum())
        
        # return the encoded value of the predicted char and the hidden state
        return net.int2char[char], h

In [None]:
# Declaring a method to generate new text
def sample(net, size, prime='The', top_k=None):
        
    if(train_on_gpu):
        net.cuda()
    else:
        net.cpu()
    
    net.eval() # eval mode
    
    # First off, run through the prime characters
    chars = [ch for ch in prime]
    h = init_hidden(net,1)
    
    #run but not use the predicted output, only keep the hidden states
    for ch in prime:
        char, h, = predict(net, ch, h, top_k=top_k)
    
    chars.append(char)
    
    # Now pass in the previous character and get a new one
    for ii in range(size):
        char, h, = predict(net, chars[-1], h, top_k=top_k)
        chars.append(char)

    return ''.join(chars)

In [None]:
# Generating new text
print(sample(net, 2000, prime='<hero in the wood>', top_k=5))