In [6]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision # some builtin datasets
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader

import numpy as np
import random
import math
import matplotlib.pyplot as plt

In [7]:
# device config
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("device:", device)

device: cuda


In [11]:
"""
2 inputs: 
# input tensor: a single item in the sequence
# hidden state tensor: initially zero

A RNN layer has 3 weight matrices
# Input Dense
# Hidden Dense
# Output Dense

2 outputs:
# new hidden state: activation(input dense * input + hidden dense * hidden state)
# output: activation(output dense * new hidden state)

Task: predict next character
"""

'\n2 inputs: \n# input tensor: a single item in the sequence\n# hidden state tensor: initially zero\n\nA RNN layer has 3 weight matrices\n# Input Dense\n# Hidden Dense\n# Output Dense\n\n2 outputs:\n# new hidden state: activation(input dense * input + hidden dense * hidden state)\n# output: activation(output dense * new hidden state)\n\nTask: predict next character\n'

In [53]:
"""
Data preparation
"""
class TextDataset(Dataset):
    """
    Text Dataset Class
    
    This class is in charge of managing text data as vectors
    Data is saved as vectors (not as text)

    Attributes
    ----------
    seq_length - int: Sequence length
    chars - list(str): List of characters
    char_to_idx - dict: dictionary from character to index
    idx_to_char - dict: dictionary from index to character
    vocab_size - int: Vocabulary size
    data_size - int: total length of the text
    """

    def __init__(self, text_data: str, seq_length: int = 25) -> None:
        """
        Inputs
        ------
        text_data: Full text data as string
        seq_length: sequence length. How many characters per index of the dataset.
        """
        self.chars = sorted(list(set(text_data)))
        self.data_size, self.vocab_size = len(text_data), len(self.chars)
        
        # useful way to fetch characters either by index or char
        self.idx_to_char = {i:ch for i, ch in enumerate(self.chars)}
        self.char_to_idx = {ch:i for i, ch in enumerate(self.chars)}
        
        self.seq_length = seq_length
        self.X = self.string_to_vector(text_data)
    
    @property
    def X_string(self) -> str:
        """
        Returns X in string form
        """
        return self.vector_to_string(self.X)
        
    def __len__(self) -> int:
        """
        number of sequences

        We remove the last sequence to avoid conflicts with Y being shifted to the left
        This causes our model to never see the last sequence of text
        which is not a huge deal, but its something to be aware of
        """
        return int(len(self.X) / self.seq_length - 1)

    def __getitem__(self, index) -> tuple[torch.Tensor, torch.Tensor]:
        """
        X and Y have the same shape, but Y is shifted left 1 position
        """
        start_idx = index * self.seq_length
        end_idx = (index + 1) * self.seq_length

        X = torch.tensor(self.X[start_idx:end_idx]).float()
        y = torch.tensor(self.X[start_idx+1:end_idx+1]).float()
        return X, y
    
    def string_to_vector(self, name: str) -> list[int]:
        """
        Converts a string into a 1D vector with values from char_to_idx dictionary

        Inputs
        name: Name as string

        Outputs
        name_tensor: name represented as list of integers (1D vector)

        sample:
        >>> string_to_vector('test')
        [20, 5, 19, 20]
        """
        vector = list()
        for s in name:
            vector.append(self.char_to_idx[s])
        return vector

    def vector_to_string(self, vector: list[int]) -> str:
        """
        Converts a 1D vector into a string with values from idx_to_char dictionary

        Inputs
        vector: 1D vector with values in the range of idx_to_char

        Outputs
        vector_string: Vector converted to string

        sample:
        >>> vector_to_string([20, 5, 19, 20])
        'test'
        """
        vector_string = ""
        for i in vector:
            vector_string += self.idx_to_char[i]
        return vector_string

In [50]:
# test TextDataset
if False:
    # use any text file you want to learn
    data = open('data/dinos.txt', 'r').read()
    data = data.lower()

    # Data size variables
    seq_length = 25
    batch_size = 3

    text_dataset = TextDataset(data, seq_length=seq_length)
    text_dataloader = DataLoader(text_dataset, batch_size)

    # test
    print("length:", len(data))
    print("unique characters:", text_dataset.chars)

    print("num_of_sequences:", len(text_dataset))

    examples = iter(text_dataloader)
    X, y = examples.next()
    print(X.shape, y.shape)
    print(X[0,:])
    print(y[0,:])
    #print(text_dataset.X_string)
    print(text_dataset.vector_to_string(X[0,:].tolist()))

In [55]:
# Model
class RNN(nn.Module):
    '''
    Basic RNN block: a single layer of RNN.
    '''
    def __init__(self, input_size: int, hidden_size: int, output_size: int) -> None:
        super().__init__()

        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size

        self.i2h = nn.Linear(input_size, hidden_size, bias=False)
        self.h2h = nn.Linear(hidden_size, hidden_size)
        self.h2o = nn.Linear(hidden_size, output_size)

    def forward(self, x, h) -> tuple[torch.Tensor, torch.Tensor]:
        x = self.i2h(x)
        h = self.h2h(h)
        h = torch.tanh(x + h)
        
        out = self.h2o(h)
        return out, h
    
    def init_zero_hidden(self, batch_size=1) -> torch.Tensor:
        return torch.zeros(batch_size, self.hidden_size, requires_grad=False)

In [67]:
def train(model: RNN, data: DataLoader, epochs: int, optimizer: optim.Optimizer, loss_fn: nn.Module) -> None:
    """
    Trains the model for the specified number of epochs

    Inputs
    ------
    model: RNN model to train
    data: Iterable DataLoader
    epochs: Number of epochs to train the model
    optiimizer: Optimizer to use for each epoch
    loss_fn: Function to calculate loss
    """
    train_losses = {}
    
    model.train()
    print("=> Starting training")
    for epoch in range(epochs):
        epoch_losses = list()
        for X, Y in data:

            # 1. data
            hidden = model.init_zero_hidden(batch_size=X.shape[0])
            X, Y, hidden = X.to(device), Y.to(device), hidden.to(device)

            # 2. loss: one loss per sequence
            loss = 0
            for c in range(X.shape[1]):
                out, hidden = model(X[:, c].reshape(X.shape[0],1), hidden)
                l = loss_fn(out, Y[:, c].long())
                loss += l

            # 3. clear gradients
            model.zero_grad()
            
            # 4. Compte gradients gradients
            loss.backward()

            # 5. Adjust learnable parameters
            # clip as well to avoid vanishing and exploding gradients
            nn.utils.clip_grad_norm_(model.parameters(), 3)
            optimizer.step()
        
            epoch_losses.append(loss.detach().item() / X.shape[1])

        train_losses[epoch] = torch.tensor(epoch_losses).mean()
        if (epoch % 100 == 0): print(f'=> epoch: {epoch + 1}, loss: {train_losses[epoch]}')
        
        

In [192]:
if __name__ == "__main__":
    # use any text file you want to learn
    data = open('data/dinos.txt', 'r').read()
    data = data.lower()

    # Settings
    seq_length = 25
    batch_size = 64
    hidden_size = 256

    text_dataset = TextDataset(data, seq_length=seq_length)
    text_dataloader = DataLoader(text_dataset, batch_size)
    output_size = len(text_dataset.chars)

    # test
    print("length:", len(data))
    print("unique characters:", text_dataset.chars)

    # Model
    rnnModel = RNN(1, hidden_size, output_size).to(device)

    # Train variables
    epochs = 1000
    loss = nn.CrossEntropyLoss()
    optimizer = optim.RMSprop(rnnModel.parameters(), lr = 0.001)

    train(rnnModel, text_dataloader, epochs, optimizer, loss)


length: 19916
unique characters: ['\n', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z']
=> Starting training
=> epoch: 1, loss: 2.9525957107543945
=> epoch: 101, loss: 1.9783456325531006
=> epoch: 201, loss: 1.7968411445617676
=> epoch: 301, loss: 1.6473729610443115
=> epoch: 401, loss: 1.5338435173034668
=> epoch: 501, loss: 1.4334731101989746
=> epoch: 601, loss: 1.3648561239242554
=> epoch: 701, loss: 1.3004016876220703
=> epoch: 801, loss: 1.2423133850097656
=> epoch: 901, loss: 1.1996700763702393


In [193]:
# evaluation
rnnModel.eval()

RNN(
  (i2h): Linear(in_features=1, out_features=256, bias=False)
  (h2h): Linear(in_features=256, out_features=256, bias=True)
  (h2o): Linear(in_features=256, out_features=27, bias=True)
)

In [205]:
prediction_len = 10
# first character
hidden = rnnModel.init_zero_hidden()
predicted = text_dataset.vector_to_string([random.randint(0, len(text_dataset.chars) -1)])
print("starts with:", predicted)

# subsequent 9 characters
for i in range(prediction_len - 1):
    last_char = torch.Tensor([text_dataset.char_to_idx[predicted[-1]]])
    X, hidden = last_char.to(device), hidden.to(device)
    out, hidden = rnnModel(X, hidden)
    result = torch.multinomial(nn.functional.softmax(out, 1), 1).item()
    predicted += text_dataset.idx_to_char[result]
predicted += "-"

print(predicted)

starts with: p
pngrga
tur-
