In [1]:
import numpy as np
from mnist import MNIST
from tqdm.notebook import tqdm
from tabulate import tabulate

In [2]:
eps = 1e-3

In [3]:
class Conv:
    __padding = ((1, 1), (1, 1), (0, 0))

    def __init__(self, num_filters=8, filter_size=3, learn_rate=0.005):
        self.num_filters = num_filters
        self.filter_size = filter_size
        self.filters = np.random.randn(num_filters, filter_size, filter_size) / filter_size ** 2
        self.last_image = None
        self.learn_rate = learn_rate
        self.G = np.zeros((num_filters, filter_size, filter_size), dtype=float)
        self.e = np.full((num_filters, filter_size, filter_size), eps)

    def __get_sub_image(self, image):
        h, w, _ = image.shape

        for i in range(h - self.filter_size + 1):
            for j in range(w - self.filter_size + 1):
                sub_image = image[i:(i + self.filter_size), j:(j + self.filter_size)]
                yield np.sum(sub_image, axis=2), i, j

    def forward(self, image):
        new_image = np.pad(image, self.__padding)

        h, w, _ = new_image.shape
        self.last_image = image
        result = np.zeros((h - self.filter_size + 1, w - self.filter_size + 1, self.num_filters))
        for sub_image, i, j in self.__get_sub_image(new_image):
            result[i][j] = np.sum(sub_image * self.filters, axis=(1, 2))
        return result

    def back_prop(self, d_loss_d_output):
        d_loss_d_filters = np.zeros(self.filters.shape)

        for sub_image, i, j in self.__get_sub_image(np.pad(self.last_image, self.__padding)):
            for f in range(self.num_filters):
                d_loss_d_filters[f] += d_loss_d_output[i][j][f] * sub_image

        d_loss_d_input = np.zeros(self.last_image.shape)
        for sub_image, i, j in self.__get_sub_image(np.pad(d_loss_d_output, self.__padding)):
            d_loss_d_input[i][j] = np.sum(sub_image * self.filters, axis=(0, 1, 2))

        self.G += d_loss_d_filters * d_loss_d_filters

        self.filters -= self.learn_rate / np.sqrt(self.G + self.e) * d_loss_d_filters

        return d_loss_d_input
    

In [4]:
class MaxPool:
    def __init__(self, pool_size=2):
        self.pool_size = pool_size
        self.last_input = None

    def __get_sub_image(self, input):
        h, w, _ = input.shape
        new_h = h // self.pool_size
        new_w = w // self.pool_size

        for i in range(new_h):
            for j in range(new_w):
                sub_image = input[(i * self.pool_size):((i + 1) * self.pool_size),
                            (j * self.pool_size):((j + 1) * self.pool_size)]
                yield sub_image, i, j

    def forward(self, input):
        self.last_input = input

        h, w, d = input.shape
        output = np.zeros((h // self.pool_size, w // self.pool_size, d))

        for sub_image, i, j in self.__get_sub_image(input):
            output[i][j] = np.amax(sub_image, axis=(0, 1))

        return output

    def back_prop(self, d_loss_d_output):
        d_loss_d_input = np.zeros(self.last_input.shape)

        for sub_image, i, j in self.__get_sub_image(self.last_input):
            h, w, f = sub_image.shape
            mx = np.amax(sub_image, axis=(0, 1))

            for i1 in range(h):
                for j1 in range(w):
                    for f1 in range(f):
                        if sub_image[i1][j1][f1] == mx[f1]:
                            d_loss_d_input[i * self.pool_size + i1][j * self.pool_size + j1, f1] = \
                                d_loss_d_output[i][j][f1]

        return d_loss_d_input
    

In [5]:
class SoftArgMax:
    def __init__(self, input_len, nodes, learn_rate=0.005):
        self.nodes = nodes
        self.weights = np.random.randn(input_len, nodes) / input_len
        self.bs = np.zeros(nodes)
        self.last_input_shape = None
        self.last_input = None
        self.last_totals = None
        self.learn_rate = learn_rate

        self.G_weights = np.zeros((input_len, nodes), dtype=float)
        self.e_weights = np.full((input_len, nodes), eps)
        self.G_b = np.zeros(nodes, dtype=float)
        self.e_b = np.full(nodes, eps)

    def forward(self, input):
        self.last_input_shape = input.shape

        input = input.flatten()
        self.last_input = input

        totals = np.dot(input, self.weights) + self.bs
        self.last_totals = totals
        exps = np.exp(totals)
        return exps / np.sum(exps, axis=0)

    def back_prop(self, d_loss_d_output):
        for i, gradient in enumerate(d_loss_d_output):
            if np.isclose(gradient, 0):
                continue

            t_exp = np.exp(self.last_totals)
            S = np.sum(t_exp)

            d_output_d_t = -t_exp[i] * t_exp / (S ** 2)
            d_output_d_t[i] = t_exp[i] * (S - t_exp[i]) / (S ** 2)

            d_t_d_w = self.last_input
            d_t_d_b = 1
            d_t_d_input = self.weights

            d_loss_d_t = gradient * d_output_d_t

            d_loss_d_w = d_t_d_w[np.newaxis].T @ d_loss_d_t[np.newaxis]
            d_loss_d_b = d_loss_d_t * d_t_d_b
            d_loss_d_input = d_t_d_input @ d_loss_d_t

            self.G_weights += d_loss_d_w * d_loss_d_w
            self.G_b += d_loss_d_b * d_loss_d_b

            self.weights -= self.learn_rate / np.sqrt(self.G_weights + self.e_weights) * d_loss_d_w
            self.bs -= self.learn_rate / np.sqrt(self.G_b + self.e_b) * d_loss_d_b

            return d_loss_d_input.reshape(self.last_input_shape)
        

In [6]:
class CNN:
    def __init__(self, layers, soft_arg_max_input_len):
        self.layers = layers
        self.soft_arg_max = SoftArgMax(soft_arg_max_input_len, 10)

    def forward(self, image, label):
        normalized_image = np.array(image).reshape((28, 28, 1)) / 255 - 0.5

        output = self.layers[0].forward(normalized_image)
        for i in range(1, len(self.layers)):
            output = self.layers[i].forward(output)

        output = self.soft_arg_max.forward(output)

        loss = -np.log(output[label])
        output_label = np.argmax(output)

        return output, loss, output_label

    def back_prop(self, gradient):
        gradient = self.soft_arg_max.back_prop(gradient)

        for j in range(len(self.layers) - 1, -1, -1):
            gradient = self.layers[j].back_prop(gradient)

    def fit(self, train_images, train_labels):
        permutation = np.random.permutation(len(train_images))
        images = np.array(train_images)[permutation]
        labels = np.array(train_labels)[permutation]

        for i in tqdm(range(len(images))):
            image, label = images[i], labels[i]
            output, loss, label_pred = self.forward(image, label)

            gradient = np.zeros(10)
            gradient[label] = -1 / output[label]

            self.back_prop(gradient)

In [7]:
def score(cnn, test_images, test_labels):
    correct = 0.0
    cm = [[0] * 10 for _ in range(10)]
    
    for i in tqdm(range(len(test_images))):
        image, label = test_images[i], test_labels[i]
        output, loss, label_pred = cnn.forward(image, label)
        cm[label][label_pred] += 1
        if label == label_pred:
            correct += 1

    accuracy = correct / len(test_images)
    return 1 - accuracy, cm

In [8]:
mnist_data = MNIST('data/mnist')

mnist_train_images, mnist_train_labels = mnist_data.load_training()
mnist_test_images, mnist_test_labels = mnist_data.load_testing()

In [9]:
best_error_rate = 1.0
best_architecture = []
best_input_len = -1

In [10]:
print("conv + pool")
conv_pool = CNN([Conv(), MaxPool()], 14 * 14 * 8)
conv_pool.fit(mnist_train_images, mnist_train_labels)
error_rate, cm = score(conv_pool, mnist_test_images, mnist_test_labels)

print("error rate = %f\n" % error_rate)
print(tabulate(cm))

if error_rate < best_error_rate:
    best_error_rate = error_rate
    best_architecture = [Conv(), MaxPool()]
    best_input_len = 14 * 14 * 8

conv + pool


error rate = 0.089100

---  ----  ---  ---  ---  ---  ---  ---  ---  ---
958     0    1    3    0    8    6    1    3    0
  0  1108    3    2    0    3    4    1   14    0
 12     6  908   14   12    3   20   19   29    9
  5     2   21  911    0   29    3   14   17    8
  1     2   10    1  880    4   15    5    8   56
 10     4    5   40    6  769   18    8   26    6
 14     3    4    2    6   17  906    3    3    0
  3    12   32    4    7    0    0  939    2   29
 12     5    9   37    7   35   10   10  833   16
 12     6    8   11   21   10    0   34   10  897
---  ----  ---  ---  ---  ---  ---  ---  ---  ---


HBox(children=(FloatProgress(value=0.0, max=60000.0), HTML(value='')))

HBox(children=(FloatProgress(value=0.0, max=10000.0), HTML(value='')))

In [11]:
print("pool + conv")
pool_conv = CNN([MaxPool(), Conv()], 14 * 14 * 8)
pool_conv.fit(mnist_train_images, mnist_train_labels)
error_rate, cm = score(pool_conv, mnist_test_images, mnist_test_labels)

print("error rate = %f\n" % error_rate)
print(tabulate(cm))

if error_rate < best_error_rate:
    best_error_rate = error_rate
    best_architecture = [MaxPool(), Conv()]
    best_input_len = 14 * 14 * 8

pool + conv


error rate = 0.094800

---  ----  ---  ---  ---  ---  ---  ---  ---  ---
952     0    4    6    0    7    5    1    5    0
  0  1098    7    5    0    3    5    0   17    0
 12     5  905   17   15    2   17   20   30    9
  4     2   22  918    0   20    3   13   20    8
  1     4    5    1  900    3   13    4   10   41
 10     7    8   54   15  729   21   14   28    6
 13     5    9    2   17   14  892    3    3    0
  2    10   30    7   10    0    0  928    2   39
  6     7   18   31   12   20    9    9  845   17
 12     5    9   13   36    6    0   35    8  885
---  ----  ---  ---  ---  ---  ---  ---  ---  ---


HBox(children=(FloatProgress(value=0.0, max=60000.0), HTML(value='')))

HBox(children=(FloatProgress(value=0.0, max=10000.0), HTML(value='')))

In [12]:
print("conv + conv")
conv2 = CNN([Conv(), Conv()], 28 * 28 * 8)
conv2.fit(mnist_train_images, mnist_train_labels)
error_rate, cm = score(conv2, mnist_test_images, mnist_test_labels)

print("error rate = %f\n" % error_rate)
print(tabulate(cm))

if error_rate < best_error_rate:
    best_error_rate = error_rate
    best_architecture = [Conv(), Conv()]
    best_input_len = 28 * 28 * 8

conv + conv


error rate = 0.082600

---  ----  ---  ---  ---  ---  ---  ---  ---  ---
960     0    2    2    0    8    5    1    2    0
  0  1094    4    2    1    2    4    1   27    0
  9     6  929   16    7    1   13   12   34    5
  3     0   29  906    0   28    1    9   28    6
  2     1    5    0  890    0   12    4   13   55
  8     2    7   34    9  758   14    7   44    9
 15     3    9    0   17   11  896    1    6    0
  3     6   40    5    8    0    1  925    2   38
  5     3    7   20    8   17    8    9  889    8
 13     4    2    8   21    8    0   13   13  927
---  ----  ---  ---  ---  ---  ---  ---  ---  ---


HBox(children=(FloatProgress(value=0.0, max=60000.0), HTML(value='')))

HBox(children=(FloatProgress(value=0.0, max=10000.0), HTML(value='')))

In [13]:
print("conv + conv + pool")
conv2_pool = CNN([Conv(), Conv(), MaxPool()], 14 * 14 * 8)
conv2_pool.fit(mnist_train_images, mnist_train_labels)
error_rate, cm = score(conv2_pool, mnist_test_images, mnist_test_labels)

print("error rate = %f\n" % error_rate)
print(tabulate(cm))

if error_rate < best_error_rate:
    best_error_rate = error_rate
    best_architecture = [Conv(), Conv(), MaxPool()]
    best_input_len = 14 * 14 * 8

conv + conv + pool


error rate = 0.084700

---  ----  ---  ---  ---  ---  ---  ---  ---  ---
953     0    1    2    0   16    5    1    2    0
  0  1088    5    3    0    9    4    1   25    0
  9     2  926   11   13    5   13   10   36    7
  4     0   20  885    1   55    2    7   26   10
  1     1    6    1  902    6    8    2   15   40
  6     2    2   20    3  816   10    2   27    4
 12     2    8    0   12   33  885    1    5    0
  2     6   31    5    9    4    0  912    3   56
  7     2    4   14    8   57    7    3  856   16
 10     4    4   11   18    9    0   12   11  930
---  ----  ---  ---  ---  ---  ---  ---  ---  ---


HBox(children=(FloatProgress(value=0.0, max=60000.0), HTML(value='')))

HBox(children=(FloatProgress(value=0.0, max=10000.0), HTML(value='')))

In [14]:
print("conv + conv + conv + pool")
conv3_pool = CNN([Conv(), Conv(), Conv(), MaxPool()], 14 * 14 * 8)
conv3_pool.fit(mnist_train_images, mnist_train_labels)
error_rate, cm = score(conv3_pool, mnist_test_images, mnist_test_labels)

print("error rate = %f\n" % error_rate)
print(tabulate(cm))

if error_rate < best_error_rate:
    best_error_rate = error_rate
    best_architecture = [Conv(), Conv(), Conv(), MaxPool()]
    best_input_len = 14 * 14 * 8

conv + conv + conv + pool


error rate = 0.082600

---  ----  ---  ---  ---  ---  ---  ---  ---  ---
960     0    2    2    0    7    4    1    4    0
  0  1114    4    2    0    2    3    1    9    0
  7    10  907   26   10    3    4   14   43    8
  4     1   17  938    0   17    1    9   18    5
  1     3   14    3  888    3    4    3   15   48
  7     4    6   46    4  749   12    5   51    8
 13     3   15    5   13   14  885    2    8    0
  1     8   29    9    5    0    0  918    3   55
  6     4    4   27    6   13    6    2  894   12
  7     9    3   11   22    5    0   16   15  921
---  ----  ---  ---  ---  ---  ---  ---  ---  ---


HBox(children=(FloatProgress(value=0.0, max=60000.0), HTML(value='')))

HBox(children=(FloatProgress(value=0.0, max=10000.0), HTML(value='')))

In [15]:
print("conv + pool + conv + pool")
conv_pool_2 = CNN([Conv(), MaxPool(), Conv(), MaxPool()], 7 * 7 * 8)
conv_pool_2.fit(mnist_train_images, mnist_train_labels)
error_rate, cm = score(conv_pool_2, mnist_test_images, mnist_test_labels)

print("error rate = %f\n" % error_rate)
print(tabulate(cm))

if error_rate < best_error_rate:
    best_error_rate = error_rate
    best_architecture = [Conv(), MaxPool(), Conv(), MaxPool()]
    best_input_len = 7 * 7 * 8

conv + pool + conv + pool


error rate = 0.065000

---  ----  ---  ---  ---  ---  ---  ---  ---  ---
966     0    1    2    0    2    7    1    1    0
  0  1115    3    3    1    1    2    1    9    0
  8     3  943   21    9    1    8    9   25    5
  4     3   11  952    0   11    2    6   12    9
  1     2    8    1  918    1    9    2    6   34
 12     3    2   37    4  792   19    2   17    4
  9     2    5    3    7   10  919    1    2    0
  0     6   30   12    6    0    0  936    4   34
  7     5    7   25    7   11    8    2  888   14
 11     9    4   12   27    6    0   10    9  921
---  ----  ---  ---  ---  ---  ---  ---  ---  ---


HBox(children=(FloatProgress(value=0.0, max=60000.0), HTML(value='')))

HBox(children=(FloatProgress(value=0.0, max=10000.0), HTML(value='')))

In [16]:
print('best architecture - ' + ' + '.join([layer.__class__.__name__ for layer in best_architecture]))
print('error rate = %f' % best_error_rate)

best architecture - Conv + MaxPool + Conv + MaxPool
error rate = 0.065000


In [17]:
fashion_data = MNIST('data/fashion')
fashion_train_images, fashion_train_labels = mnist_data.load_training()
fashion_test_images, fashion_test_labels = mnist_data.load_testing()

In [18]:
cnn = CNN(best_architecture, best_input_len)
cnn.fit(fashion_train_images, fashion_train_labels)
error_rate, cm = score(cnn, fashion_test_images, fashion_test_labels)

print("error rate = %f\n" % error_rate)
print(tabulate(cm))

HBox(children=(FloatProgress(value=0.0, max=60000.0), HTML(value='')))



error rate = 0.085400

---  ----  ---  ---  ---  ---  ---  ---  ---  ---
958     0    1    3    0    7    8    1    2    0
  0  1102    3    5    0    1    5    1   18    0
  9     3  898   32   17    2   16   21   26    8
  3     1   13  936    1   19    0   16   12    9
  1     2    5    1  893    1   13    4    8   54
  6     3    5   39    8  769   19    6   25   12
 12     3    4    5   13   26  889    2    4    0
  1     7   24    9    7    0    1  941    3   35
  5     5    8   44   10   26    7   16  843   10
  7     6    4   12   20    8    1   26    8  917
---  ----  ---  ---  ---  ---  ---  ---  ---  ---


HBox(children=(FloatProgress(value=0.0, max=10000.0), HTML(value='')))