In [36]:
import numpy as np
import torch
from torchvision import datasets, transforms

In [37]:
# Load and preprocess the data
transform = transforms.Compose([transforms.ToTensor()])
mnist_train = datasets.MNIST(root='/mnt/d/CUDA/cuda-learn/mnist-cuda/data', train=True, download=True, transform=transform)
mnist_test = datasets.MNIST(root='./data', train=False, download=True, transform=transform)

X_train = mnist_train.data.numpy().reshape(-1, 1, 28, 28) / 255.0
y_train = mnist_train.targets.numpy()
X_test = mnist_test.data.numpy().reshape(-1, 1, 28, 28) / 255.0
y_test = mnist_test.targets.numpy()

In [40]:
import numpy as np

# Convolutional Layer
def conv2d_init(in_channels, out_channels, kernel_size, stride=1, padding=0):
    return np.random.randn(out_channels, in_channels, kernel_size, kernel_size) * np.sqrt(2. / (in_channels * kernel_size * kernel_size))

def conv2d_forward(x, w, stride, padding):
    x_padded = np.pad(x, ((0,0), (0,0), (padding,padding), (padding,padding)), mode='constant')
    n, c, h, w_dim = x_padded.shape
    out_h = (h - w.shape[2]) // stride + 1
    out_w = (w_dim - w.shape[3]) // stride + 1
    out = np.zeros((n, w.shape[0], out_h, out_w))

    for i in range(out_h):
        for j in range(out_w):
            out[:, :, i, j] = np.sum(x_padded[:, np.newaxis, :, i*stride:i*stride+w.shape[2], j*stride:j*stride+w.shape[3]] * w[np.newaxis, :, :, :, :], axis=(2,3,4))

    return out, x_padded

def conv2d_backward(dout, x_padded, w, stride, padding):
    n, _, out_h, out_w = dout.shape
    dx = np.zeros_like(x_padded)
    dw = np.zeros_like(w)

    for i in range(out_h):
        for j in range(out_w):
            x_slice = x_padded[:, :, i*stride:i*stride+w.shape[2], j*stride:j*stride+w.shape[3]]
            for k in range(w.shape[0]):  # out_channels
                dx[:, :, i*stride:i*stride+w.shape[2], j*stride:j*stride+w.shape[3]] += w[k, :, :, :] * dout[:, k, i, j][:, np.newaxis, np.newaxis, np.newaxis]
                dw[k, :, :, :] += np.sum(x_slice * dout[:, k, i, j][:, np.newaxis, np.newaxis, np.newaxis], axis=0)

    return dx[:, :, padding:-padding, padding:-padding], dw

# ReLU Activation
def relu_forward(x):
    return np.maximum(0, x)

def relu_backward(dout, x):
    return dout * (x > 0)

# Max Pooling
def maxpool_forward(x, kernel_size, stride):
    n, c, h, w = x.shape
    out_h = (h - kernel_size) // stride + 1
    out_w = (w - kernel_size) // stride + 1
    out = np.zeros((n, c, out_h, out_w))

    for i in range(out_h):
        for j in range(out_w):
            out[:, :, i, j] = np.max(x[:, :, i*stride:i*stride+kernel_size, j*stride:j*stride+kernel_size], axis=(2,3))

    return out

def maxpool_backward(dout, x, kernel_size, stride):
    n, c, out_h, out_w = dout.shape
    dx = np.zeros_like(x)

    for i in range(out_h):
        for j in range(out_w):
            window = x[:, :, i*stride:i*stride+kernel_size, j*stride:j*stride+kernel_size]
            mask = window == np.max(window, axis=(2,3))[:, :, np.newaxis, np.newaxis]
            dx[:, :, i*stride:i*stride+kernel_size, j*stride:j*stride+kernel_size] += mask * dout[:, :, i:i+1, j:j+1]

    return dx

# Linear Layer
def linear_init(in_features, out_features):
    return np.random.randn(out_features, in_features) * np.sqrt(2. / in_features)

def linear_forward(x, w):
    return w @ x

def linear_backward(dout, x, w):
    dx = w.T @ dout
    dw = dout @ x.T
    return dx, dw

# Softmax and Cross-Entropy Loss
def softmax(x):
    exp_x = np.exp(x - np.max(x, axis=0, keepdims=True))
    return exp_x / np.sum(exp_x, axis=0, keepdims=True)

def cross_entropy_loss(y_pred, y_true):
    m = y_pred.shape[1]
    p = softmax(y_pred)
    log_likelihood = -np.log(p[y_true, range(m)])
    loss = np.sum(log_likelihood) / m
    return loss

# Neural Network
def init_neural_net():
    return {
        'conv1': conv2d_init(1, 32, 3, padding=1),
        'fc1': linear_init(32 * 14 * 14, 128),
        'fc2': linear_init(128, 10)
    }

def forward(x, weights):
    conv1, x_padded = conv2d_forward(x, weights['conv1'], stride=1, padding=1)
    relu1 = relu_forward(conv1)
    pool1 = maxpool_forward(relu1, kernel_size=2, stride=2)
    fc1_input = pool1.reshape(pool1.shape[0], -1).T
    fc1 = linear_forward(fc1_input, weights['fc1'])
    relu2 = relu_forward(fc1)
    fc2 = linear_forward(relu2, weights['fc2'])
    return fc2, (x_padded, conv1, relu1, pool1, fc1_input, fc1, relu2)

def backward(dout, weights, cache):
    x_padded, conv1, relu1, pool1, fc1_input, fc1, relu2 = cache
    
    dx, fc2_grad = linear_backward(dout, relu2, weights['fc2'])
    dx = relu_backward(dx, fc1)
    dx, fc1_grad = linear_backward(dx, fc1_input, weights['fc1'])
    dx = dx.T.reshape(8, 32, 14, 14)
    dx = maxpool_backward(dx, relu1, kernel_size=2, stride=2)
    dx = relu_backward(dx, conv1)
    dx, conv_grad = conv2d_backward(dx, x_padded, weights['conv1'], stride=1, padding=1)

    return conv_grad, fc1_grad, fc2_grad

def update_weights(weights, conv_grad, fc1_grad, fc2_grad, lr):
    weights['conv1'] -= lr * conv_grad
    weights['fc1'] -= lr * fc1_grad
    weights['fc2'] -= lr * fc2_grad
    return weights

In [43]:
# Training parameters
batch_size = 8
epochs = 5
lr = 1e-3  # Further reduced learning rate

# Initialize the neural network
weights = init_neural_net()

# Training loop
for epoch in range(epochs):
    print(f"Epoch {epoch+1}/{epochs}")
    for i in range(0, len(X_train), batch_size):
        batch_X = X_train[i:i+batch_size]
        batch_y = y_train[i:i+batch_size]

        # Forward pass
        y_pred, cache = forward(batch_X, weights)

        # Compute loss and gradients
        loss = cross_entropy_loss(y_pred, batch_y)

        # Compute softmax probabilities
        softmax_probs = softmax(y_pred)

        # Create one-hot encoded true labels
        y_true = np.zeros_like(y_pred)
        for k in range(len(batch_y)):
            true_class = batch_y[k]
            y_true[true_class, k] = 1

        # print('softmax probs', softmax_probs, 'y_true', y_true)
        # Compute gradient
        dout = softmax_probs - y_true

        # Normalize gradient by batch size
        dout /= len(batch_y)
 
        # Backward pass
        conv_grad, fc1_grad, fc2_grad = backward(dout, weights, cache)

        # Update weights
        weights = update_weights(weights, conv_grad, fc1_grad, fc2_grad, lr)

        if i % 64 == 0:
            print(f"Iter: {i//batch_size} Loss: {loss}")

    # Evaluate on test set
    y_pred, _ = forward(X_test, weights)
    test_loss = cross_entropy_loss(y_pred, y_test)
    accuracy = np.mean(np.argmax(y_pred, axis=0) == y_test)
    print(f"Epoch {epoch+1} - Test Loss: {test_loss:.4f}, Test Accuracy: {accuracy:.4f}")

print("Training completed!")

Epoch 1/5
Iter: 0 Loss: 2.2550391329952015
Iter: 8 Loss: 2.2528827395562807
Iter: 16 Loss: 2.1885430698947284
Iter: 24 Loss: 2.180439342893292
Iter: 32 Loss: 2.161353204361432
Iter: 40 Loss: 1.8303605009516488
Iter: 48 Loss: 2.089944378459229
Iter: 56 Loss: 1.7376544584815936
Iter: 64 Loss: 2.167860412211517
Iter: 72 Loss: 1.8911323782390927
Iter: 80 Loss: 1.9841146376951972
Iter: 88 Loss: 1.8443487545389812
Iter: 96 Loss: 1.8257833455251125
Iter: 104 Loss: 1.667480792769398
Iter: 112 Loss: 1.847660729116419
Iter: 120 Loss: 1.6279789866556955
Iter: 128 Loss: 1.7849004907062522
Iter: 136 Loss: 1.3919290212004989
Iter: 144 Loss: 1.1411015059230274
Iter: 152 Loss: 1.9223132772909188
Iter: 160 Loss: 1.716764176848298
Iter: 168 Loss: 1.6172682701563927
Iter: 176 Loss: 1.5977699217614196
Iter: 184 Loss: 1.6243539041111101
Iter: 192 Loss: 1.4271120187400197


KeyboardInterrupt: 