# ADAML workshop 3: Recurrent Neural Network



___

## Model implementation

In [26]:
import numpy as np
import matplotlib.pyplot as plt

class RNN:
    def __init__(self, input_size, hidden_size, output_size, lr=1e-3, seed=1):
        rng = np.random.RandomState(seed)
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.Wxh = rng.randn(hidden_size, input_size)
        self.Whh = rng.randn(hidden_size, hidden_size)
        self.Why = rng.randn(output_size, hidden_size)
        self.bh = np.zeros((hidden_size, 1))
        self.by = np.zeros((output_size, 1))
        self.lr = lr

    def forward(self, X):
        # In forward model we feed the data X through the network.
        # X shape: (batch, seq_len, input_size)

        batch, seq_len, _ = X.shape
        h = np.zeros((batch, seq_len + 1, self.hidden_size))
        for t in range(seq_len):
            # Reshaping to correct input shape (batch, input_size)
            xt = X[:, t, :].reshape(batch, -1)
            # compute next hidden: h_t = tanh(Wxh@x_t + Whh@h_{t-1} + bh)
            pre = xt.dot(self.Wxh.T) + h[:, t, :].dot(self.Whh.T) + self.bh.T
            h[:, t + 1, :] = np.tanh(pre)
        # compute output using the last hidden state.
        # NOTE:  We could also use multiple hidden states to have more context.
        y_pred = h[:, -1, :].dot(self.Why.T) + self.by.T
        return h, y_pred  # h includes initial zero state at index 0

    def loss(self, y_pred, y_true):
        # MSE
        diff = y_pred - y_true
        return np.mean(diff**2), diff

    def bptt_update(self, X, h, y_pred, y_true):
        # Using backpropagation through time (bptt) to update the weights
        # X: (batch, seq_len, input_size)
        batch, seq_len, _ = X.shape

        # Initializing the gradients
        dWxh = np.zeros_like(self.Wxh)
        dWhh = np.zeros_like(self.Whh)
        dWhy = np.zeros_like(self.Why)
        dbh = np.zeros_like(self.bh)
        dby = np.zeros_like(self.by)

        # dy on outputs (MSE derivative)
        dy = (y_pred - y_true) * (2.0 / batch)  # shape (batch, output_size)
        # dWhy and dby from last hidden
        # (batch, hidden)
        h_last = h[:, -1, :].reshape(batch, self.hidden_size)
        dWhy += dy.T.dot(h_last)  # (output, hidden)
        dby += dy.T.sum(axis=1, keepdims=True)  # (output,1)

        # backprop into last hidden state
        dh_next = dy.dot(self.Why)  # (batch, hidden)

        # BPTT through time
        # NOTE: As in normal BP, we go the network backwards
        for t in reversed(range(seq_len)):
            ht = h[:, t + 1, :]  # (batch, hidden)
            ht_prev = h[:, t, :]  # (batch, hidden)
            # derivative through tanh
            dt = dh_next * (1 - ht**2)  # (batch, hidden)
            dbh += dt.T.sum(axis=1, keepdims=True)
            # dWxh: sum over batch of dt^T x_t
            xt = X[:, t, :].reshape(batch, -1)
            dWxh += dt.T.dot(xt)  # (hidden, input)
            # dWhh: dt^T h_{t-1}
            dWhh += dt.T.dot(ht_prev)
            # propagate dh to previous time step
            dh_next = dt.dot(self.Whh)

        # Gradient clipping to avoid exploding gradients.
        for grad in (dWxh, dWhh, dWhy, dbh, dby):
            np.clip(grad, -5, 5, out=grad)

        # SGD parameter update
        self.Wxh -= self.lr * dWxh
        self.Whh -= self.lr * dWhh
        self.Why -= self.lr * dWhy
        self.bh -= self.lr * dbh
        self.by -= self.lr * dby

    def train(self, X, y, epochs=50, batch_size=32, verbose=True):
        n = X.shape[0]
        losses = []
        for epoch in range(1, epochs + 1):
            # shuffle
            idx = np.random.permutation(n)
            X_shuffled = X[idx]
            y_shuffled = y[idx]
            epoch_loss = 0.0

            # Creating batches, feeding through the network,
            # computing loss, and updating the gradients
            for i in range(0, n, batch_size):
                xb = X_shuffled[i: i + batch_size]
                yb = y_shuffled[i: i + batch_size]
                h, y_pred = self.forward(xb)
                loss, _ = self.loss(y_pred, yb)
                epoch_loss += loss * xb.shape[0]
                self.bptt_update(xb, h, y_pred, yb)
            epoch_loss /= n
            losses.append(epoch_loss)
            if verbose and (epoch % max(1, epochs // 10) == 0 or epoch == 1):
                print(f"Epoch {epoch}/{epochs} - loss: {epoch_loss:.6f}")
        return losses

def generate_sine_sequences(n_samples=2000, seq_len=20, input_size=1, seed=0):
    rng = np.random.RandomState(seed)
    x = np.linspace(0, 50, n_samples * seq_len * input_size)
    data = np.sin(x) + 0.1 * rng.randn(n_samples * seq_len * input_size)
    X = data.reshape(n_samples, seq_len, input_size)
    rolled = np.roll(data, -1).reshape(n_samples, seq_len, input_size)
    y_last = rolled[:, -1, :]
    return X.astype(np.float32), y_last.astype(np.float32)

if False: 
    # Creating the data
    X, y = generate_sine_sequences(n_samples=1500, seq_len=20, input_size=1)
    # train/test split
    split = int(0.8 * X.shape[0])
    X_train, y_train = X[:split], y[:split]
    X_test, y_test = X[split:], y[split:]

    print(X_train.shape, y_train.shape)

    # Training the network
    rnn = RNN(input_size=1, hidden_size=80, output_size=1, lr=1e-3)
    losses = rnn.train(X_train, y_train, epochs=600, batch_size=8, verbose=True)

    # Test set
    _, y_pred_test = rnn.forward(X_test)
    test_loss, _ = rnn.loss(y_pred_test, y_test)
    print(f"\nTest MSE: {test_loss:.6f}\n")

    # Plotting the predictions.
    plt.plot(y_test)
    plt.plot(y_pred_test)
    plt.savefig("rnn_pred.png")
    plt.show()

___

## Classification model implementation



In [17]:
import torch
import torch.nn as nn
import torch.optim as optim
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from torch.utils.data import Dataset, DataLoader

class ClassificationRNN(nn.Module):
    def __init__(self, vocab_size, embed_size, hidden_size, output_size):
        super(ClassificationRNN, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.rnn = nn.RNN(embed_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
    
    def forward(self, x):
        x = self.embedding(x)
        h0 = torch.zeros(1, x.size(0), hidden_size).to(x.device)
        out, _ = self.rnn(x, h0)
        out = self.fc(out[:, -1, :])
        return out

___

## Data onboarding

Fetch the data from kaggle API

In [18]:
import os
import pandas as pd
import kagglehub

# Fetch the latest version of the dataset from kaggle
data_dir = kagglehub.dataset_download("tanishqdublish/text-classification-documentation")
data_path = os.path.join(data_dir, os.listdir(data_dir)[0]);

data = pd.read_csv(data_path)
print(data.shape)
print(data.head())

(2225, 2)
                                                Text  Label
0  Budget to set scene for election\n \n Gordon B...      0
1  Army chiefs in regiments decision\n \n Militar...      0
2  Howard denies split over ID cards\n \n Michael...      0
3  Observers to monitor UK election\n \n Minister...      0
4  Kilroy names election seat target\n \n Ex-chat...      0


Split the data to features and labels

In [19]:
X, y = data['Text'], data['Label']

___

## Data preprocessing

In [20]:
import re 
import nltk
from nltk.stem import WordNetLemmatizer
from nltk.corpus import stopwords

nltk.download("wordnet")
lemmatizer = WordNetLemmatizer()

def clean_text(text):

    # Lowercase and remove non-alphabetic characters
    text = text.lower()
    text = re.sub(r'[^a-z\s]', '', text)
    text = re.sub(r'\s+', ' ', text).strip()

    words = text.split()

    # Lemmatization
    words = [lemmatizer.lemmatize(word) for word in words]

    # Stop-word removal
    # words = [word for word in words if word not in stopwords]

    return text #' '.join(words)

# Apply the cleaning function
X = X.apply(clean_text)

[nltk_data] Downloading package wordnet to
[nltk_data]     /Users/eliaseskelinen/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


In [21]:
# import nltk
# import re
# nltk.download('punkt')

# # # Tokenize each sentence
# # for i, x in enumerate(X): 
# #     #tokens = nltk.word_tokenize(x)
# #     tokens = nltk.word_tokenize(x)
# #     X[i] = ' '.join(tokens)

In [22]:
X, y = X.to_numpy(), y.to_numpy()
print(X.shape)

(2225,)


Vectorize the data

In [23]:
import numpy as np
import re

# Basic tokenization
def tokenize(sentence):
    return re.findall(r"\b\w+\b", sentence.lower())

tokenized = [tokenize(s) for s in X]

# Build vocabulary
vocab = {}
for sent in tokenized:
    for word in sent:
        if word not in vocab:
            vocab[word] = len(vocab) + 1  # start indexing at 1

sequences = [[vocab[word] for word in sent] for sent in tokenized]
# e.g., [[1, 2, 3, 4, 1, 5], [6, 7, 8, 9], [10, 11, 12]]

max_len = max(len(seq) for seq in sequences)
padded = np.zeros((len(sequences), max_len), dtype=int)

for i, seq in enumerate(sequences):
    padded[i, :len(seq)] = seq
# shape = (batch, seq_len)

vocab_size = len(vocab) + 1  # +1 for padding index 0
embedding_dim = 300  # features per token

embedding_matrix = np.random.randn(vocab_size, embedding_dim)
embeddings = embedding_matrix[padded]  # shape = (batch, seq_len, features)

rnn_input = np.transpose(embeddings, (2, 1, 0))


Split the data to training and test sets

In [24]:
from sklearn.model_selection import train_test_split

#X_train, X_test, y_train, y_test = train_test_split(rnn_input, y, test_size=0.2)


___

## Model training

In [None]:
rnn = RNN(input_size=1, hidden_size=80, output_size=1, lr=1e-3)

print(rnn_input.shape, y.shape)
rnn.train(rnn_input, y)

(300, 4396, 2225) (2225,)


In [None]:
df = pd.read_csv("/content/IMDB Dataset.csv", names=["text", "label"])

le = LabelEncoder()
df['label'] = le.fit_transform(df['label'])

train_data, test_data = train_test_split(df, test_size=0.2, random_state=42)

vocab = {word for phrase in df['text'] for word in phrase}
word_to_idx = {word: idx for idx, word in enumerate(vocab, start=1)}

max_length = df['text'].str.len().max()

def encode_and_pad(text):
    encoded = [word_to_idx[word] for word in text]
    return encoded + [0] * (max_length - len(encoded))

train_data['text'] = train_data['text'].apply(encode_and_pad)
test_data['text'] = test_data['text'].apply(encode_and_pad)
vocab_size = len(vocab) + 1
embed_size = 128
hidden_size = 128
output_size = 2 
model = ClassificationRNN(vocab_size, embed_size, hidden_size, output_size)

___