# Setup

In [0]:
import math
import sklearn
from sklearn import datasets
import numpy as np
from scipy import signal
import skimage
import matplotlib.pyplot as plt

In [0]:
RANDOM_SEED = 42

np.random.seed(RANDOM_SEED)
np.set_printoptions(precision=8)

# Model

## Layers

In [0]:
# Base Class

class Layer:
  
  def __init__(self):
    self.input = None
    self.output = None
    
  def forward_propagation(self, input):
    raise NotImplementedError
    
  def backward_propagation(self, output_error, learning_rate):
    raise NotImplementedError

In [0]:
class FCLayer(Layer):
  
  def __init__(self, output_size):    
    self.output_size = output_size
    self.bias = np.random.rand(1, self.output_size) - 0.5
    
    self.weights = None
    
  def forward_propagation(self, input_data):
    if self.weights is None:
      self.initialize(input_data)
    
    self.input = input_data.reshape((1, -1))
    self.output = np.dot(self.input, self.weights) + self.bias
    
    return self.output
  
  def backward_propagation(self, output_error, learning_rate):
    input_error = np.dot(output_error, self.weights.T)
    weights_error = np.dot(self.input.T, output_error)
    
    self.weights -= learning_rate * weights_error
    self.bias -= learning_rate * output_error
    
    return input_error
  
  def initialize(self, input_data):        
    self.weights = np.random.rand(input_data.size, self.output_size) - 0.5

In [0]:
class ActivationLayer(Layer):
  
  def __init__(self, activation, activation_prime):
    self.activation = activation
    self.activation_prime = activation_prime
    
  def forward_propagation(self, input_data):
    self.input = input_data
    self.output = self.activation(self.input)
    
    return self.output
  
  def backward_propagation(self, output_error, learning_rate):
    return self.activation_prime(self.input, self.output) * output_error

In [0]:
class DropoutLayer(Layer):
  
  def __init__(self, p):
    self.p = p
    
  def forward_propagation(self, input_data):
    self.dropout = np.random.binomial(1, self.p, size=input_data.shape)
    return input_data * self.dropout
  
  def backward_propagation(self, output_error, learning_rate):
    return output_error * self.dropout

In [0]:
class ConvolutionalLayerSlow(Layer):
  
  def __init__(self, filters_count, filter_shape, padding=(0, 0), stride=1):
    self.filters_count = filters_count
    self.filter_shape = filter_shape
    self.stride = stride
    self.padding = (padding[0], padding[1], 0)
        
    self.input_shape = None
    self.input_depth = None

    self.output_shape = None
    self.weights = None
   
  def forward_propagation(self, input_data):
    
    if self.input_shape is None:
      self.initialize(input_data)
    
    self.input = np.pad(input_data, ((self.padding[0], self.padding[0]), (self.padding[1], self.padding[1]), (self.padding[2], self.padding[2])), 'constant') 
    self.output = np.zeros(self.output_shape)

    for filter in range(self.filters_count):
      for channel in range(self.input_depth):
        row_iteration = 0

        for row in range(0, self.input_shape[0], self.stride):
          col_iteration = 0

          for col in range(0, self.input_shape[1], self.stride):        
            if row + self.filter_shape[0] >= self.input_shape[0] or col + self.filter_shape[1] >= self.input_shape[1]:
              continue

            self.output[row_iteration, col_iteration, filter] += np.sum(self.input[row : row + self.filter_shape[0], col + self.filter_shape[1], channel] * self.weights[filter])    

            col_iteration += 1

          row_iteration += 1

    return self.output
    
  def backward_propagation(self, output_error, learning_rate):
    in_error = np.zeros(self.input_shape)
    dWeights = np.zeros((self.filters_count, self.filter_shape[0], self.filter_shape[1]))
    dBias = np.zeros(self.input_depth)

    for filter in range(self.filters_count):
      for channel in range(self.input_depth):
      
        row_iteration = 0

        for row in range(output_error.shape[0]):     
          input_row_index = row * self.stride

          for col in range(output_error.shape[1]):        
            input_col_index = col * self.stride  

            if input_row_index + self.filter_shape[0] >= self.input_shape[0] or input_col_index + self.filter_shape[1] >= self.input_shape[1]:
              continue

            in_error[input_row_index : input_row_index + self.filter_shape[0], input_col_index : input_col_index + self.filter_shape[1], channel] += self.weights[filter] * output_error[row, col, filter]
            dWeights[filter] = self.input[row_iteration : row_iteration + self.filter_shape[0], input_col_index : input_col_index + self.filter_shape[1], channel] * output_error[row, col, filter]

    self.weights -= learning_rate * dWeights
    
    return in_error
  
  def initialize(self, input_data):    
      self.input_shape = input_data.shape
      self.input_depth = self.input_shape[2]
    
      self.output_shape = (int((self.input_shape[0] - self.filter_shape[0] + 2 * self.padding[0]) / self.stride + 1), int((self.input_shape[1] - self.filter_shape[1] + 2 * self.padding[1]) / self.stride + 1), self.filters_count)        
      self.weights = np.random.rand(self.filters_count, self.filter_shape[0], self.filter_shape[1]) - 0.5

In [0]:
%load_ext cython

In [0]:
%%cython

cimport numpy as np

cpdef np.ndarray convolve_2d_stride(np.ndarray data, np.ndarray filter, np.ndarray result, int stride):
  
  cdef int output_rows_count = data.shape[0]
  cdef int output_cols_count = data.shape[1]
  
  cdef int filter_rows_count = data.shape[0]
  cdef int filter_cols_count = data.shape[1]
  
  cdef int result_rows_count = data.shape[0]
  cdef int result_cols_count = data.shape[1]

  cdef int row = 0
  cdef int col = 0
  
  cdef int input_row_index = 0
  cdef int input_col_index = 0
    
  for row in range(output_rows_count):     
    input_row_index = row * stride

    for col in range(output_cols_count):        
      input_col_index = col * stride  

      if input_row_index + filter_rows_count >= result_rows_count or input_col_index + filter_cols_count >= result_cols_count:
        continue

      result[input_row_index : input_row_index + result_rows_count, input_col_index : input_col_index + filter_cols_count] += filter * data[row, col]
      
  return result  

In [0]:
class ConvolutionalLayer(Layer):
  
  def __init__(self, filters_count, filter_shape, padding=(0, 0), stride=1):
    self.filters_count = filters_count
    self.filter_shape = filter_shape
    self.stride = stride
    self.padding = (padding[0], padding[1], 0)
        
    self.input_shape = None
    self.input_depth = None

    self.output_shape = None
    self.weights = None
    
  def forward_propagation(self, input_data):
    
    if self.input_shape is None:
      self.initialize(input_data)
    
    self.input = np.pad(input_data, ((self.padding[0], self.padding[0]), (self.padding[1], self.padding[1]), (self.padding[2], self.padding[2])), 'constant') 
    self.output = np.zeros(self.output_shape)

    for channel in range(self.input_depth):
      windows = skimage.util.view_as_windows(self.input[:, :, channel], self.filter_shape, self.stride)
      
      for filter in range(self.filters_count):
        self.output[:, :, filter] += np.tensordot(windows, self.weights[filter], axes=((2,3),(0,1)))

    return self.output
    
  def backward_propagation(self, output_error, learning_rate):
    in_error = np.zeros(self.input_shape)
    dWeights = np.zeros((self.filters_count, self.filter_shape[0], self.filter_shape[1]))

    for channel in range(self.input_depth):
      
      windows = skimage.util.view_as_windows(self.input[:, :, channel], self.filter_shape, self.stride)
      
      for filter in range(self.filters_count):
        in_error[:, :, channel] += convolve_2d_stride(output_error, self.weights[filter], in_error[:, :, channel], self.stride)
        dWeights  = np.tensordot(windows, output_error[:, :, filter], axes=((0,1),(0,1)))
  
    self.weights -= learning_rate * dWeights
    
    return in_error
  
  def initialize(self, input_data):    
      self.input_shape = input_data.shape
      self.input_depth = self.input_shape[2]
    
      self.output_shape = (int((self.input_shape[0] - self.filter_shape[0] + 2 * self.padding[0]) / self.stride + 1), int((self.input_shape[1] - self.filter_shape[1] + 2 * self.padding[1]) / self.stride + 1), self.filters_count)        
      self.weights = np.random.rand(self.filters_count, self.filter_shape[0], self.filter_shape[1]) - 0.5

In [0]:
class FlattenLayer(Layer):
  
  def forward_propagation(self, input_data):
    self.input_shape = input_data.shape
    return input_data.flatten()
  
  def backward_propagation(self, output_error, learning_rate):
    return output_error.reshape(self.input_shape)

### Pooling

In [0]:
class PoolingLayer(Layer):
  
  def __init__(self, pool_shape=(2,2), stride=2):
    self.pool_shape = pool_shape
    self.stride = stride
    self.padding = (int((self.pool_shape[0] - self.stride) / 2), int((self.pool_shape[1] - self.stride) / 2), 0)
        
    self.input_shape = None
    self.input_depth = None
    
    self.output_shape = None
    self.weights = None
    
  def forward_propagation(self, input):
    raise NotImplementedError
    
  def backward_propagation(self, output_error, learning_rate):
    raise NotImplementedError
    
  def initialize(self, input_data):    
    
    self.input_shape = input_data.shape
    self.input_depth = self.input_shape[2]
    
    self.output_shape = (int((self.input_shape[0] - self.pool_shape[0] + 2 * self.padding[0]) / self.stride + 1), int((self.input_shape[1] - self.pool_shape[1] + 2 * self.padding[1]) / self.stride + 1), self.input_depth)            
    self.weights = np.random.rand(self.pool_shape[0], self.pool_shape[1], self.input_depth) - 0.5

In [0]:
class MaxPoolingLayer(PoolingLayer):  
  
  def forward_propagation(self, input_data):
    
    if self.input_shape is None:
      self.initialize(input_data)
    
    self.input = np.pad(input_data, ((self.padding[0], self.padding[0]), (self.padding[1], self.padding[1]), (self.padding[2], self.padding[2])), 'constant')
    self.output = np.zeros(self.output_shape)

    for layer in range(self.input_depth):
        row_iteration = 0

        for row in range(0, self.input_shape[0], self.stride):
          col_iteration = 0

          for col in range(0, self.input_shape[1], self.stride):        
            if row + self.pool_shape[0] >= self.input_shape[0] or col + self.pool_shape[1] >= self.input_shape[1]:
              continue

            self.output[row_iteration, col_iteration, layer] = np.amax(self.input[row : row + self.pool_shape[0], col : col + self.pool_shape[1], layer])

            col_iteration += 1

          row_iteration += 1

    return self.output
    
  def backward_propagation(self, output_error, learning_rate):
    in_error = np.zeros(self.input.shape)
    
    for layer in range(self.input_depth):

      row_iteration = 0

      for row in range(output_error.shape[0]):     
        input_row_index = row * self.stride

        for col in range(output_error.shape[1]):        
          input_col_index = col * self.stride  

          if input_row_index + self.pool_shape[0] >= self.input_shape[0] or input_col_index + self.pool_shape[1] >= self.input_shape[1]:
            continue
            
          pool = self.input[input_row_index : input_row_index + self.pool_shape[0], input_col_index : input_col_index + self.pool_shape[1], layer]
          mask = (pool == np.max(pool))
          in_error[input_row_index : input_row_index + self.pool_shape[0], input_col_index : input_col_index + self.pool_shape[1], layer] = mask * output_error[row, col, layer]
    
    return in_error

## Activation Functions

In [0]:
def tanh(X):
  return np.tanh(X)

def tanh_prime(X, output):
  return 1 - np.tanh(X) ** 2

In [0]:
def relu(X):
  return np.maximum(X, np.zeros(X.shape))
  
def relu_prime(X, output):
  X[X > 0] = 1
  X[X <= 0] = 0
  
  return X

In [0]:
def sigmoid(X):
  return 1. / (1. + np.exp(-X))

def sigmoid_prime(X, output):
  f = sigmoid(X)
  return f * (1 - f)

In [0]:
def softmax(X):
  exps = np.exp(X - X.max())
  return exps / np.sum(exps)

def softmax_prime(X, output):
  result = np.zeros(X.shape)

  for i in range(len(output)):
    for j in range(len(X)):
      if i == j:
        result = output[i] * (1 - X[i])
      else: 
        result = -output[i] * X[j]
    
  return result

## Loss Functions

In [0]:
def mse(y, y_pred):    
  return np.mean(np.power(y - y_pred, 2))

def mse_prime(y, y_pred):
  return 2 * (y_pred - y) / y.size

In [0]:
def cross_entropy(y, y_pred):
  epsilon = 1e-5
  
  return (-np.log(y_pred[y] + epsilon))

def cross_entropy_prime(y, y_pred):
  y_pred[y] -= 1  
  
  return y_pred

## Main Model

In [0]:
class NeuralNetwork:
  def __init__(self):
    self.layers = []
    self.loss = None
    self.loss_prime = None

  def add(self, layer):
    self.layers.append(layer)
    return self

  def use(self, loss, loss_prime):
    self.loss = loss
    self.loss_prime = loss_prime

    return self

  def fit(self, x_train, y_train, epochs, learning_rate):
    samples = len(x_train)

    for i in range(epochs):
      err = 0

      for j in range(samples):
        output = x_train[j]
        for layer in self.layers:
          output = layer.forward_propagation(output)

        err += self.loss(y_train[j], output)

        error = self.loss_prime(y_train[j], output)
        for layer in reversed(self.layers):
            error = layer.backward_propagation(error, learning_rate)

      err /= samples
      print('epoch %d/%d   error=%f' % (i+1, epochs, err))

    return self

  def predict(self, input_data):
      samples = len(input_data)
      result = []

      for i in range(samples):
          output = input_data[i]
          for layer in self.layers:
              output = layer.forward_propagation(output)
          result.append(output)

      return result

# Training

## Data Setup

In [0]:
from keras.datasets import mnist

(x_train, y_train), (x_test, y_test) = mnist.load_data()

Downloading data from https://s3.amazonaws.com/img-datasets/mnist.npz


In [0]:
x_train = x_train.astype('float32')
x_test = x_test.astype('float32')

x_train /= 255
x_test /= 255

x_train = x_train.reshape(x_train.shape[0], 28, 28, 1)
x_test = x_test.reshape(x_test.shape[0], 28, 28, 1)

In [0]:
y_train = keras.utils.to_categorical(y_train, 10)

## Neural Network

In [0]:
fc_net = NeuralNetwork()

(fc_net
  .add(FCLayer(100))
  .add(ActivationLayer(tanh, tanh_prime))
  .add(DropoutLayer(p=0.75))
  .add(FCLayer(50))
  .add(ActivationLayer(tanh, tanh_prime))
  .add(FCLayer(10))
  .add(ActivationLayer(tanh, tanh_prime))
  .add(ActivationLayer(softmax, softmax_prime))
  .use(mse, mse_prime)
  .fit(x_train[0:1000], y_train[0:1000], epochs=30, learning_rate=0.1)
)

epoch 1/30   error=0.093278
epoch 2/30   error=0.089956
epoch 3/30   error=0.088449
epoch 4/30   error=0.087800
epoch 5/30   error=0.086694
epoch 6/30   error=0.085783
epoch 7/30   error=0.084316
epoch 8/30   error=0.082568
epoch 9/30   error=0.080677
epoch 10/30   error=0.077986
epoch 11/30   error=0.075121
epoch 12/30   error=0.071991
epoch 13/30   error=0.069615
epoch 14/30   error=0.066807
epoch 15/30   error=0.065202
epoch 16/30   error=0.063865
epoch 17/30   error=0.061491
epoch 18/30   error=0.061052
epoch 19/30   error=0.059308
epoch 20/30   error=0.058032
epoch 21/30   error=0.057582
epoch 22/30   error=0.056205
epoch 23/30   error=0.055539
epoch 24/30   error=0.055000
epoch 25/30   error=0.054030
epoch 26/30   error=0.053283
epoch 27/30   error=0.052523
epoch 28/30   error=0.051948
epoch 29/30   error=0.051270
epoch 30/30   error=0.051382


<__main__.NeuralNetwork at 0x7feea8c8b898>

In [0]:
preds = net.predict(x_test[:100])

## Convolutional Neural Network

In [0]:
conv_net = NeuralNetwork()

(conv_net
  .add(ConvolutionalLayer(filter_shape=(3, 3), filters_count=5, stride=2, padding=(1,1)))
  .add(ActivationLayer(tanh, tanh_prime))
  .add(MaxPoolingLayer(pool_shape=(2, 2)))
  .add(FlattenLayer())
  .add(FCLayer(100))
  .add(ActivationLayer(tanh, tanh_prime))
  .add(DropoutLayer(p=0.75))
  .add(FCLayer(50))  
  .add(ActivationLayer(tanh, tanh_prime))
  .add(FCLayer(10))
  .add(FlattenLayer())
  .add(ActivationLayer(tanh, tanh_prime))
  .add(ActivationLayer(softmax, softmax_prime))
  .use(mse, mse_prime)
  .fit(x_train[0:1000], y_train[0:1000], epochs=30, learning_rate=0.1)
)

epoch 1/30   error=0.090917
epoch 2/30   error=0.085968
epoch 3/30   error=0.080977
epoch 4/30   error=0.076153
epoch 5/30   error=0.072818
epoch 6/30   error=0.070183
epoch 7/30   error=0.067707
epoch 8/30   error=0.065412
epoch 9/30   error=0.063300
epoch 10/30   error=0.062564
epoch 11/30   error=0.060378
epoch 12/30   error=0.059850
epoch 13/30   error=0.058600
epoch 14/30   error=0.058431
epoch 15/30   error=0.056987
epoch 16/30   error=0.055938
epoch 17/30   error=0.055670
epoch 18/30   error=0.054547
epoch 19/30   error=0.054616
epoch 20/30   error=0.054824
epoch 21/30   error=0.053635
epoch 22/30   error=0.053197
epoch 23/30   error=0.053238
epoch 24/30   error=0.053151
epoch 25/30   error=0.052795
epoch 26/30   error=0.052544
epoch 27/30   error=0.052048
epoch 28/30   error=0.051052
epoch 29/30   error=0.052164
epoch 30/30   error=0.051023


<__main__.NeuralNetwork at 0x7feea8c79fd0>

In [0]:
conv_preds = conv_net.predict(x_test[:100])

## Evaluation

Keep in mind that we've only run a small portion of the data and the result will not be too astonishing. But you know why this implementation is badass. It's cause you've gained knowledge!

In [0]:
from sklearn.metrics import accuracy_score

In [0]:
normal_results = [np.argmax(sample) for sample in preds]
conv_results = [np.argmax(sample) for sample in conv_preds]

In [0]:
print('Accuracy | Normal net: {}; Conv Net: {}'.format(accuracy_score(y_test[:100], normal_results), accuracy_score(y_test[:100], conv_results)))

Accuracy | Normal net: 0.7; Conv Net: 0.8


# Keras

In [0]:
from __future__ import print_function
import keras
from keras.datasets import mnist
from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten
from keras.layers import Conv2D, MaxPooling2D
from keras import backend as K

batch_size = 128
num_classes = 10
epochs = 12

# input image dimensions
img_rows, img_cols = 28, 28

# the data, split between train and test sets
(x_train, y_train), (x_test, y_test) = mnist.load_data()

if K.image_data_format() == 'channels_first':
    x_train = x_train.reshape(x_train.shape[0], 1, img_rows, img_cols)
    x_test = x_test.reshape(x_test.shape[0], 1, img_rows, img_cols)
    input_shape = (1, img_rows, img_cols)
else:
    x_train = x_train.reshape(x_train.shape[0], img_rows, img_cols, 1)
    x_test = x_test.reshape(x_test.shape[0], img_rows, img_cols, 1)
    input_shape = (img_rows, img_cols, 1)

x_train = x_train.astype('float32')
x_test = x_test.astype('float32')
x_train /= 255
x_test /= 255
print('x_train shape:', x_train.shape)
print(x_train.shape[0], 'train samples')
print(x_test.shape[0], 'test samples')

# convert class vectors to binary class matrices
y_train = keras.utils.to_categorical(y_train, num_classes)
y_test = keras.utils.to_categorical(y_test, num_classes)

model = Sequential()
model.add(Conv2D(32, kernel_size=(3, 3),
                 activation='relu',
                 input_shape=input_shape))
model.add(Conv2D(64, (3, 3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))
model.add(Flatten())
model.add(Dense(128, activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(num_classes, activation='softmax'))

model.compile(loss=keras.losses.categorical_crossentropy,
              optimizer=keras.optimizers.Adadelta(),
              metrics=['accuracy'])

model.fit(x_train, y_train,
          batch_size=batch_size,
          epochs=epochs,
          verbose=1,
          validation_data=(x_test, y_test))
score = model.evaluate(x_test, y_test, verbose=0)
print('Test loss:', score[0])
print('Test accuracy:', score[1])

W0711 13:31:44.495379 139740251035520 deprecation_wrapper.py:119] From /usr/local/lib/python3.6/dist-packages/keras/backend/tensorflow_backend.py:74: The name tf.get_default_graph is deprecated. Please use tf.compat.v1.get_default_graph instead.

W0711 13:31:44.525401 139740251035520 deprecation_wrapper.py:119] From /usr/local/lib/python3.6/dist-packages/keras/backend/tensorflow_backend.py:517: The name tf.placeholder is deprecated. Please use tf.compat.v1.placeholder instead.

W0711 13:31:44.535834 139740251035520 deprecation_wrapper.py:119] From /usr/local/lib/python3.6/dist-packages/keras/backend/tensorflow_backend.py:4138: The name tf.random_uniform is deprecated. Please use tf.random.uniform instead.

W0711 13:31:44.574197 139740251035520 deprecation_wrapper.py:119] From /usr/local/lib/python3.6/dist-packages/keras/backend/tensorflow_backend.py:3976: The name tf.nn.max_pool is deprecated. Please use tf.nn.max_pool2d instead.

W0711 13:31:44.577066 139740251035520 deprecation_wrapp

x_train shape: (60000, 28, 28, 1)
60000 train samples
10000 test samples


W0711 13:31:44.780761 139740251035520 deprecation.py:323] From /usr/local/lib/python3.6/dist-packages/tensorflow/python/ops/math_grad.py:1250: add_dispatch_support.<locals>.wrapper (from tensorflow.python.ops.array_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use tf.where in 2.0, which has the same broadcast rule as np.where


Train on 60000 samples, validate on 10000 samples
Epoch 1/12
Epoch 2/12
Epoch 3/12
Epoch 4/12
Epoch 5/12
Epoch 6/12
Epoch 7/12
Epoch 8/12
Epoch 9/12
Epoch 10/12
Epoch 11/12
Epoch 12/12
Test loss: 0.0251252648020788
Test accuracy: 0.9926
