# Convolutional Neural Network

In [None]:
import numpy as np
import tensorflow as tf

## Layer base class

In [None]:
class Layer:
    def __init__(self, activation=None, input_shape=None):
        self.activation = activation
        self.input_shape = input_shape
    
    def init(self, input_shape):
        self.input_shape = input_shape
        self.output_shape = input_shape
    
    def predict(self, x):
        return x
    
    def relu(self, x):
        zeros = np.zeros_like(x)
        return np.where(x > zeros, x, zeros)
    
    def softmax(self, x):
        exp = np.exp(x)
        exp_sum = np.sum(exp)
        return exp / exp_sum

## Dense layer

In [None]:
class Dense(Layer):
    def __init__(self, units, activation="relu", input_shape=None):
        super().__init__(activation=activation, input_shape=input_shape)
        self.units = units
    
    def init(self, input_shape):
        super().init(input_shape)
        self.weights = np.zeros((input_shape[0], self.units))
        self.biases = np.zeros(self.units)
        self.output_shape = (self.units, )
    
    def predict(self, x):
        y = np.dot(x, self.weights) + self.biases
        
        if self.activation == "relu":
            y = self.relu(y)
        elif self.activation == "softmax":
            y = self.softmax(y)
        
        return y

## Convolution layer

In [None]:
class Conv2D(Layer):
    def __init__(self, filters, kernel_size, activation="relu", input_shape=None):
        super().__init__(activation=activation, input_shape=input_shape)
        self.filters = filters
        self.kernel_size = kernel_size
    
    def init(self, input_shape):
        super().init(input_shape)
        self.weights = np.zeros((self.kernel_size[0], self.kernel_size[1], input_shape[2], self.filters))
        self.biases = np.zeros(self.filters)
        self.output_shape = (input_shape[0], input_shape[1], self.filters)
        self.padding_number = ((self.kernel_size[0] - 1) // 2, (self.kernel_size[1] - 1) // 2)
    
    def zero_pad(self, image):
        image_padded = np.empty((image.shape[0] + self.padding_number[0] * 2, image.shape[1] + self.padding_number[1] * 2, image.shape[2]))
        for channel_idx in range(image.shape[2]):
            image_channel_padded = image[:, :, channel_idx]
            image_channel_padded = np.insert(image_channel_padded, [0] * self.padding_number[0] + [image.shape[1]] * self.padding_number[0], 0, axis=0)
            image_channel_padded = np.insert(image_channel_padded, [0] * self.padding_number[1] + [image.shape[1]] * self.padding_number[1], 0, axis=1)
            image_padded[:, :, channel_idx] = image_channel_padded
        return image_padded
    
    def convolve(self, image, kernel):
        convoluted = np.zeros((self.output_shape[0], self.output_shape[1]))
        for row_idx in range(self.output_shape[0]):
            row_start = row_idx
            row_end = row_start + self.kernel_size[0]
            for col_idx in range(self.output_shape[1]):
                col_start = col_idx
                col_end = col_start + self.kernel_size[1]
                receptive_field = image[row_start:row_end, col_start:col_end]
                convoluted[row_idx, col_idx] = np.sum(receptive_field * kernel)
        return convoluted
    
    def predict(self, x):
        x_padded = self.zero_pad(x)
        
        y = np.zeros(self.output_shape)
        
        for filter_idx in range(self.filters):
            feature_map = np.zeros((self.output_shape[0], self.output_shape[1]))
            for channel_idx in range(x_padded.shape[2]):
                feature_map += self.convolve(x_padded[:, :, channel_idx], self.weights[:, :, channel_idx, filter_idx])
            feature_map += self.biases[filter_idx]
            y[:, :, filter_idx] = feature_map
        
        if self.activation == "relu":
            y = self.relu(y)
        
        return y

## Max-pooling layer

In [None]:
class MaxPooling2D(Layer):
    def __init__(self, pool_size, strides, input_shape=None):
        super().__init__(input_shape=input_shape)
        self.pool_size = pool_size
        self.strides = strides
    
    def init(self, input_shape):
        super().init(input_shape)
        self.output_shape = ((input_shape[0] - self.pool_size[0]) // self.strides[0] + 1, (input_shape[1] - self.pool_size[1]) // self.strides[1] + 1, input_shape[2])
    
    def predict(self, x):
        y = np.zeros(self.output_shape)
        
        for row_idx in range(self.output_shape[0]):
            row_start = row_idx * self.pool_size[0]
            row_end = row_start + self.pool_size[0]
            for col_idx in range(self.output_shape[1]):
                col_start = col_idx * self.pool_size[1]
                col_end = col_start + self.pool_size[1]
                for channel_idx in range(self.output_shape[2]):
                    y[row_idx, col_idx, channel_idx] = np.max(x[row_start:row_end, col_start:col_end, channel_idx])
        
        return y

## Flatten layer

In [None]:
class Flatten(Layer):
    def __init__(self, input_shape=None):
        super().__init__(input_shape=input_shape)
    
    def init(self, input_shape):
        super().init(input_shape)
        self.output_shape = (np.prod(np.array(input_shape)), )
    
    def predict(self, x):
        return x.reshape(self.output_shape)

## Sequential model

In [None]:
class Sequential:
    def __init__(self):
        self.layers = []
        self.input_shape = ()
    
    def add(self, layer):
        self.layers.append(layer)
        if len(self.layers) == 1:
            self.input_shape = layer.input_shape
    
    def compile(self, optimizer="sgd", loss="categorical_crossentropy"):
        self.optimizer = optimizer
        self.loss = loss
        
        input_shape = self.input_shape
        for i in range(len(self.layers)):
            self.layers[i].init(input_shape)
            input_shape = self.layers[i].output_shape
    
    def predict(self, xs):
        ys = np.empty((xs.shape[0], self.layers[-1].output_shape[0]))
        
        for i, x in enumerate(xs):
            y = x
            for layer in self.layers:
                y = layer.predict(y)
            ys[i, :] = y
        
        return ys
    
    def error(self, xs, labels):
        ys = self.predict(xs)
        
        if self.loss == "categorical_crossentropy":
            return -np.sum(labels * np.log(ys))
        
        return 0

## Build the model

In [None]:
model = Sequential()
model.add(Conv2D(32, (5, 5), input_shape=(28, 28, 1), activation="relu"))
model.add(MaxPooling2D((2, 2), (2, 2)))
model.add(Conv2D(64, (5, 5), activation="relu"))
model.add(MaxPooling2D((2, 2), (2, 2)))
model.add(Flatten())
model.add(Dense(1024, activation="relu"))
model.add(Dense(10, activation="softmax"))

## Compile the model

In [None]:
model.compile(optimizer="sgd", loss="categorical_crossentropy")

## Load the dataset

In [None]:
mnist = tf.contrib.learn.datasets.load_dataset("mnist")
train_data = mnist.train.images
train_labels = np.asarray(mnist.train.labels, dtype=np.int32)
test_data = mnist.test.images
test_labels = np.asarray(mnist.test.labels, dtype=np.int32)

In [None]:
train_data = train_data.reshape((-1, 28, 28, 1))
train_labels = np.array([[0] * train_label + [1] + [0] * (9 - train_label) for train_label in train_labels])
test_data = test_data.reshape((-1, 28, 28, 1))
test_labels = np.array([[0] * test_label + [1] + [0] * (9 - test_label) for test_label in test_labels])

## TODO: Train the model