In [1]:
import numpy as np
import functools

We try to keep the code torchlike

In [2]:
# class for other classes to subclass
class Module:

    def __init__(self, cls) -> None:
        functools.update_wrapper(self, cls)

    def __call__(self, *args):
        return self.forward(*args)

    def __repr__(self) -> str:
        return f'{self.__class__.__name__}()'

In [3]:
class CELoss(Module):
    """ Cross entropy loss for multiclass prediction
    """
    def __init__(self) -> None:
        super().__init__(self)
    
    @staticmethod
    def forward(y_true: np.ndarray, y_pred: np.ndarray) -> float | np.ndarray:
        epsilon = 1e-12
        y_pred = np.clip(y_pred, epsilon, 1. - epsilon)
        
        batch_size = y_true.shape[0]
        loss = -np.sum(y_true * np.log(y_pred)) / batch_size
        
        return loss

        

In [4]:
class ReLU(Module):
    """ ReLU activation function
    """
    def __init__(self) -> None:
        super().__init__(self)
        self.x = None
    
    def forward(self, x: np.ndarray) -> np.ndarray:
        self.x = x
        return np.fmax(0, self.x)
    
    def backward(self, upstream_grad: np.ndarray) -> np.ndarray:
        local_grad = np.where(self.x > 0, 1, 0)
        return [], [], upstream_grad * local_grad

In [5]:
class Softmax(Module):
    """ Softmax activation function
    """
    def __init__(self) -> None:
        super().__init__(self)
    
    @staticmethod
    def forward(logits: np.ndarray) -> np.ndarray:
        exps = np.exp(logits - np.max(logits, axis=1, keepdims=True))
        return exps / np.sum(exps, axis=1, keepdims=True)

In [6]:
class Linear(Module):
    """ Single linear layer
    """
    
    def __init__(self, input_size: int, output_size: int) -> None:
        super().__init__(self)
        
        self.W = np.random.randn(output_size, input_size) * 0.01 
        self.b = np.zeros((output_size,))
        
        self.input_size = input_size
        self.output_size = output_size
        
        self.z = None
        self.x = None
        
        self.W_grad = np.zeros_like(self.W)
        self.b_grad = np.zeros_like(self.b)
        
    def forward(self, x: np.ndarray) -> np.ndarray:
        self.x = x.copy()
        self.z = self.x @ self.W.T + self.b
        return self.z
    
    def backward(self, upstream_grad: np.ndarray) -> np.ndarray:
        # print(f'W {self.W.shape}, z: {self.z.shape}, up: {upstream_grad.shape}, x: {self.x.shape} for layer {str(self)}')
        self.W_grad += upstream_grad.T @ self.x
        self.b_grad += np.sum(upstream_grad, axis=0)
        return self.W_grad, self.b_grad, upstream_grad @ self.W

    def zero_grad(self):
        self.W_grad = np.zeros_like(self.W)
        self.b_grad = np.zeros_like(self.b)
    
    def parameters(self) -> list:
        return [self.W, self.b]
    
    def step(self, w_grad: np.ndarray, b_grad: np.ndarray, lr: float) -> None:
        self.W -= lr * w_grad 
        self.b -= lr * b_grad
    
    def __repr__(self) -> str:
        return f'Linear(inp={self.input_size}, out={self.output_size})'


In [7]:
class MLP(Module):
    """ Multi layer perceptron
    """
    
    def __init__(self, hidden_layer_sizes: list | tuple, input_size: int, output_size: int) -> None:
        super().__init__(self)
        
        self.layers = []
        self.layers.append(Linear(input_size=input_size, output_size=hidden_layer_sizes[0]))
        self.layers.append(ReLU())
        
        for i in range(len(hidden_layer_sizes)-1):
            self.layers.append(Linear(input_size=hidden_layer_sizes[i], output_size=hidden_layer_sizes[i+1]))
            self.layers.append(ReLU())
        
        self.layers.append(Linear(input_size=hidden_layer_sizes[-1], output_size=output_size))
        
        self.logits = None
    
    def forward(self, x: np.ndarray) -> np.ndarray:
        for layer in self.layers:
            x = layer(x)   
        self.logits = x.copy()
        return x
    
    def parameters(self) -> list:
        params = []
        for layer in self.layers[::-1]:
            if hasattr(layer, 'W'):
                params += layer.parameters()
        return params
    
    def compute_loss(self, y_true: np.ndarray, y_pred: np.ndarray, from_logits=True) -> float:
        if from_logits:
            y_pred = Softmax.forward(y_pred)
        return CELoss.forward(y_true, y_pred)
        
    def backward(self, y_true: np.ndarray) -> np.ndarray: 
        n = y_true.shape[0]
        upstream_grad = Softmax.forward(self.logits) - y_true
        upstream_grad /= n
        out_grads = []
        for layer in self.layers[::-1]:
            w_grad, b_grad, upstream_grad = layer.backward(upstream_grad)
            if len(w_grad) > 0:
                out_grads += [w_grad]
                out_grads += [b_grad]
        return out_grads
    
    def zero_grad(self) -> None:
        for layer in self.layers[::-1]:
            if hasattr(layer, 'W'):
                layer.zero_grad()
    
    def optimizer_step(self, grads: np.ndarray, lr: float) -> None:
        idx = 0 
        for layer in self.layers[::-1]:
            if hasattr(layer, 'W'):
                w_grad = grads[idx]
                b_grad = grads[idx+1]
                layer.step(w_grad, b_grad, lr)
                idx += 2
                
    def __repr__(self) -> str:
        return f'=======Layers=======\n{'\n'.join(str(layer) for layer in self.layers)}'
        

In [8]:
# testing environment
np.random.seed(42)

input_data = np.random.randn(32, 768)
y_true = np.zeros((32, 10))
labels = np.random.randint(0, 10, size=(32,))
y_true[np.arange(32), labels] = 1


mlp = MLP([1], 768, 10)
print(mlp)

y_pred = mlp.forward(input_data)

loss = mlp.compute_loss(y_true, y_pred)

grads = mlp.backward(y_true)
params = mlp.parameters()

print("")
print("Predicted output (logits):\n", y_pred.shape)
print("Loss:", loss)
print("Gradients after backward pass:")
for grad, param in zip(grads, params):
    print(grad.shape, param.shape)
    
mlp.optimizer_step(grads, lr=0.01)

Linear(inp=768, out=1)
ReLU()
Linear(inp=1, out=10)

Predicted output (logits):
 (32, 10)
Loss: 2.302304993926562
Gradients after backward pass:
(10, 1) (10, 1)
(10,) (10,)
(1, 768) (1, 768)
(1,) (1,)


In [9]:
import torch
import torch.nn as nn
import torch.optim as optim

np.random.seed(42)
torch.manual_seed(42)

class TorchMLP(nn.Module):
    def __init__(self, hidden_layer_sizes, input_size, output_size):
        super(TorchMLP, self).__init__()
        layers = []
        layers.append(nn.Linear(input_size, hidden_layer_sizes[0]))
        layers.append(nn.ReLU())
        
        for i in range(len(hidden_layer_sizes) - 1):
            layers.append(nn.Linear(hidden_layer_sizes[i], hidden_layer_sizes[i + 1]))
            layers.append(nn.ReLU())
        
        layers.append(nn.Linear(hidden_layer_sizes[-1], output_size))
        self.net = nn.Sequential(*layers)
    
    def forward(self, x):
        return self.net(x)
    
def one_hot_encod(labels, num_classes):
    return np.eye(num_classes)[labels]

batch_size = 4
input_size = 10
output_size = 3
hidden_layers = [5, 5]

np_input = np.random.randn(batch_size, input_size).astype(np.float32)
np_labels = np.random.randint(0, output_size, batch_size)
one_hot_labels = one_hot_encod(np_labels, output_size)

torch_input = torch.tensor(np_input)
torch_labels = torch.tensor(np_labels)

custom_mlp = MLP(hidden_layer_sizes=hidden_layers, input_size=input_size, output_size=output_size)
torch_mlp = TorchMLP(hidden_layer_sizes=hidden_layers, input_size=input_size, output_size=output_size)

print(custom_mlp)

for layer_c, layer_t in zip(custom_mlp.layers, torch_mlp.net):
    if hasattr(layer_c, 'W') and hasattr(layer_t, 'weight'):
        layer_c.W = layer_t.weight.detach().numpy()
        layer_c.b = layer_t.bias.detach().numpy()

custom_preds = custom_mlp.forward(np_input)

torch_preds = torch_mlp(torch_input)

custom_loss = custom_mlp.compute_loss(one_hot_labels, custom_preds)
torch_loss_fn = nn.CrossEntropyLoss()
torch_loss = torch_loss_fn(torch_preds, torch_labels)

custom_mlp.zero_grad()
custom_grads = custom_mlp.backward(one_hot_labels)

torch_loss.backward()

torch_grads = [param.grad for param in torch_mlp.parameters()]

print(f'loss for custom: {custom_loss:.4f}, loss for torch: {torch_loss:.4f}')
print(f'custom: {custom_preds}\ntorch: {torch_preds}')

for c_grad, t_grad in zip(custom_grads[::-1], torch_grads):
    print(f'custom shape: {c_grad.shape}, torch shape: {t_grad.shape}')

def compare_gradients(c_grad, t_grad):
    print("Custom Gradient: ", c_grad.shape)
    print("Torch Gradient: ", t_grad.detach().numpy().shape)
    print(f"Allclose: {np.allclose(c_grad, t_grad)}")
    print("Difference: ", np.abs(c_grad - t_grad.detach().numpy()).mean(), '\n')

# compare gradients
compare_gradients(custom_grads[::-1][0], torch_grads[1])
compare_gradients(custom_grads[::-1][1], torch_grads[0])
compare_gradients(custom_grads[::-1][2], torch_grads[3])
compare_gradients(custom_grads[::-1][3], torch_grads[2])
compare_gradients(custom_grads[::-1][4], torch_grads[5])
compare_gradients(custom_grads[::-1][5], torch_grads[4])

Linear(inp=10, out=5)
ReLU()
Linear(inp=5, out=5)
ReLU()
Linear(inp=5, out=3)
loss for custom: 0.9984, loss for torch: 0.9984
custom: [[-0.2124254  -0.02882922 -0.27971405]
 [-0.21084696 -0.01194282 -0.23850769]
 [-0.2147279  -0.00732934 -0.2478374 ]
 [-0.32275128  0.11145934 -0.20302606]]
torch: tensor([[-0.2124, -0.0288, -0.2797],
        [-0.2108, -0.0119, -0.2385],
        [-0.2147, -0.0073, -0.2478],
        [-0.3228,  0.1115, -0.2030]], grad_fn=<AddmmBackward0>)
custom shape: (5,), torch shape: torch.Size([5, 10])
custom shape: (5, 10), torch shape: torch.Size([5])
custom shape: (5,), torch shape: torch.Size([5, 5])
custom shape: (5, 5), torch shape: torch.Size([5])
custom shape: (3,), torch shape: torch.Size([3, 5])
custom shape: (3, 5), torch shape: torch.Size([3])
Custom Gradient:  (5,)
Torch Gradient:  (5,)
Allclose: True
Difference:  3.7252904e-10 

Custom Gradient:  (5, 10)
Torch Gradient:  (5, 10)
Allclose: True
Difference:  6.146729e-10 

Custom Gradient:  (5,)
Torch Grad

In [10]:
# get data
from torchvision import datasets
import torchvision.transforms as transforms

def one_hot_encode(y, num_classes):
    one_hot = np.zeros((y.size, num_classes))
    one_hot[np.arange(y.size), y] = 1
    return one_hot

def mnist_to_numpy(is_train=True):
    dataset = datasets.MNIST(root='./data', train=is_train, download=True,
                             transform=transforms.ToTensor())
    
    data = []
    labels = []
    for image, label in dataset:
        flattened = image.numpy().flatten()
        data.append(flattened)
        labels.append(label)
    
    labels = np.array(labels)
    labels = one_hot_encode(labels, 10)
    return np.array(data), labels

X_train, y_train = mnist_to_numpy(is_train=True)
X_test, y_test = mnist_to_numpy(is_train=False)

print("Training data shape:", X_train.shape)
print("Test data shape:", X_test.shape)
print("Training labels shape:", y_train.shape)
print("Test labels shape:", y_test.shape)

def get_mini_batches(X, y, batch_size, num_iter):
    num_samples = X.shape[0]
    indices = np.arange(num_samples)
    
    for i in range(num_iter):
        if i % (num_samples // batch_size) == 0:
            np.random.shuffle(indices)
        
        start_idx = (i * batch_size) % num_samples
        batch_indices = indices[start_idx:start_idx + batch_size]
        
        yield X[batch_indices], y[batch_indices]

Training data shape: (60000, 784)
Test data shape: (10000, 784)
Training labels shape: (60000, 10)
Test labels shape: (10000, 10)


In [37]:
model = MLP([1000], 784, 10)

In [101]:
# training procedure 
batch_size = 256
num_iterations = 200

np.random.seed(42)

for i, (batch_X, batch_y) in enumerate(get_mini_batches(X_train, y_train, batch_size, num_iterations)):
    logits = model.forward(batch_X)
    model.zero_grad()
    
    loss = model.compute_loss(batch_y, logits)
    grads = model.backward(batch_y)
    
    lr = 0.1 if i < num_iterations // 2 else 0.01
    model.optimizer_step(grads, lr=lr)
    
    if i % 100 == 99: 
        print(f'Iteration {i+1} | loss: {loss:.4f}')

Iteration 100 | loss: 0.0546
Iteration 200 | loss: 0.1158


In [103]:
# get accuracy on test set
np.random.seed(42)

logits_test = model(X_test)
# print(f'logits shape: {logits_test.shape}')
y_preds = Softmax.forward(logits_test)
# print(f'logits shape: {y_preds.shape}')

pred_labels = np.argmax(y_preds, axis=1)
true_labels = np.argmax(y_test, axis=1)
# print(f'{pred_labels.shape}, {true_labels.shape}')


correct = np.sum(pred_labels == true_labels)
acc = (correct / len(pred_labels)) * 100
print(f'Accuracy: {acc}%')


Accuracy: 97.16%
