In [1]:
import numpy as np
import torch
from torch import nn
import torch.nn.functional as F

In [2]:
with open('data/anna.txt', 'r') as f:
    text = f.read()

Tokenization

In [4]:
chars = tuple(set(text))

int2char = (dict(enumerate(chars)))
char2int = {ch: ind for ind, ch in int2char.items()}
encodedText = np.array([char2int.get(char) for char in text])

In [143]:
# arr is the array to encode
# n_labels is the number of elements to use for each encoding.
def one_hot_encode(arr, n_labels):
    one_hot = np.zeros((arr.size, n_labels), dtype=np.float32)

    one_hot[np.arange(one_hot.shape[0]), arr.flatten()]=1

    one_hot = one_hot.reshape((*arr.shape, n_labels))

    return one_hot

In [6]:
def one_hot_decoding(arr):
    flattended_array = np.nonzero(one_hot)[2]
    decoded_array = flattended_array.reshape(arr.shape[0], -1)
    return decoded_array

## Test encoder and decoder

In [8]:
test_seq = np.array( [[ 6,27,72,78,22,11,17,64, 2, 63],
 [21, 47, 32, 64, 22, 27, 72, 22, 64, 72],
 [21, 47, 32, 64, 22, 27, 72, 22, 64, 72]])
one_hot = one_hot_encode(test_seq, 100)

print(one_hot_decoding(one_hot))

[[ 6 27 72 78 22 11 17 64  2 63]
 [21 47 32 64 22 27 72 22 64 72]
 [21 47 32 64 22 27 72 22 64 72]]


In [142]:
def get_batch(data, batch_size, seq_length):
    n_batches = len(data) // (batch_size*seq_length)

    data = data[:(n_batches*batch_size*seq_length)]
    data = data.reshape((batch_size, -1))

    for n in range(0, data.shape[1], seq_length):
        x = data[:,n:n+seq_length]
        y = data[:,n+1:n+seq_length+1]
        if x.shape != y.shape:
            continue
        yield x,y

## Define LSTM network

In [144]:
class CharacterLSTM(nn.Module):
    def __init__(self, tokens, hidden_dim, n_layers, dropout):

        super().__init__()
        self.tokens = tokens
        self.hidden_dim = hidden_dim
        self.output_dim = output_dim
        self.n_layers = n_layers
        self.dropout = dropout

        self.lstm = nn.LSTM(
            input_size = len(tokens),
            hidden_size = self.hidden_dim,
            num_layers=self.n_layers,
            dropout=self.dropout,
            batch_first=True
            )

        self.dropout = nn.Dropout(p=self.dropout)

        self.fc = nn.Linear(
            self.hidden_dim,
            len(tokens)
            )

            
    def forward(self, x, hidden):

        r_out, hidden = self.lstm(x, hidden)
        out = self.dropout(r_out)

        out = out.contiguous().view(-1, self.hidden_dim)
        out = self.fc(out)

        return out, hidden

    def initialize_hidden(self, batch_size):
        weights = next(self.parameters()).data

        return (weights.new(self.n_layers, batch_size, self.hidden_dim).zero_(),
        weights.new(self.n_layers, batch_size, self.hidden_dim).zero_())

## View model

In [76]:
input_size=32
hidden_dim=132
output_dim=32
n_layers=2
dropout=0.1
test_lstm = CharacterLSTM(chars, hidden_dim, n_layers, dropout)
print(test_lstm)

CharacterLSTM(
  (lstm): LSTM(83, 132, num_layers=2, batch_first=True, dropout=0.1)
  (dropout): Dropout(p=0.1, inplace=False)
  (fc): Linear(in_features=132, out_features=83, bias=True)
)


In [36]:
test_lstm.initialize_hidden(32)

(tensor([[[0., 0., 0.,  ..., 0., 0., 0.],
          [0., 0., 0.,  ..., 0., 0., 0.],
          [0., 0., 0.,  ..., 0., 0., 0.],
          ...,
          [0., 0., 0.,  ..., 0., 0., 0.],
          [0., 0., 0.,  ..., 0., 0., 0.],
          [0., 0., 0.,  ..., 0., 0., 0.]],
 
         [[0., 0., 0.,  ..., 0., 0., 0.],
          [0., 0., 0.,  ..., 0., 0., 0.],
          [0., 0., 0.,  ..., 0., 0., 0.],
          ...,
          [0., 0., 0.,  ..., 0., 0., 0.],
          [0., 0., 0.,  ..., 0., 0., 0.],
          [0., 0., 0.,  ..., 0., 0., 0.]]]),
 tensor([[[0., 0., 0.,  ..., 0., 0., 0.],
          [0., 0., 0.,  ..., 0., 0., 0.],
          [0., 0., 0.,  ..., 0., 0., 0.],
          ...,
          [0., 0., 0.,  ..., 0., 0., 0.],
          [0., 0., 0.,  ..., 0., 0., 0.],
          [0., 0., 0.,  ..., 0., 0., 0.]],
 
         [[0., 0., 0.,  ..., 0., 0., 0.],
          [0., 0., 0.,  ..., 0., 0., 0.],
          [0., 0., 0.,  ..., 0., 0., 0.],
          ...,
          [0., 0., 0.,  ..., 0., 0., 0.],
       

In [148]:
def train(data, hidden_dim, batch_size, seq_length, n_chars, n_layers, dropout, lr, epochs, print_every=10, val_frac=0.1):
    model = CharacterLSTM(chars, hidden_dim, n_layers, dropout)
    
    optim = torch.optim.Adam(model.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss()

    val_idx= int(len(data)*(1-val_frac))
    data, val_data = data[:val_idx], data[val_idx:]

    counter=0

    for epoch in range(epochs):
        hidden = model.initialize_hidden(batch_size)
        losses = []
        model.train()

        for x,y in get_batch(data, batch_size, seq_length):
            counter +=1
            x=one_hot_encode(x, n_chars)

            inputs, targets =  torch.from_numpy(x), torch.from_numpy(y)
            hidden = tuple([each.data for each in hidden])

            model.zero_grad()
            pred, hidden = model(inputs, hidden)

            loss = criterion(pred, targets.reshape(batch_size*seq_length).long())
            loss.backward()
            # `clip_grad_norm` helps prevent the exploding gradient problem in RNNs / LSTMs.
            nn.utils.clip_grad_norm_(model.parameters(), 5)
            optim.step()

            if counter % print_every == 0:
                model.eval()
                val_h = model.initialize_hidden(batch_size)
                losses = []
                for eval_x, eval_y in get_batch(val_data, batch_size, seq_length):
                    eval_x = one_hot_encode(eval_x, n_chars)

                    inputs, targets =  torch.from_numpy(eval_x), torch.from_numpy(eval_y)

                    val_h = tuple([each.data for each in val_h])
                    out, val_h = model(inputs, val_h)

                    vloss = criterion(out, targets.reshape(-1).long())

                    losses.append(vloss.item())

                print("Epoch: {}/{}...".format(epoch+1, epochs),
                      "Step: {}...".format(counter),
                      "Loss: {:.4f}...".format(loss.item()),
                      "Val Loss: {:.4f}".format(np.mean(losses)))
                model.train()
    return model

In [154]:
data=encodedText
hidden_dim=256
seq_length=100
batch_size=200
n_chars=83
n_layers=2
dropout=0.2
lr=0.003
epochs=20
print_every=10
val_frac=0.2
trained_model  = train(data, hidden_dim, batch_size, seq_length, n_chars, n_layers, dropout, lr, epochs, print_every, val_frac)

Epoch: 1/20... Step: 10... Loss: 3.1639... Val Loss: 3.1365
Epoch: 1/20... Step: 20... Loss: 3.1186... Val Loss: 3.1277
Epoch: 1/20... Step: 30... Loss: 3.0989... Val Loss: 3.1191
Epoch: 1/20... Step: 40... Loss: 3.1042... Val Loss: 3.1187
Epoch: 1/20... Step: 50... Loss: 3.1064... Val Loss: 3.1154
Epoch: 1/20... Step: 60... Loss: 3.1286... Val Loss: 3.1116
Epoch: 1/20... Step: 70... Loss: 3.0930... Val Loss: 3.1030
Epoch: 1/20... Step: 80... Loss: 3.1025... Val Loss: 3.0821
Epoch: 1/20... Step: 90... Loss: 3.0538... Val Loss: 3.0256
Epoch: 1/20... Step: 100... Loss: 2.9201... Val Loss: 2.9347
Epoch: 1/20... Step: 110... Loss: 2.8194... Val Loss: 2.8072
Epoch: 1/20... Step: 120... Loss: 2.7251... Val Loss: 2.7015
Epoch: 1/20... Step: 130... Loss: 2.6798... Val Loss: 2.6383
Epoch: 1/20... Step: 140... Loss: 2.6009... Val Loss: 2.5927
Epoch: 1/20... Step: 150... Loss: 2.5383... Val Loss: 2.5239
Epoch: 1/20... Step: 160... Loss: 2.4609... Val Loss: 2.5047
Epoch: 1/20... Step: 170... Loss:

In [155]:
# change the name, for saving multiple files
model_name = 'rnn_x_epoch.net'

checkpoint = {'n_hidden': trained_model.hidden_dim,
              'n_layers': trained_model.n_layers,
              'state_dict': trained_model.state_dict(),
              'tokens': trained_model.tokens}

with open(model_name, 'wb') as f:
    torch.save(checkpoint, f)

In [230]:
def predict(net, test_char, hidden, topk=1):
    x= np.array([[char2int[test_char]]])
    x=one_hot_encode(x, 83)

    inputs = torch.from_numpy(x)
    test_h=tuple([each.data for each in hidden])

    test_out, test_h = net(inputs,test_h)

    p = F.softmax(test_out, dim=1).data

    pro, ind = p.topk(topk)
    return int2char[ind.item()], test_h

## Testing trained network

In [245]:
test_char='T'
test_hidden = trained_model.initialize_hidden(1)
iterations = 150
pred=''
for i in range(iterations):
    pred+=test_char
    test_char, test_hidden  = predict(trained_model, test_char, test_hidden)
print(pred)


The conversation to her side of the same the strange of the same thing to the state of the conversation of the same thing to the starting to the conve
