In [None]:
# load the dataset

from tensorflow.keras.datasets import mnist
from progressbar import progressbar
import numpy as np
import multiprocessing as mp
from itertools import product

from layers.Conv2D import Conv2D
from layers.Pooling import Pooling
from layers.Dense import Dense
from layers.Flatten import Flatten

In [None]:
# import the data
(train_X, train_y), (test_X, test_y) = mnist.load_data()

# scale the data
train_X, test_X = train_X / 255.0, test_X / 255.0


# reduce the size of the dataset
train_X, test_X = train_X[:100], test_X[:10]
train_y, test_y = train_y[:100], test_y[:10]

# need the fourth dimension to represent the number of channels
train_X = train_X.reshape(-1, 28, 28, 1)
test_X = test_X.reshape(-1, 28, 28, 1)

print('Train: X=%s, y=%s' % (train_X.shape, train_y.shape))
print('Test: X=%s, y=%s' % (test_X.shape, test_y.shape))

In [None]:
# define the model
model = []

model.append(Conv2D(32, 2, 1, 1))
model.append(Pooling(2, 2, 'max'))
model.append(Flatten())

# determine the number of input features by running one forward pass on one image
dims = train_X[0]
dims = dims.reshape(1, *dims.shape)
for layer in model:
    dims = layer.forward(dims)

model.append(Dense(np.prod(dims.shape[1:]), 128))
model.append(Dense(128, 10))

for layer in model[3:]:
    dims = layer.forward(dims)

In [None]:
def predict(X, model):
    # forward pass on a single image
    for layer in model:
        X = layer.forward(X)
    X = np.argmax(X, axis=1)
    return X


def train(X, y, model, lr=0.01):
    # forward pass
    y_pred = predict(X, model)
    # 
    # calculate loss
    loss = np.square(y_pred - y).sum()
    print('loss: %f' % loss)

    # backward pass
    grad_y_pred = 2.0 * (y_pred - y)
    for layer in reversed(model):
        grad_y_pred = layer.backward(grad_y_pred, lr)

In [None]:
# train the model
train(train_X, train_y, model)