# ADAML workshop 3: Recurrent Neural Network



___

## Model implementation

In [17]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import mean_squared_error

class RNN:
    def __init__(self, input_size, hidden_size, output_size, lr=1e-3, seed=1, loss=mean_squared_error):
        rng = np.random.RandomState(seed)
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.Wxh = rng.randn(hidden_size, input_size)
        self.Whh = rng.randn(hidden_size, hidden_size)
        self.Why = rng.randn(output_size, hidden_size)
        self.bh = np.zeros((hidden_size, 1))
        self.by = np.zeros((output_size, 1))
        self.lr = lr
        self.loss_fun = loss

    def forward(self, X):
        # In forward model we feed the data X through the network.
        # X shape: (batch, seq_len, input_size)

        batch, seq_len, _ = X.shape
        h = np.zeros((batch, seq_len + 1, self.hidden_size))
        for t in range(seq_len):
            # Reshaping to correct input shape (batch, input_size)
            xt = X[:, t, :].reshape(batch, -1)
            # compute next hidden: h_t = tanh(Wxh@x_t + Whh@h_{t-1} + bh)
            pre = xt.dot(self.Wxh.T) + h[:, t, :].dot(self.Whh.T) + self.bh.T
            h[:, t + 1, :] = np.tanh(pre)
        # compute output using the last hidden state.
        # NOTE:  We could also use multiple hidden states to have more context.
        y_pred = h[:, -1, :].dot(self.Why.T) + self.by.T
        return h, y_pred  # h includes initial zero state at index 0

    def loss(self, y_pred, y_true):
        # MSE
        diff = y_pred - y_true
        return self.loss_fun(y_pred, y_true), diff

    def bptt_update(self, X, h, y_pred, y_true):
        # Using backpropagation through time (bptt) to update the weights
        # X: (batch, seq_len, input_size)
        batch, seq_len, _ = X.shape

        # Initializing the gradients
        dWxh = np.zeros_like(self.Wxh)
        dWhh = np.zeros_like(self.Whh)
        dWhy = np.zeros_like(self.Why)
        dbh = np.zeros_like(self.bh)
        dby = np.zeros_like(self.by)

        # dy on outputs (MSE derivative)
        dy = (y_pred - y_true) * (2.0 / batch)  # shape (batch, output_size)
        # dWhy and dby from last hidden
        # (batch, hidden)
        h_last = h[:, -1, :].reshape(batch, self.hidden_size)
        dWhy += dy.T.dot(h_last)  # (output, hidden)
        dby += dy.T.sum(axis=1, keepdims=True)  # (output,1)

        # backprop into last hidden state
        dh_next = dy.dot(self.Why)  # (batch, hidden)

        # BPTT through time
        # NOTE: As in normal BP, we go the network backwards
        for t in reversed(range(seq_len)):
            ht = h[:, t + 1, :]  # (batch, hidden)
            ht_prev = h[:, t, :]  # (batch, hidden)
            # derivative through tanh
            dt = dh_next * (1 - ht**2)  # (batch, hidden)
            dbh += dt.T.sum(axis=1, keepdims=True)
            # dWxh: sum over batch of dt^T x_t
            xt = X[:, t, :].reshape(batch, -1)
            dWxh += dt.T.dot(xt)  # (hidden, input)
            # dWhh: dt^T h_{t-1}
            dWhh += dt.T.dot(ht_prev)
            # propagate dh to previous time step
            dh_next = dt.dot(self.Whh)

        # Gradient clipping to avoid exploding gradients.
        for grad in (dWxh, dWhh, dWhy, dbh, dby):
            np.clip(grad, -5, 5, out=grad)

        # SGD parameter update
        self.Wxh -= self.lr * dWxh
        self.Whh -= self.lr * dWhh
        self.Why -= self.lr * dWhy
        self.bh -= self.lr * dbh
        self.by -= self.lr * dby

    def train(self, X, y, epochs=50, batch_size=32, verbose=True):
        assert len(X) == len(y), 'X and y must be the same length'
        n = X.shape[0]
        losses = []
        for epoch in range(1, epochs + 1):
            # shuffle
            idx = np.random.permutation(n)
            X_shuffled = X[idx]
            y_shuffled = y[idx]
            epoch_loss = 0.0

            # Creating batches, feeding through the network,
            # computing loss, and updating the gradients
            for i in range(0, n, batch_size):
                xb = X_shuffled[i: i + batch_size]
                yb = y_shuffled[i: i + batch_size]
                h, y_pred = self.forward(xb)
                loss, _ = self.loss(y_pred, yb)
                epoch_loss += loss * xb.shape[0]
                self.bptt_update(xb, h, y_pred, yb)
            epoch_loss /= n
            losses.append(epoch_loss)
            if verbose and (epoch % max(1, epochs // 10) == 0 or epoch == 1):
                print(f"Epoch {epoch}/{epochs} - loss: {epoch_loss:.6f}")
        return losses

def generate_sine_sequences(n_samples=2000, seq_len=20, input_size=1, seed=0):
    rng = np.random.RandomState(seed)
    x = np.linspace(0, 50, n_samples * seq_len * input_size)
    data = np.sin(x) + 0.1 * rng.randn(n_samples * seq_len * input_size)
    X = data.reshape(n_samples, seq_len, input_size)
    rolled = np.roll(data, -1).reshape(n_samples, seq_len, input_size)
    y_last = rolled[:, -1, :]
    return X.astype(np.float32), y_last.astype(np.float32)

if False: 
    # Creating the data
    X, y = generate_sine_sequences(n_samples=1500, seq_len=20, input_size=1)
    # train/test split
    split = int(0.8 * X.shape[0])
    X_train, y_train = X[:split], y[:split]
    X_test, y_test = X[split:], y[split:]

    print(X_train.shape, y_train.shape)

    # Training the network
    rnn = RNN(input_size=1, hidden_size=80, output_size=1, lr=1e-3)
    losses = rnn.train(X_train, y_train, epochs=600, batch_size=8, verbose=True)

    # Test set
    _, y_pred_test = rnn.forward(X_test)
    test_loss, _ = rnn.loss(y_pred_test, y_test)
    print(f"\nTest MSE: {test_loss:.6f}\n")

    # Plotting the predictions.
    plt.plot(y_test)
    plt.plot(y_pred_test)
    plt.savefig("rnn_pred.png")
    plt.show()

___

## Classification model implementation



In [18]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset

class ClassificationRNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        """
        input_size: embedding dimension of precomputed inputs
        hidden_size: number of hidden units
        output_size: number of classes
        """
        super(ClassificationRNN, self).__init__()
        self.rnn = nn.RNN(input_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
        self.hidden_size = hidden_size
        self.output_size = output_size

    def forward(self, x):
        """
        x: (batch, seq_len, input_size) - already embedded
        """
        h0 = torch.zeros(1, x.size(0), self.hidden_size)
        out, _ = self.rnn(x, h0)
        out = self.fc(out[:, -1, :])  # last hidden state
        return out

    def train_model(self, X, y, epochs=10, batch_size=32, lr=1e-3, verbose=True):
        """
        X: (n_samples, seq_len, input_size) - embedded floats
        y: (n_samples,) - integer labels
        """
        # Wrap data
        X_tensor = torch.tensor(X, dtype=torch.float32)
        y_tensor = torch.tensor(y, dtype=torch.long)
        dataset = TensorDataset(X_tensor, y_tensor)
        loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
        
        # Loss and optimizer
        criterion = nn.CrossEntropyLoss()
        optimizer = torch.optim.Adam(self.parameters(), lr=lr)
        
        losses = []
        for epoch in range(1, epochs + 1):
            self.train()
            total_loss = 0
            for xb, yb in loader:
                optimizer.zero_grad()
                logits = self.forward(xb)
                loss = criterion(logits, yb)
                loss.backward()
                optimizer.step()
                total_loss += loss.item() * xb.size(0)
            
            avg_loss = total_loss / len(dataset)
            losses.append(avg_loss)
            if verbose:
                print(f"Epoch {epoch}/{epochs} - loss: {avg_loss:.4f}")
        return losses

    def predict(self, X):
        self.eval()
        X_tensor = torch.tensor(X, dtype=torch.float32)
        with torch.no_grad():
            logits = self.forward(X_tensor)
            preds = torch.argmax(logits, dim=1)
        return preds.numpy()

___

## Data onboarding

Fetch the data from kaggle API

In [19]:
import os
import pandas as pd
import kagglehub

# Fetch the latest version of the dataset from kaggle
data_dir = kagglehub.dataset_download("tanishqdublish/text-classification-documentation")
data_path = os.path.join(data_dir, os.listdir(data_dir)[0]);

data = pd.read_csv(data_path)
print(data.shape)
print(data.head())

(2225, 2)
                                                Text  Label
0  Budget to set scene for election\n \n Gordon B...      0
1  Army chiefs in regiments decision\n \n Militar...      0
2  Howard denies split over ID cards\n \n Michael...      0
3  Observers to monitor UK election\n \n Minister...      0
4  Kilroy names election seat target\n \n Ex-chat...      0


Split the data to features and labels

In [20]:
X, y = data['Text'], data['Label']

___

## Data preprocessing

In [21]:
from sklearn.model_selection import train_test_split

Clean the text

In [22]:
import re

def clean_text(text):
    # Lowercase
    text = text.lower()
    # Keep only letters and basic punctuation
    text = re.sub(r"[^a-zA-Z\s]", " ", text)
    # Remove extra spaces
    text = re.sub(r"\s+", " ", text).strip()
    return text

X_clean = X.apply(clean_text)

Tokenize the text

In [23]:
from nltk import word_tokenize

from nltk.stem import WordNetLemmatizer
lemmatizer = WordNetLemmatizer()

X_tokenized = X_clean.apply(word_tokenize)
X_tokenized = [[lemmatizer.lemmatize(t) for t in tokens] for tokens in X_tokenized]


Create a vocabulary

In [24]:
from collections import Counter

all_tokens = [token for x in X_tokenized for token in x]
counter = Counter(all_tokens)

# Reserved tokens
vocab = {"<pad>": 0, "<unk>": 1}
for tok in counter:
    if tok not in vocab:
        vocab[tok] = len(vocab)

Encode the data

In [25]:
def encode_tokens(tokens, vocab):
    unk_id = vocab["<unk>"]
    return [vocab.get(tok, unk_id) for tok in tokens]

X_encoded = [encode_tokens(sent, vocab) for sent in X_tokenized]

Padding the data

In [26]:
max_len = max(len(seq) for seq in X_encoded)

def pad_sequences(sequences, max_len, pad_value=0):
    batch_size = len(sequences)
    padded = np.full((batch_size, max_len), pad_value, dtype=int)
    for i, seq in enumerate(sequences):
        padded[i, :len(seq)] = seq
    return padded

X_padded = pad_sequences(X_encoded, max_len)
print(X_padded.shape)  # (n_samples, seq_len)

(2225, 4474)


Embedding the data

In [27]:
embedding_dim = 10

# Random embeddings: shape (vocab_size, embedding_dim)
embedding_matrix = np.random.randn(len(vocab), embedding_dim)

# 
batch_size, seq_len = X_padded.shape

# Create an empty array for embeddings
X_embedded = np.zeros((batch_size, seq_len, embedding_dim))

for i in range(batch_size):
    for j in range(seq_len):
        token_id = X_padded[i, j]
        X_embedded[i, j] = embedding_matrix[token_id]

print("X_embedded shape:", X_embedded.shape)

X_embedded shape: (2225, 4474, 10)


Split the data to training and test sets

In [28]:
X_train, X_test, y_train, y_test = train_test_split(X_embedded, y, test_size=0.2)

y_train, y_test = y_train.to_numpy(), y_test.to_numpy()

print(X_train.shape, X_test.shape)

(1780, 4474, 10) (445, 4474, 10)


___

## Model training

In [None]:
from sklearn.metrics import accuracy_score

input_size = X_embedded.shape[2]  # embedding dimension
hidden_size = 100
output_size = len(set(y))

model = ClassificationRNN(input_size, hidden_size, output_size)

# Train
losses = model.train_model(X_train, y_train, epochs=10, batch_size=10, lr=1e-1)

# Predict
y_pred = model.predict(X_test)

plt.plot(losses)
print(accuracy_score(y_test, y_pred))

In [None]:

print(accuracy_score(y_test, y_pred))

0.19550561797752808


___