In [243]:
!uv venv

Using CPython [36m3.10.17[39m
Creating virtual environment at: [36m.venv[39m
Activate with: [32msource .venv/bin/activate[39m


In [244]:
!source .venv/bin/activate
!uv pip install numpy

[2mUsing Python 3.12.9 environment at: /Users/ElliotPhua/miniconda3[0m
[2mAudited [1m1 package[0m [2min 4ms[0m[0m


In [245]:
from typing import List
import numpy as np
import re

corpus = "" # corpus

def preprocess_corpus(corpus: str) -> tuple[dict[str, int], dict[int, str], np.ndarray, int]:
    """
    Args:
        corpus (str)
    Returns:
        dict[str, int]: ind_dict[k=word, v=index]
        dict[int, str]: str_dict[k=index, v=word]
        np.ndarray: an ndarray of word indices corresponding to the input corpus (ground truth)
    """
    tokens = re.findall(r"\b\w+\b", corpus)[:20000]
    ind_dict = {} # k=word, v=ind
    str_dict = {} # k=ind, v=word
    count = 0
    for word in tokens:
        if word not in ind_dict:
            ind_dict[word] = count
            str_dict[count] = word
            count += 1
    
    data_ind = np.array([ind_dict[w] for w in tokens])
    
    return ind_dict, str_dict, data_ind, count

def make_sequences(data_ind: np.ndarray, seq_len:int=10) -> tuple[np.ndarray, np.ndarray]:
    X = np.array([data_ind[i:i+seq_len] for i in range(len(data_ind) - seq_len)], dtype=int)
    Y = np.array([data_ind[j:j+seq_len] for j in range(1, len(data_ind) - seq_len + 1)], dtype=int)
    return X, Y


In [246]:
# Recurrent Neural Network
import numpy as np

def get_h_t(x_t:int, ind_dict: dict[str, int], prev_h:np.ndarray, w_e:np.ndarray, w_hh:np.ndarray, w_xh:np.ndarray, b_h:np.ndarray) -> np.ndarray:
    '''
    h_t <- f(x_t, h_t-1)
    Get current state memory, dependent on current word embedding e_t and state memory at previous time step prev_h, and using the weight embedding w_hh.
    Args:
        x_t (int): The current target word's index
        ind_dict (dict[str, int]): indices dictionary, k=word, v=corresponding row index, length |v|
        prev_h (np.ndarray): The state memory at previous time step t-1, with shape H x 1
        w_e (np.ndarray): word embedding matrix, shape |v| x d
        w_hh (np.ndarray): weight embedding for state memory, shape H x H
        w_xh (np.ndarray): weight embedding to transform target word embedding into space of state memory embedding, shape H x d
        b_h (np.ndarray): bias with shape H x 1
    Returns:
        np.ndarray: the current state memory h_t, shape H x 1
    '''
    one_hot = np.zeros((len(ind_dict), 1)) # shape |v| x 1
    one_hot[x_t] = 1 # one-hot encoding for current word index
    e_t = w_e.T@one_hot # Get the word embedding for the current word index, shape d x 1
    h_t = np.tanh(w_xh@e_t + w_hh@prev_h + b_h) # tanh prevents exploding gradients and gives hidden state non-linearity
    return h_t

def predict_o_t(h_t:np.ndarray, w_hy:np.ndarray, b_y:np.ndarray) -> np.ndarray:
    '''
    Get the output raw score for all candidate output words
    Args:
        h_t (np.ndarray): the current state memory embedding, shape H x 1
        w_hy (np.ndarray): the output score embedding, shape |v| x H
        b_y (np.ndarray): the bias for output score, shape |v| x 1
    Returns:
        np.ndarray: the raw score vector for all candidate words, shape |v| x 1
    '''
    return w_hy@h_t + b_y

def softmax(o_t: np.ndarray) -> np.ndarray:
    """
    softmax(o_i) = exp(o_i - C)/sum[j in T](exp(o_j - C)), numerical stability
    Args:
        o_t (np.ndarray): the output vector of raw scores at time step t, shape |v| x 1
    Returns:
        np.ndarray: the probability vector for all candidate output word, shape |v| x 1
    """
    exp_z = np.exp(o_t - np.max(o_t))
    return exp_z/np.sum(exp_z)
    
def get_most_likely_output_ind(p_t:np.ndarray) -> int:
    index_of_max_p = np.argmax(p_t)
    return int(index_of_max_p)

def loss_at_time_step(p_t: np.ndarray, i: int, v_len: int) -> float:
    """
    Cross entropy loss.
    Args:
        p_t (np.ndarray): the output probability vector for all candidate words at time step t, shape: |v| x 1
        i (int): index of target word
        v_len (int): vocab length
    Returns:
        float: the loss at time step t
    """
    y_true = np.zeros((v_len, 1))
    y_true[i] = 1
    loss_t = -np.log(y_true.T@p_t + 1e-12)
    return loss_t[0]

    

In [247]:

# init weights manually
class Weights():
    def __init__(self, ind_dict: dict[str, int], H:int=128, d:int=64) -> None:
        self.H = H # hidden state size
        self.d = d # word embedding sizes
        self.v = len(ind_dict) # vocab length
        self.c = 0.01
        self.w_e = np.random.randn(self.v, self.d) * self.c
        self.w_xh = np.random.randn(self.H, self.d) * self.c
        self.b_h = np.zeros((self.H, 1)) # biases are offsets, no symmetry issue; can start with neutral offset. weights are init randomly so that they can learn diff parts of the data distribution
        self.w_hh = np.random.randn(self.H, self.H) * self.c
        self.w_hy = np.random.randn(self.v, self.H) * self.c
        self.b_y = np.zeros((self.v, 1))

In [248]:
# forward propagation

def forward_prop(x_seq: np.ndarray, y_seq: np.ndarray, h_prev: np.ndarray, ind_dict: dict[str, int], model:Weights) -> tuple:
    w_e, w_xh, w_hh, w_hy, b_h, b_y = model.w_e, model.w_xh, model.w_hh, model.w_hy, model.b_h, model.b_y
    
    total_loss, correct_pred = 0, 0
    h_cache, o_cache, p_cache = {}, {}, {}
    h_cache[-1] = np.copy(h_prev)
    
    for t in range(len(x_seq)):
        h_cache[t] = get_h_t(x_seq[t], ind_dict, h_cache[t-1], w_e, w_hh, w_xh, b_h)
        o_cache[t] = predict_o_t(h_cache[t], w_hy, b_y)
        p_cache[t] = softmax(o_cache[t])
        pred_ind = get_most_likely_output_ind(p_cache[t])
        correct_pred += pred_ind == y_seq[t]
        l_t = loss_at_time_step(p_cache[t], y_seq[t], len(ind_dict))
        total_loss += l_t
    
    accuracy_score = float(correct_pred / y_seq.size)
    cache = (x_seq, h_cache, p_cache)
    return total_loss, accuracy_score, h_cache[len(x_seq)-1], cache    

In [249]:
# backpropagation through time (BPTT)
# we want to train w_e, w_xh, b_x, w_hh, w_hy, b_y

def backprop_tt(y_seq: np.ndarray, model:Weights, cache:tuple, learning_rate:float=0.1):
    w_e, w_xh, w_hh, w_hy, b_h, b_y = (
        model.w_e,
        model.w_xh,
        model.w_hh,
        model.w_hy,
        model.b_h,
        model.b_y,
    )

    dw_e = np.zeros_like(w_e)
    dw_xh = np.zeros_like(w_xh)
    dw_hh = np.zeros_like(w_hh)
    db_h = np.zeros_like(b_h)
    dw_hy = np.zeros_like(w_hy)
    db_y = np.zeros_like(b_y) # shape |v| x 1

    x_cache, h_cache, p_cache = cache

    dh_next = np.zeros_like(h_cache[0])

    for t in reversed(range(len(y_seq))):
        x_t = x_cache[t]
        y_true = y_seq[t]

        # dL/dy_t = p_t - one-hot y_t
        d_o = p_cache[t].copy() 
        d_o[y_true] -= 1 # derivative of softmax + cross entropy

        dw_hy += d_o @ h_cache[t].T 
        db_y += d_o

        dh_t = w_hy.T @ d_o + dh_next
        dh_raw = (1-h_cache[t]**2) * dh_t # tanh derivative

        db_h += dh_raw
        e_t = w_e[x_t].reshape(1, -1)
        dw_xh += dh_raw @ e_t
        dw_hh += dh_raw @ h_cache[t-1].T
        dw_e[x_t] += (w_xh.T @ dh_raw).flatten()
        dh_next = w_hh.T @ dh_raw

    # Gradient clipping to prevent exploding gradient
    for grad in [dw_xh, dw_hh, dw_hy, db_h, db_y, dw_e]:
        np.clip(grad, -5, 5, out=grad)

    # update weights sgd
    model.w_e -= learning_rate * dw_e
    model.w_xh -= learning_rate * dw_xh
    model.w_hh -= learning_rate * dw_hh
    model.w_hy -= learning_rate * dw_hy
    model.b_h -= learning_rate * db_h
    model.b_y -= learning_rate * db_y

    return model

In [250]:
from sklearn.model_selection import train_test_split

fp = "data/text8"

with open(fp, 'r') as f:
    corpus = f.read().lower()

ind_dict, str_dict, data_indices, vocab_len = preprocess_corpus(corpus)

X, Y = make_sequences(data_indices, seq_len=5)
print(X.shape, Y.shape)
print(len(ind_dict))

(19995, 5) (19995, 5)
4290


In [258]:
# if shuffle = False --> preserve order, if shuffle = True --> faster convergence if huge dataset
X_train, X_test, y_train, y_test = train_test_split(X,Y, test_size=0.2, random_state=42, shuffle=True)
X_train_smol, y_train_smol = X_train[:500], y_train[:500]


In [252]:
# initialise input embedding, w_xh, w_hh, w_hy
model = Weights(ind_dict)
print(model.w_e.shape, model.w_xh.shape, model.w_hh.shape, model.w_hy.shape)

(4290, 64) (128, 64) (128, 128) (4290, 128)


In [253]:
# manual training

def RNN_train(X_train: np.ndarray, y_train: np.ndarray, ind_dict: dict[str, int], model: Weights, hidden_size:int=128, d:int=64, epochs:int=100, learning_rate=0.3e-1) -> Weights:
    
    for epoch in range(epochs):
        total_epoch_loss = 0
        h_prev = np.zeros((hidden_size, 1))
        
        total_correct  = 0
        total_words = 0
        
        for i in range(len(X_train)):
            x_seq = X_train[i]
            y_seq = y_train[i]
            
            total_loss, accuracy_score, h_prev, cache = forward_prop(x_seq, y_seq, h_prev, ind_dict, model)
            
            total_epoch_loss += total_loss
            
            total_correct += accuracy_score * y_seq.size
            total_words += y_seq.size
            model = backprop_tt(y_seq, model, cache, learning_rate)  # Fixed order of arguments
        
        avg_loss = total_epoch_loss / len(X_train)
        epoch_accuracy = total_correct / total_words
        print(f"Epoch {epoch+1}/{epochs}: Average Loss: {avg_loss}, Accuracy Score: {epoch_accuracy}")
    
    return model

In [259]:
model = RNN_train(X_train_smol, y_train_smol, ind_dict, model)

Epoch 1/100: Average Loss: [30.38594098], Accuracy Score: 0.0508
Epoch 2/100: Average Loss: [28.30600715], Accuracy Score: 0.058
Epoch 3/100: Average Loss: [26.51425503], Accuracy Score: 0.0584
Epoch 4/100: Average Loss: [25.4367435], Accuracy Score: 0.064
Epoch 5/100: Average Loss: [24.47576544], Accuracy Score: 0.0584
Epoch 6/100: Average Loss: [23.84344435], Accuracy Score: 0.0716
Epoch 7/100: Average Loss: [23.30761517], Accuracy Score: 0.0652
Epoch 8/100: Average Loss: [22.7599109], Accuracy Score: 0.0796
Epoch 9/100: Average Loss: [22.49833956], Accuracy Score: 0.0792
Epoch 10/100: Average Loss: [22.24451067], Accuracy Score: 0.088
Epoch 11/100: Average Loss: [22.04581514], Accuracy Score: 0.086
Epoch 12/100: Average Loss: [21.49996201], Accuracy Score: 0.1064
Epoch 13/100: Average Loss: [21.41550184], Accuracy Score: 0.0988
Epoch 14/100: Average Loss: [21.0578449], Accuracy Score: 0.116
Epoch 15/100: Average Loss: [21.08194796], Accuracy Score: 0.1156
Epoch 16/100: Average Loss:

In [260]:
def RNN_test(model: Weights, X_test: np.ndarray, y_test: np.ndarray) -> None:
    total_correct = 0
    total_words = 0
    total_loss = 0
    
    h_prev = np.zeros((model.H,1))
    for i in range(len(X_test)):
        x_seq = X_test[i]
        y_seq = y_test[i]
        
        loss, _, h_prev, cache = forward_prop(x_seq, y_seq, h_prev, ind_dict, model)
        total_loss += loss
        _, _, p_cache = cache
        # Count correct predictions
        for t in range(len(x_seq)):
            p_t = p_cache[t]
            pred_idx = np.argmax(p_t)
            total_correct += pred_idx == y_seq[t]
            total_words += len(x_seq)
    
    avg_loss = total_loss / len(X_test)
    accuracy_score = total_correct / total_words
    
    print(f"Test loss: {avg_loss}, Test Accuracy: {accuracy_score}")

In [261]:
RNN_test(model, X_test, y_test)

Test loss: [61.39573813], Test Accuracy: 0.0059914978744686175


In [None]:
!uv pip install --pre torch torchvision torchaudio --extra-index-url https://download.pytorch.org/whl/nightly/cpu
!uv pip install scikit-learn pandas

[2mUsing Python 3.12.9 environment at: /Users/ElliotPhua/miniconda3[0m
[2mAudited [1m3 packages[0m [2min 13ms[0m[0m
[2mUsing Python 3.12.9 environment at: /Users/ElliotPhua/miniconda3[0m
[2mAudited [1m2 packages[0m [2min 9ms[0m[0m
[2mUsing Python 3.12.9 environment at: /Users/ElliotPhua/miniconda3[0m
[2mAudited [1m2 packages[0m [2min 9ms[0m[0m


In [None]:
# Using pytorch
import torch

if torch.backends.mps.is_available():
    mps_device = torch.device("mps")
    x = torch.ones(1, device=mps_device)
    print(x)
else: print("MPS device not found.")

tensor([1.], device='mps:0')


In [None]:
import torch.nn as nn
import torch.optim as optim
from sklearn.preprocessing import LabelEncoder
from torch.utils.data import Dataset, DataLoader
import pandas as pd