LeNet5,, using original arhcitecture so even though ReLU and Maxpooling are better than Sigmoid and Averagepooling, we use the original functions. Instead of using the original LeNet uniform [-0.05, 0.05] initialization I am going to use Xavier initialization. 

Not going to implement stride in the convolutional function as it is not needed.

| Layer      | Type                 | Parameters                             | Output Shape (input 28x28) |
| ---------- | -------------------- | -------------------------------------- | -------------------------- |
| **C1**     | Convolution          | 6 filters, 5×5 kernel, stride=1, pad=2 | (6, 28, 28)                |
|            | Activation (Sigmoid) |                                        | (6, 28, 28)                |
| **S2**     | Average Pooling      | 2×2 window, stride=2                   | (6, 14, 14)                |
| **C3**     | Convolution          | 16 filters, 5×5 kernel, stride=1       | (16, 10, 10)               |
|            | Activation (Sigmoid) |                                        | (16, 10, 10)               |
| **S4**     | Average Pooling      | 2×2 window, stride=2                   | (16, 5, 5)                 |
| **C5**     | Convolution          | 120 filters, 5×5 kernel, stride=1      | (120, 1, 1)                |
|            | Activation (Sigmoid) |                                        | (120,)                     |
| **F6**     | Fully Connected      | 120 → 84                               | (84,)                      |
|            | Activation (Sigmoid) |                                        | (84,)                      |
| **Output** | Fully Connected      | 84 → 10                                | (10,)                      |

![Architecture](figures/Architecture.png)


In [1]:
import numpy as np
from sklearn.datasets import fetch_openml

mnist = fetch_openml('mnist_784', version=1, as_frame=False)

X = mnist['data']       # Shape: (70000, 784)
y = mnist['target']     # Shape: (70000,)

X = X / 255.0           # Normalize pixel values to [0, 1]
y = y.astype(np.int32)  # Convert labels to integers

X = X.reshape(-1, 1, 28, 28) # Reshape for CNN

In [2]:
# Split into train/test (60k train, 10k test)
X_train, X_test = X[:60000], X[60000:]
y_train, y_test = y[:60000], y[60000:]

print(f"X_train shape: {X_train.shape}")
print(f"X_test shape: {X_test.shape}")
print(f"y_train shape: {y_train.shape}")
print(f"y_test shape: {y_test.shape}")

X_train shape: (60000, 1, 28, 28)
X_test shape: (10000, 1, 28, 28)
y_train shape: (60000,)
y_test shape: (10000,)


In [3]:
def sigmoid(x):
    return 1 / (1 + np.exp(-x))

In [21]:
def sigmoid_deriv(x):
    return sigmoid(x) * (1 - sigmoid(x))

In [4]:
def softmax(Z: np.ndarray) -> np.ndarray:
    """
    Softmax function to convert logits (Z) into probabilities.
    Works element-wise across classes.
    """
    Z = Z - np.max(Z, axis=1, keepdims=True)
    exp_Z = np.exp(Z)
    return exp_Z / np.sum(exp_Z, axis=1, keepdims=True)

In [5]:
def CrossEntropy(yhat: np.ndarray, y: np.ndarray, eps: float = 1e-15) -> float:
    """
    Computes the mean Cross-Entropy loss for multi-class classification.

    Parameters:
    - yhat (np.ndarray): Predicted probabilities with shape (batch_size, num_classes)
    - y (np.ndarray): One-hot encoded true labels with shape (batch_size, num_classes)
    - eps (float): Small value to prevent log(0)

    Returns:
    - float: Mean cross-entropy loss over the batch
    """
    yhat = np.clip(yhat, eps, 1 - eps)
    return -np.mean(np.sum(y * np.log(yhat), axis=1))

In [6]:
def padding(X, p):
    batch, channel, height, width = X.shape
    Y = np.zeros(shape=(batch, channel, height+ p * 2, width + p * 2))
    Y[:, :, p:p+height, p:p+width] = X
    return Y

This is going to get extremely ugly. I will try and explain what is going on but that for loop is tricky to simplify.

In [12]:
def Convolution(X, p, in_channels, out_channels, K, bias):
    """
    Performs a convolution over multi-channel input with multiple output channels.
    
    Arguments:
    - X: input tensor of shape (batch, in_channels, height, width)
    - padding: number of padding pixels on each side
    - in_channels: number of input channels
    - out_channels: number of output channels (number of filters)
    - K: Kernel
    - bias: Bias term added to each output channel
    
    Returns:
    - Y: output tensor of shape (batch, out_channels, output_height, output_width)
    - K: The kernel to be reused
    - b: The bias to be reused
    """

    # Pad X
    if p > 0:
        X = padding(X, p)
    batch, channel, height, width = X.shape

    # Compute output size (assuming stride=1)
    output_height = height - K.shape[2] + 1
    output_width  = width - K.shape[3] + 1
    Y = np.zeros(shape=(batch, out_channels, output_height, output_width))

    """
    First loop attempt. Worked but was really slow.

    for b in range(batch):
        for out_ch in range(out_channels):
            for in_ch in range(in_channels):
                for h in range(output_height):
                    for w in range(output_width):
                        Y[b, out_ch, h, w] += np.sum(
                            X[b, in_ch, h: h + K.shape[2], w: w + K.shape[3]] * 
                            K[out_ch, in_ch]
            Y[b, out_ch, :, :] += bias[out_ch]
    """

    for b in range(batch):
        for out_ch in range(out_channels):
            accum = np.zeros((output_height, output_width))
            # Sum over each input channel
            for in_ch in range(in_channels):
                # This uses a 2D sliding window, no inner h,w loop
                for i in range(K.shape[2]):
                    for j in range(K.shape[3]):
                        accum += X[b, in_ch, i:i+output_height, j:j+output_width] * K[out_ch, in_ch, i, j]
            # Add bias
            Y[b, out_ch] = accum + bias[out_ch]

    return Y, K, bias

In [23]:
def ConvBackward(dY, X, K):
    """
    Robust backprop for a convolution layer.
    """
    batch, in_channels, height, width = X.shape
    out_channels, _, kH, kW = K.shape
    out_height, out_width = dY.shape[2], dY.shape[3]

    dK = np.zeros_like(K)
    db = np.sum(dY, axis=(0,2,3)) / batch
    dX = np.zeros_like(X)

    for b in range(batch):
        for out_ch in range(out_channels):
            for in_ch in range(in_channels):
                for i in range(kH):
                    for j in range(kW):
                        h_end = i + out_height
                        w_end = j + out_width
                        # clip if we overshoot the input dims
                        if h_end > height or w_end > width:
                            h_end = min(h_end, height)
                            w_end = min(w_end, width)
                            slice_h = h_end - i
                            slice_w = w_end - j
                            # also slice dY to match
                            dY_slice = dY[b, out_ch, :slice_h, :slice_w]
                            X_slice = X[b, in_ch, i:h_end, j:w_end]
                        else:
                            X_slice = X[b, in_ch, i:h_end, j:w_end]
                            dY_slice = dY[b, out_ch]

                        # gradient w.r.t. kernel weights
                        dK[out_ch, in_ch, i, j] += np.sum(X_slice * dY_slice) / batch

                        # gradient w.r.t. input
                        dX[b, in_ch, i:h_end, j:w_end] += K[out_ch, in_ch, i, j] * dY_slice
    return dX, dK, db


In [13]:
def Pooling(X, kernel_size, stride):

    batch, channel, height, width = X.shape
    output_height = (height - kernel_size[0]) // stride + 1
    output_width  = (width - kernel_size[1]) // stride + 1
    Y = np.zeros(shape=(batch, channel, output_height, output_width))

    """
    First attempt, too slow.
    for b in range(batch):
        for ch in range(channel):
            for h in range(output_height):
                for w in range(output_width):
                    # Maxpooling is better but I am sticking to the default architecture aside from the Xavier Initialization
                    # Calculated this from a piece of paper, helped me understand everything a lot better.
                    Y[b, ch, h, w] = np.mean(X[b, ch, 
                                               (h * stride): (h * stride) + kernel_size[0],
                                               (w * stride): (w * stride) + kernel_size[1]])
    """
    window_sum = np.zeros((batch, channel, output_height, output_width))
    for i in range(kernel_size[0]):
        for j in range(kernel_size[1]):
            window_sum += X[:, :, 
                            i:i+output_height*stride:stride, 
                            j:j+output_width*stride:stride]
    Y = window_sum / (kernel_size[0] * kernel_size[1])

    return Y

In [18]:
def PoolingBackward(dY, X, kernel_size, stride):
    batch, channel, height, width = X.shape
    output_height, output_width = dY.shape[2], dY.shape[3]
    kH, kW = kernel_size
    dX = np.zeros_like(X)

    for b in range(batch):
        for ch in range(channel):
            for h in range(output_height):
                for w in range(output_width):
                    dX[b, ch,
                       (h * stride):(h * stride) + kH,
                       (w * stride):(w * stride) + kW] += dY[b, ch, h, w] / (kH * kW)
    return dX

In [9]:
def xavier_init(size, n_in, n_out):
    limit = np.sqrt(6 / (n_in + n_out))
    return np.random.uniform(-limit, limit, size=size)

In [10]:
# n_in = in_channels * kernel_size[0] * kernel_size[1]
# n_out = out_channels * kernel_size[0] * kernel_size[1]

c1_kernel = xavier_init(size=(6, 1, 5, 5),n_in=25, n_out=150)
c1_bias = np.zeros(6)

c3_kernel = xavier_init(size=(16, 6, 5, 5),n_in=150, n_out=400)
c3_bias = np.zeros(16)

c5_kernel = xavier_init(size=(120, 16, 5, 5),n_in=400, n_out=3000)
c5_bias = np.zeros(120)

f6_weights = xavier_init((84, 120), n_in=120, n_out=84)
f6_bias = np.zeros(84)

out_weights = xavier_init((10, 84), n_in=84, n_out=10)
out_bias = np.zeros(10)

In [None]:
batch_size = 4
epochs = 5
lr = 0.01

for epoch in range(epochs):
    total_loss = 0    # Sum of losses for this epoch
    correct = 0       # Count of correct predictions

    # Shuffle data each epoch
    perm = np.random.permutation(X_train.shape[0])
    X_train = X_train[perm]
    y_train = y_train[perm]
    
    for batch in range(0, X_train.shape[0], 4):
        print(batch)
        X_train_batch = X_train[batch: batch + batch_size]  # (batch_size, 784)
        y_train_batch = y_train[batch: batch + batch_size]  # (batch_size,)
        batch_len = X_train_batch.shape[0]  # Actual batch size (handles last batch)

        # Forward Pass
        c1, c1_kernel, c1_bias = Convolution(X_train_batch,
                                             p=2, 
                                             in_channels=1, 
                                             out_channels=6,
                                             K=c1_kernel,
                                             bias=c1_bias)
        a1 = sigmoid(c1)
        s2 = Pooling(a1, kernel_size=(2, 2), stride=2)
        c3, c3_kernel, c3_bias = Convolution(s2,
                                             p=0,
                                             in_channels=6,
                                             out_channels=16,
                                             K=c3_kernel,
                                             bias=c3_bias
                                             )
        a3 = sigmoid(c3)
        s4 = Pooling(a3, kernel_size=(2, 2), stride=2)
        c5, c5_kernel, c5_bias = Convolution(s4,
                                             p=0,
                                             in_channels=16,
                                             out_channels=120,
                                             K=c5_kernel,
                                             bias=c5_bias
                                             )
        a5 = sigmoid(c5).reshape(4, 120)
        f6 = np.dot(a5, f6_weights.T) + f6_bias
        a6 = sigmoid(f6)
        output = np.dot(a6, out_weights.T) + out_bias
        yhat = softmax(output)
        # Onehot Encoding Labels
        y_onehot = np.zeros((batch_len, 10))
        y_onehot[np.arange(batch_len), y_train_batch] = 1   # Turn class indices into one-hot vectors

        # Loss and Accuracy
        loss = CrossEntropy(yhat, y_onehot)                 # Average loss over batch
        total_loss += loss

        preds = np.argmax(yhat, axis=1)                     # Class prediction per sample
        correct += np.sum(preds == y_train_batch)           # Tally correct predictions

        dz_output = yhat - y_onehot
        # Grad for output layer
        dw_out = np.dot(dz_output.T, a6) / batch_len  # shape (10, 84)
        db_out = dz_output.sum(axis=0) / batch_len    # shape (10,)

        # Propagate to hidden layer
        da6 = np.dot(dz_output, out_weights)          # (batch, 84)
        dz6 = da6 * sigmoid_deriv(f6)                 # (batch, 84)

        # Grad for F6 layer
        dw_f6 = np.dot(dz6.T, a5) / batch_len         # shape (84, 120)
        db_f6 = dz6.sum(axis=0) / batch_len           # shape (84,)

        # Propagate to C5 output
        da5 = np.dot(dz6, f6_weights)                 # (batch, 120)
        da5 = da5.reshape(batch_len, 120, 1, 1)       # for compatibility with conv

        # Propagate through C5 layer
        ds4, dK5, db5 = ConvBackward(da5, s4, c5_kernel)

        # Propagate through S4
        da3 = PoolingBackward(ds4, a3, (2,2), 2)

        #Propagate through C3
        ds2, dK3, db3 = ConvBackward(da3 * sigmoid_deriv(c3), s2, c3_kernel)

        #Propagate through S2
        da1 = PoolingBackward(ds2, a1, (2,2), 2)

        #Propagate through C1
        _, dK1, db1 = ConvBackward(da1 * sigmoid_deriv(c1), X_train_batch, c1_kernel)

        # Adjusting Parameters
        out_weights -= lr * dw_out
        out_bias -= lr * db_out
        f6_weights -= lr * dw_f6
        f6_bias -= lr * db_f6
        c5_kernel -= lr * dK5
        c5_bias -= lr * db5
        c3_kernel -= lr * dK3
        c3_bias -= lr * db3
        c1_kernel -= lr * dK1
        c1_bias -= lr * db1

    # Epoch Results
    acc = correct / X_train.shape[0]
    print(f"Epoch {epoch+1} | Loss: {total_loss / X_train.shape[0]:.4f} | Accuracy: {acc:.4f}")
        

0
4
8
12
16
20


KeyboardInterrupt: 