# GSN 1 - Winter 2021/22 

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import numpy as np
from sklearn.model_selection import train_test_split
from tqdm import trange
from sklearn.metrics import accuracy_score
from tensorflow import keras
from scipy import signal
from numpy.lib.stride_tricks import as_strided
import matplotlib.pyplot as plt
import cv2

In [None]:
"""
myNetwork class parameters:
      neurons_count: int (number of neurons in layer)     
      inputs_count: int (number of inputs to each neuron of layer)
      weights: OPTIONAL - tuple of arrays (arrays of weights for each of layer)
      biases: OPTIONAL - tuple of arrays (arrays of biases for each of layer)
"""

class myLayer:
    def __init__(self, inputs_count, neuron_count, learning_rate=0.01, weights=None, biases=None):
        self.learning_rate = learning_rate

        # check if weights or biases are passed as an network argument, otherwise initialize it by default
        if weights is not None:
            self.weights = np.array(weights)  
        else:  
            self.weights = np.random.normal(scale=0.01, size=(neuron_count, inputs_count))

        if biases is not None:
            self.biases = np.array(biases)
        else:  
            self.biases = np.zeros(neuron_count)

    def forward(self, inputs):
        return np.dot(inputs, self.weights.T) + self.biases

    def backward(self, input, grad):
        grad_input = np.dot(grad, self.weights)
        grad_weights = np.dot(input.T, grad)
        grad_biases = grad.mean(axis=0) * input.shape[0]

        self.weights = self.weights - self.learning_rate * grad_weights.T
        self.biases = self.biases - self.learning_rate * grad_biases.T

        return grad_input

In [None]:
class Conv2D():
  def __init__(self, input_shape, n_filters, filter_shape, padding='none', stride = 1, learning_rate=0.01):
      self.learning_rate = learning_rate
      self.inputShape = input_shape
      if len(input_shape) == 4:
        self.batches, self.inputHeight, self.inputWidth, self.inputDepth = input_shape
      elif len(input_shape) == 5:
        self.batches, self.prevFilters, self.inputHeight, self.inputWidth, self.inputDepth = input_shape

      self.padding = 0
      self.stride = stride
      self.numFilters = n_filters 
      self.filterShape = filter_shape 
      self.filtersShape = (self.numFilters, self.filterShape[0], self.filterShape[1], self.inputDepth) 

      self.outShape = (self.batches, self.numFilters, self.inputHeight - self.filterShape[0] +1, self.inputWidth - self.filterShape[1] + 1, self.inputDepth)
      self.kernels = np.random.randn(*self.filtersShape)
      self.biases = np.random.randn(*self.outShape)

  def forward(self, input):
      self.input = input
      if len(input.shape) == 4:
        batches, h, w, depth = input.shape
        self.output = np.copy(self.biases)
        for batch in range(batches):
            for filter in range(self.numFilters):
              for depth in range(self.inputDepth):
                self.output[batch, filter, :, :, depth] += signal.correlate2d(self.input[batch,:,:,depth], self.kernels[filter,:,:,depth], "valid")
      elif len(input.shape) == 5:
        batches, prevFilters, h, w, depth = input.shape
        self.output = np.copy(self.biases)
        for batch in range(self.batches):
          for prevFilter in range(prevFilters):
            for filter in range(self.numFilters):
              for depth in range(self.inputDepth):
                self.output[batch, filter, :, :, depth] += signal.correlate2d(self.input[batch,prevFilter,:,:,depth], self.kernels[filter,:,:,depth], "valid") 
      return self.output

  def backward(self, input, grad):
      self.input = input
      if len(input.shape) == 4:
        batches, h, w, depth = input.shape
        _, prevFilters, _, _, _ = grad.shape
        kernels_gradient = np.zeros(self.filtersShape)
        input_gradient = np.zeros(input.shape)
        for batch in range(batches):
          for prevFilter in range(prevFilters):
            for filter in range(self.numFilters):
              for depth in range(self.inputDepth):
                kernels_gradient[filter,:,:,depth] = signal.correlate2d(self.input[batch, :, :, depth], grad[batch,prevFilter,:,:,depth], "valid")
                input_gradient[batch, :, :, depth] += signal.convolve2d(grad[batch,prevFilter,:,:,depth], self.kernels[filter,:,:,depth], "full")

      elif len(input.shape) == 5:
        batches, _, h, w, depth = input.shape
        _, prevFilters, _, _, _ = grad.shape
        kernels_gradient = np.zeros(self.filtersShape)
        input_gradient = np.zeros(input.shape)
        for batch in range(self.batches):
          for prevFilter in range(prevFilters):
            for filter in range(self.numFilters):
              for depth in range(self.inputDepth):
                kernels_gradient[filter,:,:,depth] = signal.correlate2d(self.input[batch, filter, :, :, depth], grad[batch,prevFilter,:,:,depth], "valid")
                input_gradient[batch, filter, :, :, depth] += signal.convolve2d(grad[batch,prevFilter,:,:,depth], self.kernels[filter,:,:,depth], "full")
      
      self.kernels -= self.learning_rate * kernels_gradient
      self.biases -= self.learning_rate * grad
      return input_gradient

In [None]:
class Pooling2D():
    def __init__(self, kernel_size, stride, padding=0, pool_mode = 'max' ):
        self.kernel_size = kernel_size
        self.stride = stride
        self.padding = padding
        self.pool_mode = pool_mode

    def forward(self, input):
        batches, num_filters, h, w, depth = input.shape
        input = np.pad(input, self.padding, mode='constant')
        output_shape = (batches, num_filters, (h - self.kernel_size) // self.stride + 1, (w - self.kernel_size) // self.stride + 1, depth)
        output = np.zeros(output_shape)

        for batch in range(batches):
          for filter in range(num_filters):
            for depth in range(depth):
              shape_w = (output_shape[2], output_shape[3], self.kernel_size, self.kernel_size)
              strides_w = (self.stride*input.strides[2], self.stride*input.strides[3], input.strides[2], input.strides[3])
              A_w = as_strided(input, shape_w, strides_w)
              if self.pool_mode == 'max':
                  output[batch, filter, :, :, depth] = A_w.max(axis=(2, 3))

              elif self.pool_mode == 'avg':
                  output[batch, filter, :, :, depth] = A_w.mean(axis=(2, 3))
        return output

    def backward(self, input, grad, mode = "max"):
        batches, num_filters, n_H_prev, n_W_prev, depth = input.shape
        _, _, n_H, n_W, n_C = grad.shape
    
        input_gradient = np.zeros(input.shape)
          
        for i in range(batches): 
            for filter in range(num_filters):
              for h in range(n_H):                   
                for w in range(n_W):               
                  for c in range(n_C):           
                      vert_start = h*self.stride
                      vert_end = vert_start + self.kernel_size
                      horiz_start = w*self.stride
                      horiz_end = horiz_start + self.kernel_size
              
                      if mode == "max":
                          input_slice = input[i, filter, vert_start:vert_end, horiz_start:horiz_end, c]
                          mask = (input_slice == np.max(input_slice))
                          input_gradient[i, filter, vert_start:vert_end, horiz_start:horiz_end, c] += np.multiply(mask, grad[i, filter, h, w, c])
                              
                      elif mode == "average":
                          da = grad[i, filter, h, w, c]
                          shape = (self.kernel_size, self.kernel_size)
                          input_gradient[i, filter, vert_start:vert_end, horiz_start:horiz_end, c] += distribute_value(da, shape)
        return input_gradient

In [None]:
class Flatten():
    def __init__(self):
        pass

    def forward(self, input):
        self.prevShape = input.shape
        batches, num_filters, h, w, depth = input.shape
        return input.reshape(batches, num_filters*h*w*depth)

    def backward(self, input, grad):
        return grad.reshape(self.prevShape)

In [None]:
# ACTIVATION FUNCTIONS

class sigmoid():
    def forward(self, x):
      return 1 / (1 + np.exp(-X))
    def backward(self, x, grad):
      sig_grad = x * (1 - x)
      return grad * sig_grad

class relu():
    def forward(self, x):
      return x * (x > 0)
    def backward(self, x, grad):
      re_grad = 1 * (x > 0)
      return grad * re_grad

class tanh():
    def forward(self, x):
      return (np.exp(x) - np.exp(-x)) / (np.exp(x) + np.exp(-x))
    def backward(self, x, grad):
      tan_grad = 1 - (np.exp(x) - np.exp(-x)) / (np.exp(x) + np.exp(-x))**2
      return grad * tan_grad

In [None]:
def softmax_crossentropy_with_logits(logits, reference_answers):
    logits_for_answers = logits[np.arange(len(logits)),reference_answers]
    xentropy = -logits_for_answers + np.log(np.sum(np.exp(logits),axis=-1))
    return xentropy

def grad_softmax_crossentropy_with_logits(logits, reference_answers):
    ones_for_answers = np.zeros_like(logits)
    ones_for_answers[np.arange(len(logits)),reference_answers] = 1
    softmax = np.exp(logits) / np.exp(logits).sum(axis=-1, keepdims=True)
    return (- ones_for_answers + softmax) / logits.shape[0]

In [None]:
class myNetwork:
    def __init__(self):
        self.layers_instances = []

    def add_layer(self, layer):
        self.layers_instances.append(layer)

    def forward(self, inputs):
        self.inputs = [inputs]

        current_state = np.array(inputs)
        for layer in self.layers_instances:
          current_state = layer.forward(current_state)
          self.inputs.append(current_state)
        return current_state

    def backward(self, grad):
        current_state = grad
        for layer_index in range(len(self.layers_instances))[::-1]:
          layer = self.layers_instances[layer_index]
          current_state = layer.backward(self.inputs[layer_index], current_state)

In [None]:
def iterate_minibatches(inputs, targets, batchsize, shuffle=False):
    assert len(inputs) == len(targets)

    if shuffle:
        indices = np.random.permutation(len(inputs)) 

    for start_idx in trange(0, len(inputs) - batchsize + 1, batchsize, desc='minibatch iteration'):
        if shuffle:
            excerpt = indices[start_idx:start_idx + batchsize]
        else:
            excerpt = slice(start_idx, start_idx + batchsize)

        yield inputs[excerpt], targets[excerpt]

### Initialize and train neural network with MNIST dataset

In [None]:
(X_train, y_train), (X_test, y_test) = keras.datasets.mnist.load_data()

#X_train, X_test = np.reshape(X_train / 255., (60000, 28*28)), np.reshape(X_test /255. , (10000, 28 * 28))

X_train, X_test = X_train / 255, X_test /255
X_train = X_train.reshape(60000, 28, 28, 1)
X_test = X_test.reshape(10000, 28, 28, 1)

print(X_test.shape)
print(X_train.shape)
print(y_train.shape)

(10000, 28, 28, 1)
(60000, 28, 28, 1)
(60000,)


In [None]:
### full network training ###
net = myNetwork()

net.add_layer(Conv2D((32, 28, 28, 1), 4, (3,3)))
net.add_layer(relu())
net.add_layer(Flatten())
net.add_layer(myLayer(2704, 512))
net.add_layer(relu())
net.add_layer(myLayer(512, 64))
net.add_layer(relu())
net.add_layer(myLayer(64, 10))

for epoch in range(5):
    epoch_loss = []
    for x_batch, y_batch in iterate_minibatches(X_train, y_train, batchsize=32, shuffle=True):
 
        ### forward ###
        logits = net.forward(x_batch)

        ### loss calculation ###
        loss = softmax_crossentropy_with_logits(logits, y_batch)
        epoch_loss.append(np.mean(loss))

        ### backward ###
        loss_grad = grad_softmax_crossentropy_with_logits(logits, y_batch)

        net.backward(loss_grad)

    preds = []
    for x_batch, _ in iterate_minibatches(X_test, y_test, batchsize=1, shuffle=False):
        preds.append(np.argmax(net.forward(x_batch)))

    print(f'\nEpoch: {epoch + 1} \t Loss: {np.mean(epoch_loss)} \t Test accuracy: {accuracy_score(y_test, preds)} \n')

minibatch iteration: 100%|██████████| 1875/1875 [04:03<00:00,  7.70it/s]
minibatch iteration: 100%|██████████| 10000/10000 [01:01<00:00, 161.86it/s]



Epoch: 1 	 Loss: 1.0004405326792272 	 Test accuracy: 0.8898 



minibatch iteration: 100%|██████████| 1875/1875 [04:02<00:00,  7.72it/s]
minibatch iteration: 100%|██████████| 10000/10000 [01:01<00:00, 162.21it/s]



Epoch: 2 	 Loss: 0.32332958831091213 	 Test accuracy: 0.924 



minibatch iteration: 100%|██████████| 1875/1875 [04:03<00:00,  7.71it/s]
minibatch iteration: 100%|██████████| 10000/10000 [01:03<00:00, 157.27it/s]



Epoch: 3 	 Loss: 0.2321569134462044 	 Test accuracy: 0.9429 



minibatch iteration: 100%|██████████| 1875/1875 [04:02<00:00,  7.74it/s]
minibatch iteration: 100%|██████████| 10000/10000 [01:04<00:00, 155.59it/s]



Epoch: 4 	 Loss: 0.1759306098119565 	 Test accuracy: 0.9559 



minibatch iteration: 100%|██████████| 1875/1875 [04:04<00:00,  7.66it/s]
minibatch iteration: 100%|██████████| 10000/10000 [01:02<00:00, 160.17it/s]



Epoch: 5 	 Loss: 0.1405370052916032 	 Test accuracy: 0.9542 

