In [44]:
import numpy as np
import random
import math

FILE = "fashion-mnist_train.csv"
with open(f"../../data/{FILE}") as f:
    examples = f.read().strip().split("\n")[1:]

In [6]:
random.seed(0)
train, valid, test = [], [], []
valid_cutoff = .8
test_cutoff = .9

for example in examples:
    label, pixels = example.split(",", 1)
    label = int(label)
    if label > 4:
        continue
    pixels = [int(p) for p in pixels.split(",")]
    data = np.asarray(pixels, dtype="int32")
    data = data.reshape((1,28,28))
    # Scale values from -1 to 1
    data = ((data / 255) - .5) / .5
    # Move channel axis to first axis
    #data = np.moveaxis(data, -1, 0)
    target = np.zeros((1, 5))
    target[0,label] = 1
    row = (data, target, )

    split = random.random()
    if split > test_cutoff:
        test.append(row)
    elif split > valid_cutoff:
        valid.append(row)
    else:
        train.append(row)

In [7]:
def log_loss(predicted, actual):
    tol = 1e-6
    cross_entropy = actual * np.log(predicted + tol)
    return -np.sum(cross_entropy)

def log_loss_grad(predicted, actual):
    return predicted - actual

def softmax(preds):
    tol = 1e-6
    preds = np.exp(preds - np.max(preds))
    return preds / (np.sum(preds) - tol)

In [8]:
def init_layers(layer_defs):
    layers = []
    for i in range(1, len(layer_defs)):
        if "input_units" in layer_defs[i]:
            last_units = layer_defs[i]["input_units"]
        else:
            last_units = layer_defs[i-1]["units"]

        biases = np.ones((1,layer_defs[i]["units"]))
        if layer_defs[i]["type"] == "cnn":
            np.random.seed(0)
            weights = np.random.rand(layer_defs[i-1]["units"], layer_defs[i]["units"], layer_defs[i]["kernel_size"], layer_defs[i]["kernel_size"])
            weights = weights / 10
        else:
            np.random.seed(0)
            weights = np.random.rand(last_units, layer_defs[i]["units"])
            weights = weights / 5 - .1

        layers.append([
            weights,
            biases,
            layer_defs[i]["type"]
        ])
    return layers

In [9]:
from skimage.util import view_as_windows

def unroll_image_manual(image, kernel_x, kernel_y):
    x_size = (image.shape[0] - (kernel_x - 1))
    y_size = (image.shape[1] - (kernel_y - 1))
    rows =  x_size * y_size
    unrolled = np.zeros((rows, kernel_x * kernel_y))
    for x in range(0, x_size):
        for y in range(0, y_size):
            unrolled[y + (x * y_size),:] = image[x:(x+kernel_x),y:(y+kernel_y)].reshape((1,kernel_x * kernel_y))
    return unrolled

def unroll_image(image, kernel_x, kernel_y):
    unrolled = view_as_windows(image, (kernel_x, kernel_y))
    x_size = (image.shape[0] - (kernel_x - 1))
    y_size = (image.shape[1] - (kernel_y - 1))
    rows =  x_size * y_size
    return unrolled.reshape(rows, kernel_x * kernel_y)

def convolve(image, kernel):
    return np.matmul(image, kernel.reshape(math.prod(kernel.shape), 1))

In [25]:
def forward(batch, layers):
    hidden = [batch.copy()]
    for i in range(len(layers)):
        if layers[i][2] == "cnn":
            channels, next_channels, kernel_x, kernel_y = layers[i][0].shape

            new_x = batch.shape[1] - (kernel_x - 1)
            new_y = batch.shape[2] - (kernel_y - 1)
            next_batch = np.zeros((next_channels, new_x , new_y))
            for channel in range(channels):
                unrolled = unroll_image(batch[channel,:], kernel_x, kernel_y)
                for next_channel in range(next_channels):
                    kernel = layers[i][0][channel, next_channel, :]
                    mult = convolve(unrolled, kernel).reshape(new_x, new_y)
                    next_batch[next_channel,:] += mult
            next_batch /= batch.shape[0]

            hidden.append(next_batch.copy())
            next_batch = np.maximum(next_batch, 0)
            batch = next_batch
        else:
            if layers[i-1][2] == "cnn" or i == 0:
                batch = batch.reshape(1, math.prod(batch.shape))
            batch = np.matmul(batch, layers[i][0]) + layers[i][1]
            hidden.append(batch.copy())
            if i < len(layers) - 1:
                batch = np.maximum(batch, 0)

    return batch, hidden

In [40]:
def backward(layers, hidden, grad, lr, verbose=False):
    for i in range(len(layers)-1, -1, -1):
        print(f"Layer {i}") if verbose else None

        if layers[i][2] == "cnn":
            grad = grad.reshape(hidden[i+1].shape)
            if i != len(layers) - 1:
                grad = np.multiply(grad, np.heaviside(hidden[i+1], 0))
            _, kernel_x, kernel_y = grad.shape
            print(f"Grad shape {grad.shape}") if verbose else None

            new_grad = np.zeros(hidden[i].shape)
            for channel in range(hidden[i].shape[0]):
                # With multi-channel output, you need to loop across the output grads to link to input channel kernels
                # Each kernel gets a unique update
                flat_input = unroll_image(hidden[i][channel,:], kernel_x, kernel_y)
                for next_channel in range(hidden[i+1].shape[0]):
                    # Kernel update
                    channel_grad = grad[next_channel,:]
                    k_grad = convolve(flat_input, channel_grad).reshape(layers[i][0].shape[2], layers[i][0].shape[3])
                    grad_norm = math.prod(channel_grad.shape)
                    layers[i][0][channel,next_channel,:] -= (k_grad * lr) / grad_norm
                    print(f"k_grad: {k_grad.shape}") if verbose else None
            for next_channel in range(hidden[i+1].shape[0]):
                channel_grad = grad[next_channel,:]
                kernel_x = (layers[i][0][0,next_channel,:].shape[0])
                kernel_y = (layers[i][0][0,next_channel,:].shape[1])
                padded_grad = np.pad(channel_grad, ((kernel_x - 1, kernel_x - 1),  (kernel_y - 1, kernel_y - 1)))
                flat_padded = unroll_image(padded_grad, kernel_x, kernel_y)
                for channel in range(hidden[i].shape[0]):
                    # Grad to lower layer
                    flipped_kernel = np.flip(layers[i][0][channel,next_channel,:], axis=[0,1])
                    updated_grad = convolve(flat_padded, flipped_kernel).reshape(hidden[i].shape[1], hidden[i].shape[2])
                    # Since we're multiplying each input by multiple kernel values, reduce the gradient accordingly
                    # This will reduce the edges more than necessary (they contribute to fewer output values), but is simple to implement
                    new_grad[channel, :] += updated_grad / math.prod(flipped_kernel.shape)
            grad = new_grad
        else:
            if i != len(layers) - 1:
                grad = np.multiply(grad, np.heaviside(hidden[i+1], 0))
            grad = grad.T
            print(f"starting grad: {grad.shape}") if verbose else None
            w_grad = np.matmul(grad, hidden[i].reshape(1, math.prod(hidden[i].shape))).T
            print(f"w_grad: {w_grad.shape}") if verbose else None
            b_grad = grad.T

            layers[i][0] -= w_grad * lr
            layers[i][1] -= b_grad * lr

            grad = np.matmul(layers[i][0], grad).T
            print(f"ending grad: {grad.shape}") if verbose else None
    return layers

In [42]:
layer_defs = [
    {"type": "input", "units": 1},
    {"type": "cnn", "input_units": 1, "units": 1, "kernel_size": 3},
    {"type": "dense", "input_units": 26 ** 2, "units": 5}
]
lr = 5e-3
epochs = 5

layers = init_layers(layer_defs)
for epoch in range(epochs):
    epoch_loss = 0
    for i, img in enumerate(train):
        image, target = img
        batch, hidden = forward(image, layers)

        grad = log_loss_grad(softmax(batch), target)
        epoch_loss += log_loss(softmax(batch), target)
        layers = backward(layers, hidden, grad, lr)

    print(f"Epoch {epoch} loss: {epoch_loss / len(train)}")
    match = np.zeros(len(valid))
    for i, img in enumerate(valid):
        image, target = img
        valid_pred, _ = forward(image, layers)
        match[i] = np.argmax(valid_pred) == np.argmax(target)

    print(f"Epoch {epoch} valid accuracy: {match.sum() / match.shape[0]}")

Epoch 0 loss: 0.7330912743388215
Epoch 0 valid accuracy: 0.7805611222444889
Epoch 1 loss: 0.596589137086033
Epoch 1 valid accuracy: 0.8072812291249165
Epoch 2 loss: 0.5335671947706974
Epoch 2 valid accuracy: 0.8183032732130928
Epoch 3 loss: 0.5067247472491165
Epoch 3 valid accuracy: 0.8299933199732799
Epoch 4 loss: 0.4908787078802125
Epoch 4 valid accuracy: 0.8350033400133601


In [54]:
test_preds = []
match = np.zeros(len(test))
for i, img in enumerate(test):
    image, target = img
    test_pred, _ = forward(image, layers)
    test_preds.append(softmax(test_pred))
    match[i] = np.argmax(test_pred) == np.argmax(target)

match.sum() / match.shape[0]

0.8531147540983607