In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import shutil

file_path = 'drive/MyDrive/Colab Notebooks/Word_RNN/shakespeare.txt'
destination_path = '../content/sample_data/'
shutil.copyfile(file_path, destination_path + 'shakespeare.txt')

file_path = 'drive/MyDrive/Colab Notebooks/Word_RNN/sherlock.txt'
destination_path = '../content/sample_data/'
shutil.copyfile(file_path, destination_path + 'sherlock.txt')

'../content/sample_data/sherlock.txt'

In [None]:
# !ls sample_data
# print()
# !ls ../
# print()
# !ls ../content

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import string
import random
import matplotlib.pyplot as plt
import timeit

In [None]:
# Load the dataset
with open('sample_data/shakespeare.txt', 'r') as f:
    text = f.read()

# Create character to index and index to character mappings
chars = tuple(set(text))
char2int = {ch:ii for ii,ch in dict(enumerate(tuple(set(string.printable)))).items()}
# int2char = dict(enumerate(chars))
# char2int = {ch: ii for ii, ch in int2char.items()}

# Encode the text
encoded = np.array([char2int[ch] for ch in text])

# Function to one-hot encode the characters
def one_hot_encode(arr, n_labels):
    oh = np.zeros((np.multiply(*arr.shape), n_labels), dtype=np.float32)
    oh[np.arange(oh.shape[0]), arr.flatten()] = 1.
    oh = oh.reshape((*arr.shape, n_labels))
    return oh

# Function to one-hot encode groups of 2 characters, in attempt to increase contextual range
def one_hot_encode_double(arr, n_labels):
    # arr is shape batch_size, seq_length
    if (arr.shape[1]%2 == 1): arr = np.insert(arr, 0, 76, axis=1) # " " is 76, to make the new seq_length even
    new = np.zeros((arr.shape[0], int(arr.shape[1]/2)), dtype=int)
    oh = np.zeros((*(new.shape), n_labels**2), dtype=np.float32) # *(new.shape) here because .shape returns a tuple
    # below only works because arr has been pre-processed to be from min-index to max-index
    for i in range(0, arr.shape[1], 2):
        new[:,int(i/2)] = n_labels*arr[:,i] + arr[:,i+1]
    oh[np.arange(oh.shape[0])[:,None], np.arange(oh.shape[1]), new] = 1
    # oh = oh.reshape((*new.shape, n_labels**2))
    # returns oh of shape batch_size, seq_length, n_labels**2
    # Ex. oh of n**2 = 9, (batch,seq) [1,2],[3,4],[5,0] would be
    # [[0,1,0,0,0,0,0,0,0], [0,0,1,0,0,0,0,0,0]
    #  [0,0,0,1,0,0,0,0,0], [0,0,0,0,1,0,0,0,0]
    #  [0,0,0,0,0,1,0,0,0], [1,0,0,0,0,0,0,0,0]]
    return oh

# Check if GPU is available
train_on_gpu = torch.cuda.is_available()

In [None]:
'''
n_labels = 12
array = np.array(([0,1,2,3],[4,5,6,7],[8,9,10,11]))
new = np.zeros((array.shape[0], int(array.shape[1]/2)), dtype=int)
oh = np.zeros((*(new.shape), n_labels**2), dtype=np.float32) # *(new.shape) here because .shape returns a tuple
for i in range(0, array.shape[1], 2):
    new[:,int(i/2)] = n_labels*array[:,i] + array[:,i+1]
oh[np.arange(oh.shape[0])[:,None], np.arange(oh.shape[1]), new] = 1
#oh = oh.reshape((*(new.shape), n_labels**2))
array, new, oh.shape
print(new.shape, oh.shape)
print(new)
oh
'''

# a = np.array(([1,2],[3,4],[5,6]))
# array2 = np.array((a, 6+a, 12+a, 18+a)).transpose(0,2,1)
# print(array2)
# test1 = np.array(([1,2],[0,0],[0,0],[0,0]))
# print(array2.shape, test1.shape)
# array2[np.arange(array2.shape[0])[:, None], np.arange(array2.shape[1]), test1] = 1
# array2

# n_labels = 12
# array = np.array(([0,1,2,3],[4,5,6,7],[8,9,10,11]))
# oh = np.zeros((np.multiply(*(array.shape)), n_labels), dtype=np.float32)
# oh[np.arange(oh.shape[0]), array.flatten()] = 1.
# oh = oh.reshape((*(array.shape), n_labels))
# oh

('b', 'd', 'a', 'e', 'g', 'c', 'f')

In [None]:
class CharRNN(torch.nn.Module):
    def __init__(self, tokens, n_hidden=256, n_layers=2, drop_prob=0.5): #, lr=0.001):
        super().__init__()
        self.drop_prob = drop_prob
        self.n_layers = n_layers
        self.n_hidden = n_hidden
        #self.lr = lr

        self.chars = tokens #string.printable
        self.allchars = tuple(set(string.printable))
        #self.int2char = dict(enumerate(self.chars))
        self.char2int = {ch: ii for ii, ch in dict(enumerate(self.allchars)).items()}

        self.lstm = torch.nn.LSTM(len(self.allchars), n_hidden, n_layers,
                                  batch_first=True) # dropout=drop_prob,

        # self.dropout = torch.nn.Dropout(drop_prob)

        self.linear = torch.nn.Linear(n_hidden, len(self.allchars))

    def forward(self, x, hidden):
        r_output, hidden = self.lstm(x, hidden)
        out = r_output # self.dropout(r_output)
        out = out.contiguous().view(-1, self.n_hidden)
        out = self.linear(out)
        return out, hidden

    def init_hidden(self, batch_size):
        weight = next(self.parameters()).data
        if train_on_gpu:
            hidden = (weight.new(self.n_layers, batch_size, self.n_hidden).zero_().cuda(),
                      weight.new(self.n_layers, batch_size, self.n_hidden).zero_().cuda())
        else:
            hidden = (weight.new(self.n_layers, batch_size, self.n_hidden).zero_(),
                      weight.new(self.n_layers, batch_size, self.n_hidden).zero_())
        return hidden

In [None]:
all_chars = string.printable

# Get a random sequence of the Shakespeare dataset.
def get_random_seq(arr, seq_len):
    # seq_len     = 128  # The length of an input sequence.
    start_index = random.randint(0, len(arr) - seq_len)
    end_index   = start_index + seq_len + 1
    return arr[start_index:end_index]

# Convert the sequence to index tensor.
def seq_to_index(seq):
    tensor = torch.zeros(len(seq[0]), batch_size)
    # Shape of the tensor:
    #     (sequence length, batch size).
    # Here we use batch size = 1.
    for i in range(batch_size):
        for t, char in enumerate(seq[i]):
            tensor[t][i] = all_chars.index(char)
    return tensor

# Sample a mini-batch including input tensor and target tensor.
def get_input_and_target():
#     seqs    = []*batch_size
#     for i in batch_size:
#         seq[i] = get_random_seq()
#     input  = seq_to_onehot(seq[:][:-1])      # Input is represented in one-hot.
#     target = seq_to_index(seq[:][1:]).long() # Target is represented in index.
    seqs   = [get_random_seq() for _ in range(batch_size)]
    input = seq_to_onehot([seq[:-1] for seq in seqs])
    target = seq_to_index([seq[1:] for seq in seqs]).long()
    return input, target

def get_batches(arr, batch_size, seq_length):
    '''Create a generator that returns batches of size
       batch_size x seq_length from arr.

       Arguments
       ---------
       arr: Array you want to make batches from
       batch_size: Batch size, the number of sequences per batch
       seq_length: Number of encoded chars in a sequence
    '''

    batch_size_total = batch_size * seq_length
    # total number of batches we can make
    n_batches = len(arr)//batch_size_total

    # Keep only enough characters to make full batches
    arr = arr[:n_batches * batch_size_total]
    # Reshape into batch_size rows
    arr = arr.reshape((batch_size, -1))

    # iterate through the array, one sequence at a time
    for n in range(0, arr.shape[1], seq_length):
        # The features
        x = arr[:, n:n+seq_length]
        # The targets, shifted by one
        y = np.zeros_like(x)
        try:
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, n+seq_length]
        except IndexError:
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, 0]
        yield x, y

In [None]:
def train(net, data, epochs=10, batch_size=10, seq_length=50, lr=0.001, clip=5, val_frac=0.1, print_every=10):
    ''' Training a network

        Arguments
        ---------

        net: CharRNN network
        data: text data to train the network
        epochs: Number of epochs to train
        batch_size: Number of mini-sequences per mini-batch, aka batch size
        seq_length: Number of character steps per mini-batch
        lr: learning rate
        clip: gradient clipping
        val_frac: Fraction of data to hold out for validation
        print_every: Number of steps for printing training and validation loss

    '''
    net.train()
    opt = torch.optim.Adam(net.parameters(), lr=lr)
    criterion = torch.nn.CrossEntropyLoss()

    # create training and validation data
    val_idx = int(len(data)*(1-val_frac))
    data, val_data = data[:val_idx], data[val_idx:]

    if(train_on_gpu):
        net.cuda()

    counter = 0
    n_chars = len(net.allchars)
    for e in range(epochs):
        h = net.init_hidden(batch_size)

        for x, y in get_batches(data, batch_size, seq_length):
            counter += 1

            # One-hot encode our data and make them Torch tensors
            x = one_hot_encode(x, n_chars)
            inputs, targets = torch.from_numpy(x), torch.from_numpy(y)

            if(train_on_gpu):
                inputs, targets = inputs.cuda(), targets.cuda()

            # Creating new variables for the hidden state, otherwise
            # we'd backprop through the entire training history
            h = tuple([each.data for each in h])

            # zero accumulated gradients
            net.zero_grad()

            # get the output from the model
            output, h = net(inputs, h)

            # calculate the loss and perform backprop
            loss = criterion(output, targets.view(batch_size*seq_length).long())
            loss.backward()
            # `clip_grad_norm` helps prevent the exploding gradient problem in RNNs / LSTMs.
            nn.utils.clip_grad_norm_(net.parameters(), clip)
            opt.step()

            # loss stats
            if counter % print_every == 0:
                # Get validation loss
                val_h = net.init_hidden(batch_size)
                val_losses = []
                net.eval()
                for x, y in get_batches(val_data, batch_size, seq_length):
                    # One-hot encode our data and make them Torch tensors
                    x = one_hot_encode(x, n_chars)
                    x, y = torch.from_numpy(x), torch.from_numpy(y)

                    # Creating new variables for the hidden state, otherwise
                    # we'd backprop through the entire training history
                    val_h = tuple([each.data for each in val_h])

                    inputs, targets = x, y
                    if(train_on_gpu):
                        inputs, targets = inputs.cuda(), targets.cuda()

                    output, val_h = net(inputs, val_h)
                    val_loss = criterion(output, targets.view(batch_size*seq_length).long())

                    val_losses.append(val_loss.item())

                net.train() # reset to train mode after iterationg through validation data

                print("Epoch: {}/{}...".format(e+1, epochs),
                      "Step: {}...".format(counter),
                      "Loss: {:.4f}...".format(loss.item()),
                      "Val Loss: {:.4f}".format(np.mean(val_losses)))

In [None]:
# def train(net, data, epochs=10, batch_size=10, seq_length=50, lr=0.001, clip=5, val_frac=0.1, print_every=10):
#     ''' Training a network

#         Arguments
#         ---------

#         net: CharRNN network
#         data: text data to train the network
#         epochs: Number of epochs to train
#         batch_size: Number of mini-sequences per mini-batch, aka batch size
#         seq_length: Number of character steps per mini-batch
#         lr: learning rate
#         clip: gradient clipping
#         val_frac: Fraction of data to hold out for validation
#         print_every: Number of steps for printing training and validation loss

#     '''
#     net.train()
#     opt = torch.optim.Adam(net.parameters(), lr=lr)
#     criterion = torch.nn.CrossEntropyLoss()

#     # create training and validation data
#     val_idx = int(len(data)*(1-val_frac))
#     data, val_data = data[:val_idx], data[val_idx:]

#     if(train_on_gpu):
#         net.cuda()

#     counter = 0
#     n_chars = len(net.chars)
#     for e in range(epochs):
#         h = net.init_hidden(batch_size)

#         for x, y in get_batches(data, batch_size, seq_length):
#             counter += 1

#             # One-hot encode our data and make them Torch tensors
#             x = one_hot_encode(x, n_chars)
#             inputs, targets = torch.from_numpy(x), torch.from_numpy(y)

#             if(train_on_gpu):
#                 inputs, targets = inputs.cuda(), targets.cuda()

#             # Creating new variables for the hidden state, otherwise
#             # we'd backprop through the entire training history
#             h = tuple([each.data for each in h])

#             # zero accumulated gradients
#             net.zero_grad()

#             # get the output from the model
#             output, h = net(inputs, h)

#             # calculate the loss and perform backprop
#             loss = criterion(output, targets.view(batch_size*seq_length).long())
#             loss.backward()
#             # `clip_grad_norm` helps prevent the exploding gradient problem in RNNs / LSTMs.
#             nn.utils.clip_grad_norm_(net.parameters(), clip)
#             opt.step()

#             # loss stats
#             if counter % print_every == 0:
#                 # Get validation loss
#                 val_h = net.init_hidden(batch_size)
#                 val_losses = []
#                 net.eval()
#                 for x, y in get_batches(val_data, batch_size, seq_length):
#                     # One-hot encode our data and make them Torch tensors
#                     x = one_hot_encode(x, n_chars)
#                     x, y = torch.from_numpy(x), torch.from_numpy(y)

#                     # Creating new variables for the hidden state, otherwise
#                     # we'd backprop through the entire training history
#                     val_h = tuple([each.data for each in val_h])

#                     inputs, targets = x, y
#                     if(train_on_gpu):
#                         inputs, targets = inputs.cuda(), targets.cuda()

#                     output, val_h = net(inputs, val_h)
#                     val_loss = criterion(output, targets.view(batch_size*seq_length).long())

#                     val_losses.append(val_loss.item())

#                 net.train() # reset to train mode after iterationg through validation data

#                 print("Epoch: {}/{}...".format(e+1, epochs),
#                       "Step: {}...".format(counter),
#                       "Loss: {:.4f}...".format(loss.item()),
#                       "Val Loss: {:.4f}".format(np.mean(val_losses)))

In [None]:
n_hidden=500
n_layers=2

net = CharRNN(chars, n_hidden, n_layers)
print(net)

batch_size = 64
seq_length = 128
n_epochs = 20 # start small if you are just testing initial behavior

# train the model
train(net, encoded, epochs=n_epochs, batch_size=batch_size, seq_length=seq_length, lr=0.005, print_every=10)

CharRNN(
  (lstm): LSTM(65, 512, num_layers=2, batch_first=True, dropout=0.5)
  (dropout): Dropout(p=0.5, inplace=False)
  (fc): Linear(in_features=512, out_features=65, bias=True)
)
Epoch: 1/20... Step: 10... Loss: 3.3938... Val Loss: 3.4161
Epoch: 1/20... Step: 20... Loss: 3.3138... Val Loss: 3.3518
Epoch: 1/20... Step: 30... Loss: 3.3406... Val Loss: 3.3435
Epoch: 1/20... Step: 40... Loss: 3.3547... Val Loss: 3.3398
Epoch: 1/20... Step: 50... Loss: 3.3342... Val Loss: 3.3369
Epoch: 1/20... Step: 60... Loss: 3.3174... Val Loss: 3.3234
Epoch: 1/20... Step: 70... Loss: 3.2727... Val Loss: 3.2890
Epoch: 2/20... Step: 80... Loss: 3.2362... Val Loss: 3.2238
Epoch: 2/20... Step: 90... Loss: 3.1777... Val Loss: 3.1539
Epoch: 2/20... Step: 100... Loss: 3.1179... Val Loss: 3.1103
Epoch: 2/20... Step: 110... Loss: 3.0802... Val Loss: 3.0415
Epoch: 2/20... Step: 120... Loss: 2.9732... Val Loss: 2.9293
Epoch: 2/20... Step: 130... Loss: 2.8755... Val Loss: 2.8048
Epoch: 2/20... Step: 140... Loss:

In [None]:
def predict(net, char, h=None, top_k=None):
        ''' Given a character, predict the next character.
            Returns the predicted character and the hidden state.
        '''

        # tensor inputs
        x = np.array([[net.char2int[char]]])
        x = one_hot_encode(x, len(net.chars))
        inputs = torch.from_numpy(x)

        if(train_on_gpu):
            inputs = inputs.cuda()

        # detach hidden state from history
        h = tuple([each.data for each in h])
        # get the output of the model
        out, h = net(inputs, h)

        # get the character probabilities
        p = F.softmax(out, dim=1).data
        if(train_on_gpu):
            p = p.cpu() # move to cpu

        # get top characters
        if top_k is None:
            top_ch = np.arange(len(net.chars))
        else:
            p, top_ch = p.topk(top_k)
            top_ch = top_ch.numpy().squeeze()

        # select the likely next character with some element of randomness
        p = p.numpy().squeeze()
        char = np.random.choice(top_ch, p=p/p.sum())

        # return the encoded value of the predicted char and the hidden state
        return net.int2char[char], h

In [None]:
def sample(net, size, prime='The', top_k=None):

    if(train_on_gpu):
        net.cuda()
    else:
        net.cpu()

    net.eval() # eval mode

    # First off, run through the prime characters
    chars = [ch for ch in prime]
    h = net.init_hidden(1)
    for ch in prime:
        char, h = predict(net, ch, h, top_k=top_k)

    chars.append(char)

    # Now pass in the previous character and get a new one
    for ii in range(size):
        char, h = predict(net, chars[-1], h, top_k=top_k)
        chars.append(char)

    return ''.join(chars)

print(sample(net, 1000, prime='The', top_k=5))

The hate that saint and be
To make you show them out the courtery.

DUKE OF YORK:
What is the stare to him true such a string
Wouther their sight of many a percess,
Than I were but the worshill tell my body;
Therefore thou stands me what thou his hours.

KING LEWWARD I::
Take you my black with the death weaping are.

KING RICHARD III:
And my brother all a world be sprect.

DUTES:
I would he was not subjecting: and then stone
That the did with a love, ale much and me.

LORD RISS:
The signess shall set his sentence with a live,
And too the stretch hath desires to thee,
Are thou are a might to me, steel the did;
But they here the dost bestery heart,
As the more paty best shall pray and take
Is such to the power of tellings:
I say it we been made as thou seest.

DUKE VINCENTIO:
Hark they shall not to-morrow of this brather,
And wo did to thy for the sean.

SICINIUS:
Thou art shouse to be the woods, and say their
forsent alone, a shill thrive, that stay's a people
A stan to thee to my stand