# Intro to RNNs
This is loosely based on: https://blog.floydhub.com/a-beginners-guide-on-recurrent-neural-networks-with-pytorch/

Imports:

In [None]:
import torch
from torch import nn
import numpy as np
from timeit import default_timer as timer

In [None]:
with open('text', 'r') as fd:
    full_text = fd.read().lower()
full_text = full_text[0:16000]
print(full_text)

In [None]:
vocab = set(full_text)
int2char = dict(enumerate(vocab))
char2int = {char: ind for ind, char in int2char.items()}
print(char2int)

Helper functions:

In [14]:
def split_eq(text, no):
    cnt = int(len(text) / no)
    examples = [text[i:i+cnt] for i in range(0, len(text), cnt)]
    if (no*cnt == len(text)):
        return examples
    else:
        return examples[:-1]

def produce_targets(examples):
    targets = [ex[1:] for ex in examples]
    inputs = [ex[:-1] for ex in examples]
    return inputs, targets

def translate_to_int(examples):
    translated = [list(map(lambda ch: char2int[ch], ex)) for ex in examples]
    return translated

def translate_to_char(examples):
    translated = [''.join(list(map(lambda i: int2char[i], ex))) for ex in examples]
    return translated

def one_hot_encode(examples):
    features = np.zeros((len(examples), max(map(len, examples)), len(char2int)), dtype=np.float32)
    
    for i, example in enumerate(examples):
        for pos in range(len(examples[i]) - 1):
            features[i, pos, examples[i][pos]] = 1
    return features

def to_model_format(inputs):
    if isinstance(inputs, str):
        inputs = [inputs]
    trans_inputs = translate_to_int(inputs)
    encoded = one_hot_encode(trans_inputs)
    encoded_tensor = torch.from_numpy(encoded)
    return encoded_tensor

def training_check(output):
    # TODO: fix
    print("#########", len(output))
    single_char = None
    for char_no, out in enumerate(output):
        mx = max(out)
        for no, val in enumerate(out):
            if val == mx and (not single_char or single_char == no):
                print(val, no, int2char[no], int2char[trans_inputs[0][char_no]])

Data configurations:

In [None]:
no_of_examples = 32
examples = split_eq(full_text, no_of_examples)
chars_per_example = len(examples[0])
inputs, targets = produce_targets(examples)
trans_inputs = translate_to_int(inputs)
trans_targets = translate_to_int(targets)

print("Chars per example:", chars_per_example)

In [None]:
per_batch = 16
no_of_batches = int(no_of_examples / per_batch)


batches = []
targets = []

for i in range(no_of_batches):
    input_seq = one_hot_encode(trans_inputs[i*per_batch:(i+1)*per_batch])
    batches.append(torch.from_numpy(input_seq))
    target_seq = torch.Tensor(trans_targets[i*per_batch:(i+1)*per_batch])
    targets.append(target_seq)

print("All targets:", len(trans_targets), sum(map(len, targets)), len(targets[0]), len(targets))

Model:

In [None]:
class Model(nn.Module):
    def __init__(self, input_size, output_size, hidden_dim, n_layers):
        super(Model, self).__init__()
        self.hidden_dim = hidden_dim
        self.n_layers = n_layers
        self.rnn = nn.RNN(input_size, hidden_dim, n_layers, batch_first=True)   
        self.fc = nn.Linear(hidden_dim, output_size)
    
    def forward(self, x):
        batch_size = x.size(0)
        hidden = self.init_hidden(batch_size)
        out, hidden = self.rnn(x, hidden)
        out = out.contiguous().view(-1, self.hidden_dim)
        out = self.fc(out)
        return out, hidden
    
    def init_hidden(self, batch_size):
        hidden = torch.zeros(self.n_layers, batch_size, self.hidden_dim)
        return hidden

Training and model configuration:

In [None]:
dict_size = len(char2int)
print(dict_size)
model = Model(input_size=dict_size, output_size=dict_size, hidden_dim=12, n_layers=3)
n_epochs = 2000
lr=0.0048

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)

Training:

In [None]:
t1 = timer()
for epoch in range(1, n_epochs):
    for no, batch in enumerate(batches):
        target_batch = targets[no]
        optimizer.zero_grad()
        output, hidden = model(batch)
        # training_check(input_seq, output)
        loss = criterion(output, target_batch.view(-1).long())
        loss.backward()
        optimizer.step()

        if epoch%25 == 0:
            print('Epoch: {}/{}......'.format(epoch, n_epochs), end=' ')
            print("Loss: {:.4f}".format(loss.item()))
            t2 = timer()
            print("Elapsed:", t2 - t1)
t3 = timer()
print("Total secods elapsed:", t3 - t1)

Prediction helpers:

In [None]:
def predict_next(model, input_string):
    encoded_input = to_model_format(input_string)
    out, hidden = model(encoded_input)

    # choosing one with highest probability
    prob = nn.functional.softmax(out[-1], dim=0).data
    char_ind = torch.max(prob, dim=0)[1].item()
    return int2char[char_ind], hidden


def run_model(model, starting_seq, size=50):
    model.eval()
    seq = starting_seq.lower()
    for _ in range(size):
        char, h = predict_next(model, seq)
        seq += char
    return ''.join(seq)

Prediction:

In [None]:
res = run_model(model, 'character ')
print(res)

Some results:

ep:1500 lr:0.004 1000chars: "character auddoc aouis thrts cmgei eheo  bwiogib throliaeddo"

ep:1500 lr:0.004 2000chars: "character serlpiip  oa doee  foerdtoitn aad ehd eou  eosu  o" 

ep:1000 lr:0.0045 4000chars: "character iat  iypheae  nimftercteitrtrotd ia .eta aat  aeto"

ep:1000 lr:0.0045 4000chars Inbatch:8 "character aodlt voot  eo csrtoe .p tpucsl\neopdsiuts tautor t"

ep:2400 lr:0.0045 4000chars Inbatch:8 "character ootteos meatcias eopneansen rast aagteos road iet "

ep:1800 lr:0.0045 8000chars Inbatch:32 "character lpian tpratttttttttttttttttttttttttttttttttttttttt", Loss:1.8095

ep:2800 lr:0.0047 8000chars Inbatch:32 "character  au dhr eat  uumsr eottrreent  au dhr eat  uumsr e" Loss:1.8013

ep:2000 lr:0.0048 8000chars Inbatch:16 Batches:2 "character  uhsist rrietsr yho nurtpr  rhaed  uhsist rrietsr " Loss:1.8369