## Recurrent Neural Network with LSTM layers
### Remixed into PyTorch from

https://machinelearningmastery.com/develop-word-based-neural-language-models-python-keras/

In [1]:
import numpy as np
import torch
from torch import nn
import torch.nn.functional as F

In [2]:
def onehot(values, num_classes):
    output = torch.zeros((len(values), num_classes))
    output[np.arange(len(values)), values] = 1.
    return output

In [3]:
def split_words(paragraph):
    return paragraph.replace('\n', ' ').lower().split()

In [4]:
def generate_encoding(tokens):
    counts = {}
    for token in tokens:
        if token not in counts:
            counts[token] = 0
            
        counts[token] += 1
    
    sorted_tokens = sorted(counts.keys(), key=counts.get, reverse=True)

    encoding = dict(zip(sorted_tokens, range(1, 1+len(sorted_tokens))))

    return encoding

def tokenize(tokens, encoding):
    output = []
    for token in tokens:
        output.append(encoding[token])
    return output

In [5]:
def pre_pad(sequences, max_length, pad_value=0):
    padded = torch.full((len(sequences), max_length), pad_value, dtype=torch.long)
    for i, sequence in enumerate(sequences):
        padded[i, -len(sequence):] = torch.Tensor(sequence)

    return padded

In [6]:
def train_model(model, loss, optimizer, X, y, n_epoch=500):
    for epoch in range(n_epoch):

        total_loss = 0
        for i, inputs in enumerate(X):
            labels = y[i]

            optimizer.zero_grad()

            outputs = model(inputs)
            outputs = outputs.reshape((1, -1))

            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        if epoch % (n_epoch // 10) == (n_epoch // 10) - 1:
            print(f'{epoch+1:4.0f} | loss: {total_loss / 2000:.3f}')
            total_loss = 0.0
        
    return model

### Model 1: One word in, one word out

In [7]:
def word_by_word_seq(model, encoding, seed_text, n_words):
    curr_text, result = seed_text, seed_text

    for _ in range(n_words):
        # Text -> Int
        encoded = tokenize(split_words(curr_text), encoding)
        encoded = torch.Tensor(encoded).reshape((1, -1)).to(torch.long)

        # predict word
        word_vec = model.forward(encoded).argmax()

        # predicted idx -> word
        for word, idx in encoding.items():
            if idx == word_vec:
                curr_text = word
                break

        #
        result += f" {curr_text}"

    return result

In [8]:
data = """Jack and Jill went up the hill\n
		To fetch a pail of water\n
		Jack fell down and broke his crown\n
		And Jill came tumbling after\n """

# Text -> Integer
encoding = generate_encoding(split_words(data))
encoded = tokenize(split_words(data), encoding)

vocab_size = len(encoding) + 1
print(f"Vocabulary Size: {vocab_size}")

# Word : Word sequences
sequences = [encoded[i-1:i+1] for i in range(1, len(encoded))]
sequences = torch.Tensor(sequences).to(torch.long).reshape((-1, 1, 2))
print(f"Total Sequences: {sequences.shape[0]}")

X, y = sequences[:, :, :-1], sequences[:, :, -1]

Vocabulary Size: 22
Total Sequences: 24


In [9]:
# Model
class RNN(nn.Module):
    def __init__(self):
        super().__init__()

        self.l1 = nn.Embedding(vocab_size, 10)
        self.l2 = nn.LSTM(10, 50)
        self.l3 = nn.Linear(50, vocab_size)

    def forward(self, x):
        x = self.l1(x)
        x, _ = self.l2(x)
        x = F.tanh(x)
        x = F.sigmoid(self.l3(x))
        return x

model = RNN()
print(repr(model))

RNN(
  (l1): Embedding(22, 10)
  (l2): LSTM(10, 50)
  (l3): Linear(in_features=50, out_features=22, bias=True)
)


In [10]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=.001)

In [11]:
# Train
model = train_model(model, criterion, optimizer, X, y, n_epoch=500)

  50 | loss: 0.029
 100 | loss: 0.027
 150 | loss: 0.027
 200 | loss: 0.026
 250 | loss: 0.026
 300 | loss: 0.026
 350 | loss: 0.026
 400 | loss: 0.026
 450 | loss: 0.026
 500 | loss: 0.026


In [12]:
print(word_by_word_seq(model, encoding, 'Jack', 6))

Jack and jill went up the hill


### Model 2: One line in, one word out

In [13]:
def seq_to_word_seq(model, encoding, max_length, seed_text, n_words):
    in_text = seed_text

    for _ in range(n_words):
        encoded = tokenize(split_words(in_text), encoding)
        encoded = pre_pad([encoded], max_length)

        yhat = model.forward(encoded).argmax()

        out_word = ''
        for word, index in encoding.items():
            if index == yhat:
                out_word = word
                break

        in_text += f" {out_word}"

    return in_text

In [14]:
data = """Jack and Jill went up the hill\n
		To fetch a pail of water\n
		Jack fell down and broke his crown\n
		And Jill came tumbling after\n """

# Text -> Integer
encoding = generate_encoding(split_words(data))
encoded = tokenize(split_words(data), encoding)

vocab_size = len(encoding) + 1
print(f"Vocabulary Size: {vocab_size}")

# Line based sequences
sequences = list()
for line in data.split('\n'):
    encoded = tokenize(split_words(line), encoding)
    for i in range(1, len(encoded)):
        sequence = encoded[:i+1]
        sequences.append(sequence)
print(f"Total Sequences: {len(sequences)}")

# Pad
max_length = max([len(seq) for seq in sequences])
sequences = pre_pad(sequences, max_length).reshape((-1, 1, max_length)).to(torch.long)
print(f"Max Sequence Length: {max_length}")

X, y = sequences[:, :, :-1], sequences[:, :, -1]

Vocabulary Size: 22
Total Sequences: 21
Max Sequence Length: 7


In [15]:
# Model
class RNN(nn.Module):
    def __init__(self, input_length):
        super().__init__()

        self.input_length = input_length

        self.l1 = nn.Embedding(vocab_size, 10)
        self.l2 = nn.LSTM(10 * self.input_length, 50)
        self.l3 = nn.Linear(50, vocab_size)

    def forward(self, x):
        x = self.l1(x)
        x, _ = self.l2(x.reshape((-1, 1, 10 * self.input_length)))
        x = F.tanh(x)
        x = F.sigmoid(self.l3(x))
        return x

model = RNN(max_length-1)
print(repr(model))

RNN(
  (l1): Embedding(22, 10)
  (l2): LSTM(60, 50)
  (l3): Linear(in_features=50, out_features=22, bias=True)
)


In [16]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=.0001)

In [17]:
# Train
model = train_model(model, criterion, optimizer, X, y, n_epoch=500)

  50 | loss: 0.031
 100 | loss: 0.028
 150 | loss: 0.027
 200 | loss: 0.027
 250 | loss: 0.026
 300 | loss: 0.026
 350 | loss: 0.025
 400 | loss: 0.025
 450 | loss: 0.025
 500 | loss: 0.024


In [18]:
print(seq_to_word_seq(model, encoding, max_length-1, 'Jack', 4))
print(seq_to_word_seq(model, encoding, max_length-1, 'Jill', 4))

Jack and jill went up
Jill fell jill went up


### Model 3: Two words in, one word out

In [19]:
def two_to_one_seq(model, encoding, max_length, seed_text, n_words):
    in_text = seed_text

    for _ in range(n_words):
        encoded = tokenize(split_words(in_text), encoding)[-max_length:]
        encoded = pre_pad([encoded], max_length)

        yhat = model.forward(encoded).argmax()

        out_word = ''
        for word, index in encoding.items():
            if index == yhat:
                out_word = word
                break

        in_text += f" {out_word}"

    return in_text

In [20]:
data = """Jack and Jill went up the hill\n
		To fetch a pail of water\n
		Jack fell down and broke his crown\n
		And Jill came tumbling after\n """

# Text -> Integer
encoding = generate_encoding(split_words(data))
encoded = tokenize(split_words(data), encoding)

vocab_size = len(encoding) + 1
print(f"Vocabulary Size: {vocab_size}")

# Line based sequences
sequences = [encoded[i-2:i+1] for i in range(2, len(encoded))]
print(f"Total Sequences: {len(sequences)}")

# Pad
max_length = max([len(seq) for seq in sequences])
sequences = pre_pad(sequences, max_length).reshape((-1, 1, max_length)).to(torch.long)
print(f"Max Sequence Length: {max_length}")

X, y = sequences[:, :, :-1], sequences[:, :, -1]

Vocabulary Size: 22
Total Sequences: 23
Max Sequence Length: 3


In [21]:
# Model
class RNN(nn.Module):
    def __init__(self, input_length):
        super().__init__()

        self.input_length = input_length

        self.l1 = nn.Embedding(vocab_size, 10)
        self.l2 = nn.LSTM(10 * self.input_length, 50)
        self.l3 = nn.Linear(50, vocab_size)

    def forward(self, x):
        x = self.l1(x)
        x, _ = self.l2(x.reshape((-1, 1, 10 * self.input_length)))
        x = F.tanh(x)
        x = F.sigmoid(self.l3(x))
        return x

model = RNN(max_length-1)
print(repr(model))

RNN(
  (l1): Embedding(22, 10)
  (l2): LSTM(20, 50)
  (l3): Linear(in_features=50, out_features=22, bias=True)
)


In [22]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=.0001)

In [23]:
# Train
model = train_model(model, criterion, optimizer, X, y, n_epoch=500)

  50 | loss: 0.035
 100 | loss: 0.034
 150 | loss: 0.032
 200 | loss: 0.031
 250 | loss: 0.030
 300 | loss: 0.029
 350 | loss: 0.029
 400 | loss: 0.028
 450 | loss: 0.027
 500 | loss: 0.027


In [24]:
print(two_to_one_seq(model, encoding, max_length-1, 'Jack and', 5))
print(two_to_one_seq(model, encoding, max_length-1, 'And Jill', 3))
print(two_to_one_seq(model, encoding, max_length-1, 'fell down', 5))
print(two_to_one_seq(model, encoding, max_length-1, 'pail of', 5))

Jack and jill his crown and jill
And Jill his crown and
fell down and broke his crown and
pail of water jack fell down and
