In [2]:
import numpy as np

## Neural Networks: For Beginners, By Beginners
by Emil Krumov


## Introduction

So, neural nets. It’s the first thing that pops up in the minds of most of the common coders when they hear the buzzwords artificial intelligence and/or machine learning. Although not being the most fundamental material in the book, it is actually a not so bad starting point if explained in a beginner-friendly language.

Throughout this article I will take you on a journey starting from the very beginning of the neural networks ideology, take you through the core modern principles that make it learn, and finally show you a step-by-step implementation of a neural network model from scratch featuring Fully Connected, Activation, Flatten, Convolution and Pooling layers. This implementation is heavily based on and inspired by this amazing article by
Omar Aflak
which is a must-read for everyone who wants to learn more on the mathematical background of neural networks.

## Understanding Neural Networks

The history of neural networks traces back to 1943 when neurophysiologist Warren McCulloch and mathematician Walter Pitts portrayed a model of a human brain neuron with a simple electronic circuit which took a set of inputs, multiplied them by weighted values and put them through a threshold gate which gave as output a value of 0 or 1, based on the threshold value. This model was called the McCulloch-Pitts perceptron.

<img src="perceptron.gif" width=350 height=350 />
Image source: wikimedia commons


This idea was taken further by a psychologist called Rosenblatt who created the mathematical model of the perceptron and called it Mark I Perceptron. It was based on the McCulloch-Pitts model and was one of the first attempts to make a machine learn. The perceptron model also took a set of binary inputs which were then multiplied by weighted values(representing the synapse strength). Then a bias typically having a value of 1 was added(an offset that ensures that more functions are computable with the same input) and once again the output was set to 0 or 1 based on a threshold value. The input mentioned above is either the input data or other perceptrons’ outputs.

While the McCulloch-Pitts model was a groundbreaking research at that time, it lacked a good mechanism of learning which made it unsuitable for the area of AI.

Rosenblatt took inspiration from Donald Hebb’s thesis that learning occurred in the human brain through formation and change of synapses between neurons and then came up with the idea to replicate it in its own way. He thought of a perceptron which takes a training set of input-output examples and forms(learns) a function by changing the weights of the perceptron.

### The implementation took four steps:

1.  Initialize a perceptron with random weights
2.  For each example in the training set, compute the output
3.  If the output should have been 1 but was 0 instead, increase the weights with input 1 and vice-versa — if the output is 1 but should’ve been 0, decrease the weights with input of 1.
4.  Repeat steps 2–4 for each example until the perceptron outputs correct values

This set of instructions are what modern perceptrons are based on. Due to significant increase of computing power however, we can now work with many more perceptrons grouped together forming a neural network.

<img src="schem_of_perceptron_classifier.webp" width=350 height=350 />
Image source: mixtend

However, they are not just randomly put in the network but are actually part of another building block — a layer.

## Layers

A layer is made of perceptrons which are linked to the perceptrons of the previous and the next layers if such do happen to exist. Every layer defines it’s own functionality and therefore serves its own purpose. Neural networks consist of an input layer(takes the initial data), an output layer(returns the overall result of the network), and hidden layers(one or many layers with different sizes(number of perceptrons) and functionality).

<img src="layers.webp" width=350 height=350 />
Source: cs231n.github.io

In order for the network to be able to learn and produce results each layer has to implement two functions — forward propagation and backward propagation(shortly backpropagation).

In [23]:
# Base Class

class Layer:
  
  def __init__(self):
    self.input = None
    self.output = None
    
  def forward_propagation(self, input):
    raise NotImplementedError
    
  def backward_propagation(self, output_error, learning_rate):
    raise NotImplementedError



Imagine a train travelling between point A(input) and point B(output) which changes direction each time it reaches one of the points. The A to B course takes one or more samples from the input layer and carries it through the forward propagation functions of all hidden layers consecutively, until point B is reached(and a result is produced). Backpropagation is basically the same thing only in the opposite direction — the course takes the data through the backpropagation methods of all layers in a reverse order until it reaches point A. What differs the two courses though is what happens inside of these methods.

Forward propagation is only responsible for running the input through a function and return the result. No learning, only calculations. Backpropagation is a bit trickier because it is responsible for doing two things:

* Update the parameters of the layer in order to improve the accuracy of the forward propagation method.
* Implement the derivative of the forward propagation function and return the result.

So how and why does that happen exactly. The mystery unravels at point B — before the train changes direction and goes through the backpropagation of all the layers. In order to tune our model we need to answer two questions:

* How good the model’s result is compared with the actual output?
* How do we minimize this difference?

The process of answering the first question is known as calculating the error. To do that we use cost functions(synonyms with loss functions).

## $ MSE = \frac{1}{n} \sum_{i=0}^{N} (y_{Ei} - y_{Pi})^2$
Mean Squared Error(MSE) between expected (E) and predicted (P) values also called the sum of squared residuals. 
(This is the reduced $\chi^2$ statistic with the individual errors, $\sigma_i$ set to one)

It is a pretty straightforward function — we sum the squares of the difference between the actual output and the model’s output and we calculate the mean. But to help our model implementing MSE only isn’t going to be of any significant help. We must implement its derivative as well.

In [24]:
def mse(y, y_pred):    
  return np.mean((y - y_pred)**2)

def mse_prime(y, y_pred):
  return 2 * (y_pred - y) / y.size

But why do we need this? Because of the infamous…
## Gradient Descent

The last thing we need to do here is to show our model how to minimize the error. To do that we need an optimization algorithm(optimizer). Once again, there are many kinds of optimizers all serving the same purpose but for the sake of keeping things simple but still meaningful we are going to use the most widely used and the one which many other optimization algorithms are based on. Behold the mighty Gradient Descent:

<img src="Cost_v_weight.webp" width=650 height=350 />
Graphical representation of Gradient Descent | Source: Medium

Doesn’t look as scary as it sounds, does it? Good news everybody, it is a relatively simple concept. By definition, the gradient is a fancy word for derivative, or the rate of change of a function.

<img src="3Dgradient_descent.webp" width=650 height=350 />
3D representation of Gradient | Source: OReilly

So let’s imagine our model is a ball. The surface represents the gradient(derivative) of the error. We want the ball to roll down the surface(descent) as low as possible in order to decrease the altitude(the error). Taking it to Math level — we need to reach a global(or at least a good enough local) minimum.

In order to make the ball move though, we need to update our parameters at a certain rate — called learning rate. This is a predefined parameter that we pass to our model before we run it. Those kind of parameters are called <em>hyperparameters</em> and have a huge role in our model’s performance. Here is what I mean:

<img src="learning_rate.webp" width=650 height=350 />
Significance of Learning Rate | Source: analyticsvidhya.com

If we choose a learning rate that is too big the parameters will change drastically and we might skip the minimum. If our learning rate is too small on the other hand, it will take too much time and hence computing power to reach a satisfying result. That’s why tuning this parameter by testing the model with different values of the learning rate is rather important. It is highly recommended to start with a learning rate of 0.1 or 0.01 and start tuning from there on.

## Back to back(propagation)

Now we need to update the model’s parameters layer by layer by passing the appropriate data to the backpropagation methods. The backpropagation takes two parameters — output error and the learning rate. Output error is calculated either as the result of the derivative of the cost function or as the result of the backpropagation of the previous layer(if looked from point B to point A) — as written above, the backward propagation should give as a result the derivative of the forward propagation function. By doing this each layer shows its predecessor its error.

So in other words if for some reason we had a Sine Layer it would look something like this:

In [25]:
class SineLayer(Layer):
  
  def forward_propagation(self, input_data):
    return np.sin(input_data)
    
  def backward_propagation(self, output_error, learning_rate):
  
    # some update logic based on the output_error and learning_rate
    
    return np.cos(output_error)

Now that we’ve got the two parameters needed, the backpropagation should update the layers weights(if such are present). Since every type of layer is different, it defines its own logic for parameter tuning — something which we will cover in a bit.

## Wrapping up Gradient Descent

When each layer’s backpropagation is complete and our train arrives at point A, it takes the next sample(or set of samples) and starts its course through the hidden layers’ forward propagation functions once again — only this time, they should perform a bit better. This process continues on and on until training is completed and/or an error minimum has been reached.

Now that we’ve explained all the theory behind gradient descent, here is how it should look in code:

In [26]:
#This is just an example it will break without inputs!

for j in range(samples_length):
  output = x_train[j]
  
  for layer in layers:
    output = layer.forward_propagation(output)

  error = loss_prime(y_train[j], output)
  
  for layer in reversed(layers):
      error = layer.backward_propagation(error, learning_rate)

NameError: name 'samples_length' is not defined

I hope this snippet gives much more clearance on the algorithm itself. The only thing that we haven’t fully covered yet is what types of layers we can use in a network and how to implement them.

## Basic Layers

As though there are many kinds of layers to choose from for a starter, the infamous Fully-Connected Layer is undoubtedly the best choice.
### Fully-Connected

The Fully-Connected Layer is the most widely used class type. Its principles of work are based on the Rosenblatt model and are as follow:

1. Every single perceptron from the previous layer is linked to every single perceptron of this layer.
2. Each link has a weighted value(weight).
3. A bias (offset) is added to the results.
4. The layer’s weights are held in a 2D array with size m x n (where m is the number of perceptrons in the previous layer and n is the number of perceptrons in this layer). They are initialized as random values.
5. The layer’s bias is held in a 1D array with size n. It is initialized as random values.

<img src="fully_connected_layer.webp" width=350 height=350 />
Visual representation of Fully-Connected (FC) Layer | Source: cs231n.github.io

Now let’s head to our implementation:

In [27]:
class FCLayer(Layer):
  
  def __init__(self, output_size):    
    self.output_size = output_size
    self.bias = np.random.rand(1, self.output_size) - 0.5
    
    self.weights = None
    
  def forward_propagation(self, input_data):
    if self.weights is None:
      self.initialize(input_data)
    
    self.input = input_data.reshape((1, -1)) # ensure data is a 1D array
    self.output = np.dot(self.input, self.weights) + self.bias # Multiply the links by their weights
    
    return self.output
  
  def backward_propagation(self, output_error, learning_rate):
    input_error = np.dot(output_error, self.weights.T) # calculate derivative of the forward propagation
    weights_error = np.dot(self.input.T, output_error) # calculate the weights error
    
    self.weights -= learning_rate * weights_error # update the weights
    self.bias -= learning_rate * output_error # update the error
    
    return input_error
  
  def initialize(self, input_data):        
    self.weights = np.random.rand(input_data.size, self.output_size) - 0.5

As you can see the implementation of our to methods here is not something too complicated as long as you know basic linear algebra. And although relatively simple, this is a completely useful and optimized layer implementation which we will easily put to use later.

The only problem with Fully-Connected Layers though is that they are <strong>linear</strong>. In fact, most layers have completely linear logic. A linear function is a polynomial of one degree. Using only such functions hinders the model’s ability to learn complex functional mappings, hence, learning is limited. That’s why(by convention) it is good to add non-linear functionality after every linear layer using activation layers.

## Activation Layer

Activation layers are just like any other type of layer except they don’t have weights but use a non-linear function over the input instead.

In [28]:
class ActivationLayer(Layer):
  
  def __init__(self, activation, activation_prime):
    self.activation = activation
    self.activation_prime = activation_prime
    
  def forward_propagation(self, input_data):
    self.input = input_data
    self.output = self.activation(self.input)
    
    return self.output
  
  def backward_propagation(self, output_error, learning_rate):
    return self.activation_prime(self.input, self.output) * output_error

A good example of such activation function is tanh which stands for hyperbolic tangent.
<img src = "tanh.webp" width = 350 height = 350 />
tanh compared to sinh and cosh | Source: Wikipedia

Since we are going to need it when we begin building our model, we need to implement it:

In [29]:
def tanh(X): # used for forward propagation
  return np.tanh(X)

def tanh_prime(X, output): # used for backpropagation
  return 1 - np.tanh(X) ** 2 # derivative of tanh

Now that we have our two most important layers implemented, let’s proceed to implementing the whole Neural Network class.

## Neural Network Implementation

There are several methods that need to be implemented

* a <strong>constructor</strong> — here we need to pass the <em>hyperparameters</em> (learning rate and number of epochs — the number of times our network will run over the input dataset); initialization of necessary fields
* <strong> add layer method</strong> — pass an instance of a layer; used for model construction; can (should) be used several times in order to add several layers;
* <strong> use <em>cost function</em> method</strong> — specify the cost function to be used when training the model
* <strong>fit method</strong> — a standard name for the method that performs the training process; here is where we will place the gradient descent snippet from earlier
* <strong> predict method</strong> — a standard name for the method that is used to calculate results only; it is useful once the training process is complete

And here goes the code:

In [30]:
class NeuralNetwork:
  def __init__(self):
    self.layers = []
    self.loss = None
    self.loss_prime = None

  def add(self, layer):
    self.layers.append(layer)
    return self

  def use(self, loss, loss_prime):
    self.loss = loss
    self.loss_prime = loss_prime

    return self

  def fit(self, x_train, y_train, epochs, learning_rate):
    samples = len(x_train)

    for i in range(epochs): # each epoch represents one iteration over all samples
      err = 0

      for j in range(samples): # our gradient descent logic from earlier in the article
        output = x_train[j]
        for layer in self.layers:
          output = layer.forward_propagation(output)

        err += self.loss(y_train[j], output)

        error = self.loss_prime(y_train[j], output)
        for layer in reversed(self.layers):
            error = layer.backward_propagation(error, learning_rate)

      err /= samples
      print('epoch %d/%d   error=%f' % (i+1, epochs, err))

    return self

  def predict(self, input_data):
      samples = len(input_data)
      result = []

      for i in range(samples): # basically running the forward propagation of all layers to get the result
          output = input_data[i]
          for layer in self.layers:
              output = layer.forward_propagation(output)
          result.append(output)

      return result

You may have noticed the return self statement being present at the end of every method. The reason I put this is that it allows us to do method chaining. If you are not sure what I am talking about, you are going to see a good example in a bit.

Now let’s put it to work. We are going to use the <strong>MNIST</strong> database for classifying handwritten digits. You can download it from http://yann.lecun.com/exdb/mnist/, or you can easily import it from <strong>Keras</strong>.

In [31]:
from keras.datasets import mnist

(x_train, y_train), (x_test, y_test) = mnist.load_data()

Since the pixel values are represented in the range [0; 255], we are going to scale that down to a range of [0.0, 1.0] or binary.

Another thing we did is we made the y(the results) be a little bit more convenient(note keras.utils.to_categorical). What it does is it represents the numeric result in a one-shot vector:

5 => [0, 0, 0, 0, 0, 1, 0, 0, 0, 0]

This is helpful because our network’s output layer is going to consist of 10 nodes, each holding an output value. Since the ideal case output would be a correct one-hot vector it is now easier for the cost function to do its job.

Now let’s construct our first network by putting some FC and activation layers:


In [32]:
fc_net = NeuralNetwork()

(fc_net
  .add(FCLayer(100))
  .add(ActivationLayer(tanh, tanh_prime))
  .add(FCLayer(50))
  .add(ActivationLayer(tanh, tanh_prime))
  .add(FCLayer(10))
  .add(ActivationLayer(tanh, tanh_prime))
  .use(mse, mse_prime)
  .fit(x_train[0:4000], y_train[0:4000], epochs=30, learning_rate=0.1)
)

preds = fc_net.predict(x_test[:100])


epoch 1/30   error=20.312698
epoch 2/30   error=20.303001
epoch 3/30   error=20.303000
epoch 4/30   error=20.303000
epoch 5/30   error=20.303000
epoch 6/30   error=20.303000
epoch 7/30   error=20.303000
epoch 8/30   error=20.303000
epoch 9/30   error=20.303000
epoch 10/30   error=20.303000
epoch 11/30   error=20.303000
epoch 12/30   error=20.303000
epoch 13/30   error=20.303000
epoch 14/30   error=20.303000
epoch 15/30   error=20.303000
epoch 16/30   error=20.303000
epoch 17/30   error=20.303000
epoch 18/30   error=20.303000
epoch 19/30   error=20.303000
epoch 20/30   error=20.303000
epoch 21/30   error=20.303000
epoch 22/30   error=20.303000
epoch 23/30   error=20.303000
epoch 24/30   error=20.303000
epoch 25/30   error=20.303000
epoch 26/30   error=20.303000
epoch 27/30   error=20.303000
epoch 28/30   error=20.303000
epoch 29/30   error=20.303000
epoch 30/30   error=20.303000


The reason why we use only the first 1000 samples is that it is going to run too long if we use all the samples. Using more samples in this case will give you better results, so you can try a bigger range if you have the time.

Before we run this though, we need to add one more final touch — another pre-output activation function — Softmax. What it does is it normalizes an array with n elements in a probability distribution array consisting of n probabilities proportional to the exponentials of the input numbers, or simply put — calculates the probability that the sample matches a certain class.

### softmax function: $\mathrm{s}(z_i) = \frac{e^{z_i}}{\sum e^{z_j}}$

And the implementation:

In [33]:
def softmax(X):
  exps = np.exp(X - X.max())
  return exps / np.sum(exps)

def softmax_prime(X, output):
  result = np.zeros(X.shape)

  for i in range(len(output)):
    for j in range(len(X)):
      if i == j:
        result = output[i] * (1 - X[i])
      else: 
        result = -output[i] * X[j]
    
  return result

where we used:

## $\frac {\partial s_i}{\partial z_j} = s_i \frac {\partial}{\partial z_j} \mathrm{log} s_i = s_i \cdot (\delta_{ij} - s_j) $ 

for the derivative. The derivation is here: https://towardsdatascience.com/derivative-of-the-softmax-function-and-the-categorical-cross-entropy-loss-ffceefc081d1

Let’s try it once again, only this time we are going to have the <strong>Softmax</strong> activation as our final layer.

In [34]:
fc_net = NeuralNetwork()

(fc_net
  .add(FCLayer(100))
  .add(ActivationLayer(tanh, tanh_prime))
  .add(FCLayer(50))
  .add(ActivationLayer(tanh, tanh_prime))
  .add(FCLayer(10))
  .add(ActivationLayer(tanh, tanh_prime))
  .add(ActivationLayer(softmax, softmax_prime))
  .use(mse, mse_prime)
  .fit(x_train[0:1000], y_train[0:1000], epochs=50, learning_rate=0.01)
)

preds = fc_net.predict(x_test[:100])

epoch 1/50   error=27.233803
epoch 2/50   error=27.233610
epoch 3/50   error=27.233606
epoch 4/50   error=27.233604
epoch 5/50   error=27.233603
epoch 6/50   error=27.233603
epoch 7/50   error=27.233603
epoch 8/50   error=27.233603
epoch 9/50   error=27.233602
epoch 10/50   error=27.233602
epoch 11/50   error=27.233602
epoch 12/50   error=27.233602
epoch 13/50   error=27.233602
epoch 14/50   error=27.233602
epoch 15/50   error=27.233602
epoch 16/50   error=27.233602
epoch 17/50   error=27.233602
epoch 18/50   error=27.233602
epoch 19/50   error=27.233602
epoch 20/50   error=27.233601
epoch 21/50   error=27.233601
epoch 22/50   error=27.233601
epoch 23/50   error=27.233601
epoch 24/50   error=27.233601
epoch 25/50   error=27.233601
epoch 26/50   error=27.233601
epoch 27/50   error=27.233601
epoch 28/50   error=27.233601
epoch 29/50   error=27.233601
epoch 30/50   error=27.233601
epoch 31/50   error=27.233601
epoch 32/50   error=27.233601
epoch 33/50   error=27.233601
epoch 34/50   error

Now that we’ve trained our data, let’s evaluate our final model.

## Evaluation

Keep in mind that we have implemented a small model for educational purposes. It is not going to produce a quite high result. I would highly recommend playing around with it in order to get a better accuracy.

In order to evaluate our results, we are going to use a simple utility from <strong>sklearn</strong> which shows as the accuracy of our model.

In [35]:
from sklearn.metrics import accuracy_score

normal_results = [np.argmax(sample) for sample in preds]

print('Accuracy | Normal net: {}'.format(accuracy_score(y_test[:100], normal_results)))

Accuracy | Normal net: 0.05


Now that you know the basic of neural network construction we can proceed to the more advanced stuff…


## Convolutional Neural Networks(CNN)

The main problem with using FC layers for image recognition tasks is that it uses each pixel as a separate feature, which doesn’t seem quite right, given the fact that neural networks are aiming at replicating the human brain. And what helps our brain identify visual input is not just random points, but patterns of points instead. Here’s where processes called convolution and correlation come in handy.
### Convolution and Correlation

Convolution and correlation in image processing are almost identical processes having one small difference in their implementation. Both of them take an image I, produce a matrix O based on another matrix (called filter or feature map) F. Here is what I mean

Let’s start with correlation:
<img src = "correlate_matrix.webp" width = 650 height = 350 />
Visual representation of Correlation | Source: My Late Night PowerPoint Creations
<img src = "correlate_matrix2.webp" width = 650 height = 350 />

Let’s explain what is happening. The filter is acting like a sliding window which goes through the array, step by step, sliding right until it reaches the end of the row. Then it slides down and starts at the beginning of the row. At each position we sum the dot product of the filter F and the respective window of the input image I. Each position represents a cell in the output O.

### Now what about convolution?

The only difference between correlation and convolution is that convolution works with the same filter F, only rotated 180 degrees.

<img src = "convolution.webp" width = 650 height = 350 />

### But how does that helps us?

By using this sort of filters, going through our image we extract not only one pixel, but a whole sector of pixels, enabling our model to map more complex image features like lines and shapes.

For example, if you give a convolution layer a cat image it will be able to recognize small features like nose and ears. Then by adding more convolution layers after that it may recognize features of a larger scale like a head or a tail.

The way this works is having several filters per convolution layer, all initialized with random numbers. The goal behind this is to have each filter activate on different features(one features trains to detect the nose, other the eyes, etc.).

Usually when dealing with multi-channel images(like RGB) your filters will be 3-dimensional and will have the depth equal to the image channels count. That way you only focus on the 2D process.

#### Padding and Stride

The problem with convolution and correlation is that the corner pixels don’t get much attention — they only get processed once. If you want to change that, you can add a padding.

<img src = "padding.webp" width = 350 height = 350 />

Now even the corners will be processed as much as the other parts of the image.

Sometimes, however we don’t want to pay as much attention to each pixel(highly applicable when dealing with larger images). Having each filter go through every possible position is sometimes an overkill. That’s why we can configure a stride.

The stride tells our filter how much it should slide when going to its next position. Having a stride of 2 will skip one position with each shift which reduces the size of the output and makes computation lighter.

<img src = "stride.webp" width = 650 height = 350 />

<em>Having no stride basically means having a stride of 1 — the filters cover every possible position without skipping</em>.

## Implementation

First let’s list our parameters:

* Filters Count — how many filters are we going to have in the layer
* Filter Size — how big our filters will be
* Padding — how much zero padding are we going to have(we are going to use a tuple format for this one (x, y), where x stands for column zero padding and y stands for row zero padding).
* Stride — just a plain number showing how many positions should a filter move per slide

NB! Because we are going to initialize our filters with random values, there is practically no difference between using correlation or convolution so we are going to use correlation for simplicity.

Here’s a very iterative(for-loopy) approach on implementing a convolution layer:

In [36]:
class ConvolutionalLayerSlow(Layer):
  
  def __init__(self, filters_count, filter_shape, padding=(0, 0), stride=1):
    self.filters_count = filters_count
    self.filter_shape = filter_shape
    self.stride = stride
    self.padding = (padding[0], padding[1], 0)
        
    self.input_shape = None
    self.input_depth = None

    self.output_shape = None
    self.weights = None
   
  def forward_propagation(self, input_data):
    
    if self.input_shape is None:
      self.initialize(input_data)
    
    self.input = np.pad(input_data, ((self.padding[0], self.padding[0]), (self.padding[1], self.padding[1]), (self.padding[2], self.padding[2])), 'constant') 
    self.output = np.zeros(self.output_shape)

    for filter in range(self.filters_count):
      for channel in range(self.input_depth):
        row_iteration = 0

        for row in range(0, self.input_shape[0], self.stride):
          col_iteration = 0

          for col in range(0, self.input_shape[1], self.stride):        
            if row + self.filter_shape[0] >= self.input_shape[0] or col + self.filter_shape[1] >= self.input_shape[1]:
              continue

            self.output[row_iteration, col_iteration, filter] += np.sum(self.input[row : row + self.filter_shape[0], col + self.filter_shape[1], channel] * self.weights[filter])    

            col_iteration += 1

          row_iteration += 1

    return self.output
    
  def backward_propagation(self, output_error, learning_rate):
    in_error = np.zeros(self.input_shape)
    dWeights = np.zeros((self.filters_count, self.filter_shape[0], self.filter_shape[1]))

    for filter in range(self.filters_count):
      for channel in range(self.input_depth):
      
        row_iteration = 0

        for row in range(output_error.shape[0]):     
          input_row_index = row * self.stride

          for col in range(output_error.shape[1]):        
            input_col_index = col * self.stride  

            if input_row_index + self.filter_shape[0] >= self.input_shape[0] or input_col_index + self.filter_shape[1] >= self.input_shape[1]:
              continue

            in_error[input_row_index : input_row_index + self.filter_shape[0], input_col_index : input_col_index + self.filter_shape[1], channel] += self.weights[filter] * output_error[row, col, filter]
            dWeights[filter] = self.input[row_iteration : row_iteration + self.filter_shape[0], input_col_index : input_col_index + self.filter_shape[1], channel] * output_error[row, col, filter]

    self.weights -= learning_rate * dWeights
    
    return in_error
  
  def initialize(self, input_data):    
      self.input_shape = input_data.shape
      self.input_depth = self.input_shape[2]
    
      self.output_shape = (int((self.input_shape[0] - self.filter_shape[0] + 2 * self.padding[0]) / self.stride + 1), int((self.input_shape[1] - self.filter_shape[1] + 2 * self.padding[1]) / self.stride + 1), self.filters_count) # using a simple formula to calculate output size based on input size, padding and stride
      self.weights = np.random.rand(self.filters_count, self.filter_shape[0], self.filter_shape[1]) - 0.5

Though being a bit easier to understand, it is too slow because it lacks vectorization (uses for-loops instead of array-based operations). That’s why I dug around and found some very useful library methods that helped me optimize my layer:

* <strong>skimage.util.view_as_windows</strong> — I used it to automatically get the positions where my filters should go instead of looping through them on my own.
* <strong>np.tensordot</strong> — I used it to multiply the windows(which I got from the method mentioned above) with the filters.

Unfortunately, I could not find an easy way to vectorize the calculation of in_error (the closest I got was using scipy.signal.convolve_2d, but it did not work with stride), until it occurred to me that vectorized operations are usually for loops written in a lower level language so I implemented my own method called convolve_2d_stride using Cython (a library which lets you easily integrate C code into Python).

In [1]:
%load_ext cython
%%cython

cimport numpy as np

cpdef np.ndarray convolve_2d_stride(np.ndarray data, np.ndarray filter, np.ndarray result, int stride):
  
  cdef int output_rows_count = data.shape[0]
  cdef int output_cols_count = data.shape[1]
  
  cdef int filter_rows_count = data.shape[0]
  cdef int filter_cols_count = data.shape[1]
  
  cdef int result_rows_count = data.shape[0]
  cdef int result_cols_count = data.shape[1]

  cdef int row = 0
  cdef int col = 0
  
  cdef int input_row_index = 0
  cdef int input_col_index = 0
    
  for row in range(output_rows_count):     
    input_row_index = row * stride

    for col in range(output_cols_count):        
      input_col_index = col * stride  

      if input_row_index + filter_rows_count >= result_rows_count or input_col_index + filter_cols_count >= result_cols_count:
        continue

      result[input_row_index : input_row_index + result_rows_count, input_col_index : input_col_index + filter_cols_count] += filter * data[row, col]
      
  return result  

SyntaxError: invalid syntax (1447325188.py, line 4)

For more info on Cython please visit their documentation (https://cython.readthedocs.io/en/latest/).

Now that we have all our puzzle pieces on our table, let’s assemble our optimized convolution layer:

In [None]:
class ConvolutionalLayer(Layer):
  
  def __init__(self, filters_count, filter_shape, padding=(0, 0), stride=1):
    self.filters_count = filters_count
    self.filter_shape = filter_shape
    self.stride = stride
    self.padding = (padding[0], padding[1], 0)
        
    self.input_shape = None
    self.input_depth = None

    self.output_shape = None
    self.weights = None
    
  def forward_propagation(self, input_data):
    
    if self.input_shape is None:
      self.initialize(input_data)
    
    self.input = np.pad(input_data, ((self.padding[0], self.padding[0]), (self.padding[1], self.padding[1]), (self.padding[2], self.padding[2])), 'constant') 
    self.output = np.zeros(self.output_shape)

    for channel in range(self.input_depth):
      windows = skimage.util.view_as_windows(self.input[:, :, channel], self.filter_shape, self.stride)
      
      for filter in range(self.filters_count):
        self.output[:, :, filter] += np.tensordot(windows, self.weights[filter], axes=((2,3),(0,1)))

    return self.output
    
  def backward_propagation(self, output_error, learning_rate):
    in_error = np.zeros(self.input_shape)
    dWeights = np.zeros((self.filters_count, self.filter_shape[0], self.filter_shape[1]))

    for channel in range(self.input_depth):
      
      windows = skimage.util.view_as_windows(self.input[:, :, channel], self.filter_shape, self.stride)
      
      for filter in range(self.filters_count):
        in_error[:, :, channel] += convolve_2d_stride(output_error, self.weights[filter], in_error[:, :, channel], self.stride) # here we use the cython function from earlier
        dWeights  = np.tensordot(windows, output_error[:, :, filter], axes=((0,1),(0,1)))
  
    self.weights -= learning_rate * dWeights
    
    return in_error
  
  def initialize(self, input_data):    
      self.input_shape = input_data.shape
      self.input_depth = self.input_shape[2]
    
      self.output_shape = (int((self.input_shape[0] - self.filter_shape[0] + 2 * self.padding[0]) / self.stride + 1), int((self.input_shape[1] - self.filter_shape[1] + 2 * self.padding[1]) / self.stride + 1), self.filters_count)        
      self.weights = np.random.rand(self.filters_count, self.filter_shape[0], self.filter_shape[1]) - 0.5

Speaking of optimization, the next set of layers that we will look at will help us save our GPUs from melting.
Pooling Layers

The idea behind pooling layers is to reduce the spatial size of the output of previous convolution layers. This helps our model for two reasons:

1. It lowers down the computational power needed.
2. It creates a lower resolution version of the input. This process is also called down-sampling. The reason this helps is Convolution layers’s filters are often bound to an exact position in the image. Having a small movement or distortion may result in a different output which is not desirable. Down-sampled images still contain the larger structural features, only excluding fine details that may hinder the model’s performance.

An alternative to down-sampling that one can use to solve this problem is use a larger stride in the Convolution layer on the first place.

The usual pattern met in most CNN models is as follows:

* Convolution Layer
* Activation Layer
* Pooling Layer

<img src="CNN_layers.webp" width = 650 height = 350 />

A typical CNN pattern | Source: jefkine.com

### Principle of work

The way pooling works is somewhat similar to convolution and correlation. Once again we have a window(called pool) which slides over our input. The pool however does not contain any data like the filter/feature map did. When a pool moves in a position it calculates a result based only on the values in this position(like average or max).

To help you understand how it works we are going to implement the most widely-used type — Max Pooling Layer.
### Max Pooling Layer

The idea is simple — when the pool slides to a position, the maximum value in that position is stored in the output.

<img src = "max_pooling_layer.webp" width = 650 height = 350 />
Visual representation of Max Pooling | Source: My Late Night PowerPoint Creations

### Parameters

The list here is a lot simpler than the one for Convolution layer

* Pool Shape— the shape of the pool, described in a tuple, a good default is (2, 2)
* Stride — same as in convolution, a good default is 2(most of the cases it is good to match the dimensions of the pool)

### Implementation

It is a good idea to implement a base class once again, since some operation are the same for all pooling layers(like initialization for instance):

In [38]:
class PoolingLayer(Layer):
  
  def __init__(self, pool_shape=(2,2), stride=2):
    self.pool_shape = pool_shape
    self.stride = stride
    self.padding = (int((self.pool_shape[0] - self.stride) / 2), int((self.pool_shape[1] - self.stride) / 2), 0)
        
    self.input_shape = None
    self.input_depth = None
    
    self.output_shape = None
    self.weights = None
    
  def forward_propagation(self, input):
    raise NotImplementedError
    
  def backward_propagation(self, output_error, learning_rate):
    raise NotImplementedError
    
  def initialize(self, input_data):    
    
    self.input_shape = input_data.shape
    self.input_depth = self.input_shape[2]
    
    self.output_shape = (int((self.input_shape[0] - self.pool_shape[0] + 2 * self.padding[0]) / self.stride + 1), int((self.input_shape[1] - self.pool_shape[1] + 2 * self.padding[1]) / self.stride + 1), self.input_depth)            
    self.weights = np.random.rand(self.pool_shape[0], self.pool_shape[1], self.input_depth) - 0.5

And here goes the <strong>Max Pooling</strong>:

In [39]:
class MaxPoolingLayer(PoolingLayer):  
  
  def forward_propagation(self, input_data):
    
    if self.input_shape is None:
      self.initialize(input_data)
    
    self.input = np.pad(input_data, ((self.padding[0], self.padding[0]), (self.padding[1], self.padding[1]), (self.padding[2], self.padding[2])), 'constant')
    self.output = np.zeros(self.output_shape)

    for layer in range(self.input_depth):
        row_iteration = 0

        for row in range(0, self.input_shape[0], self.stride):
          col_iteration = 0

          for col in range(0, self.input_shape[1], self.stride):        
            if row + self.pool_shape[0] >= self.input_shape[0] or col + self.pool_shape[1] >= self.input_shape[1]:
              continue

            self.output[row_iteration, col_iteration, layer] = np.amax(self.input[row : row + self.pool_shape[0], col : col + self.pool_shape[1], layer])

            col_iteration += 1

          row_iteration += 1

    return self.output
    
  def backward_propagation(self, output_error, learning_rate):
    in_error = np.zeros(self.input.shape)
    
    for layer in range(self.input_depth):

      row_iteration = 0

      for row in range(output_error.shape[0]):     
        input_row_index = row * self.stride

        for col in range(output_error.shape[1]):        
          input_col_index = col * self.stride  

          if input_row_index + self.pool_shape[0] >= self.input_shape[0] or input_col_index + self.pool_shape[1] >= self.input_shape[1]:
            continue
            
          pool = self.input[input_row_index : input_row_index + self.pool_shape[0], input_col_index : input_col_index + self.pool_shape[1], layer]
          mask = (pool == np.max(pool))
          in_error[input_row_index : input_row_index + self.pool_shape[0], input_col_index : input_col_index + self.pool_shape[1], layer] = mask * output_error[row, col, layer]
    
    return in_error

I strongly encourage you to try and implement <strong>Average Pooling Layer</strong> by yourself.

Before we build our first CNN model, we need to reformat our data so that it can be used by other layers like Fully-Connected. This is needed because the last non-activation layer should always be Fully-Connected when we are solving a classification task. And because Fully-Connected would be a mess if its input is anything else then a 1D array, we must flatten(convert N-D array to 1D array) the data first.
### Flatten Layer

This is probably the simplest layer to implement. Our forward propagation needs to flatten the input and the backpropagation needs to reformat it back to its initial form:

In [74]:
class FlattenLayer(Layer):
  
  def forward_propagation(self, input_data):
    self.input_shape = input_data.shape
    return input_data.flatten()
  
  def backward_propagation(self, output_error, learning_rate):
    return output_error.reshape(self.input_shape)

Now let’s build our first CNN…
### CNN Model

We are going to build a simple CNN using all the layers we implemented in this article:

In [40]:
conv_net = NeuralNetwork()

(conv_net
  .add(ConvolutionalLayer(filter_shape=(3, 3), filters_count=5, stride=2, padding=(1,1)))
  .add(ActivationLayer(tanh, tanh_prime))
  .add(MaxPoolingLayer(pool_shape=(2, 2)))
  .add(FlattenLayer())
  .add(FCLayer(100))
  .add(ActivationLayer(tanh, tanh_prime))
  .add(FCLayer(50))  
  .add(ActivationLayer(tanh, tanh_prime))
  .add(FCLayer(10))
  .add(FlattenLayer())
  .add(ActivationLayer(tanh, tanh_prime))
  .add(ActivationLayer(softmax, softmax_prime))
  .use(mse, mse_prime)
  .fit(x_train[0:4000], y_train[0:4000], epochs=30, learning_rate=0.1)
)

conv_preds = conv_net.predict(x_test[:100])

NameError: name 'ConvolutionalLayer' is not defined

Now let’s see how well it performs:

In [None]:
conv_results = [np.argmax(sample) for sample in conv_preds]

print('Accuracy | Conv Net: {}'.format(accuracy_score(y_test[:100], conv_results)))

It should perform better than our normal net, although it can actually achieve much higher results if you play around with it and have the patience to wait for the training process to finish. 