In [372]:
import numpy as np
from tqdm import trange

# Layers

In [373]:
class Layer:
    def __init__(self):
        self.input_ = None
        self.output = None

    def forward(self, input_: np.ndarray):
        raise NotImplementedError('This method should be implemented')

    def backward(self, upstream_grad: np.ndarray):
        raise NotImplementedError('This method should be implemented')

    def step(self, lr):
        pass

### Linear layer 

In [374]:
class Linear(Layer):
    def __init__(self, features_in, features_out, beta1=0.9, beta2=0.999, epsilon=1e-8):
        self.w = np.random.randn(features_in, features_out) * np.sqrt(2.0 / features_in)
        self.b = np.random.randn(1, features_out) * 0.1

        self.gradw = None
        self.gradb = None

        self.mw = np.zeros_like(self.w)
        self.vw = np.zeros_like(self.w)
        self.mb = np.zeros_like(self.b)
        self.vb = np.zeros_like(self.b)
        self.beta1 = beta1
        self.beta2 = beta2
        self.epsilon = epsilon
        self.t = 0

    def forward(self, input_: np.ndarray):
        self.input_ = input_
        self.output = (input_ @ self.w) + self.b 

        return self.output

    def backward(self, upstream_grad: np.ndarray):
        self.gradw = self.input_.T @ upstream_grad
        self.gradb = np.sum(upstream_grad, axis=0, keepdims=True)

        downstream_grad = upstream_grad @ self.w.T
        return downstream_grad

    def step(self, lr=0.1):
        # Adam optimaztion
        self.t += 1

        self.mw = self.beta1 * self.mw + (1 - self.beta1) * self.gradw
        self.vw = self.beta2 * self.vw + (1 - self.beta2) * (self.gradw ** 2)
        mw_hat = self.mw / (1 - self.beta1 ** self.t)
        vw_hat = self.vw / (1 - self.beta2 ** self.t)

        self.w -= lr * mw_hat / (np.sqrt(vw_hat) + self.epsilon)

        self.mb = self.beta1 * self.mb + (1 - self.beta1) * self.gradb
        self.vb = self.beta2 * self.vb + (1 - self.beta2) * (self.gradb ** 2)

        mb_hat = self.mb / (1 - self.beta1 ** self.t)
        vb_hat = self.vb / (1 - self.beta2 ** self.t)

        self.b -= lr * mb_hat / (np.sqrt(vb_hat) + self.epsilon)
    def __str__(self):
        return f'I am linear!'

### Sigmoid layer

In [375]:
class Sigmoid(Layer):
    def forward(self, input_: np.ndarray):
        self.output = 1 / (1 + np.exp(-input_))
        return self.output

    def backward(self, upstream_grad: np.ndarray):
        downstream_grad = upstream_grad * (self.output * (1 - self.output))
        return downstream_grad
    
    def __str__(self):
        return f'I am sigmoid!'

### RELU layer

In [376]:
class ReLU(Layer):
    def forward(self, input_: np.ndarray):
        self.input_ = input_
        self.output = np.maximum(0, input_)
        return self.output

    def backward(self, upstream_grad: np.ndarray):
        downstream_grad = upstream_grad * (self.input_ > 0 ).astype(upstream_grad.dtype)
        return downstream_grad
    
    def __str__(self):
        return f'I am ReLU'

### Softmax Layer

In [377]:
class Softmax:
    def __init__(self):
        pass

    def forward(self):
        pass

    def backward(self):
        pass

# Loss classes

### Mean Squared Error

In [378]:
class MSE(Layer):
    def forward(self, y_pred: np.ndarray, y_true: np.ndarray):
        self.y_pred = y_pred
        self.output = np.mean((y_pred - y_true) ** 2)
        return self.output

    def backward(self, y_true: np.ndarray):
        n = self.y_pred.size
        upstream_grad =  2 * (self.y_pred - y_true) / n
        return upstream_grad
    
    def __str__(self):
        return f'I am MSE'

### Cross entropy

In [379]:
class CrossEntropy(Layer):
    def __init__(self):
        pass

    def forward(self):
        pass

    def backward(self):
        pass

# Multi Layer Perceptron

In [380]:
class MLP:
    def __init__(self, layers: list[Layer], loss_method: Layer, lr=0.01):
        self.layers = layers
        self.loss_method = loss_method
        self.lr = lr
        self.outputs = None

    def forward(self, input_: np.ndarray):
        self.outputs = input_
        for layer in self.layers:
            self.outputs = layer.forward(self.outputs)

        return self.outputs

    def backward(self, y_true: np.ndarray):
        upstream_grad = self.loss_method.backward(y_true) 
        for layer in self.layers[::-1]:
            upstream_grad = layer.backward(upstream_grad)

    def update_weigths(self):
        for layer in self.layers:
            layer.step(self.lr)

    def evalate(self, input_: np.ndarray, y: np.ndarray):
        predicts = self.forward(input_)
        loss = self.loss_method.forward(predicts, y.reshape(-1, 1))
        return loss

    def train(self, input_: np.ndarray, y: np.ndarray, X_val: np.ndarray, y_val: np.ndarray, epoches=100, batch_size=1):
        random_shuffle = np.random.permutation(input_.shape[0])
        y = y.reshape(-1, 1)
        shuffled_data_x = input_[random_shuffle]
        shuffled_data_y = y[random_shuffle]
        rows, columns = input_.shape

        train_losses = []
        val_losses = []

        for epoch in (pbar := trange(epoches)):
            random_indices = np.random.choice(rows, size=batch_size)
            batch_data_x = shuffled_data_x[random_indices]
            batch_data_y = shuffled_data_y[random_indices]

            predicts = self.forward(batch_data_x)
            train_loss = self.loss_method.forward(predicts, batch_data_y)
            train_losses.append(train_loss)

            self.backward(batch_data_y)
            self.update_weigths()

            val_loss = self.evalate(X_val, y_val)
            val_losses.append(val_loss)

        return train_losses, val_losses

    def predict(self, input_: np.ndarray):
        return self.forward(input_)