In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from dataclasses import dataclass, field
from typing import List, Tuple
from collections import Counter

## RNN Basic
### Token
For language model, we need to tokenize our words (or characters) into number. We can tokenize our inputs based on each input's frequency. We tokenize our input based on input's frequency because this allow computer to cache more frequently used words and improve training efficiency:

In [2]:
import string
import unicodedata


# Turn a Unicode string to plain ASCII, thanks to https://stackoverflow.com/a/518232/2809427
def unicode_to_ascii(s):
    allowed_characters = string.ascii_letters
    return ''.join(
        c for c in unicodedata.normalize('NFD', s)
        if unicodedata.category(c) != 'Mn'
        and c in allowed_characters
    )

class Tokenizer:

    def __init__(self, tokens:List[str], reserved_tokens:List[str]=['<pad>'], min_freq:int=0) -> None:
        self.unk = '<unk>'
        self.reserved_tokens = reserved_tokens

        counter = Counter(tokens)
        tokens_freq = sorted(counter.items(), key=lambda token:token[1], reverse=True)
        sorted_tokens = list([self.unk] + self.reserved_tokens + [
            token for token, freq in tokens_freq if freq > min_freq
        ])

        self.token_to_idx = {
            token:index for index, token in enumerate(sorted_tokens)
        }

        self.idx_to_token = {
            index:token for index, token, in enumerate(sorted_tokens)
        }

    def to_idx(self, token:str) -> int:
        if token not in self.token_to_idx:
            return self.token_to_idx[self.unk]

        return self.token_to_idx[token]
    
    def to_token(self, idx:int) -> str:
        if idx not in self.idx_to_token:
            return self.unk

        return self.idx_to_token[idx]
    
    def get_most_frequent(self, n: int) -> List[str]:
        res = []
        start_idx = len(self.unk) + len(self.reserved_tokens)
        for idx in range(n):
            res.append(self.to_token(start_idx + idx))
        return res
    
    def __getitem__(self, index):
        if isinstance(index, (list, tuple, slice)):
            return [self.__getitem__(i) for i in index]
        return self.to_idx(index)

    def __len__(self) -> int:
        return len(self.token_to_idx)

### Construct Dataset
We devided the input text into segments with the length of num_inputs.

In [3]:
from torch.utils.data import Dataset, DataLoader
from pathlib import Path
class TextDataset(Dataset):

    def __init__(self, text_file:Path, num_steps:int) -> None:
        super().__init__()
        
        with text_file.open('r') as f:
            text = f.read()
            splitted_text = text.split()
            splitted_text = [self.preprocess_text(text).lower() for text in splitted_text]

        self.tokenizer = Tokenizer(splitted_text)
        tokenized = [self.tokenizer.to_idx(text) for text in splitted_text]

        array = torch.tensor([tokenized[i:i+num_steps+1]
                        for i in range(len(tokenized)-num_steps)])
        self.X, self.Y = array[:,:-1], array[:,1:]

    def preprocess_text(self, text) -> List[str]:
        return unicode_to_ascii(text)
    
    def get_vocab_size(self) -> int:
        return len(self.tokenizer)
    
    def __getitem__(self, index):
        if isinstance(index, (list, tuple)):
            return [self.__getitem__(idx) for idx in index]
        elif isinstance(index, slice):
            return [self.__getitem__(idx) for idx in range(*index.indices(len(self)))]
        return (self.X[index], self.Y[index])
    
    def __len__(self) -> int:
        return len(self.X)

### Language Model
Let each input in time t denoted as $x_t$. Our goal is to predict $x_t$ given $x_0, x_1, ..., x_{t-1}$.
The probability of a sequence of words with length t will then be:
$$
P(x_1, x_2, ..., x_t) = P(x_1) * \prod_{t=2}^T P(x_t  \mid  x_1, \ldots, x_{t-1})
$$

The probability of $x_1$ is $P(x_1)$.

The probability of $x_1$ and $x_2$ is $P(x_1, x_2) = P(x_2 | x_1) * P(x_1)$.

That is, the joint probability of $x_1$ and $x_2$ is just the probability of $x_1$ times the probability of $x_2$ given $x_1$.

We can treat the output of the model at each stage as the probability of $x_t$ given $x_1, ..., x_{t-1}$

### Perplexity
We can measure the cross-entropy loss averaged over all the tokens of a sequence with perplexity:
$$\frac{1}{n} \sum_{t=1}^n -\log P(x_t \mid x_{t-1}, \ldots, x_1),$$
where $P$ is given by a language model and $x_t$ is the actual token observed at time step $t$ from the sequence.
This makes the performance on documents of different lengths comparable. For historical reasons, scientists in natural language processing prefer to use a quantity called *perplexity*:

$$\exp\left(-\frac{1}{n} \sum_{t=1}^n \log P(x_t \mid x_{t-1}, \ldots, x_1)\right)$$

### RNN Model
RNN model is similar to MLP. The key difference is that instead of using hidden layer, RNN typically uses hidden states to store all the features from the previous samples. At each batch, the hidden state from the last batch will be multiplied with the current input to compute the current hiddent state. The current hidden state will then be multiplied by a weight to compute the output of the current stage.


![screenshot](resources/rnn_with_hidden_state.png)

The calculation of the hidden layer output of the current time step is determined by the input of the current time step together with the hidden layer output of the previous time step:

$$\mathbf{H}_t = \phi(\mathbf{X}_t \mathbf{W}_{\textrm{xh}} + \mathbf{H}_{t-1} \mathbf{W}_{\textrm{hh}}  + \mathbf{b}_\textrm{h}).$$

For time step $t$,
the output of the output layer is similar to the computation in the MLP:

$$\mathbf{O}_t = \mathbf{H}_t \mathbf{W}_{\textrm{hq}} + \mathbf{b}_\textrm{q}.$$


In [4]:
class RNN_Scratch(nn.Module):

    def __init__(self, num_inputs:int, num_hiddens:int):
        super().__init__()

        self.num_inputs = num_inputs
        self.num_hiddens = num_hiddens

        self.w_xh = nn.Parameter(
            torch.randn(num_inputs, num_hiddens)
        )

        self.b_xh = nn.Parameter(
            torch.randn(num_hiddens)
        )

        self.w_hh = nn.Parameter(
            torch.randn((num_hiddens, num_hiddens))
        )

        self.b_hh = nn.Parameter(
            torch.zeros(num_hiddens)
        )

    def forward(self, X, state=None):
        
        seq_size, batch_size, input_size = X.shape

        if state is None:
            device = self.w_xh.device
            state = torch.zeros((batch_size, self.num_hiddens), device=device)

        # Input size will be (steps, batchs, inputs)
        outputs = []
        h_t = state
        h_t_minus_1 = state

        for step in X:
            h_t = F.tanh(
                step @ self.w_xh
                + self.b_xh
                + h_t_minus_1 @ self.w_hh
                + self.b_hh
            )
            outputs.append(h_t)
            h_t_minus_1 = h_t

        outputs = torch.stack(outputs)
        return outputs, h_t

# Declare RNN from PyTorch and Scratch.
rnn = nn.RNN(10, 20, 1)
rnn_scratch = RNN_Scratch(10, 20)

# Let RNNs share the same weight.
rnn_scratch.w_xh = nn.Parameter(rnn.weight_ih_l0.T)
rnn_scratch.b_xh = nn.Parameter(rnn.bias_ih_l0)
rnn_scratch.w_hh = nn.Parameter(rnn.weight_hh_l0.T)
rnn_scratch.b_hh = nn.Parameter(rnn.bias_hh_l0)

# Sequence, Batch, Input
input = torch.randn(5, 3, 10)
h0 = torch.randn(1, 3, 20)
output, hn = rnn(input, h0)
output_scratch, hn_scratch = rnn_scratch(input, h0)

# Compare output
compare_func = lambda a, b, threshold: torch.abs(a - b) > threshold
compare_func(output, output_scratch, 0.0001).sum(), compare_func(hn, hn_scratch, 0.0001).sum()


(tensor(1200), tensor(0))

In [5]:
class RNN_LM_Scratch(nn.Module):

    def __init__(self, num_embeds:int, num_hiddens:int, num_vocabs:int, device):
        super().__init__()

        self.embedding = nn.Embedding(num_vocabs, num_embeds)
        self.rnn = RNN_Scratch(num_embeds, num_hiddens)
        self.num_vocabs = num_vocabs

        self.rnn.to(device)
        
        self.w_hq = nn.Parameter(
            torch.randn((num_hiddens, num_vocabs))
        )

        self.b_q = nn.Parameter(
            torch.zeros(num_vocabs)
        )

    def forward(self, X, state=None):

        # X: (batch, seq, intputs)
        emb = self.embedding(X)

        # Swap the embedding from (batch, seq, emb) -> (seq, batch, emb)
        emb = torch.swapaxes(emb, 0, 1)

        # The shape of the outputs will be (steps, batch, hidden)
        rnn_outputs, ht = self.rnn(emb, state)
        outputs = rnn_outputs @ self.w_hq + self.b_q

        # Return output as (batch, steps, outputs)
        return torch.swapaxes(outputs, 0, 1), ht



In [6]:
# data = d2l.TimeMachine(batch_size=1024, num_steps=32)
# rnn = RNNScratch(num_inputs=len(data.vocab), num_hiddens=32)
# model = RNNLMScratch(rnn, vocab_size=len(data.vocab), lr=1)
# trainer = d2l.Trainer(max_epochs=100, gradient_clip_val=1, num_gpus=1)
# trainer.fit(model, data)

batch_size, num_steps = 1024, 32

dataset = TextDataset(Path(r'shakespeare.txt'), num_steps)
train_num = int(len(dataset) * 0.8)
train_dataset = dataset[:train_num]
valid_dataset = dataset[train_num:]

train_loader = DataLoader(dataset=train_dataset,
                          batch_size=batch_size,
                          shuffle=True,
                          num_workers=2)

test_loader = DataLoader(dataset=valid_dataset,
                         batch_size=batch_size,
                         shuffle=False,
                         num_workers=True)

len(train_dataset), len(valid_dataset), dataset.get_vocab_size()

(14045, 3512, 3183)

In [7]:
def train_loop(model: nn.Module, train_loader: DataLoader, loss_func, optim, device, to_clip_grad=False):

    losses = []
    model.train()

    for i, (X, Y) in enumerate(train_loader):
        X, Y = X.to(device), Y.to(device)

        predict = model(X)
        vocab_size = model.num_vocabs
        loss = loss_func(predict.reshape(-1, vocab_size), Y.reshape(-1))

        optim.zero_grad()
        loss.backward()
        
        # Prevent the gradient from exploding (but can not prevent vanishing)
        if to_clip_grad:
            nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optim.step()

        losses.append(loss.item())
    
    return sum(losses) / len(losses)



In [None]:
import Trainers
import Trainers.trainers
import Trainers.trainers.core

class RNN_Trainer(Trainers.trainers.core.BaseTrainer):

    def train_loop(self) -> dict:
        train_statistic = {}
        loss = train_loop(self.model,
                          self.train_loader,
                          self.loss_fn,
                          self.optimizer,
                          self.device,
                          to_clip_grad=True)
        train_statistic['Train Loss'] = loss
        print(f'Train Loss: {loss}')
        return train_statistic

    def test_loop(self) -> dict:
        return {}

num_embeds = 512
num_hiddens = 64

device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
print(f'device: {device}')
print(f'batch_size: {batch_size}')
print(f'num_steps: {num_steps}')
print(f'num_embeds: {num_embeds}')
print(f'num_hiddens: {num_hiddens}')
print(f'num_vocabs: {dataset.get_vocab_size()}')


model = RNN_LM_Scratch(num_embeds=num_embeds,
                       num_hiddens=num_hiddens,
                       num_vocabs=dataset.get_vocab_size(),
                       device=device)
model = model.to(device)
loss = torch.nn.CrossEntropyLoss()
optim = torch.optim.SGD(model.parameters(), lr=1)

trainer = RNN_Trainer(name='RNN_Vanilla',
                      model=model,
                      loss_fn=loss,
                      optimizer=optim,
                      train_loader=train_loader,
                      test_loader=test_loader,
                      device=device)
trainer.fit(100, graph=True, save_check_point=True)




device: cuda
batch_size: 1024
num_steps: 32
num_embeds: 512
num_hiddens: 64
num_vocabs: 3183


In [14]:
def predict(model: RNN_LM_Scratch, tokenizer: Tokenizer, words:str, num_of_predict:int, num_steps: int) -> list:
    """
    Predict num_of_predict of words given words.
    """

    def preprocess(words: str) -> torch.Tensor:
        # Split the sentence with whitespace.
        splitted_words = [unicode_to_ascii(word) for word in words.split()]
        splitted_words.extend(
            ['<pad>' for _ in range(num_steps - len(splitted_words))]
        )
        
        # Convert word to index
        word_index = [tokenizer.token_to_idx[word] for word in splitted_words]

        # Split input from words to [len(words), seq]
        inputs = [
            word_index[i:i+num_steps]
            for i in range(len(word_index) - num_steps + 1)
        ]

        return torch.tensor(inputs)

    model.eval()

    hidden = None
    inputs = preprocess(words).to(next(model.parameters()).device)
    res = words.split()

    for seq in inputs:
        seq = seq
        pred, hidden = model(inputs, hidden)

    for _ in range(num_of_predict):
        pred_index = torch.argmax(F.softmax(pred, dim=2), dim=2)

        pred, hidden = model(pred_index, hidden)
        pred_index = torch.argmax(F.softmax(pred.squeeze(), dim=1), dim=1)
        pred_words = [tokenizer.idx_to_token[int(idx.item())] for idx in pred_index]
        res.append(pred_words[0])
    
    return ' '.join(res)


checkpoint = torch.load(r'Checkpoints\RNN_Vanilla\20250526\RNN_Vanilla_epoch100_20250526_002807.pt', map_location=device)
model.load_state_dict(checkpoint['model_state_dict'])

result = predict(model=model,
                 tokenizer=dataset.tokenizer,
                 words='thou shall be',
                 num_of_predict=20,
                 num_steps=num_steps)
print(f'result: {result}')


result: thou shall be so i then force forebemoaned my self dost thou art missed sweetness old they measure too abysm bred on doth
