# Text Autocompletion with LSTM and Beam Search

This notebook uses the text from the book, [Frankenstein; Or, The Modern Prometheus by Mary Wollstonecraft Shelley](https://www.gutenberg.org/ebooks/42324) as the training dataset.

In [None]:
import numpy as np
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import LSTM 
import tensorflow as tf
import logging
tf.get_logger().setLevel(logging.ERROR)

In [None]:
EPOCHS = 32
BATCH_SIZE = 256
INPUT_FILE_NAME = 'frankenstein.txt'
WINDOW_LENGTH = 40
WINDOW_STEP = 3
BEAM_SIZE = 8
NUM_LETTERS = 11
MAX_LENGTH = 50

In [None]:
# Open the text file
file = open(INPUT_FILE_NAME, 'r', encoding='utf-8-sig') # Decode with BOM
text = file.read()
file.close()

In [None]:
# Convert all text to lowercase,
# strip newlines and extra spaces.
text = text.lower().strip()
text = text.replace('\n', ' ')
text = text.replace('  ', ' ')

In [None]:
# Encode characters as indices
unique_chars = list(set(text))
char_to_index = dict((ch, index) for index, ch in enumerate(unique_chars))
index_to_char = dict((index, ch) for index, ch in enumerate(unique_chars))
encoding_width = len(char_to_index)

In [None]:
len(unique_chars), len(char_to_index)

In [None]:
# Create the training set
fragments = []
targets = []

# Subtract the WINDOW_LENGTH to allow extraction just to the last
# valid range.
for i in range(0, len(text) - WINDOW_LENGTH, WINDOW_STEP):
  fragments.append(text[i: i + WINDOW_LENGTH])
  targets.append(text[i + WINDOW_LENGTH])

In [None]:
# Examples 
fragments[20001], targets[20001]

In [None]:
# Convert to one-hot encoded training data.
X = np.zeros((len(fragments), WINDOW_LENGTH, encoding_width))
y = np.zeros((len(fragments), encoding_width))
for i, fragment in enumerate(fragments):
  for j, char in enumerate(fragment):
    X[i, j, char_to_index[char]] = 1
    target_char = targets[i]
    y[i, char_to_index[target_char]] = 1

In [None]:
# Building the model for training.
model = Sequential()
model.add(LSTM(128, return_sequences=True, dropout=0.2, recurrent_dropout=0.2, input_shape=(None, encoding_width)))
model.add(LSTM(128, dropout=0.2, recurrent_dropout=0.2))
model.add(Dense(encoding_width, activation='softmax'))
model.compile(loss='categorical_crossentropy', optimizer='adam')
model.summary()

In [None]:
# Training,
# validation split set to 5%, good performance is highly subjective here.
history = model.fit(X, y, validation_split=0.05, batch_size=BATCH_SIZE, epochs=EPOCHS, verbose=2, shuffle=True)

In [None]:
# Beam search implementation
# Create initial single beam represented by triplet.
# (probability, string, one-hot encoded string)
letters = 'the man '
one_hots = []
for i, char in enumerate(letters):
  x = np.zeros(encoding_width)
  x[char_to_index[char]] = 1
  one_hots.append(x)
beams = [(np.log(1.0), letters, one_hots)]

In [None]:
# Predict NUM_LETTERS into the future.
for i in range(NUM_LETTERS):
  minibatch_list = []
  # Create minibatch from one-hot encodings, and predict.
  for triple in beams:
    # Extract the one-hot enconding representation of each beam.
    minibatch_list.append(triple[2])
  minibatch = np.array(minibatch_list)
  # Predict the one-hot encode for potential next characters.
  y_predict = model.predict(minibatch, verbose=0)

  new_beams = []
  for j, softmax_vec in enumerate(y_predict):
    triple = beams[j]
    # Create BEAM_SIZE new beams from each existing beam.
    for k in range(BEAM_SIZE):
      # Get the probable character.
      char_index = np.argmax(softmax_vec)
      # Calculate the new probability.
      new_prob = triple[0] + np.log(softmax_vec[char_index])
      # Add new letter to the string.
      new_letters = triple[1] + index_to_char[char_index]
      # Encode the new character.
      x = np.zeros(encoding_width)
      x[char_index] = 1
      # Create a copy of the old one_hot encoded representation,
      # and append the new character (it's one-hot encoded version.)
      new_one_hots = triple[2].copy()
      new_one_hots.append(x)
      # Append the new beam to the list of beams.
      new_beams.append((new_prob, new_letters, new_one_hots))
      # Set that index to zero, so that np.argmax can find,
      # the next probable character. 
      softmax_vec[char_index] = 0
  # Prune tree to only keep BEAM_SIZE most probable beams.
  new_beams.sort(key=lambda tup: tup[0], reverse=True)
  beams = new_beams[0:BEAM_SIZE]

for item in beams:
  print(item[1])

PYTORCH

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader
from sklearn.model_selection import train_test_split
import numpy as np
from utils import train_model

In [None]:
# Device setup
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [None]:
# Training parameters reused as above.
# Create training examples.
fragments = []
targets = []
for i in range(0, len(text) - WINDOW_LENGTH, WINDOW_STEP):
    fragments.append(text[i: i + WINDOW_LENGTH])
    targets.append(text[i + WINDOW_LENGTH])

# Convert to one-hot encoded training data.
X = np.zeros((len(fragments), WINDOW_LENGTH, encoding_width), dtype=np.float32)
y = np.zeros(len(fragments), dtype=np.int64)
for i, fragment in enumerate(fragments):
    for j, char in enumerate(fragment):
        X[i, j, char_to_index[char]] = 1
    target_char = targets[i]
    y[i] = char_to_index[target_char]
    
# Train test split
train_X, test_X, train_y, test_y = train_test_split(
    X, y, test_size=0.05, random_state=0
)

# Create dataset objects
trainset = TensorDataset(torch.from_numpy(train_X), torch.from_numpy(train_y))
testset = TensorDataset(torch.from_numpy(test_X), torch.from_numpy(test_y))

In [None]:
# trainset.dtype

In [None]:
# Custom layer to handle the output of the recurrent neural network.
class LastTimestep(nn.Module):
  def forward(self, inputs):
    return inputs[1][0][1]  # Return hidden state and not the cell state of the last timestep.

In [None]:
# The model
model = nn.Sequential(
    nn.LSTM(encoding_width, 128, num_layers=2, dropout=0.2, batch_first=True),
    LastTimestep(),
    nn.Dropout(0.2),
    nn.Linear(128, encoding_width)
)

In [None]:
model

In [None]:
# Loss function and optimizer
optimizer = torch.optim.Adam(model.parameters())
loss_function = nn.CrossEntropyLoss()

# Training
train_model(model, device, EPOCHS, BATCH_SIZE, trainset, testset, optimizer, loss_function, 'acc')

In [None]:
# Create initial single beam represented by triplet
# (probability , string , one-hot encoded string).
# Generally the same as Tensorflow just a little more involved.
letters = 'i trembled '
one_hots = []
for i, char in enumerate(letters):
    x = np.zeros(encoding_width)
    x[char_to_index[char]] = 1
    one_hots.append(x)
beams = [(np.log(1.0), letters, one_hots)]

# Predict NUM_LETTERS into the future.
for i in range(NUM_LETTERS):
    minibatch_list = []
    # Create minibatch from one-hot encodings, and predict.
    for triple in beams:
        minibatch_list.append(triple[2])
    minibatch = np.array(minibatch_list, dtype=np.float32)

    # A quite more involved prediction.
    # Convert to pytorch tensors.
    inputs = torch.from_numpy(minibatch)
    # Send the input to the same device as the model.
    inputs = inputs.to(device)
    # Perform the prediction.
    outputs = model(inputs)
    # Run the output through the softmax to get character probabilities.
    outputs = F.softmax(outputs, dim=1)
    # Return the output to the cpu.
    y_predict = outputs.cpu().detach().numpy()

    new_beams = []
    for j, softmax_vec in enumerate(y_predict):
        triple = beams[j]
        # Create BEAM_SIZE new beams from each existing beam.
        for k in range(BEAM_SIZE):
            char_index = np.argmax(softmax_vec)
            new_prob = triple[0] + np.log(softmax_vec[char_index])
            new_letters = triple[1] + index_to_char[char_index]
            x = np.zeros(encoding_width)
            x[char_index] = 1
            new_one_hots = triple[2].copy()
            new_one_hots.append(x)
            new_beams.append((new_prob, new_letters, new_one_hots))
            softmax_vec[char_index] = 0
    # Prune tree to only keep BEAM_SIZE most probable beams.
    new_beams.sort(key=lambda tup: tup[0], reverse=True)
    beams = new_beams[0:BEAM_SIZE]
for item in beams:
    print(item[1])