## Vanilla RNN from Scratch (NumPy)

This cell implements a **minimal RNN** from scratch using only NumPy.  
It is broken down into several classes:

- **InputLayer**: handles one-hot encoded inputs and multiplies with input weights `U`  
- **HiddenLayer**: maintains hidden states (`a_t`) and applies `tanh` activation  
- **OutputLayer**: computes logits and applies `softmax` for next-character prediction  
- **VanillaRNN**: ties everything together, implementing:
  - forward pass  
  - BPTT (Backpropagation Through Time)  
  - parameter update (`step`)  

Additionally, helper functions are defined:
- `softmax`: normalization for logits  
- `one_hot`: converts character index into one-hot vector  
- `make_sequence_data`: generates synthetic training data (alphabet sequences)

**Goal:** Train an RNN to predict the next character in the A–Z alphabet sequence.

Source : https://medium.com/@thisislong/building-a-recurrent-neural-network-from-scratch-ba9b27a42856

In [None]:

import numpy as np
from typing import List, Tuple

def softmax(z: np.ndarray) -> np.ndarray:
    z = z - np.max(z, axis=0, keepdims=True)
    exp_z = np.exp(z)
    return exp_z / np.sum(exp_z, axis=0, keepdims=True)

def one_hot(idx: int, vocab_size: int) -> np.ndarray:
    v = np.zeros((vocab_size, 1), dtype=np.float32)
    v[idx, 0] = 1.0
    return v

class InputLayer:
    def __init__(self, input_size: int, hidden_size: int, seed: int = 42):
        rng = np.random.default_rng(seed)
        self.U = rng.normal(0, 0.1, size=(hidden_size, input_size)).astype(np.float32)
        self.b = np.zeros((hidden_size, 1), dtype=np.float32)
        self.dU = np.zeros_like(self.U)
        self.db = np.zeros_like(self.b)
        self.inputs: List[np.ndarray] = []

    def get_input(self, t: int) -> np.ndarray:
        return self.inputs[t]

    def weighted_sum(self, t: int) -> np.ndarray:
        return self.U @ self.get_input(t) + self.b

    def zero_cache(self):
        self.inputs = []

    def zero_grads(self):
        self.dU.fill(0.0)
        self.db.fill(0.0)

    def accumulate_grads(self, delta_s_t: np.ndarray, t: int):
        self.dU += delta_s_t @ self.get_input(t).T
        self.db += delta_s_t

    def step(self, lr: float, clip: float = None):
        if clip is not None:
            np.clip(self.dU, -clip, clip, out=self.dU)
            np.clip(self.db, -clip, clip, out=self.db)
        self.U -= lr * self.dU
        self.b -= lr * self.db

class HiddenLayer:
    def __init__(self, hidden_size: int, seed: int = 123):
        self.hidden_size = hidden_size
        rng = np.random.default_rng(seed)
        self.W = rng.normal(0, 0.1, size=(hidden_size, hidden_size)).astype(np.float32)
        self.b = np.zeros((hidden_size, 1), dtype=np.float32)
        self.dW = np.zeros_like(self.W)
        self.db = np.zeros_like(self.b)
        self.a: List[np.ndarray] = []
        self.s: List[np.ndarray] = []

    def get_a(self, t: int) -> np.ndarray:
        if t < 0:
            return np.zeros((self.hidden_size, 1), dtype=np.float32)
        return self.a[t]

    def zero_cache(self):
        self.a = []
        self.s = []

    def zero_grads(self):
        self.dW.fill(0.0)
        self.db.fill(0.0)

    def forward_step(self, s_t: np.ndarray) -> np.ndarray:
        a_t = np.tanh(s_t)
        self.s.append(s_t)
        self.a.append(a_t)
        return a_t

    def accumulate_grads(self, delta_s_t: np.ndarray, t: int):
        a_prev = self.get_a(t - 1)
        self.dW += delta_s_t @ a_prev.T
        self.db += delta_s_t

    def step(self, lr: float, clip: float = None):
        if clip is not None:
            np.clip(self.dW, -clip, clip, out=self.dW)
            np.clip(self.db, -clip, clip, out=self.db)
        self.W -= lr * self.dW
        self.b -= lr * self.db

class OutputLayer:
    def __init__(self, hidden_size: int, output_size: int, seed: int = 7):
        rng = np.random.default_rng(seed)
        self.V = rng.normal(0, 0.1, size=(output_size, hidden_size)).astype(np.float32)
        self.c = np.zeros((output_size, 1), dtype=np.float32)
        self.dV = np.zeros_like(self.V)
        self.dc = np.zeros_like(self.c)
        self.o: List[np.ndarray] = []
        self.yhat: List[np.ndarray] = []

    def zero_cache(self):
        self.o = []
        self.yhat = []

    def zero_grads(self):
        self.dV.fill(0.0)
        self.dc.fill(0.0)

    def forward_step(self, a_t: np.ndarray):
        o_t = self.V @ a_t + self.c
        yhat_t = softmax(o_t)
        self.o.append(o_t)
        self.yhat.append(yhat_t)
        return o_t, yhat_t

    def accumulate_grads(self, delta_o_t: np.ndarray, a_t: np.ndarray):
        self.dV += delta_o_t @ a_t.T
        self.dc += delta_o_t

    def step(self, lr: float, clip: float = None):
        if clip is not None:
            np.clip(self.dV, -clip, clip, out=self.dV)
            np.clip(self.dc, -clip, clip, out=self.dc)
        self.V -= lr * self.dV
        self.c -= lr * self.dc

class VanillaRNN:
    def __init__(self, input_size: int, hidden_size: int, output_size: int, lr: float = 0.1, clip: float = 5.0):
        self.input = InputLayer(input_size, hidden_size)
        self.hidden = HiddenLayer(hidden_size)
        self.output = OutputLayer(hidden_size, output_size)
        self.lr = lr
        self.clip = clip

    def zero_caches_and_grads(self):
        self.input.zero_cache()
        self.hidden.zero_cache()
        self.output.zero_cache()
        self.input.zero_grads()
        self.hidden.zero_grads()
        self.output.zero_grads()

    def forward(self, x_seq: List[np.ndarray]):
        self.zero_caches_and_grads()
        yhat_seq = []
        for t, x_t in enumerate(x_seq):
            self.input.inputs.append(x_t)
            s_t = self.hidden.W @ self.hidden.get_a(t - 1) + self.input.weighted_sum(t) + self.hidden.b
            a_t = self.hidden.forward_step(s_t)
            _, yhat_t = self.output.forward_step(a_t)
            yhat_seq.append(yhat_t)
        return yhat_seq

    def bptt(self, y_seq: List[np.ndarray]):
        T = len(y_seq)
        delta_a_next = np.zeros((self.hidden.hidden_size, 1), dtype=np.float32)
        for t in reversed(range(T)):
            a_t = self.hidden.a[t]
            yhat_t = self.output.yhat[t]
            y_true_t = y_seq[t]
            delta_o_t = (yhat_t - y_true_t)
            self.output.accumulate_grads(delta_o_t, a_t)
            delta_a_t = self.output.V.T @ delta_o_t + delta_a_next
            ds_t = (1.0 - np.square(a_t)) * delta_a_t
            self.input.accumulate_grads(ds_t, t)
            self.hidden.accumulate_grads(ds_t, t)
            delta_a_next = self.hidden.W.T @ ds_t

    def step(self):
        self.input.step(self.lr, self.clip)
        self.hidden.step(self.lr, self.clip)
        self.output.step(self.lr, self.clip)

    def loss(self, yhat_seq: List[np.ndarray], y_seq: List[np.ndarray]) -> float:
        return sum(-float(np.sum(y_seq[t]*np.log(yhat_seq[t]+1e-12))) for t in range(len(y_seq)))

    def predict_next(self, x_seq: List[np.ndarray]) -> int:
        yhat_seq = self.forward(x_seq)
        last = yhat_seq[-1]
        return int(np.argmax(last))


## Helper Functions

This cell defines utility functions:

- `one_hot`: returns a one-hot encoded column vector for a given index.  
- `make_sequence_data`: generates synthetic sequential data:
  - Randomly selects a starting index `k` in the alphabet.  
  - Creates an input sequence of length `seq_len`.  
  - The target sequence is shifted by one character.  

For example, input: `["A","B","C","D"]` → target: `["B","C","D","E"]`.


In [None]:
def one_hot(idx: int, vocab_size: int) -> np.ndarray:
    v = np.zeros((vocab_size, 1), dtype=np.float32)
    v[idx, 0] = 1.0
    return v

def make_sequence_data(seq_len: int = 5, num_samples: int = 200):
    rng = np.random.default_rng(0)
    dataset = []
    for _ in range(num_samples):
        k = int(rng.integers(0, V))
        letters = [vocab[(k + t) % V] for t in range(seq_len + 1)]
        xs = [one_hot(v2i[ch], V) for ch in letters[:-1]]
        ys = [one_hot(v2i[ch], V) for ch in letters[1:]]
        dataset.append((xs, ys))
    return dataset

## Preparing the Dataset

This cell sets up the dataset:

- Defines the alphabet `A–Z` and vocabulary mappings:  
  - `v2i`: character → index  
  - `i2v`: index → character  
- Defines vocabulary size `V`.  
- Generates the training dataset using `make_sequence_data` with 300 samples.  

Example of one training sample:
- Input (x): `["A","B","C","D","E","F"]`  
- Target (y): `["B","C","D","E","F","G"]`  

Each character is represented as a one-hot vector of size `V=26`.


In [None]:

alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
vocab = list(alphabet)
v2i = {ch: i for i, ch in enumerate(vocab)}
i2v = {i: ch for ch, i in v2i.items()}
V = len(vocab)

seq_len = 6
train = make_sequence_data(seq_len=seq_len, num_samples=300)

## 1 sample (seq_len = 6) : (x 6 feature, y 6 feature)
# Ex : x (ABCDEF) -> y (BCDEFG)

## 1 feature 24 size (one-hot encoding)
# ex : A -> [1, 0, 0, 0 ..., 0]
# ex : B -> [0, 1, 0, 0 ..., 0]
# ex : Z -> [0, 0, 0, 0 ..., 1]
print(train[0])


([array([[0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [1.],
       [0.],
       [0.],
       [0.]], dtype=float32), array([[0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [1.],
       [0.],
       [0.]], dtype=float32), array([[0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
  

## Training and Testing the RNN

This cell trains the RNN model:

- Initializes a `VanillaRNN` with:
  - `input_size = V` (26)  
  - `hidden_size = 64`  
  - `output_size = V`  
- Trains for **15 epochs** using cross-entropy loss.  
- Performs stochastic gradient descent with gradient clipping (`clip=5.0`).  

At the end, it tests the model on the sequence `"ABCDE"`,  
and predicts the **next character** (expected `"F"`).  


In [None]:
## input size = output size , hidden size = 64
rnn = VanillaRNN(input_size=V, hidden_size=64, output_size=V, lr=0.1, clip=5.0)
for epoch in range(1, 101):
    np.random.shuffle(train)
    total = 0.0
    for xs, ys in train:
        yhat = rnn.forward(xs)
        L = sum(-float(np.sum(ys[t]*np.log(yhat[t]+1e-12))) for t in range(len(ys)))
        rnn.bptt(ys)
        rnn.step()
        total += L
    if epoch % 10 == 0:
      print(f"epoch={epoch} loss={total/len(train):.4f}")

test_seq = "ABCDEF"
xs = [one_hot(v2i[ch], V) for ch in test_seq]
pred = rnn.predict_next(xs)
print("Input:", test_seq, " Pred next:", i2v[pred])

epoch=10 loss=0.0086
epoch=20 loss=0.0037
epoch=30 loss=0.0023
epoch=40 loss=0.0017
epoch=50 loss=0.0013
epoch=60 loss=0.0011
epoch=70 loss=0.0009
epoch=80 loss=0.0008
epoch=90 loss=0.0007
epoch=100 loss=0.0006
Input: ABCDEF  Pred next: G
