In [None]:
import cupy as np
import struct
import gzip
import os

def load_mnist_images(filename):
    with gzip.open(filename, 'rb') as f:
        magic, num_images, num_rows, num_cols = struct.unpack('>IIII', f.read(16))
        data = np.frombuffer(f.read(), dtype=np.uint8)
        
        images = data.reshape(num_images, 1, num_rows, num_cols).astype(np.float32)
        images /= 255.0
        
        return images

def load_mnist_labels(filename):
    with gzip.open(filename, 'rb') as f:
        magic, num_items = struct.unpack('>II', f.read(8))
        labels = np.frombuffer(f.read(), dtype=np.uint8)
        
        return labels

# X_train = load_mnist_images('train-images-idx3-ubyte.gz')
# Y_train = load_mnist_labels('train-labels-idx1-ubyte.gz')

# print(f"Training Image Shape: {X_train.shape}")
# print(f"Training Label Shape: {Y_train.shape}")

In [None]:
def relu(x):
    """Our simple non-linear activation function."""
    return np.maximum(0, x)

def softmax(x):
    """Converts raw scores (logits) into probabilities that sum to 1."""
    # To prevent overflow during exponentiation, subtract the max value
    e_x = np.exp(x - np.max(x))
    return e_x / e_x.sum(axis=0)

def cross_entropy_loss(probabilities, true_labels):
    """
    Calculates the Cross-Entropy Loss (error) for the current batch.
    
    Args:
        probabilities: (Batch_Size, 10) array of predicted probabilities.
        true_labels: (Batch_Size,) array of integer labels (0-9).
    """
    m = true_labels.shape[0] # Batch size
    
    # The one-hot encoding is not strictly needed for the loss calculation 
    # but is useful for understanding:
    # one_hot_labels = np.zeros((m, 10))
    # one_hot_labels[np.arange(m), true_labels] = 1

    # Get the predicted probability for the *correct* class for each image in the batch.
    # This uses the 1D 'true_labels' array to index the probabilities array.
    # The typo 'true_hot_labels' has been fixed to 'true_labels'.
    log_probs = -np.log(probabilities[np.arange(m), true_labels] + 1e-9) 
    
    # Return the average loss
    return np.sum(log_probs) / m

def manual_conv2d(image, filters, bias, stride=1, padding=0):
    """
    Manually performs the 2D convolution operation (Forward Pass).
    Note: Simplified for single-image batch, no padding implemented here.
    """
    C_out, C_in, F_h, F_w = filters.shape
    C_in, H_in, W_in = image.shape

    # Calculate Output Dimensions: H_out = (I - F + 2P)/S + 1
    H_out = (H_in - F_h + 2 * padding) // stride + 1
    W_out = (W_in - F_w + 2 * padding) // stride + 1
    
    output = np.zeros((C_out, H_out, W_out), dtype=np.float32)

    for k in range(C_out):   # Loop over 5 Filters
        for i in range(H_out): # Loop over Output Height (e.g., 26 times)
            for j in range(W_out): # Loop over Output Width (e.g., 26 times)
                
                # Define the slice of the input image that the filter sees
                h_start, h_end = i * stride, i * stride + F_h
                w_start, w_end = j * stride, j * stride + F_w
                input_patch = image[:, h_start:h_end, w_start:w_end]

                # Core Operation: Sum(Multiplication) + Bias
                output[k, i, j] = np.sum(input_patch * filters[k]) + bias[k]

    return output

def manual_max_pool(image, pool_size=2, stride=2):
    """Manually performs the Max Pooling operation."""
    C, H_in, W_in = image.shape
    
    # Calculate Output Dimensions: H_out = I / S (for P=0, F=S)
    H_out = H_in // stride
    W_out = W_in // stride
    
    output = np.zeros((C, H_out, W_out), dtype=np.float32)

    for c in range(C):
        for i in range(H_out):
            for j in range(W_out):
                
                h_start, h_end = i * stride, i * stride + pool_size
                w_start, w_end = j * stride, j * stride + pool_size
                
                input_patch = image[c, h_start:h_end, w_start:w_end]
                
                # Core Operation: Find the Maximum value
                output[c, i, j] = np.max(input_patch)
                
    return output

In [None]:
def init_params():
    """Initializes all weights and biases (W1, W2, W3, W4) with small random values."""
    # We use a simple small uniform distribution for initialization
    def initialize(shape):
        # Using Xavier/Glorot initiation for a slightly better start
        limit = np.sqrt(6 / (shape[0] + shape[1]))
        return np.random.uniform(-limit, limit, size=shape).astype(np.float32)

    params = {}
    # Layer 1: Conv (5 filters, 1 channel in, 3x3 kernel)
    params['W1'] = initialize((5, 1, 3, 3))
    params['b1'] = np.zeros(5, dtype=np.float32)
    
    # Layer 2: Conv (10 filters, 5 channels in, 3x3 kernel)
    params['W2'] = initialize((10, 5, 3, 3))
    params['b2'] = np.zeros(10, dtype=np.float32)

    # Layer 3: FC (Input 250 features, Output 100 neurons)
    params['W3'] = initialize((100, 250)) 
    params['b3'] = np.zeros(100, dtype=np.float32)

    # Layer 4: FC (Output 10 classes)
    params['W4'] = initialize((10, 100))
    params['b4'] = np.zeros(10, dtype=np.float32)

    return params

def forward_pass(image, params):
    """Defines the full architecture: Conv1 -> Pool1 -> Conv2 -> Pool2 -> FC."""
    
    # Layer 1: Conv -> ReLU -> Pool
    conv1_output = manual_conv2d(image, params['W1'], params['b1'])
    relu1_output = relu(conv1_output)
    pool1_output = manual_max_pool(relu1_output) # Output: 5 x 13 x 13

    # Layer 2: Conv -> ReLU -> Pool
    conv2_output = manual_conv2d(pool1_output, params['W2'], params['b2'])
    relu2_output = relu(conv2_output)
    pool2_output = manual_max_pool(relu2_output) # Output: 10 x 5 x 5

    # Layer 3: Flatten (10 x 5 x 5 = 250 features)
    flattened = pool2_output.flatten() # Vector of 250 features

    # Layer 4: FC 1 -> ReLU
    fc1_output = np.dot(params['W3'], flattened) + params['b3']
    relu3_output = relu(fc1_output)

    # Layer 5: FC 2 (Output 10 neurons)
    fc2_output = np.dot(params['W4'], relu3_output) + params['b4']
    
    # Final Activation: Softmax to get probabilities (p)
    probabilities = softmax(fc2_output)
    
    # Storing outputs for the Backward Pass (Chain Rule)
    # The dictionary no longer uses the invalid '...' placeholder.
    return probabilities, {
        'relu1_output': relu1_output, 
        'relu2_output': relu2_output,
        'relu3_output': relu3_output,
        'flattened': flattened, 
        'pool1_output': pool1_output,
        'pool2_output': pool2_output
    }

In [None]:
X_train = load_mnist_images('MNIST/raw/train-images-idx3-ubyte.gz')
Y_train = load_mnist_labels('MNIST/raw/train-labels-idx1-ubyte.gz')

params = init_params()
LEARNING_RATE = 0.0001
BATCH_SIZE = 64
EPOCHS = 5

print("Starting manual training loop...")

for epoch in range(EPOCHS):
    # For each epoch, iterate through the training data in batches
    for i in range(0, X_train.shape[0], BATCH_SIZE):
        X_batch = X_train[i:i + BATCH_SIZE]
        Y_batch = Y_train[i:i + BATCH_SIZE]
        
        # 1. FORWARD PASS
        # Note: This forward_pass function needs to be adapted for batches 
        # (it currently only handles a single image).
        # We'll use the first image for demonstration:
        probabilities, cache = forward_pass(X_train[0], params)
        
        # 2. LOSS CALCULATION
        loss = cross_entropy_loss(probabilities.reshape(1, -1), np.array([Y_train[0]]))
        
        # 3. BACKWARD PASS (The complex manual step we didn't code)
        # gradients = manual_backward_pass(loss, cache, params)
        
        # 4. GRADIENT DESCENT (Optimization)
        # for key in params:
        #     params[key] -= LEARNING_RATE * gradients[key]

    print(f"Epoch {epoch+1}: Loss = {loss:.4f}")