# RNN-based character level language model

Below you can find an implementation of a character & RNN-based langauge model that predicts the next character given a sequence of elements we have.

This simple implementation in PyTorch may give you a good overview on how those models behave and learn.

This notebook does not introduce any tasks, you are free to experiment with the code, check what the input looks like, how it is encoded, how it is passed to the network and how the next character choice is made.

You can try to use this code to generate the output sequence after the training stops, simply provide the context (some beginning of the text) and it should generate the sequence character after character until the end of sequence is reached (or, what is frequently used heuristics, a given number of elements is produced).


In [1]:
# install package for sentence splitting
# !pip install blingfire

# import pytorch-related stuff
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np

# requests will help us download some data
import requests

# blingfire can split texts into sentences
import blingfire as bf


def download_ulysses():
    """
    This function downloads Ulysses -- a famous book written by J. Joyce.

    :return: The text of the book.
    """
    url = "https://www.gutenberg.org/files/4300/4300-0.txt"
    response = requests.get(url)
    data = None

    if response.status_code == 200:
        data = response.text
    else:
        print("Failed to download. Status code:", response.status_code)
    return data


def split_into_sentences(text):
    """
    Splits a given text into sentences using BlingFire.

    :param text: The input text string.
    :return: A list of sentences.
    """
    sentences = bf.text_to_sentences(text).split('\n')
    return [s.strip() for s in sentences if s.strip()]



class CharRNN(nn.Module):
    """ Simple RNN network with one input layer, one hidden layer, and one output layer. """
    def __init__(self, input_size, hidden_size, output_size):
        super(CharRNN, self).__init__()

        self.hidden_size = hidden_size                   # hidden size = embedding size
        self.rnn = nn.RNNCell(input_size, hidden_size)   # recurrent cell that concumes the current input and the previous hidden state
        self.fc = nn.Linear(hidden_size, output_size)    # output transformation that transforms the hidden state into the output layer (ie., decisions)

    def forward(self, x, hidden):
        """
        Implement the forward pass of the network. At each timestep,
        we consume our current input (x) and the hidden state (hidden)
        and produce an updated hidden state along with
        the output of the whole network
        """

        hidden = self.rnn(x, hidden)  # update hidden state based on current input and previous state
        output = self.fc(hidden)      # calculate the output of the network by processing the hidden state with a fully-connected (fc) layer
        return output, hidden         # return both

    def init_hidden(self, batch_size):
        """
        Init hidden state with zero, required for generating the first step when there is no previous hidden state to be used. We use zeros in that case.
        """
        return torch.zeros(batch_size, self.hidden_size)


def char_tensor(text, char_to_idx):
    """
    Transform a given text into one hot representations, where each letter is represented as one-hot encoding.
    Here you can find more on one-hot representation: https://www.kaggle.com/code/dansbecker/using-categorical-data-with-one-hot-encoding
    """

    # each character in our text will be represented as
    # a vector of the length equal to the number of distinct characters in our dataset.
    # Then we assign each position in that vector with different character and encode
    #a given character by setting 1 on its position in the vector, while keeping other values set to 0.

    tensor = torch.zeros(len(text), len(char_to_idx))

    for i, char in enumerate(text):        # for each character (char) and its position in text (i)
        tensor[i][char_to_idx[char]] = 1   # apply one-hot encoding for that character
    return tensor

if __name__ == "__main__":
    text = download_ulysses() # download the dataset
    sentences = split_into_sentences(text) #split into sentences, 1 sentence will form one training example
    chars = list(set(text))   # capture all characters found in our text
    chars.append('\\')        # add a special character that will be used to encode the end of the sequence (sentence)

    char_to_idx = {char: idx for idx, char in enumerate(chars)}  # create a mapping of known characters to their positions
    idx_to_char = {idx: char for char, idx in char_to_idx.items()}  # and a mapping of positions to characters

    input_size = len(chars)       # the size of the input layer is the number of distinct characters observed, at each timestep, we provide a single character, that we encode using one-hot representation
    hidden_size = 128             # what is the embedding length
    output_size = len(chars)      # the same story as with the input layer, we transform hidden state into a character that will be the next one

    model = CharRNN(input_size, hidden_size, output_size) # instantiate the model!
    optimizer = optim.Adam(model.parameters(), lr=0.01)   # and configure training-related stuff
    criterion = nn.CrossEntropyLoss()

    n_epochs = 500  # how many epochs should we apply?

    for epoch in range(n_epochs):  # for each epoch
        total_loss = 0  # start collecting cumulative loss that will help observe progress

        for idx, instance in enumerate(sentences): # for each sentence
            hidden = model.init_hidden(1)          # init the "previous" hidden state at the first step - set it to zero
            input_seq = char_tensor(instance, char_to_idx) # encode our instance as one-hot encoding
            target_seq = torch.tensor([char_to_idx[c] for c in instance[1:] + '\\']) # for each input_seq element, predict the subsequent character. At the end, when processing the last character, set "\\" as the target token representing the end of sequence.

            optimizer.zero_grad()  # zero gradients
            loss = 0  # zero loss
            for i in range(len(instance)): # for each character in our sentence
                input_char = input_seq[i].unsqueeze(0)  # take the current character,   Shape (1, input_size)
                target_char = target_seq[i].unsqueeze(0)  # take the next character (that we want to predict) # Shape (1,)
                output, hidden = model(input_char, hidden) # check what the model produces
                loss += criterion(output, target_char)   # and compare the produced outcome with the expected (true) next character

            loss.backward() # apply optimization procedure
            optimizer.step()
            total_loss += loss.item()

            if idx == 5000:
                break

            if idx % 1000 == 0:
                print(f'Epoch {epoch}, sentence {idx} cumulative loss: {total_loss:.4f} item loss: {loss.item()}')


        if epoch % 50 == 0:
            print(f'Epoch {epoch}, Loss: {total_loss:.4f}')


OSError: dlopen(/Users/Kuba/Library/Python/3.9/lib/python/site-packages/blingfire/libblingfiretokdll.dylib, 0x0006): tried: '/Users/Kuba/Library/Python/3.9/lib/python/site-packages/blingfire/libblingfiretokdll.dylib' (mach-o file, but is an incompatible architecture (have 'x86_64', need 'arm64e' or 'arm64')), '/System/Volumes/Preboot/Cryptexes/OS/Users/Kuba/Library/Python/3.9/lib/python/site-packages/blingfire/libblingfiretokdll.dylib' (no such file), '/Users/Kuba/Library/Python/3.9/lib/python/site-packages/blingfire/libblingfiretokdll.dylib' (mach-o file, but is an incompatible architecture (have 'x86_64', need 'arm64e' or 'arm64'))