In [1]:
import math
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, confusion_matrix
from sklearn.neural_network import MLPClassifier
from sklearn.model_selection import KFold

In [5]:
class PerceptronLayer:
    def __init__(self, layer_id, n_inputs, n_neurons):
        self.layer_id = layer_id
        self.n_inputs = n_inputs
        self.n_neurons = n_neurons
        
        # Vectorized weights and biases
        self.weights = np.random.randn(n_neurons, n_inputs) * 0.1
        self.biases = np.zeros((n_neurons, 1))
        
        # values for backprop
        self.z = None

    def activation(self, z):
        # Sigmoid
        return 1 / (1 + np.exp(-z))
    
    def activation_derivative(self, z):
        a = self.activation(z)
        return a * (1 - a)
    
    def forward(self, inputs):
        self.inputs = inputs
        self.z = np.dot(self.weights, inputs) + self.biases
        self.a = self.activation(self.z)
        return self.a

In [None]:
class RecurrentLayer:
    def __init__(self, layer_id,n_neurons,n_inputs ):
        """
        layer_id : the id of the layer
        n_neurons : number of neurons in the layer
        n_recurrent_inputs : number of recurrent inputs
        n_inputs : number of inputs
        """
        self.layer_id = layer_id
        self.n_neurons = n_neurons
        self.n_inputs = n_inputs

        self.z_history = []
        self.h_history = []
        self.x_history = []
        
        self.input_weights = np.random.rand(n_neurons,n_inputs)
        self.recurrent_weights = np.random.rand(n_neurons,n_neurons)
        self.biases = np.zeros((n_neurons, 1))

        self.dW_xh = np.zeros_like(self.input_weights)
        self.dW_hh = np.zeros_like(self.recurrent_weights)
        self.db = np.zeros_like(self.biases)

        self.h_prev = np.zeros((n_neurons, 1))

    def activation(self,z):
        # Tanh activation function
        return np.tanh(z)
    
    def activation_derivative(self,z):
        # Derivative of Tanh activation function
        h=self.activation(z)
        return 1 - h**2
        
    def forward_step(self, h_prev, x_t):
        z_t = np.dot(self.input_weights, x_t) + np.dot(self.recurrent_weights, h_prev) + self.biases
        h_t = self.activation(z_t)
        return z_t, h_t
    
    def forward(self, x_sequence):
        """
        x_sequence : input sequence of shape (n_inputs, sequence_length)
        """
        sequence_length = x_sequence.shape[1]
        
        
        h_t = self.h_prev
        
        for t in range(sequence_length):
            x_t = x_sequence[:, t].reshape(-1, 1)
            z_t, h_t = self.forward_step(self.input_weights, self.recurrent_weights, self.biases, h_t, x_t)
            
            self.z_history.append(z_t)
            self.h_history.append(h_t)
            self.x_history.append(x_t)
        
        self.h_prev = h_t
        
        return self.h_history, self.z_history
    
    def backward(self,delta_h_T):
        T=len(self.h_history)
        self.dW_xh.fill(0)
        self.dW_hh.fill(0)
        self.db.fill(0)
        delta_h_next = np.zeros_like(delta_h_T)

        for t in range(T-1,-1,-1):
            z_t = self.z_history[t]
            h_prev = self
            x_t = self.x_history[t]

            delta_output = delta_h_T if t == T-1 else np.zeros_like(delta_h_T)
            delta_recurrent = np.dot(self.recurrent_weights.T, delta_h_next)
            delta_pre_act = delta_output + delta_recurrent

            delta_z_t = delta_pre_act * self.activation_derivative(z_t)

            self.dW_hh += np.dot(delta_z_t, h_prev.T)
            self.dW_xh += np.dot(delta_z_t, x_t.T)
            self.db += delta_z_t

            delta_h_next = delta_pre_act

            return 
        
    def update_weights(self, learning_rate):
        self.input_weights -= learning_rate * self.dW_xh
        self.recurrent_weights -= learning_rate * self.dW_hh
        self.biases -= learning_rate * self.db



    def reset(self):
        self.h_prev = np.zeros((self.n_neurons, 1))
        self.z_history = []
        self.h_history = []
        self.x_history = []

    

In [None]:
class RecurrentNeuralNetwork:
    def __init__(self, n_inputs, n_outputs, hidden_layers,recurrent_output_size):
        # Initialize the RNN with given structure
        recurrent_layer = RecurrentLayer(layer_id=1, n_neurons=recurrent_output_size, n_inputs=n_inputs)
        self.layers = [recurrent_layer]
        current_input_size = recurrent_output_size
       
        # Output layer
        output_layer = PerceptronLayer(layer_id=2, n_inputs=current_input_size, n_neurons=n_outputs)
        self.layers.append(output_layer)


    def forward_sequence(self, x_sequence):
        self.layers[0].reset()
        h_history, z_history = self.layers[0].forward(x_sequence)
        h_T = h_history[-1]
        
        a=h_T

        ffn_output_layer = self.layers[1]

        y_hat = ffn_output_layer.forward(a)
        return y_hat
        

    def backpropagation_through_time(self, x_sequence, target_y, learning_rate):
        
        y_hat = self.forward_sequence(x_sequence)
        # Compute loss and initial delta
        delta_y_hat = (y_hat - target_y)**2  # Assuming mean squared error loss
        ffn_output_layer = self.layers[1]
        delta_h_T = ffn_output_layer.backward(delta_y_hat)
        recurrent_layer = self.layers[0]
        recurrent_layer.backward(delta_h_T)
        recurrent_layer.update_weights(learning_rate)
        ffn_output_layer.update_weights(learning_rate)
        