In [1]:
import torch
import torch.nn as nn
import numpy as np
import struct as st

In [2]:
# Helper function to parse IDX files
def parse_idx(file_path):
    with open(file_path, 'rb') as file:
        magic = st.unpack('>I', file.read(4))[0]  # Magic number (4 bytes)
        num_items = st.unpack('>I', file.read(4))[0]  # Number of items (4 bytes)

        if magic == 2051:  # Magic number for images
            num_rows = st.unpack('>I', file.read(4))[0]
            num_cols = st.unpack('>I', file.read(4))[0]
            num_bytes = num_items * num_rows * num_cols
            data = np.frombuffer(file.read(num_bytes), dtype=np.uint8)
            return data.reshape(num_items, num_rows, num_cols)
        elif magic == 2049:  # Magic number for labels
            data = np.frombuffer(file.read(num_items), dtype=np.uint8)
            return data
        else:
            raise ValueError(f"Unknown magic number: {magic}")

# Parse the training data
x_train = parse_idx('../data/DigitData/train-images.idx3-ubyte')
y_train = torch.tensor(parse_idx('../data/DigitData/train-labels.idx1-ubyte'))

x_test = parse_idx('../data/DigitData/t10k-images.idx3-ubyte')
y_test = torch.tensor(parse_idx('../data/DigitData/t10k-labels.idx1-ubyte'))

# Reshape and scale down
p = x_train.shape[1]
x_train = x_train.reshape(x_train.shape[0], -1) / 255.0
x_test = x_test.reshape(x_test.shape[0], -1) / 255.0

In [3]:
# Normalize
train_mean = np.mean(x_train, axis=0)
train_std = np.std(x_train, axis=0) + 1e-12

x_train = (x_train - train_mean) / train_std
x_test = (x_test - train_mean) / train_std

x_train = torch.tensor(x_train.reshape(x_train.shape[0], 1, p, p), dtype=torch.float32, requires_grad=True) # Need (n, c, p, p) shape
x_test = torch.tensor(x_test.reshape(x_test.shape[0], 1, p, p), dtype=torch.float32, requires_grad=True)

In [15]:
# Build Convolutional block
class ConvBlock(nn.Module):

    def __init__(self, in_channels, out_channels, kernel_size, pool_size, kernel_stride = 1, pool_stride = 2):
        super().__init__()
        self.conv = nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=kernel_size, padding=1, stride=kernel_stride, bias=False, dtype=torch.float32)
        self.batch_norm = nn.BatchNorm2d(out_channels, momentum=0.9, dtype=torch.float32)
        self.act = nn.ReLU()
        self.pool = nn.MaxPool2d(kernel_size=pool_size, stride=pool_stride)
    
    def forward(self, x):
        x = self.conv(x)
        x = self.batch_norm(x)
        x = self.act(x)
        x = self.pool(x)
        return x

class MLP(nn.Module):

    def __init__(self, size: tuple):
        super().__init__()
        layers = []
        for i in range(len(size) - 1):
            layers.append(nn.Linear(size[i], size[i+1], bias=True, dtype=torch.float32))
            if i != len(size) - 2:
                layers.append(nn.ReLU())
        self.seq = nn.Sequential(*layers)
    def forward(self, x):
        return self.seq(x)

class Model(nn.Module):

    def __init__(self, blocks):
        super().__init__()
        self.blocks = nn.ModuleList(blocks)
    
    def forward(self, x):
        for block in self.blocks:
            x = block(x)
        return x
    


In [16]:
# Instantiate model
model = Model([
    ConvBlock(in_channels=1, out_channels=8, kernel_size=(3, 3), pool_size=2, kernel_stride=1),
    ConvBlock(in_channels=8, out_channels=16, kernel_size=(3, 3), pool_size=2, kernel_stride=1),
    nn.Flatten(),
    MLP((784, 128, 10))  # adjust shape if needed
])

# Define loss
criterion = nn.CrossEntropyLoss()

device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
model.to(device)
x_train = x_train.to(device)
y_train = y_train.to(device)
x_test = x_test.to(device)
y_test = y_test.to(device)

In [29]:
# Hyper params
BATCH_SIZE = 30
STEPS = 1000
LR = 0.01

In [36]:
SEED = 42

for _ in range(STEPS):

    # Minibatch Construct
    ix = torch.randint(0, x_train.shape[0], (BATCH_SIZE,))

    # Forward Pass
    logits = model(x_train[ix])
    loss = nn.functional.cross_entropy(logits, y_train[ix])

    # Backward Pass
    model.zero_grad()
    loss.backward()

    lr = LR if _ < 1000 else LR * 0.1

    # Update
    torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
    for p in model.parameters():
        p.data += -lr * p.grad
    
    if _ % 100 == 0:
        print(loss.item())

0.0037946244701743126
0.06675023585557938
0.11217992007732391
0.00029738497687503695
0.04412606731057167
0.0023515166249126196
0.0024233313743025064
0.00014870883023831993
0.008332587778568268
0.0024808584712445736


In [37]:
model.eval()
with torch.no_grad():
    logits = model(x_test)
    loss = nn.functional.cross_entropy(logits, y_test)
    accuracy = (logits.argmax(dim=1) == y_test).float().mean()

print(f"Loss: {loss.item():.4f}")
print(f"Accuracy: {accuracy.item() * 100:.4f}")

Loss: 328893.7188
Accuracy: 98.7700
