In [35]:
import numpy as np
import time
from typing import List

def cross_entropy_derivative(y_true, y_pred):
    epsilon = 1e-9  # Small value to prevent division by zero
    return - (y_true / (y_pred + epsilon)) / y_true.shape[0]

def cross_entropy(y_true, y_pred, epsilon=1e-12):
    """
    y_true: (batch_size, num_classes) - one-hot
    y_pred: (batch_size, num_classes) - output of softmax
    """
    y_pred = np.clip(y_pred, epsilon, 1. - epsilon) 
    loss = -np.sum(y_true * np.log(y_pred), axis=1) 
    return np.mean(loss) 
def linear(Z):
    return Z

def linear_derivative(Z):
    return np.ones_like(Z)


def relu(Z):
    return np.maximum(0, Z)


def relu_derivative(Z):
    return np.where(Z > 0, 1, 0)


def leaky_relu(Z, alpha=0.01):
    return np.where(Z > 0, Z, alpha * Z)


def leaky_relu_derivative(Z, alpha=0.01):
    return np.where(Z > 0, 1, alpha)


def softmax(Z):
    expZ = np.exp(Z - np.max(Z, axis=1, keepdims=True)) 
    return expZ / np.sum(expZ, axis=1, keepdims=True)


def sigmoid(x):
    return 1 / (1 + np.exp(-x))

def sigmoid_backward(x, grad_output):
    sig = sigmoid(x)
    return grad_output * sig * (1 - sig)

def softmax_derivative(s): 
    """
    s: shape=(num_class)
    """
    s = s.reshape(-1, 1)  # convert to vector column
    return np.diagflat(s) - np.dot(s, s.T)  # Jacobian matrix

def softmax_derivative_from_output_2d(softmax_output):
    """
    Calculate Jacobian of Softmax for each sample in batch.
    
    softmax_output: numpy array (shape: [batch_size, num_classes])
    Output: numpy array (shape: [batch_size, num_classes, num_classes])
    """
    batch_size, num_classes = softmax_output.shape
    jacobian_matrices = np.zeros((batch_size, num_classes, num_classes))

    for i in range(batch_size): 
        s = softmax_output[i, :].reshape(-1, 1) 
        jacobian_matrices[i] = np.diagflat(s) - np.dot(s, s.T)

    return jacobian_matrices
class Layer:
    def forward(self,inputs):
        raise NotImplementedError()
    def backward(self,grad_outputs):
        raise NotImplementedError()
    def parameters(self):
        raise NotImplementedError()

class Dense(Layer):
    def __init__(self,output_dim,activation = None):
        super().__init__()
        if activation == 'relu':
            self.activation = relu
            self.backward_activation = relu_derivative
        elif activation == 'leaky_relu':
            self.activation = leaky_relu
            self.backward_activation = leaky_relu_derivative
        elif activation == 'softmax':
            self.activation = softmax
            self.backward_activation = softmax_derivative_from_output_2d
        else:
            self.activation = linear
            self.backward_activation = linear_derivative
            
        self.output_dim = output_dim
        self.weights = None
    def init_weight(self,input_dim):
        self.weights = np.random.randn(input_dim,self.output_dim)* np.sqrt(2/input_dim)
        self.bias = np.zeros((1,self.output_dim))
    def forward(self, inputs):
        '''
        inputs shape(m,n)
        weight shape(n,k)
        outputs shape(m,k)
        '''

        if self.weights is None:
            input_dim = inputs.shape[1]
            self.init_weight(input_dim=input_dim)
        self.inputs = inputs
        self.linear_outputs = self.inputs @ self.weights + self.bias
        self.outputs = self.activation(self.linear_outputs)
        return self.outputs
    def __call__(self, inputs):
        self.id = int(time.time())
        return self.forward(inputs)
    def backward(self, grad_outputs):
        
        if self.activation == softmax:
            pass
            # # For softmax, we need to handle the Jacobian properly
            # da_dz = self.backward_activation(self.outputs)  # (batch_size, output_dim, output_dim)
            # dl_dz = np.zeros_like(grad_outputs)  # (batch_size, output_dim)
            
            # for i in range(batch_size):
            #     dl_dz[i, :] = da_dz[i] @ grad_outputs[i, :]
            # grad_outputs = dl_dz
        else:
            # For other activations
            grad_outputs = grad_outputs * self.backward_activation(self.linear_outputs)  # (batch_size, output_dim)

        # Calculate gradients
        self.grad_weights = self.inputs.T @ grad_outputs  # (input_dim, output_dim)
        self.grad_bias = np.sum(grad_outputs, axis=0, keepdims=True)  # (1, output_dim)
        # Calculate gradient for previous layer
        grad_inputs = grad_outputs @ self.weights.T  # (batch_size, input_dim) because dl2 = da1 @ weight => dl2/da  = weight.T
        return grad_inputs
    
    def parameters(self):
        return [(self.weights, self.grad_weights), (self.bias, self.grad_bias)]

In [None]:
class Model:
    def fit(self, inputs, outputs, epochs = 1, batch_size=32, learning_rate=1e-4, loss=None):
        
        steps = inputs.shape[0]// batch_size
        for epoch in range(epochs):
            total_loss = 0
            steps_per_epoch = 0
            for i in range(0, inputs.shape[0], batch_size):
                inputs_batch = inputs[i:i+batch_size, :] 
                outputs_batch = outputs[i:i+batch_size, :]   
                
                # Forward pass
                pred_outputs = self.call(inputs_batch)
                
                loss_val = cross_entropy(outputs_batch, pred_outputs)
                total_loss += loss_val
                steps_per_epoch += 1
                # Backward pass
                dl_da = cross_entropy_derivative(outputs_batch, pred_outputs)
                layers = self._all_layer()
                for layer in reversed(layers):
                    dl_da = layer.backward(dl_da)
                for layer in layers:
                    for param, grad in layer.parameters():
                        np.clip(grad, -1, 1, out=grad)
                        param[:]-= learning_rate * grad
 
            avg_loss = total_loss / steps
            print(f'Epoch: {epoch+1}/{epochs} ==========================> {steps}/{steps} steps. loss {avg_loss}')

     
    def call(self,inputs):
        raise NotImplementedError()
    def __call__(self, inputs):
        return self.call(inputs)
    def _all_layer(self):
        layers = []
        for attr in self.__dict__.values():
            if isinstance(attr,Layer):
                layers.append(attr)
        layers.sort(key=lambda l:l.id)
        ids = [l.id for l in layers]
        print(ids)  # để xem ID sau sort
        return
        return layers  
        

In [47]:
class MyModel(Model):
    def __init__(self):
        super().__init__()
        self.layer1 = Dense(128,'relu')
        self.layer2 = Dense(10,'softmax')
    def call(self, inputs):
        x = self.layer1(inputs)
        x = self.layer2(x)
        return x
    

In [48]:
a = MyModel()
x = np.random.randn(10,20)
y = np.random.randn(10,10)
a.fit(x,y)



  avg_loss = total_loss / steps
