In [2]:
import numpy as np

class ScratchSimpleRNNClassifier:
    """
    Simple Recurrent Neural Network Classifier from scratch.

    Parameters
    ----------
    n_nodes : int
        Number of nodes (neurons) in the RNN hidden layer.
    n_output : int
        Number of output classes.
    n_epochs : int
        Number of training epochs.
    learning_rate : float
        Learning rate for weight updates (alpha).
    batch_size : int
        Number of samples per batch during training.
    sigma : float
        Standard deviation for weight initialization (Gaussian distribution).
    verbose : bool
        If True, print loss during training.
    """
    def __init__(self, n_nodes=50, n_output=1, n_epochs=10, learning_rate=0.01,
                 batch_size=20, sigma=0.01, verbose=False):
        self.n_nodes = n_nodes
        self.n_output = n_output 
        self.n_epochs = n_epochs
        self.lr = learning_rate
        self.batch_size = batch_size
        self.sigma = sigma
        self.verbose = verbose

        self.Wx = None 
        self.Wh = None 
        self.B = None  
        self.Wy = None 
        self.By = None 

        self.loss_history = []

    def _tanh(self, a):
        """Tanh activation function."""
        return np.tanh(a)

    def _softmax(self, a):
        """Softmax activation function."""
    
        a = a - np.max(a, axis=-1, keepdims=True)
        exp_a = np.exp(a)
        return exp_a / np.sum(exp_a, axis=-1, keepdims=True)

    def _cross_entropy_loss(self, y_pred_proba, y_true_one_hot):
        """Cross-entropy loss calculation."""
        # Avoid log(0)
        epsilon = 1e-12
        y_pred_proba = np.clip(y_pred_proba, epsilon, 1. - epsilon)
        log_likelihood = -np.sum(y_true_one_hot * np.log(y_pred_proba), axis=1)
        loss = np.mean(log_likelihood)
        return loss

    def _initialize_weights(self, n_features):
        """Initialize weights using Gaussian distribution."""
        self.Wx = self.sigma * np.random.randn(n_features, self.n_nodes)
        self.Wh = self.sigma * np.random.randn(self.n_nodes, self.n_nodes)
        self.B = np.zeros(self.n_nodes)
        self.Wy = self.sigma * np.random.randn(self.n_nodes, self.n_output)
        self.By = np.zeros(self.n_output)

    def _forward_propagation(self, X_batch):
        """
        Perform forward propagation for one batch.

        Parameters
        ----------
        X_batch : ndarray, shape (batch_size, n_sequences, n_features)
            Input data for the batch.

        Returns
        -------
        h_states : ndarray, shape (batch_size, n_sequences + 1, n_nodes)
            Hidden states at each time step (including initial h0).
        a_states : ndarray, shape (batch_size, n_sequences, n_nodes)
            Pre-activation states at each time step.
        final_h : ndarray, shape (batch_size, n_nodes)
            Hidden state after the last sequence.
        output_a : ndarray, shape (batch_size, n_output)
             Pre-activation state of the output layer.
        y_pred_proba : ndarray, shape (batch_size, n_output)
            Predicted probabilities after softmax.
        """
        batch_size, n_sequences, n_features = X_batch.shape

        #
        h_prev = np.zeros((batch_size, self.n_nodes))

        #
        h_states = np.zeros((batch_size, n_sequences + 1, self.n_nodes))
        a_states = np.zeros((batch_size, n_sequences, self.n_nodes))
        h_states[:, 0, :] = h_prev # Store initial h0

        # RNN forward pass through time
        for t in range(n_sequences):
            xt = X_batch[:, t, :] # Input at time t (batch_size, n_features)
            # Calculate pre-activation state 'a' at time t
            at = xt @ self.Wx + h_prev @ self.Wh + self.B
            # Calculate hidden state 'h' at time t using tanh
            ht = self._tanh(at)

            # Store states
            a_states[:, t, :] = at
            h_states[:, t + 1, :] = ht

            # Update previous hidden state for next iteration
            h_prev = ht

        # Fully connected output layer 
        final_h = h_states[:, -1, :] 
        output_a = final_h @ self.Wy + self.By 

        # Apply softmax activation for classification output
        y_pred_proba = self._softmax(output_a)

        return h_states, a_states, final_h, output_a, y_pred_proba

    def _backward_propagation(self, X_batch, y_true_one_hot, h_states, a_states, y_pred_proba):
        """
        Perform backward propagation (BPTT) for one batch.

        Parameters
        ----------
        X_batch : ndarray, shape (batch_size, n_sequences, n_features)
            Input data for the batch.
        y_true_one_hot : ndarray, shape (batch_size, n_output)
            True labels in one-hot encoded format.
        h_states : ndarray, shape (batch_size, n_sequences + 1, n_nodes)
            Hidden states from forward pass.
        a_states : ndarray, shape (batch_size, n_sequences, n_nodes)
            Pre-activation states from forward pass.
        y_pred_proba : ndarray, shape (batch_size, n_output)
            Predicted probabilities from forward pass.

        Returns
        -------
        dWx : ndarray, shape (n_features, n_nodes)
            Gradient of loss w.r.t. Wx.
        dWh : ndarray, shape (n_nodes, n_nodes)
            Gradient of loss w.r.t. Wh.
        dB : ndarray, shape (n_nodes,)
            Gradient of loss w.r.t. B.
        dWy : ndarray, shape (n_nodes, n_output)
            Gradient of loss w.r.t. Wy.
        dBy : ndarray, shape (n_output,)
            Gradient of loss w.r.t. By.
        """
        batch_size, n_sequences, n_features = X_batch.shape

        # Initialize gradients
        dWx = np.zeros_like(self.Wx)
        dWh = np.zeros_like(self.Wh)
        dB = np.zeros_like(self.B)
        dWy = np.zeros_like(self.Wy)
        dBy = np.zeros_like(self.By)

        
        #
        delta_out = y_pred_proba - y_true_one_hot 

        # Gradient of loss w.r.t. Wy (dL/dWy)
        final_h = h_states[:, -1, :] 
        dWy = final_h.T @ delta_out 

        # Gradient of loss w.r.t. By (dL/dBy)
        dBy = np.sum(delta_out, axis=0) 

        # 
        dh_next = delta_out @ self.Wy.T 

        #Backpropagation Through Time (BPTT)
        
        for t in reversed(range(n_sequences)):
           
            ht = h_states[:, t + 1, :] 
            at = a_states[:, t, :]   
            tanh_derivative = 1 - ht**2 
            
            delta_a = dh_next * tanh_derivative 

            #
            
            dB += np.sum(delta_a, axis=0) 

            # dL/dWx_t = x_t.T @ (dL/da_t)
            xt = X_batch[:, t, :] 
            dWx += xt.T @ delta_a 

            # dL/dWh_t = h_{t-1}.T @ (dL/da_t)
            h_prev = h_states[:, t, :] 
            dWh += h_prev.T @ delta_a 

            
            # dL/dh_{t-1} = (dL/da_t) @ Wh.T
            dh_next = delta_a @ self.Wh.T # (batch_size, n_nodes) @ (n_nodes, n_nodes) -> (batch_size, n_nodes)

        

        return dWx, dWh, dB, dWy, dBy

    def fit(self, X, y):
        """
        Train the RNN classifier.

        Parameters
        ----------
        X : ndarray, shape (n_samples, n_sequences, n_features)
            Training data.
        y : ndarray, shape (n_samples,)
            Target labels (integers from 0 to n_output-1).
        """
        n_samples, n_sequences, n_features = X.shape

        # Initialize weights 
        if self.Wx is None:
            self._initialize_weights(n_features)

        # One-hot encode target labels
        y_true_one_hot = np.eye(self.n_output)[y] 

        # Training loop
        for epoch in range(self.n_epochs):
            epoch_loss = 0
            # Mini-batch processing
            permutation = np.random.permutation(n_samples)
            X_shuffled = X[permutation]
            y_true_one_hot_shuffled = y_true_one_hot[permutation]

            for i in range(0, n_samples, self.batch_size):
                
                X_batch = X_shuffled[i : i + self.batch_size]
                y_batch_one_hot = y_true_one_hot_shuffled[i : i + self.batch_size]
                current_batch_size = X_batch.shape[0] 

                # 
                h_states, a_states, _, _, y_pred_proba = self._forward_propagation(X_batch)

                # Calculate loss
                loss = self._cross_entropy_loss(y_pred_proba, y_batch_one_hot)
                epoch_loss += loss * current_batch_size # Accumulate total loss for epoch avg

                # Backward propagation
                dWx, dWh, dB, dWy, dBy = self._backward_propagation(
                    X_batch, y_batch_one_hot, h_states, a_states, y_pred_proba
                )

                # Update weights and biases
                self.Wx -= self.lr * dWx / current_batch_size 
                self.Wh -= self.lr * dWh / current_batch_size
                self.B  -= self.lr * dB / current_batch_size
                self.Wy -= self.lr * dWy / current_batch_size
                self.By -= self.lr * dBy / current_batch_size

            # Calculate average loss for the epoch
            average_epoch_loss = epoch_loss / n_samples
            self.loss_history.append(average_epoch_loss)

            if self.verbose:
                 print(f"Epoch {epoch+1}/{self.n_epochs}, Loss: {average_epoch_loss:.4f}")

    def predict_proba(self, X):
        """
        Predict class probabilities for input samples.

        Parameters
        ----------
        X : ndarray, shape (n_samples, n_sequences, n_features)
            Input data.

        Returns
        -------
        y_pred_proba : ndarray, shape (n_samples, n_output)
            Predicted probabilities.
        """
        # 
        _, _, _, _, y_pred_proba = self._forward_propagation(X)
        return y_pred_proba

    def predict(self, X):
        """
        Predict class labels for input samples.

        Parameters
        ----------
        X : ndarray, shape (n_samples, n_sequences, n_features)
            Input data.

        Returns
        -------
        y_pred : ndarray, shape (n_samples,)
            Predicted class labels.
        """
        y_pred_proba = self.predict_proba(X)
        # 
        y_pred = np.argmax(y_pred_proba, axis=1)
        return y_pred

#Verification with the small array 
print("--- Verifying Forward Propagation with Small Array ---")

#
x_small = np.array([[[1, 2], [2, 3], [3, 4]]]) / 100 
w_x_small = np.array([[1, 3, 5, 7], [3, 5, 7, 8]]) / 100 
w_h_small = np.array([[1, 3, 5, 7], [2, 4, 6, 8], [3, 5, 7, 8], [4, 6, 8, 10]]) / 100 
b_small = np.array([1, 1, 1, 1]) 

# Get dimensions
batch_size_small = x_small.shape[0]
n_sequences_small = x_small.shape[1]
n_features_small = x_small.shape[2]
n_nodes_small = w_x_small.shape[1]

# Initialize h0
h_prev_small = np.zeros((batch_size_small, n_nodes_small)) 

# 
print("Manual Calculation Steps:")
h_states_manual = np.zeros((batch_size_small, n_sequences_small + 1, n_nodes_small))
h_states_manual[:, 0, :] = h_prev_small

for t in range(n_sequences_small):
    xt_small = x_small[:, t, :] 
    print(f"\nTime step t={t}")
    print(f"  Input x_t: {xt_small}")
    print(f"  Previous h_{t-1}: {h_prev_small}")

    # Calculate a_t = x_t @ Wx + h_{t-1} @ Wh + B
    term1 = xt_small @ w_x_small
    term2 = h_prev_small @ w_h_small
    a_t_small = term1 + term2 + b_small
    print(f"  x_t @ Wx = {term1}")
    print(f"  h_{t-1} @ Wh = {term2}")
    print(f"  Bias B = {b_small}")
    print(f"  Pre-activation a_t: {a_t_small}")

    # Calculate h_t = tanh(a_t)
    h_t_small = np.tanh(a_t_small)
    print(f"  Activation h_t = tanh(a_t): {h_t_small}")

    # Update h_prev for the next step
    h_prev_small = h_t_small
    h_states_manual[:, t + 1, :] = h_t_small


print("\n--- Final Hidden State (Manual Calculation) ---")
final_h_manual = h_prev_small
print(final_h_manual)

print("\n--- Expected Output from Text ---")
expected_h = np.array([[0.79494228, 0.81839002, 0.83939649, 0.85584174]])
print(expected_h)

print("\n--- Verifying with Class Method (using fixed weights) ---")
# 
rnn_verifier = ScratchSimpleRNNClassifier(n_nodes=n_nodes_small, n_output=1)

# 
rnn_verifier.Wx = w_x_small
rnn_verifier.Wh = w_h_small
rnn_verifier.B = b_small
# 
rnn_verifier.Wy = np.zeros((n_nodes_small, 1))
rnn_verifier.By = np.zeros(1)

#
h_states_class, a_states_class, final_h_class, _, _ = rnn_verifier._forward_propagation(x_small)

print("\n--- Final Hidden State (Class Method _forward_propagation) ---")
print(final_h_class)

#
print("\n--- Comparison ---")
print(f"Match: {np.allclose(final_h_manual, final_h_class)}") # Should be True
print(f"Match with Expected: {np.allclose(final_h_class, expected_h)}") # Should be True

--- Verifying Forward Propagation with Small Array ---
Manual Calculation Steps:

Time step t=0
  Input x_t: [[0.01 0.02]]
  Previous h_-1: [[0. 0. 0. 0.]]
  x_t @ Wx = [[0.0007 0.0013 0.0019 0.0023]]
  h_-1 @ Wh = [[0. 0. 0. 0.]]
  Bias B = [1 1 1 1]
  Pre-activation a_t: [[1.0007 1.0013 1.0019 1.0023]]
  Activation h_t = tanh(a_t): [[0.76188798 0.76213958 0.76239095 0.76255841]]

Time step t=1
  Input x_t: [[0.02 0.03]]
  Previous h_0: [[0.76188798 0.76213958 0.76239095 0.76255841]]
  x_t @ Wx = [[0.0011 0.0021 0.0031 0.0038]]
  h_0 @ Wh = [[0.07623574 0.13721527 0.19819481 0.25155044]]
  Bias B = [1 1 1 1]
  Pre-activation a_t: [[1.07733574 1.13931527 1.20129481 1.25535044]]
  Activation h_t = tanh(a_t): [[0.792209   0.8141834  0.83404912 0.84977719]]

Time step t=2
  Input x_t: [[0.03 0.04]]
  Previous h_1: [[0.792209   0.8141834  0.83404912 0.84977719]]
  x_t @ Wx = [[0.0015 0.0029 0.0043 0.0053]]
  h_1 @ Wh = [[0.08321832 0.14902269 0.21482707 0.27229095]]
  Bias B = [1 1 1 1]
  