# MLP like PyTorch

In the notebook [3-1-mlp](3-1-mlp.ipynb), 
we made a single dense layer that contained its 
own activation function, so it is easy to explain backpropagation.
But usually, the Dense layer and the Activation function layer are separated. 
In this notebook, we separate the Dense layer and the Activation function layer.

<div style="text-align: center; background-color: black">
<img src="../images/mlp-2.png" alt="deep neuronal network" width="400">
</div>

In this notebook we are not going to explain mathematics because, 
in essence, it is the same thing.

In [1]:
import torch
from torch import nn

from platform import python_version
python_version(), torch.__version__

('3.12.6', '2.5.1+cu124')

In [2]:
device = 'cpu'
if torch.cuda.is_available():
    device = 'cuda'
device

'cuda'

In [3]:
torch.set_default_dtype(torch.float64)

In [4]:
def add_to_class(Class):  
    """Register functions as methods in created class."""
    def wrapper(obj):
        setattr(Class, obj.__name__, obj)
    return wrapper

# Dataset

## create dataset

$$
\mathbf{X} \in \mathbb{R}^{m \times n} \\
\mathbf{Y} \in \mathbb{R}^{m \times n_{\text{o}}}
$$

In [5]:
from sklearn.datasets import make_classification

M: int = 10_100 # number of samples
N: int = 5 # number of input features
CLASSES: int = 3 # number of output classes

X, Y = make_classification(
    n_samples=M, 
    n_features=N, 
    n_classes=CLASSES, 
    n_informative=N - 1, 
    n_redundant=0
)

print(X.shape)
print(Y.shape)

(10100, 5)
(10100,)


## one hot encoding

In [6]:
Y_hat = nn.functional.one_hot(
    torch.tensor(Y, device=device).long(), 
    CLASSES
).type(torch.float32)
Y_hat.shape

torch.Size([10100, 3])

## split dataset into train and valid

In [7]:
X_train = torch.tensor(X[:100], device=device)
X_valid = torch.tensor(X[100:], device=device)
X_train.shape, X_valid.shape

(torch.Size([100, 5]), torch.Size([10000, 5]))

In [8]:
Y_train, Y_valid = Y_hat[:100], Y_hat[100:]
Y_train.shape, Y_valid.shape

(torch.Size([100, 3]), torch.Size([10000, 3]))

## delete raw dataset

In [9]:
del X
del Y
del Y_hat

# Model and layers

## layers

In [10]:
class Layer:
    is_trainable: bool = False
    pass

### dense or full conect layer

In [11]:
class Dense(Layer):
    def __init__(self, units: int):
        self.units = units
        self.is_trainable = True

    def set_params(self, w: torch.Tensor, b: torch.Tensor) -> None:
        self.w.copy_(w.T.detach().clone())
        self.b.copy_(b.detach().clone())

    def construct(self, x: torch.Tensor) -> torch.Tensor:
        """
        Initialize the parameters
        self.w := tensor (n_features, units).
        self.b := tensor (units).
        
        Args:
            x: input tensor of shape (m_samples, n_features).
        
        Return:
            z: out tensor of shape (m_samples, units).
        """
        n_features = x.shape[-1]
        self.w = torch.randn(n_features, self.units, device=device)
        self.b = torch.randn(self.units, device=device)
        return self.forward(x)
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Compute weighted sum Z = XW+b.
        
        Args:
            x: input tensor of shape (m_samples, n_features).
            
        Return:
            z: out tensor of shape (m_samples, units).
        """
        return torch.matmul(x, self.w) + self.b

    def __forward__(self, x: torch.Tensor) -> torch.Tensor:
        """Forward propagation for training step."""
        self.input = x.clone()
        return self.forward(x)
    
    def backward(self, delta, lr: float) -> torch.Tensor:
        # bias der and update
        self.b -= lr * torch.sum(delta, axis=0)
        # weight derivative (update weight after compute input der)
        w_der = torch.matmul(self.input.T, delta)
        # input derivative
        delta = torch.matmul(delta, self.w.T)
        # weight update
        self.w -= lr * w_der
        return delta

### activation functions

#### ReLU

In [12]:
class Relu(Layer):
    def forward(self, z: torch.Tensor) -> torch.Tensor:
        #return torch.relu(z)
        return torch.max(z, torch.zeros_like(z))
    
    def __forward__(self, z: torch.Tensor) -> torch.Tensor:
        self.a = self.forward(z)
        return self.a
    
    def construct(self, z: torch.Tensor) -> torch.Tensor:
        return self.forward(z)
    
    def backward(self, delta, lr: float):
        return delta * (1 * (self.a > 0))

#### Sigmoid

In [13]:
class Sigmoid(Layer):
    def forward(self, z: torch.Tensor) -> torch.Tensor:
        #return torch.sigmoid(z)
        return 1 / (1 + torch.exp(-z))
    
    def __forward__(self, z: torch.Tensor) -> torch.Tensor:
        self.a = self.forward(z)
        return self.a
    
    def construct(self, z: torch.Tensor) -> torch.Tensor:
        return self.forward(z)
    
    def backward(self, delta, lr: float):
        return delta * (self.a * (1 - self.a))

#### Tanh

In [14]:
class Tanh(Layer):
    def forward(self, z: torch.Tensor) -> torch.Tensor:
        #return torch.tanh(z)
        exp = torch.exp(-2 * z)
        return (1 - exp) / (1 + exp)
    
    def __forward__(self, z: torch.Tensor) -> torch.Tensor:
        self.a = self.forward(z)
        return self.a
    
    def construct(self, z: torch.Tensor) -> torch.Tensor:
        return self.forward(z)

    def backward(self, delta, lr: float):
        return delta * (1 - self.a**2)

#### Softmax

In [15]:
class Softmax(Layer):
    def forward(self, z: torch.Tensor) -> torch.Tensor:
        exp = torch.exp(z - torch.max(z, dim=1, keepdims=True)[0])
        return exp / exp.sum(1, keepdims=True)
    
    def __forward__(self, z: torch.Tensor) -> torch.Tensor:
        self.a = self.forward(z)
        return self.a
    
    def construct(self, z: torch.Tensor) -> torch.Tensor:
        return self.forward(z)
    
    def backward(self, delta, lr: float):
        return self.a * (delta - (delta * self.a).sum(axis=1, keepdims=True))

### input layer

In [16]:
class InputLayer(Layer):
    def __init__(self, n_input_features: int):
        self.m = 10
        self.n = n_input_features

    def construct(self) -> torch.Tensor:
        return torch.randn(self.m, self.n, device=device)

## loss function

In [17]:
class Losses:
    pass

### MSE

In [18]:
class MSE(Losses):
    def loss(self, y_pred: torch.Tensor, y_true: torch.Tensor) -> float:
        return ((y_pred - y_true)**2).mean().item()

    def __call__(self, y_pred: torch.Tensor, y_true: torch.Tensor) -> float:
        return self.loss(y_pred, y_true)

    def backward(self, y_pred: torch.Tensor, y_true: torch.Tensor) -> torch.Tensor:
        return 2 * (y_pred - y_true) / y_true.numel()

## scratch model

In [19]:
class Model:
    def __init__(self, layers: list[Layer], loss_f: Losses = None):
        self.layers = layers[1:] # do not save the input layer
        self.loss_f = MSE() if loss_f is None else loss_f

        # initialize all parameters
        out = layers[0].construct()
        for layer in self.layers:
            out = layer.construct(out)

    def copy_parameters(self, parameters) -> None:
        params = list(parameters())
        for layer in self.layers:
            if layer.is_trainable:
                layer.set_params(params.pop(0), params.pop(0))

    def predict(self, x: torch.Tensor) -> torch.Tensor:
        """
        Forward propagation.
        
        Args:
            x: tensor of shape (m_samples, n_input_features).
            
        Return:
            y_pred: tensor of shape (m_samples, n_out_features).
        """
        out = x
        for layer in self.layers:
            out = layer.forward(out)
        return out

    def __forward__(self, x: torch.Tensor) -> torch.Tensor:
        out = x
        for layer in self.layers:
            out = layer.__forward__(out)
        return out
    
    def evaluate(self, x: torch.Tensor, y: torch.Tensor) -> float:
        """
        Evaluate the model between input x and target y.
        
        Args:
            x: tensor (m_samples, n_input_features).
            y: target tensor (m_samples, n_out_features).
            
        Return:
            loss: error between y_pred and target y.
        """
        y_pred = self.predict(x)
        return self.loss_f(y_pred, y)
    
    def update(self, y_pred: torch.Tensor, y_true: torch.Tensor, lr: float) -> None:
        delta = self.loss_f.backward(y_pred, y_true)
        for layer in reversed(self.layers):
            delta = layer.backward(delta, lr)

    def fit(self, x_train: torch.Tensor, y_train: torch.Tensor, 
        epochs: int, lr: float, batch_size: int, 
        x_valid: torch.Tensor, y_valid: torch.Tensor):

        for epoch in range(epochs):
            loss_t = [] # train loss
            for batch in range(0, len(y_train), batch_size):
                end_batch = batch + batch_size

                y_pred = self.__forward__(x_train[batch:end_batch])
                loss_t.append(self.loss_f(y_pred, y_train[batch:end_batch]))

                self.update(y_pred, y_train[batch:end_batch], lr)
                
            loss_t = sum(loss_t) / len(loss_t)
            loss_v = self.evaluate(x_valid, y_valid) # valid loss
            print('Epoch: {} - L: {:.4f} - L_v {:.4f}'.format(epoch, loss_t, loss_v))

# Torch sequential

In [20]:
class TorchSequential(nn.Module):
    def __init__(self, layers: list[nn.Module], loss_fn=None):
        super(TorchSequential, self).__init__()
        self.layers = nn.ModuleList(layers)
        for layer in self.layers:
            layer.to(device)
        self.loss_fn = loss_fn if loss_fn is not None else nn.MSELoss()
        self.eval()

    def forward(self, x):
        out = x.clone()
        for l in self.layers:
            out = l(out)
        return out

    def evaluate(self, x, y):
        self.eval()
        with torch.no_grad():
            y_pred = self(x)
            return self.loss_fn(y_pred, y).item()
        
    def fit(self, x: torch.Tensor, y: torch.Tensor, 
            epochs: int, lr: float, batch_size: int, 
            x_valid: torch.Tensor, y_valid: torch.Tensor):
        optimizer = torch.optim.SGD(self.parameters(), lr=lr, momentum=0.0)
        for epoch in range(epochs):
            loss_t = []
            for batch in range(0, len(y), batch_size):
                end_batch = batch + batch_size
                optimizer.zero_grad()

                y_pred = self(x[batch:end_batch])
                loss = self.loss_fn(y_pred, y[batch:end_batch])
                loss_t.append(loss.item())

                loss.backward()
                optimizer.step()
            loss_t = sum(loss_t) / len(loss_t)
            loss_v = self.evaluate(x_valid, y_valid)
            print('Epoch: {} - L: {:.4f} - L_v {:.4f}'.format(epoch, loss_t, loss_v))

In [21]:
torch_model = TorchSequential([
    nn.Linear(N, 32), nn.Tanh(),
    nn.Linear(32, 32), nn.Sigmoid(),
    nn.Linear(32, 32), nn.ReLU(),
    nn.Linear(32, CLASSES), nn.Softmax(dim=1)
])

# Scratch vs Sequential

## scratch model

In [22]:
model = Model([
    InputLayer(N),
    Dense(32), Tanh(),
    Dense(32), Sigmoid(),
    Dense(32), Relu(),
    Dense(CLASSES), Softmax()
])

## evals

### mape

In [23]:
import os
import sys
module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
    sys.path.append(module_path)

from tools.torch_metrics import torch_mape as mape

### predict

In [24]:
mape(
    model.predict(X_valid),
    torch_model(X_valid)
)

1.2527793118124328

### copy parameters

In [25]:
model.copy_parameters(torch_model.parameters)

### predict after copy parameters

In [26]:
mape(
    model.predict(X_valid),
    torch_model(X_valid)
)

1.9964730477431362e-17

### loss

In [27]:
mape(
    model.evaluate(X_valid, Y_valid),
    torch_model.evaluate(X_valid, Y_valid)
)

1.243111041594777e-16

### train

In [28]:
LR: float = 0.08
EPOCHS: int = 32
BATCH_SIZE: int = len(Y_train) // 3

In [29]:
torch_model.fit(
    X_train, Y_train.double(), 
    EPOCHS, LR, BATCH_SIZE, 
    X_valid, Y_valid.double()
)

Epoch: 0 - L: 0.2272 - L_v 0.2223
Epoch: 1 - L: 0.2219 - L_v 0.2222
Epoch: 2 - L: 0.2179 - L_v 0.2227
Epoch: 3 - L: 0.2146 - L_v 0.2235
Epoch: 4 - L: 0.2120 - L_v 0.2246
Epoch: 5 - L: 0.2100 - L_v 0.2259
Epoch: 6 - L: 0.2084 - L_v 0.2272
Epoch: 7 - L: 0.2072 - L_v 0.2285
Epoch: 8 - L: 0.2063 - L_v 0.2297
Epoch: 9 - L: 0.2055 - L_v 0.2310
Epoch: 10 - L: 0.2049 - L_v 0.2320
Epoch: 11 - L: 0.2045 - L_v 0.2330
Epoch: 12 - L: 0.2041 - L_v 0.2339
Epoch: 13 - L: 0.2038 - L_v 0.2346
Epoch: 14 - L: 0.2035 - L_v 0.2353
Epoch: 15 - L: 0.2033 - L_v 0.2359
Epoch: 16 - L: 0.2031 - L_v 0.2363
Epoch: 17 - L: 0.2029 - L_v 0.2368
Epoch: 18 - L: 0.2026 - L_v 0.2371
Epoch: 19 - L: 0.2024 - L_v 0.2373
Epoch: 20 - L: 0.2022 - L_v 0.2375
Epoch: 21 - L: 0.2020 - L_v 0.2377
Epoch: 22 - L: 0.2018 - L_v 0.2378
Epoch: 23 - L: 0.2017 - L_v 0.2379
Epoch: 24 - L: 0.2015 - L_v 0.2380
Epoch: 25 - L: 0.2013 - L_v 0.2381
Epoch: 26 - L: 0.2012 - L_v 0.2382
Epoch: 27 - L: 0.2010 - L_v 0.2381
Epoch: 28 - L: 0.2008 - L_v 0.

In [30]:
model.fit(
    X_train, Y_train, 
    EPOCHS, LR, BATCH_SIZE, 
    X_valid, Y_valid
)

Epoch: 0 - L: 0.2272 - L_v 0.2223
Epoch: 1 - L: 0.2219 - L_v 0.2222
Epoch: 2 - L: 0.2179 - L_v 0.2227
Epoch: 3 - L: 0.2146 - L_v 0.2235
Epoch: 4 - L: 0.2120 - L_v 0.2246
Epoch: 5 - L: 0.2100 - L_v 0.2259
Epoch: 6 - L: 0.2084 - L_v 0.2272
Epoch: 7 - L: 0.2072 - L_v 0.2285
Epoch: 8 - L: 0.2063 - L_v 0.2297
Epoch: 9 - L: 0.2055 - L_v 0.2310
Epoch: 10 - L: 0.2049 - L_v 0.2320
Epoch: 11 - L: 0.2045 - L_v 0.2330
Epoch: 12 - L: 0.2041 - L_v 0.2339
Epoch: 13 - L: 0.2038 - L_v 0.2346
Epoch: 14 - L: 0.2035 - L_v 0.2353
Epoch: 15 - L: 0.2033 - L_v 0.2359
Epoch: 16 - L: 0.2031 - L_v 0.2363
Epoch: 17 - L: 0.2029 - L_v 0.2368
Epoch: 18 - L: 0.2026 - L_v 0.2371
Epoch: 19 - L: 0.2024 - L_v 0.2373
Epoch: 20 - L: 0.2022 - L_v 0.2375
Epoch: 21 - L: 0.2020 - L_v 0.2377
Epoch: 22 - L: 0.2018 - L_v 0.2378
Epoch: 23 - L: 0.2017 - L_v 0.2379
Epoch: 24 - L: 0.2015 - L_v 0.2380
Epoch: 25 - L: 0.2013 - L_v 0.2381
Epoch: 26 - L: 0.2012 - L_v 0.2382
Epoch: 27 - L: 0.2010 - L_v 0.2381
Epoch: 28 - L: 0.2008 - L_v 0.

#### predict after train

In [31]:
mape(
    model.predict(X_valid),
    torch_model(X_valid)
)

6.055008869305907e-17

#### bias

In [32]:
for k in range(len(model.layers)):
    if not model.layers[k].is_trainable:
        continue
    print(f'layer #{k}')
    print(mape(model.layers[k].b, torch_model.layers[k].bias))

layer #0
4.424587785904014e-18
layer #2
4.70377332506446e-17
layer #4
3.951782772272695e-17
layer #6
2.8938960980879306e-16


#### weight

In [33]:
for k in range(len(model.layers)):
    if not model.layers[k].is_trainable:
        continue
    print(f'layer #{k}')
    print(mape(model.layers[k].w, torch_model.layers[k].weight.T))

layer #0
1.9099847960228522e-17
layer #2
1.1944783338317023e-16
layer #4
8.598821318003327e-17
layer #6
8.624499224120739e-17
