In [0]:
import os
from io import open
import torch

class Dictionary(object):
    def __init__(self):
        self.word2idx = {}
        self.idx2word = []

    def add_word(self, word):
        if word not in self.word2idx:
            self.idx2word.append(word)
            self.word2idx[word] = len(self.idx2word) - 1
        return self.word2idx[word]

    def __len__(self):
        return len(self.idx2word)


class Corpus(object):
    def __init__(self, path):
        self.dictionary = Dictionary()
        self.train = self.tokenize(os.path.join(path, 'shakespeare.txt'))

    def tokenize(self, path):
        """Tokenizes a text file."""
        assert os.path.exists(path)
        # Add words to the dictionary
        with open(path, 'r', encoding="utf8") as f:
            for line in f:
                for word in line:
                    self.dictionary.add_word(word)

        # Tokenize file content
        with open(path, 'r', encoding="utf8") as f:
            idss = []
            for line in f:
                ids = []
                for word in line:
                    ids.append(self.dictionary.word2idx[word])
                idss.append(torch.tensor(ids).type(torch.int64))
            ids = torch.cat(idss)

        return ids

In [0]:
path = 'data/'

In [0]:
corpus = Corpus(path)

In [0]:
def batchify(data, bsz):
    # Work out how cleanly we can divide the dataset into bsz parts.
    nbatch = data.size(0) // bsz
    # Trim off any extra elements that wouldn't cleanly fit (remainders).
    data = data.narrow(0, 0, nbatch * bsz)
    # Evenly divide the data across the bsz batches.
    data = data.view(bsz, -1).t().contiguous()
    return data.to(device)

def get_batch(source, i):
    seq_len = min(bptt, len(source) - 1 - i)
    data = source[i:i+seq_len]
    target = source[i+1:i+1+seq_len]#.view(-1)
    return data, target


In [0]:
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable

class RNN(nn.Module):
    def __init__(self, input_size, emb_size, hidden_size, n_layers, dropout):
        super(RNN, self).__init__()
        self.input_size = input_size
        self.emb_size = emb_size
        self.hidden_size = hidden_size
        self.n_layers = n_layers
        
        self.embedding = nn.Embedding(input_size, emb_size)
        self.gru = nn.GRU(emb_size, hidden_size, n_layers)
        self.fc = nn.Linear(hidden_size, input_size)
        self.drop = nn.Dropout(dropout)
    
    def forward(self, inputs, hidden):

        #inputs = [batch size]

        encoded = self.drop(self.embedding(inputs))

        #encoded = [batch size, emb dim]

        encoded = encoded.unsqueeze(0)

        #encoded = [1, batch size, emb dim]

        output, hidden = self.gru(encoded, hidden)

        #output = [1, batch size, hid dim * num directions]
        #hidden = [n layers * num directions, batch size, hid dim]

        output = self.fc(output)

        #output = [1, batch size, input size]

        output = output.view(-1, self.input_size)

        #output = [1*batch size, input size]

        return output, hidden
        
    def init_hidden(self, batch_size):

        return Variable(torch.zeros(self.n_layers, batch_size, self.hidden_size))

In [0]:
n_characters  = len(corpus.dictionary)
hidden_size = 256
emb_size = 128
n_layers = 2
lr = 0.005
dropout = 0.3

model = RNN(n_characters, emb_size, hidden_size, n_layers, dropout)

In [8]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f'The model has {count_parameters(model):,} trainable parameters')

The model has 726,620 trainable parameters


In [9]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# device = 'cpu'
print(device)

cuda


In [0]:
import torch.optim as optim


criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)


In [0]:
model=model.to(device)
criterion=criterion.to(device)

In [0]:
batch_size = 50
bptt = 200

train_loader = batchify(corpus.train, batch_size)


In [0]:
def generate(model, input_str='A', predict_len=100, temperature=0.8):
    input_list = [corpus.dictionary.word2idx[l] for l in input_str]
    prime_input = torch.tensor(input_list, dtype=torch.long)
    prime_input = Variable(prime_input.unsqueeze(1))
    hidden = model.init_hidden(1)

    predicted = input_str

    hidden = hidden.to(device)
    prime_input = prime_input.to(device)
    # Use priming string to "build up" hidden state
    for p in range(len(input_str)):
        _, hidden = model(prime_input[p], hidden)
        
    inp = prime_input[-1]
    inp = inp.to(device)
    for p in range(predict_len):
        output, hidden = model(inp, hidden)
        
        # Sample from the network as a multinomial distribution
        output_dist = output.data.view(-1).div(temperature).exp()
        top_i = torch.multinomial(output_dist, 1)[0]

        # Add predicted character to string and use as next input
        predicted_char = corpus.dictionary.idx2word[top_i]
        predicted += predicted_char
        
        inp = Variable(torch.tensor(corpus.dictionary.word2idx[predicted_char], dtype=torch.long).unsqueeze(0))
        inp = inp.to(device)
    return predicted

In [0]:
def train(model, iterator, criterion):
    clip = 0.25

    model.train()
    
    hidden = model.init_hidden(batch_size)
    hidden = hidden.to(device)
    
    # all_loss = []

    for batch, i in enumerate(range(0, iterator.size(0) - 1, bptt)):
        loss = 0
        data, targets = get_batch(iterator, i)

        seq_len_batched = data.shape[0]

        model.zero_grad()

        hidden = hidden.detach()
        

        for c in range(seq_len_batched):
            output, hidden = model(data[c], hidden)
            loss += criterion(output, targets[c])
      
        loss.backward()

        optimizer.step()
        
        total_loss = loss.item()/seq_len_batched
        
        if batch % log_interval == 0 and batch > 0:
            cur_loss = total_loss #/ log_interval
            print('| epoch {:3d} | {:5d}/{:5d} batches | lr {:02.2f} | '
                    'loss {:5.2f} | ppl {:8.2f}'.format(
                epoch, batch, len(iterator) // bptt, lr,
                cur_loss, math.exp(cur_loss)))
            total_loss = 0
            print(generate(model, 'Wh', 100), '\n')

        # if batch % plot_every == 0:
        #     all_loss.append(total_loss)
        # return all_loss


In [0]:
import time

def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

In [16]:
N_EPOCHS = 4

best_valid_loss = float('inf')
counter = 0
patience = 2
log_interval = 100
plot_every = 10
all_loses = []
for epoch in range(N_EPOCHS):

    start_time = time.time()
    
    train(model, train_loader, criterion)
    
    end_time = time.time()

    epoch_mins, epoch_secs = epoch_time(start_time, end_time)

    print(f'Epoch: {epoch+1:02} | Epoch Time: {epoch_mins}m {epoch_secs}s')


| epoch   0 |   100/  546 batches | lr 0.01 | loss  1.73 | ppl     5.64
Whow;
    What reveres fay twat this bid the crut and that sman bited in yet dots
     Geart it me mor 

| epoch   0 |   200/  546 batches | lr 0.01 | loss  1.50 | ppl     4.47
Whip!
  KING RAMINE. Speel thee, the will for they surm'd forside of great
    Shall same of a power o 

| epoch   0 |   300/  546 batches | lr 0.01 | loss  1.44 | ppl     4.22
Who;
    And he day all thou duty your blatter of you
    a blusadd dills, world for which a Graces;
  

| epoch   0 |   400/  546 batches | lr 0.01 | loss  1.39 | ppl     4.01
Whese.
  TOLUSTIAN. Ay, a must; sir.                                                                   

| epoch   0 |   500/  546 batches | lr 0.01 | loss  1.30 | ppl     3.68
Wha.
  BOYTTRIBUMERIA. Why, I do have so out others.
  GUIECER. Well, go no may to be well to thee.
   

Epoch: 01 | Epoch Time: 3m 7s
| epoch   1 |   100/  546 batches | lr 0.01 | loss  1.39 | ppl     4.02
Whalt?
  ROSA

In [0]:
def generate(model, input_str='A', predict_len=100, temperature=0.8):
    input_list = [corpus.dictionary.word2idx[l] for l in input_str]
    prime_input = torch.tensor(input_list, dtype=torch.long)
    prime_input = Variable(prime_input.unsqueeze(1))
    hidden = model.init_hidden(1)

    predicted = input_str

    hidden = hidden.to(device)
    prime_input = prime_input.to(device)
    # Use priming string to "build up" hidden state
    for p in range(len(input_str)):
        _, hidden = model(prime_input[p], hidden)
        
    inp = prime_input[-1]
    inp = inp.to(device)
    for p in range(predict_len):
        output, hidden = model(inp, hidden)
        
        # Sample from the network as a multinomial distribution
        output_dist = output.data.view(-1).div(temperature).exp()
        top_i = torch.multinomial(output_dist, 1)[0]

        # Add predicted character to string and use as next input
        predicted_char = corpus.dictionary.idx2word[top_i]
        predicted += predicted_char
        
        inp = Variable(torch.tensor(corpus.dictionary.word2idx[predicted_char], dtype=torch.long).unsqueeze(0))
        inp = inp.to(device)
    return predicted

In [53]:
i='P'
print(generate(model, i, 100), '\n')

PH70UMX.>>>>>>>>>>>>]>>7>`>>7]
  THIRD DICHLETER. [Ashal to Iside] O, there is it on;
    Dost meet.  



In [47]:
print(generate(model, 'Wh', 100), '\n')

Who
  Beneon. The King here is this welcome to bank an impoison.
    Thy too course enought and yet hi 

