In [None]:
import simplegrad as sg
from pprint import pprint

In [None]:
VAL_SPLIT = 0.05

EPOCHS = 20
BATCH_SIZE = 32

LEARNING_RATE = 0.01
MOMENTUM = 0.8
DAMPENING = 0.1

In [None]:
# !curl -L -o ~/Downloads/mnist-dataset.zip https://www.kaggle.com/api/v1/datasets/download/hojjatk/mnist-dataset
# !mkdir -p ~/Developer/simplegrad/datasets/mnist/
# !unzip ~/Downloads/mnist-dataset.zip -d ~/Developer/simplegrad/datasets/mnist/

In [None]:
import numpy as np

In [None]:
np.random.seed(42)

In [None]:
def parse_mnist(folder_path):
    parse = lambda file_path, offset, dtype: np.frombuffer(
        open(file_path, "rb").read(), dtype=dtype, offset=offset
    )
    x = (
        parse(f"{folder_path}/train-images.idx3-ubyte", 16, np.uint8)
        .reshape(-1, 1, 28, 28)
        .astype("float32")
        / 255.0
    )
    labels = parse(f"{folder_path}/train-labels.idx1-ubyte", 8, np.uint8)
    x_test = (
        parse(f"{folder_path}/t10k-images.idx3-ubyte", 16, np.uint8)
        .reshape(-1, 1, 28, 28)
        .astype("float32")
        / 255.0
    )
    labels_test = parse(f"{folder_path}/t10k-labels.idx1-ubyte", 8, np.uint8)
    return (x, labels), (x_test, labels_test)


def to_one_hot(labels, num_classes=10):
    one_hot = np.zeros((len(labels), num_classes), dtype="float32")
    one_hot[np.arange(len(labels)), labels] = 1.0
    return one_hot


# After parsing MNIST data
(x, labels), (x_test, labels_test) = parse_mnist("./datasets/mnist")
x_train = x[: int(len(x) * (1 - VAL_SPLIT))]
labels_train = labels[: int(len(labels) * (1 - VAL_SPLIT))]
x_val = x[int(len(x) * (1 - VAL_SPLIT)) :]
labels_val = labels[int(len(labels) * (1 - VAL_SPLIT)) :]

# Convert labels to one-hot
y_train = to_one_hot(labels_train)
y_val = to_one_hot(labels_val)
y_test = labels_test

print("x train:", x_train.shape, x_train.dtype)
print("y train:", y_train.shape, y_train.dtype)
print("x val:", x_val.shape, x_val.dtype)
print("y val:", y_val.shape, y_val.dtype)
print("x test:", x_test.shape, x_test.dtype)
print("y test:", y_test.shape, y_test.dtype)

In [None]:
import matplotlib.pyplot as plt

idx = 55234
print("One-hot:", y_train[idx])
print("Label:", np.argmax(y_train[idx]))
plt.imshow(x_train[idx][0], cmap="gray");

In [None]:
inter_ch, out_ch = 8, 16
kernel_size = 5

model = sg.nn.Sequential(
    sg.nn.Conv2d(in_channels=1, out_channels=inter_ch, kernel_size=kernel_size, stride=1),
    sg.nn.ReLU(),
    sg.nn.MaxPool2d(kernel_size=2, stride=2),
    sg.nn.Conv2d(in_channels=inter_ch, out_channels=out_ch, kernel_size=kernel_size, stride=1),
    sg.nn.ReLU(),
    sg.nn.MaxPool2d(kernel_size=2, stride=2),
    sg.nn.Flatten(start_dim=1, end_dim=-1),
    sg.nn.Linear(out_ch * 4 * 4, 10),
)

model.summary()

In [None]:
optim = sg.optim.SGD(model, lr=LEARNING_RATE, momentum=MOMENTUM, dampening=DAMPENING)
loss_fn = sg.nn.CELoss(dim=1, reduction='mean')  # dim=1 for class dimension

In [None]:
tracker = sg.vis.TrainingTracker(title="MNIST Classification")
tracker.register_metric('train_loss', sg.vis.LineVisualizer(color='blue',marker=None))
tracker.register_metric('val_loss', sg.vis.LineVisualizer(color='red', linestyle='--', marker='s'))

In [None]:
for epoch in range(EPOCHS):
    for i in range(0, len(x_train), BATCH_SIZE):
        x_batch = sg.Tensor(x_train[i : i + BATCH_SIZE, :, : , :], dtype='float32')
        y_batch = sg.Tensor(y_train[i : i + BATCH_SIZE], dtype='float32')
        logits = model(x_batch)
        loss = loss_fn(logits, y_batch)
        tracker.log(train_loss=loss.values.item())
        optim.zero_grad()
        loss.backward()
        optim.step()

    # Validation
    x_val_tensor = sg.Tensor(x_val, dtype='float32', comp_grad=False)
    y_val_tensor = sg.Tensor(labels_val, dtype='float32', comp_grad=False)
    val_logits = model(x_val_tensor)
    val_loss = loss_fn(val_logits, y_val_tensor)
    tracker.log(val_loss=val_loss.values.item())

In [None]:
tracker.plot(num_cols=1, cell_w=20, cell_h=6);

In [None]:
x = sg.Tensor(x_test, dtype='float32', comp_grad=False)
predictions = sg.flatten(sg.argmax(sg.softmax(model(x), dim=1), dim=1, dtype='int8'))

correct = 0
for i in range(len(y_test)):
    if predictions[0, i][0] == y_test[i]:  # retruns a tuple of vlue and a grad
        correct += 1

accuracy = correct / len(y_test)
print(f"Predictions: {correct}/{len(y_test)}")
print(f"Accuracy: {accuracy:.3%}")