RNN implementation from scratch with application to text generation.

In [78]:
import torch
import torch.nn as nn


class RNN(nn.Module):

  def __init__(self, input_dim, hidden_dim, output_dim):

    super().__init__()
    self.input_dim = input_dim
    self.hidden_dim = hidden_dim
    self.output_dim = output_dim

    self.in2h = nn.Linear(self.input_dim, self.hidden_dim, bias=False)
    self.h2h = nn.Linear(self.hidden_dim, self.hidden_dim)
    self.h2out = nn.Linear(self.hidden_dim, self.output_dim)

    self.tanh = nn.Tanh()
    self.softmax = nn.Softmax(dim=1)

  def forward(self, x, h0):

    x = self.in2h(x)
    h = self.h2h(h0)
    h_new = torch.tanh(h + x)
    out = self.h2out(h_new)

    return out, h_new

In [79]:
from torch.utils.data import Dataset, DataLoader

class text_data(Dataset):

  def __init__(self, text, seq_length):

    self.seq_length = seq_length

    self.vocab = sorted(list(set(text)))
    self.vocab_size = len(self.vocab)
    self.text_size = len(text)

    self.idx2char = {i:ch for i, ch in enumerate(self.vocab)}
    self.char2idx = {ch:i for i, ch in enumerate(self.vocab)}

    self.input = self.text2vect(text)


  def __len__(self):
    return int(len(self.input) / self.seq_length - 1) # -1 to account for the shift between X and Y

  def __getitem__(self, idx):
    start_idx = idx * self.seq_length
    end_idx = (idx+1) * self.seq_length

    X = torch.tensor(self.input[start_idx:end_idx]).float()
    Y = torch.tensor(self.input[start_idx+1:end_idx+1]).float()

    return X, Y

  def text2vect(self, text):
    return [ self.char2idx[text[i]] for i in range(len(text)) ]

  def vect2text(self, vect):
    string = ""
    for i in vect:
        string += self.idx2char[i]
    return string

In [80]:
import random

def generate_text(model: RNN, dataset: text_data, prediction_length: int = 100, hidden_dim=256) -> str:
    """
    Generate text up to prediction_length characters
    This function requires the dataset as argument in order to properly
    generate the text and return the output as strings
    """
    model.eval()
    predicted = dataset.vect2text([random.randint(0, len(dataset.vocab) -1)])
    hidden = torch.zeros(1, hidden_dim)

    for i in range(prediction_length - 1):
        last_char = torch.Tensor([dataset.char2idx[predicted[-1]]])
        X, hidden = last_char.to(device), hidden.to(device)
        out, hidden = model(X, hidden)
        result = torch.multinomial(nn.functional.softmax(out, 1), 1).item()
        predicted += dataset.idx2char[result]

    return predicted

In [None]:
from torch.optim import Adam
import torch


hidden_dim = 256

lr = 0.001
epochs = 1000
batch_size = 16


device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using '{device}' device")

# Load data from text file
file = open("/content/sample_data/data.txt", "r")
data = file.read()
data = data.lower()
file.close()

print(data)
# Dataloder
training_data = text_data(data, seq_length=25)
train_dataloader = DataLoader(training_data, batch_size=batch_size, shuffle=True)

# Define model
model = RNN(1, hidden_dim, training_data.vocab_size)
model.to(device)

model.train()

loss_fn = nn.CrossEntropyLoss()
optim = Adam(model.parameters(), lr=lr)

train_losses = []
for epoch in range(epochs):
  epoch_losses = []
  for x, y in train_dataloader:
    if x.shape[0] != batch_size:
      continue

    hidden = torch.zeros(batch_size, hidden_dim)

    x, y , hidden = x.to(device), y.to(device), hidden.to(device)
    model.zero_grad()

    loss = 0
    for c in range(x.shape[1]):
      y_hat, hidden = model(x[:, c].reshape(x.shape[0], 1), hidden)
      l = loss_fn(y_hat, y[:, c].long())
      loss += l

    loss.backward()
    nn.utils.clip_grad_norm_(model.parameters(), 3)
    optim.step()

    epoch_losses.append(loss.detach().item() / x.shape[1])


  train_losses.append(torch.tensor(epoch_losses).mean())
  print(f"Epoch '{epoch} -- Training loss = '{train_losses[epoch]}'")
  print(generate_text(model, train_dataloader.dataset))