# Recurrent Neural Networks

In [19]:
import torch
from torch import nn
from torch.nn import functional as F
from matplotlib import pyplot as plt
from utils import d2l
import collections
import random
import re
%matplotlib inline

## 9.1 Working with Sequences

In [None]:
T = 1000
tau = 5

In [None]:
time = torch.arange(1, T+1, dtype=torch.float32)
x = torch.sin(0.01 * time) + torch.randn(T) * 0.2

In [None]:
plt.plot(time, x)

In [None]:
features = [x[i : T-tau+i] for i in range(tau)]
features = torch.stack(features, 1)

features.shape

In [None]:
labels = x[tau:].unsqueeze(1)
labels.shape

In [None]:
features[:3], labels[:3]

In [None]:
train_slice = slice(0, 600)
test_slice = slice(600, None)
train_features = features[train_slice]
train_labels = labels[train_slice]
test_features = features[test_slice]
test_labels = labels[test_slice]

In [None]:
class LinearRegression(nn.Module):
    def __init__(self, in_features):
        super().__init__()
        self.linear = nn.Linear(in_features, 1)

    def forward(self, X):
        y = self.linear(X)
        return y

In [None]:
model = LinearRegression(tau)
model.parameters

In [None]:
loss = nn.MSELoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)

In [None]:
for epoch in range(100):
    result = model(train_features)
    l = loss(result, train_labels)
    optimizer.zero_grad()
    l.backward()
    optimizer.step()
    print(f'epoch {epoch + 1}, loss {l:f}')

In [None]:
one_step_preds = model(test_features).detach().numpy()
plt.plot(test_labels.numpy(), label='true')
plt.plot(one_step_preds, label='pred')

In [None]:
multistep = x[train_slice]

In [None]:
features_len = len(test_features)
for i in range(1, len(test_labels)+1):
    step_features = multistep[-tau:].unsqueeze(0)
    step_preds = model(step_features).detach().numpy()

    multistep = torch.cat((multistep, torch.tensor(step_preds).flatten()))


In [None]:
plt.plot(multistep.numpy(), label='pred')
plt.plot(x, label='true', linestyle='--')

## 9.2 Converting Raw Text into Sequence Data

In [None]:
def download_url(url, folder='data'):
    import os
    import requests
    if not os.path.exists(folder):
        os.makedirs(folder)
    fname = os.path.join(folder, url.split('/')[-1])
    if not os.path.isfile(fname):
        r = requests.get(url)
        with open(fname, 'wb') as f:
            f.write(r.content)
    return fname

In [None]:
URL = 'https://www.gutenberg.org/cache/epub/35/pg35.txt'
book = download_url(URL)
with open(book, 'r', encoding='utf-8') as f:
    raw_text = f.read()

In [None]:
def download_book():
    URL = 'https://www.gutenberg.org/cache/epub/35/pg35.txt'
    book = download_url(URL)
    with open(book, 'r', encoding='utf-8') as f:
        raw_text = f.read()
    return raw_text

In [None]:
def preprocess(text):
    text = text.lower()
    text = re.sub(r'[^a-z]+', ' ', text)
    return text

In [None]:
text = preprocess(raw_text)

In [None]:
text

In [None]:
class Vocab:  #@save
    """Vocabulary for text."""
    def __init__(self, tokens=[], min_freq=0, reserved_tokens=[]):
        # Flatten a 2D list if needed
        if tokens and isinstance(tokens[0], list):
            tokens = [token for line in tokens for token in line]
        # Count token frequencies
        counter = collections.Counter(tokens)
        self.token_freqs = sorted(counter.items(), key=lambda x: x[1],
                                  reverse=True)
        # The list of unique tokens
        self.idx_to_token = list(sorted(set(['<unk>'] + reserved_tokens + [
            token for token, freq in self.token_freqs if freq >= min_freq])))
        self.token_to_idx = {token: idx
                             for idx, token in enumerate(self.idx_to_token)}

    def __len__(self):
        return len(self.idx_to_token)

    def __getitem__(self, tokens):
        if not isinstance(tokens, (list, tuple)):
            return self.token_to_idx.get(tokens, self.unk)
        return [self.__getitem__(token) for token in tokens]

    def to_tokens(self, indices):
        if hasattr(indices, '__len__') and len(indices) > 1:
            return [self.idx_to_token[int(index)] for index in indices]
        return self.idx_to_token[indices]

    @property
    def unk(self):  # Index for the unknown token
        return self.token_to_idx['<unk>']

In [None]:
tokens = list(text)
vocab = Vocab(tokens)

In [None]:
vocab[tokens[:10]], tokens[:10]

In [None]:
words = text.split()
vocab = Vocab(words)

vocab.token_freqs[:10], vocab.token_freqs[-10:]

## 9.4 Recurrent Neural Networks

In [None]:
X, W_xh = torch.rand(3, 2), torch.rand(2, 5)
H, W_hh = torch.rand(3, 5), torch.rand(5, 5)

In [None]:
torch.mm(X, W_xh) + torch.mm(H, W_hh)

In [None]:
torch.mm(torch.cat((X, H), 1), torch.cat((W_xh, W_hh), 0))

## 9.5 Recurrent Neural Networks from Scratch

In [23]:
time_machine = d2l.TimeMachine(64, 10, 10112, 5056)

In [24]:
time_machine.X.dtype

torch.float32

In [25]:
len(time_machine.vocab)

28

In [37]:
class RNNScratch(nn.Module):
    def __init__(self, n_inputs, n_hidden, n_outputs):
        super().__init__()
        # Set hyperparameters
        self.n_inputs = n_inputs # Length of input sequence
        self.n_hidden = n_hidden # Number of hidden dimensions
        self.n_outputs = n_outputs # Length of output dimensions

        # Initialize latent layer parameters
        self.W_xh = self._init_parameter((n_inputs, n_hidden))
        self.W_hh = self._init_parameter((n_hidden, n_hidden))
        self.b_h = self._init_zeros((n_hidden,))

        # Initialize output layer parameters
        self.W_hq = self._init_parameter((n_hidden, n_outputs))
        self.b_q = self._init_zeros((n_outputs,))
    
    def _init_parameter(self, shape: tuple):
        # Initialize parameters with Xavier distribution
        return nn.Parameter(torch.nn.init.xavier_normal_(torch.empty(shape)))
    
    def _init_zeros(self, shape: tuple):
        # Initialize bias with zeros
        return nn.Parameter(torch.zeros(shape))
    
    def parameters(self):
        # Return all parameters
        params = [self.W_xh, self.W_hh, self.b_h, self.W_hq, self.b_q]
        for param in params:
            yield param

    def forward(self, X: torch.Tensor, hidden_state: torch.Tensor = None):
        if hidden_state is None:
            hidden_state = torch.zeros((X.shape[0], self.n_hidden))
        
        # Compute the new hidden state
        X_proj = torch.mm(X, self.W_xh)
        H_proj = torch.mm(hidden_state, self.W_hh)
        hidden_state = torch.tanh(X_proj + H_proj + self.b_h)
        
        # Compute output
        output = torch.mm(hidden_state, self.W_hq) + self.b_q
        
        return output, hidden_state

In [32]:
net = RNNScratch(n_inputs=10, n_hidden=20, n_outputs=len(time_machine.vocab))
net.W_xh.shape, net.W_hh.shape, net.b_h.shape, net.W_hq.shape, net.b_q.shape

(torch.Size([10, 20]),
 torch.Size([20, 20]),
 torch.Size([20]),
 torch.Size([20, 28]),
 torch.Size([28]))

In [33]:
train_data = time_machine.get_dataloader(False)
test_data = time_machine.get_dataloader(True)

In [46]:
def train_rnn_epoch(net, train_iterator, loss, optimizer):
    """Train a model for one epoch."""
    # Set the model to training mode
    net.train()
    # Initialize the total loss and number of samples
    total_loss, num_samples = 0, 0

    hidden_state = None
    for X, y in train_iterator:
        # Forward pass
        y_hat, hidden_state = net(X, hidden_state)

        # Detach the hidden state to prevent backpropagation through the entire sequence
        hidden_state = hidden_state.detach()

        # Compute the loss
        l = loss(y_hat, y[:, -1].long())
        
        # Backward pass
        optimizer.zero_grad()
        l.backward(retain_graph=True)
        optimizer.step()
        
        # Update the total loss and number of samples
        total_loss += l.item() * y.shape[0]
        num_samples += y.shape[0]
    return total_loss / num_samples

In [47]:
loss = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(net.parameters(), lr=0.001)
num_epochs = 10

In [48]:
net = RNNScratch(n_inputs=10, n_hidden=100, n_outputs=len(time_machine.vocab))

for epoch in range(num_epochs):
    train_loss = train_rnn_epoch(net, train_data, loss, optimizer)
    print(f'Epoch {epoch + 1}, Loss: {train_loss:.4f}')

Epoch 1, Loss: 4.1420
Epoch 2, Loss: 4.1420
Epoch 3, Loss: 4.1420
Epoch 4, Loss: 4.1420
Epoch 5, Loss: 4.1420
Epoch 6, Loss: 4.1420
Epoch 7, Loss: 4.1420
Epoch 8, Loss: 4.1420
Epoch 9, Loss: 4.1420
Epoch 10, Loss: 4.1420
