In [25]:
import numpy as np
from sklearn.datasets import fetch_openml  

In [26]:
import sys
sys.path.append(r"C:\Users\kanin\Desktop\CV\Neural network")
import core.nn as nn
import core.optim as optim
from core.losses import MSE, CrossEntropy
from core.utils import accuracy

In [27]:
def dataset(loader_fn, train_num, test_num):
    data_x, data_y = loader_fn
    
    classes = np.unique(data_y)
    
    train_x_list = []
    train_y_list = []
    test_x_list = []
    test_y_list = []
    
    for cls in classes:
        cls_indices = np.where(data_y == cls)[0]
        cls_indices = np.random.permutation(cls_indices)
        
        X_cls = data_x[cls_indices]
        Y_cls = data_y[cls_indices]
        
        train_x_list.append(X_cls[:train_num])
        train_y_list.append(Y_cls[:train_num])
        
        test_x_list.append(X_cls[train_num:train_num + test_num])
        test_y_list.append(Y_cls[train_num:train_num + test_num])
        
    X_train = np.concatenate(train_x_list)
    y_train = np.concatenate(train_y_list)
    X_test = np.concatenate(test_x_list)
    y_test = np.concatenate(test_y_list)
    
    train_perm = np.random.permutation(len(X_train))
    X_train = X_train[train_perm]
    y_train = y_train[train_perm]
    
    test_perm = np.random.permutation(len(X_test))
    X_test = X_test[test_perm]
    y_test = y_test[test_perm]
    
    return X_train, y_train, X_test, y_test

In [28]:
data = fetch_openml("mnist_784")

In [29]:
# Load and preprocess MNIST dataset
# Normalize pixel values to range 0-1 by dividing by 255.0 (grayscale images originally 0-255)
# Convert labels to integer type using astype('int16') to ensure numeric operations work correctly 

X_train, y_train, X_test, y_test = dataset(
    (
        np.asarray(data["data"].values) / 255.0,                 # Normalize input images
        np.asarray(data["target"].values.astype('int16'))        # Convert labels to integers
    ),
    train_num=1000,
    test_num=100
)


In [30]:
X_train, X_test = X_train.reshape(-1, 1, 28, 28), X_test.reshape(-1, 1, 28, 28)

In [31]:
class MaxPool2d(nn.Module):
    def __init__(self, pool_size=(2,2), stride=None):
        super().__init__()
        if isinstance(pool_size, int):
            pool_size = (pool_size, pool_size)
        self.pool_size = pool_size

        if stride is None:
            stride = pool_size  # default (2,2)
        if isinstance(stride, int):
            stride = (stride, stride)
        self.stride = stride

        self.x = None
        self.argmax = None

    def forward(self, x):
        """
        x: (N, C, H, W)
        """
        self.x = x
        N, C, H, W = x.shape
        KH, KW = self.pool_size
        SH, SW = self.stride

        # Output size
        OH = (H - KH) // SH + 1
        OW = (W - KW) // SW + 1

        # -------- im2col for pooling ---------
        # Create sliding windows using strides trick
        shape = (N, C, OH, OW, KH, KW)
        strides = (
            x.strides[0],
            x.strides[1],
            SH * x.strides[2],
            SW * x.strides[3],
            x.strides[2],
            x.strides[3],
        )

        windows = np.lib.stride_tricks.as_strided(
            x, shape=shape, strides=strides, writeable=False
        )  # (N, C, OH, OW, KH, KW)

        # Flatten window dims → (N, C, OH, OW, KH*KW)
        windows_reshaped = windows.reshape(N, C, OH, OW, KH * KW)

        # Max over last dimension
        out = windows_reshaped.max(axis=4)

        # Store argmax indices for backward
        self.argmax = windows_reshaped.argmax(axis=4)

        return out  # (N, C, OH, OW)

    def backward(self, grad_output):
        """
        grad_output: (N, C, OH, OW)
        return dx: (N, C, H, W)
        """
        x = self.x
        N, C, H, W = x.shape
        KH, KW = self.pool_size
        SH, SW = self.stride

        OH, OW = grad_output.shape[2:]

        # Initialize dx
        dx = np.zeros_like(x)

        # Compute base indices for each pooling region
        for i in range(OH):
            for j in range(OW):
                # Flatten => indices in [0, KH*KW)
                idx = self.argmax[:, :, i, j]  # (N, C)

                # Map flattened index → (kh, kw)
                kh = idx // KW
                kw = idx % KW

                # Build coordinates in input
                ih = i * SH + kh
                iw = j * SW + kw

                # Accumulate gradients
                dx[np.arange(N)[:, None], np.arange(C), ih, iw] += grad_output[:, :, i, j]

        return dx


In [32]:
def im2col(input_data, kernel_size, stride, padding):
    """
    input_data: (B, C, H, W)
    kernel_size: (KH, KW)
    stride: (SH, SW)
    padding: (PH, PW)
    """
    B, C, H, W = input_data.shape
    KH, KW = kernel_size
    SH, SW = stride
    PH, PW = padding

    H_p, W_p = input_data.shape[2], input_data.shape[3]
    OH = (H_p - KH) // SH + 1
    OW = (W_p - KW) // SW + 1

    col = np.empty((B, OH, OW, C, KH, KW), dtype=input_data.dtype)
    for y in range(OH):
        y_min = y * SH
        y_max = y_min + KH
        for x in range(OW):
            x_min = x * SW
            x_max = x_min + KW
            col[:, y, x, :, :, :] = input_data[:, :, y_min:y_max, x_min:x_max]

    return col.reshape(B * OH * OW, -1), OH, OW


def col2im(cols, input_shape, kernel_size, stride, padding, OH, OW):
    """
    cols: (B*OH*OW, C*KH*KW)
    input_shape: original input (B, C, H, W) before padding
    OH, OW: output height and width from im2col
    """
    B, C, H, W = input_shape
    KH, KW = kernel_size
    SH, SW = stride
    PH, PW = padding

    H_p, W_p = H + 2*PH, W + 2*PW
    cols_reshaped = cols.reshape(B, OH, OW, C, KH, KW).transpose(0, 3, 4, 5, 1, 2)  # (B, C, KH, KW, OH, OW)
    # print(cols_reshaped)
    out = np.zeros((B, C, H_p, W_p), dtype=cols.dtype)
    for y in range(OH):
        y_start = y * SH
        for x in range(OW):
            x_start = x * SW
            # (B, C, H, W)
            out[:, :, y_start:y_start+KH, x_start:x_start+KW] += cols_reshaped[:, :, :, :, y, x]

    # Remove padding
    if PH > 0 or PW > 0:
        return out[:, :, PH:H_p-PH, PW:W_p-PW]
    return out

class Conv2d(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, stride=(1, 1), padding=(0, 0)):
        super().__init__()
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.kernel_size = kernel_size if isinstance(kernel_size, tuple) else (kernel_size, kernel_size)
        self.stride = stride if isinstance(stride, tuple) else (stride, stride)
        self.padding = padding if isinstance(padding, tuple) else (padding, padding)
        KH, KW = self.kernel_size

        limit = 1 / np.sqrt(in_channels * KH * KW)
        self.weights = nn.Parameter(np.random.uniform(-limit, limit, (out_channels, in_channels, KH, KW)))
        self.biases = nn.Parameter(np.zeros(out_channels))

    def forward(self, x):
        self.x = x  # original unpadded input

        # Pad the input enplicitly here
        PH, PW = self.padding
        if PH > 0 or PW > 0:
            self.x_padded = np.pad(x, ((0, 0), (0, 0), (PH, PH), (PW, PW)), mode='constant')
        else:
            self.x_padded = x

        self.cols, self.OH, self.OW = im2col(
            self.x_padded, self.kernel_size, self.stride, (0, 0)  # No extra padding here
        )

        W_col = self.weights.data.reshape(self.out_channels, -1).T
        out = self.cols @ W_col
        out += self.biases.data
        out = out.reshape(x.shape[0], self.OH, self.OW, self.out_channels).transpose(0, 3, 1, 2)
        return out

    def backward(self, d_out):
        B, OC, OH, OW = d_out.shape
        d_out_flat = d_out.transpose(0, 2, 3, 1).reshape(-1, OC)

        self.weights.grad += (d_out_flat.T @ self.cols).reshape(self.weights.data.shape)
        self.biases.grad += d_out_flat.sum(axis=0)
        W_rot = np.flip(self.weights.data, axis=(2, 3))    # (OC, IC, KH, KW)

        W_rot_col = W_rot.reshape(OC, -1)  
        
        
        d_cols = d_out_flat @ W_rot_col    # (B*OH*OW, IC*KH*KW)

        dx_padded = col2im(
            d_cols,
            self.x_padded.shape,
            self.kernel_size,
            self.stride,
            (0, 0),
            self.OH,
            self.OW
        )

        # Remove padding before returning
        PH, PW = self.padding
        if PH > 0 or PW > 0:
            return dx_padded[:, :, PH:-PH, PW:-PW]
        return dx_padded


In [33]:
class Flatten(nn.Module):
    def forward(self, x):
        self.input_shape = x.shape
        return x.reshape(x.shape[0], -1)

    def backward(self, grad_output):
        return grad_output.reshape(self.input_shape)


In [34]:
model = nn.Sequential([
    Conv2d(in_channels=1, out_channels=4, kernel_size=3),  
    nn.ReLU(),
    Flatten(),                                              
    nn.Linear(2704, 32),
    nn.ReLU(),
    nn.Linear(32, 10),
])


In [35]:
model_2 = nn.Sequential([
    Conv2d(in_channels=1, out_channels=4, kernel_size=3),
    nn.ReLU(),

    MaxPool2d(pool_size=(2,2), stride=(2,2)),       # 26→13

    Flatten(),                                      # 4 * 13 * 13 = 676
    nn.Linear(676, 32),
    nn.ReLU(),
    nn.Linear(32, 10),
])


In [36]:
model = model_2

loss_fn = CrossEntropy()

epochs = 15
batch_size = 32
initial_lr = 0.01

optimizer = optim.SGD(model.parameters(), lr=initial_lr)

for epoch in range(epochs):
    for i in range(0, X_train.shape[0], batch_size):
        x_batch = X_train[i:i+batch_size]
        y_batch = y_train[i:i+batch_size]

        logits = model.forward(x_batch)
        loss = loss_fn.forward(logits, y_batch)
        grad_output = loss_fn.backward()
        model.backward(grad_output)
        optimizer.step()
        optimizer.zero_grad()

    logits_train = model.forward(X_train)
    train_loss = loss_fn.forward(logits_train, y_train)
    train_acc = accuracy(logits_train, y_train)

    logits_test = model.forward(X_test)
    test_loss = loss_fn.forward(logits_test, y_test)
    test_acc = accuracy(logits_test, y_test)

    print(f"Epoch {epoch+1} Summary: "
          f"Train Acc={train_acc:.4f}, Train Loss={train_loss:.4f}, "
          f"Test Acc={test_acc:.4f}, Test Loss={test_loss:.4f}")

Epoch 1 Summary: Train Acc=0.1716, Train Loss=2.3352, Test Acc=0.1640, Test Loss=2.3356
Epoch 2 Summary: Train Acc=0.2437, Train Loss=2.2784, Test Acc=0.2290, Test Loss=2.2785
Epoch 3 Summary: Train Acc=0.5043, Train Loss=2.2247, Test Acc=0.4790, Test Loss=2.2242
Epoch 4 Summary: Train Acc=0.6115, Train Loss=2.0191, Test Acc=0.6120, Test Loss=2.0170
Epoch 5 Summary: Train Acc=0.6733, Train Loss=1.4577, Test Acc=0.6850, Test Loss=1.4359
Epoch 6 Summary: Train Acc=0.7530, Train Loss=1.0838, Test Acc=0.7600, Test Loss=1.0811
Epoch 7 Summary: Train Acc=0.7947, Train Loss=0.9066, Test Acc=0.8010, Test Loss=0.9337
Epoch 8 Summary: Train Acc=0.8272, Train Loss=0.7845, Test Acc=0.8300, Test Loss=0.8106
Epoch 9 Summary: Train Acc=0.8540, Train Loss=0.7077, Test Acc=0.8550, Test Loss=0.7240
Epoch 10 Summary: Train Acc=0.8703, Train Loss=0.6407, Test Acc=0.8660, Test Loss=0.6134
Epoch 11 Summary: Train Acc=0.8868, Train Loss=0.5895, Test Acc=0.8830, Test Loss=0.5434
Epoch 12 Summary: Train Acc=0.