In [None]:
import numpy as np

In [None]:
def softmax(X: np.ndarray, dim: int = None, dL: np.ndarray = None) -> np.ndarray:
    """_summary_

    Args:
        X (np.ndarray): _description_
        dim (int, optional): _description_. Defaults to None.
        dL (np.ndarray, optional): _description_. Defaults to None.

    Returns:
        np.ndarray: _description_
    """
    # is dL is not None, the function is called to perform the backprop pass
    if dL is not None:
        # compute backpropagated loss
        e = np.exp(X - np.max(X))
        s = e / np.sum(e)
        return np.sum(dL * s) - s * np.sum(dL * s)
    else:
        return np.exp(X - np.max(X)) / np.sum(np.exp(X - np.max(X)))


In [None]:
def add(x: np.ndarray, b: np.ndarray, dL: np.ndarray = None) -> np.ndarray:
    """
    This function executes the addition operation 
    """    
    if dL is not None:
        # this operation has no effect on the gradient
        return dL, dL
    else:
        # compute addition
        return x + b

In [None]:
def dot(b: np.ndarray, x: np.ndarray, dL: np.ndarray = None) -> np.ndarray:
    """Compute dot product or backpropagates the loss.
    
    \\text{dot}(x, b) = x \cdot b
    
    Args:
        x (np.ndarray): input array
        b (np.ndarray): bias array
        dL (np.ndarray, optional): backpropagated loss. Defaults to None.
        
    Returns:
        np.ndarray: dot product or backpropagated loss
    """
    if dL is not None:
        return np.dot(dL, x.T), np.dot(b.T, dL)
    else:
        # compute dot product
        return np.dot(b, x)

In [None]:
def batch_dot(x: np.ndarray, b: np.ndarray, dL: np.ndarray = None) -> np.ndarray:
    if dL is not None:
        # compute backpropagated loss
        return dL * b
    else:
        # compute batch dot product
        return np.sum(x * b, axis=1)
    

In [None]:
def relu(X: np.ndarray, dL: np.ndarray = None) -> np.ndarray:
    """
    compute the RELU operation over (X)
    """
    if dL is not None:
        # the gradient of relu is simply
        return (X > 0) * dL  
    else:
        # compute ReLU
        return np.maximum(X, 0)

In [None]:
def matmul(X: np.ndarray, W: np.ndarray, dL: np.ndarray = None) -> np.ndarray:
    """Compute matrix multiplication or backpropagates the loss.
    
    \\text{matmul}(X, W) = WX
    
    Args:
        X (np.ndarray): input array
        W (np.ndarray): weight array
        dL (np.ndarray, optional): backpropagated loss. Defaults to None.
        
    Returns:
        np.ndarray: matrix multiplication or backpropagated loss
    """
    if dL is not None:
        # compute backpropagated loss
        return np.dot(dL, W.T), np.dot(X.T, dL)
    else:
        # compute matrix multiplication
        return np.dot(X, W)

In [None]:
def bce(X: np.ndarray, y: np.ndarray, dL: np.ndarray = None) -> np.ndarray:
    """Compute binary cross entropy or backpropagates the loss.
    
    \\text{bce}(X, y) = -y \log(X) - (1 - y) \log(1 - X)
    
    Args:
        X (np.ndarray): input array
        y (np.ndarray): target array
        dL (np.ndarray, optional): backpropagated loss. Defaults to None.
        
    Returns:
        np.ndarray: binary cross entropy or backpropagated loss
    """
    if dL is not None:
        # compute backpropagated loss
        return -y / (X + (np.finfo(float).eps)) + (1 - y) / (1 - X + np.finfo(float).eps)
    else:
        # compute binary cross entropy
        return -np.mean(y * np.log(X) + (1 - y) * np.log(1 - X))

In [None]:
def conv(
    X: np.ndarray, 
    k: np.ndarray, 
    dL: np.ndarray = None
) -> np.ndarray:
    if dL is not None:
        # Compute backpropagated loss over kernel and input image
        dLdX = np.zeros_like(X)
        dLdK = np.zeros_like(k)

        for i in range(X.shape[0]):
            for j in range(k.shape[0]):
                for h in range(X.shape[2] - k.shape[2] + 1):
                    for w in range(X.shape[3] - k.shape[3] + 1):
                        dLdX[i, :, h:h + k.shape[2], w:w + k.shape[3]] += dL[i, j, h, w] * k[j, :, :, :]
                        dLdK[j, :, :, :] += dL[i, j, h, w] * X[i, :, h:h + k.shape[2], w:w + k.shape[3]]
        
        return dLdX, dLdK

    else:
        output = np.zeros(
            (X.shape[0], k.shape[0], X.shape[2] - k.shape[2] + 1, X.shape[3] - k.shape[3] + 1)
        )
        for i in range(X.shape[0]):
            for j in range(k.shape[0]):
                for h in range(X.shape[2] - k.shape[2] + 1):
                    for w in range(X.shape[3] - k.shape[3] + 1):
                        output[i, j, h, w] = np.sum(X[i, :, h:h + k.shape[2], w:w + k.shape[3]] * k[j, :, :, :])        
        return output


In [None]:
def flatten(X: np.ndarray, dL: np.ndarray = None) -> np.ndarray:
    batch_size = X.shape[0]
    
    if dL is not None:
        return dL.reshape(X.shape)
    else:
        return X.reshape(batch_size, -1)

In [None]:
from tqdm import tqdm

def minibatch_gd_mnist(X, y, 
                    k1: np.ndarray = None, 
                    k2: np.ndarray = None,
                    k3: np.ndarray = None, 
                    w: np.ndarray = None,
                    epochs=10, 
                    batch_size=32,
                    lr=0.01):
    
    # set the parameters
    k1 = k1 if k1 is not None else np.random.randn(16, 1, 3, 3)
    k2 = k2 if k2 is not None else np.random.randn(32, 16, 3, 3)
    k3 = k3 if k3 is not None else np.random.randn(10, 32, 3, 3)

    w = w if w is not None else np.random.randn(4840, 10)
    
    for n in range(epochs):
        for i in tqdm(range(0, len(X), batch_size)):
            y_ = y[i:i + batch_size]
            x = X[i:i + batch_size]
            
            conv1 = conv(x, k1)
            relu1 = relu(conv1)
            conv2 = conv(relu1, k2)
            relu2 = relu(conv2)
            conv3 = conv(relu2, k3)
            relu3 = relu(conv3)
            
            # flatten
            x_fl = flatten(relu3)
            
            # dense
            x_mm = matmul(x_fl, w)
            
            # softmax
            y_hat = softmax(x_mm, dim=1)
            
            # loss
            loss = bce(y_hat, y_)
            
            # backpropagation
            dL = bce(y_hat, y_, dL=1)
            dL, dW = matmul(x_fl, w, dL=dL)
            dL = flatten(relu3, dL)
            dL = relu(conv3, dL)
            dL, dK_3 = conv(relu2, k3, dL=dL)
            dL = relu(conv2, dL)
            dL, dK_2 = conv(relu1, k2, dL=dL)
            dL = relu(conv1, dL)
            dL, dK_1 = conv(x, k1, dL=dL)
            
            # perform the gradient descent rule
            k1 -= lr * dK_1
            k2 -= lr * dK_2
            k3 -= lr * dK_3
            w -= lr * dW
            
        print(f'Epoch {n + 1} | Loss: {loss:.4f}')

    return k1, k2, k3, w



In [None]:


def eval(x, k1, k2, k3, w):
    conv1 = conv(x, k1)
    relu1 = relu(conv1)
    conv2 = conv(relu1, k2)
    relu2 = relu(conv2)
    conv3 = conv(relu2, k3)
    relu3 = relu(conv3)
    
    # flatten
    x = relu3.reshape(relu3.shape[0], -1)
    
    # dense
    x = matmul(x, w)
    
    # softmax
    y_hat = softmax(x, dim=1)
    
    return y_hat

In [None]:
import torch
def compare_with_torch():
    X = np.random.rand(1, 1, 28, 28)
    k = np.random.rand(1, 1, 3, 3)

    my_conv = conv(X, k)
    torch_conv = torch.nn.Conv2d(1, 1, 3, bias=False)
    torch_conv.weight.data = torch.tensor(k)
    torch_conv_out = torch_conv(torch.tensor(X))

    assert np.allclose(my_conv, torch_conv_out.detach().numpy())

    my_conv_dL = conv(X, k, dL=np.ones_like(my_conv))
    torch_conv_out.backward(torch.ones_like(torch_conv_out))

    assert np.allclose(my_conv_dL[1], torch_conv.weight.grad.detach().numpy())

    print("forward pass and backward pass validated !!")

In [None]:
compare_with_torch()

In [None]:
# first load the MNIST dataset.
import os
from torch.utils.data import Dataset
from torchvision.datasets import MNIST
from torch.utils.data import DataLoader
from torchvision import transforms as tr

mnist_train = MNIST(root=os.getcwd(), train=True, download=True, transform=tr.ToTensor())
mnist_test = MNIST(root=os.getcwd(), train=False, download=True, transform=tr.ToTensor())

In [None]:
X, y = np.asarray([mnist_train[i][0].numpy() for i in range(500)]), np.asarray([mnist_train[i][1] for i in range(500)])
X_test, y_test = np.asarray([mnist_test[i][0].numpy() for i in range(500)]), np.asarray([mnist_test[i][1] for i in range(500)])

In [None]:
X = X.reshape(-1, 1, 28, 28).astype(np.float32)
x_test = X_test.reshape(-1, 1, 28, 28).astype(np.float32)
y = y.reshape(-1, 1)
y_test = y_test.reshape(-1, 1)

In [128]:
params = minibatch_gd_mnist(X, y, batch_size=32, epochs=2, lr=0.01)

Epoch 1 | Loss: 40.6785
Epoch 2 | Loss: 20.4560


In [131]:
y_hat = eval(x_test, *params)
print(f'Accuracy: {np.mean(np.argmax(y_hat, axis=1) == y_test)}')

Accuracy: 0.3456
