<a href="https://colab.research.google.com/github/BiroAd/cnn_from_scratch/blob/main/cnn_from_scratch.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
from keras.utils.np_utils import to_categorical
import time

In [None]:
class Layer:
    def forward(self, params, training=True):
        raise NotImplementedError

    def backward(self, backward):
        raise NotImplementedError

    def update_params(self, iteration, learning_rate, beta1=0.9, beta2=0.999):
        return


class Convolutional_layer(Layer):
    def __init__(self, kernel_num, kernel_size, dimensionOfSample):
        self.input = None
        self.kernel_num = kernel_num
        self.kernel_size = kernel_size
        self.bias = np.zeros(kernel_num)
        self.kernels = np.random.randn(kernel_num, dimensionOfSample, kernel_size, kernel_size) * np.sqrt(
            2 / (dimensionOfSample * kernel_size ** 2))
        self.kernels_m = np.zeros_like(self.kernels)
        self.bias_m = np.zeros_like(self.bias)
        self.kernels_v = np.zeros_like(self.kernels)
        self.bias_v = np.zeros_like(self.bias)

    def patches_generator(self, input, kernel_size):
        image_sample, image_channels, image_h, image_w = input.shape
        for h in range(image_h - kernel_size + 1):
            for w in range(image_w - kernel_size + 1):
                patch = input[:, :, h:(h + kernel_size), w:(w + kernel_size)]
                yield patch, h, w

    def forward(self, forwardInput, training=True):
        self.input = forwardInput
        self.conv_output = np.zeros(
            (self.input.shape[0], self.kernel_num, self.input.shape[2], self.input.shape[3]))
        paddedInput = np.pad(forwardInput, [(0, 0), (0, 0), (1, 1), (1, 1)], mode='constant')

        for patch, h, w in self.patches_generator(paddedInput, self.kernel_size):
            self.conv_output[:, :, h, w] = np.einsum('ijkl,mjkl->im', patch, self.kernels) + self.bias
        endt = time.time()
        return self.conv_output

    def backward(self, dLdZ):
        dX = np.zeros_like(self.input, dtype=float)
        self.dF = np.zeros_like(self.kernels, dtype=float)
        paddedInput = np.pad(self.input, [(0, 0), (0, 0), (1, 1), (1, 1)], mode='constant')
        flipped_kernel = np.flip(self.kernels, (2, 3))
        paddedOutput = np.pad(dLdZ, [(0, 0), (0, 0), (1, 1), (1, 1)], mode="constant")
        for patch, h, w in self.patches_generator(paddedInput, dLdZ.shape[2]):
            self.dF[:, :, h, w] = np.einsum('ijkl,imkl->mj', patch, dLdZ)
        for patch, h, w in self.patches_generator(paddedOutput, self.kernel_size):
            dX[:, :, h, w] = np.einsum('ijkl,jmkl->im', patch, flipped_kernel)

        self.dB = np.einsum('ijkl->j', dLdZ)
        return dX

    def update_params(self, iteration, learning_rate, beta1=0.9, beta2=0.999):
        self.kernels_m = beta1 * self.kernels_m + (1 - beta1) * self.dF
        self.bias_m = beta1 * self.bias_m + (1 - beta1) * self.dB
        kernels_m_hat = self.kernels_m / (1 - beta1 ** (iteration + 1))
        bias_m_hat = self.bias_m / (1 - beta1 ** (iteration + 1))
        self.kernels_v = beta2 * self.kernels_v + (1 - beta2) * self.dF ** 2
        self.bias_v = beta2 * self.bias_v + (1 - beta2) * self.dB ** 2
        kernels_v_hat = self.kernels_v / (1 - beta2 ** (iteration + 1))
        bias_v_hat = self.bias_v / (1 - beta2 ** (iteration + 1))

        self.kernels -= learning_rate * kernels_m_hat / (np.sqrt(kernels_v_hat) + 1e-6)
        self.bias -= learning_rate * bias_m_hat / (np.sqrt(bias_v_hat) + 1e-6)


class MaxPoolingLayer(Layer):
    def patches_generator(self, input):
        image_sample, image_channels, image_h, image_w = input.shape
        self.image = input
        for h in range(image_h // 2):
            for w in range(image_w // 2):
                patch = input[:, :, 2 * h:2 * h + 2, 2 * w:2 * w + 2]
                yield patch, h, w

    def forward(self, ForwardInput, training=True):
        self.input = ForwardInput
        self.mask = np.zeros_like(ForwardInput, dtype=float)
        self.forward_output = np.zeros(
            (ForwardInput.shape[0], ForwardInput.shape[1], ForwardInput.shape[2] // 2, ForwardInput.shape[3] // 2),
            dtype=float)
        for patch, h, w in self.patches_generator(self.input):
            max = np.max(patch, axis=(2, 3), keepdims=True)
            expanded_max = np.repeat(np.repeat(max, 2, axis=2), 2, axis=3)
            self.forward_output[:, :, h:h + 1, w:w + 1] = max
            self.mask[:, :, 2 * h:2 * h + 2, 2 * w:2 * w + 2] = np.equal(expanded_max, self.input[:, :, 2 * h:2 * h + 2,
                                                                                       2 * w:2 * w + 2]).astype(int)
        return self.forward_output

    def backward(self, backward):
        backward_repeated = np.repeat(np.repeat(backward, 2, axis=2), 2, axis=3)
        return np.array(backward_repeated * self.mask, dtype='float32')


class ReLU(Layer):
    def forward(self, S, training=True):
        self.A = S
        return np.where(S > 0, S, 0)

    def backward(self, dLdA):
        return np.where(self.A > 0, 1, 0) * dLdA


class LeakyReLU(Layer):
    def __init__(self, alpha=0.01):
        self.alpha = alpha
        self.input = None

    def forward(self, X, training=True):
        self.input = X
        return np.where(X >= 0, X, self.alpha * X)

    def backward(self, dLdA):
        return np.where(self.input >= 0, dLdA, self.alpha * dLdA)


class Flatten(Layer):
    def forward(self, A, training=True):
        self.N, self.C, self.H, self.W = A.shape
        reshaped = np.zeros((self.C * self.H * self.W, self.N), dtype='float32')
        for i in np.arange(0, self.N):
            reshaped[:, i:i + 1] = np.reshape(A[i], (self.C * self.H * self.W, 1))
        return reshaped

    def backward(self, dLdS):
        back_shape = np.zeros((self.N, self.C, self.H, self.W), dtype='float32')
        for i in np.arange(0, dLdS.shape[1]):
            back_shape[i:i + 1] = np.reshape(dLdS[:, i:i + 1], (self.C, self.H, self.W))
        return back_shape


class Linear(Layer):
    def __init__(self, m, n):
        self.dLdW0 = None
        self.dLdW = None
        self.A = None
        self.m, self.n = (m, n)
        self.W0 = np.zeros([n, 1])
        self.W = np.random.randn(m, n) * np.sqrt(2 / m)
        self.weight_m = np.zeros_like(self.W)
        self.bias_m = np.zeros_like(self.W0)
        self.weight_v = np.zeros_like(self.W)
        self.bias_v = np.zeros_like(self.W0)

    def forward(self, A, training=True):
        self.A = A
        return np.transpose(self.W) @ self.A + self.W0

    def backward(self, dLdZ):
        self.dLdW = self.A @ np.transpose(dLdZ)
        self.dLdW0 = np.sum(dLdZ, axis=1, keepdims=True)
        return self.W @ dLdZ

    def update_params(self, iteration, learning_rate, beta1=0.9, beta2=0.999):
        self.weight_m = beta1 * self.weight_m + (1 - beta1) * self.dLdW
        self.bias_m = beta1 * self.bias_m + (1 - beta1) * self.dLdW0
        weight_m_hat = self.weight_m / (1 - beta1 ** (iteration + 1))
        bias_m_hat = self.bias_m / (1 - beta1 ** (iteration + 1))
        self.weight_v = beta2 * self.weight_v + (1 - beta2) * self.dLdW ** 2
        self.bias_v = beta2 * self.bias_v + (1 - beta2) * self.dLdW0 ** 2
        weight_v_hat = self.weight_v / (1 - beta2 ** (iteration + 1))
        bias_v_hat = self.bias_v / (1 - beta2 ** (iteration + 1))

        self.W -= learning_rate * weight_m_hat / (np.sqrt(weight_v_hat) + 1e-10)
        self.W0 -= learning_rate * bias_m_hat / (np.sqrt(bias_v_hat) + 1e-10)


class SoftMax(Layer):
    def forward(self, Z, training=True):
        values = np.exp(Z)
        return values / np.sum(values, axis=0, keepdims=True)

    def backward(self, dLdZ):
        return dLdZ

    def class_fun(self, Ypred):
        return np.argmax(Ypred, axis=0)


class Dropout(Layer):
    def __init__(self, dropout_rate):
        self.dropout_rate = dropout_rate

    def forward(self, X, training=True):
        if training:
            self.mask = np.random.binomial(1, 1 - self.dropout_rate, size=X.shape) / (1 - self.dropout_rate)
            return X * self.mask
        else:
            return X

    def backward(self, dA):
        return dA * self.mask


class Network:
    def __init__(self, layers):
        self.layers = layers
        self.iteration = 0
        self.learning_rate = 0.005

    def update_layer_params(self):
        self.iteration += 1
        for m in self.layers:
            m.update_params(self.iteration, self.learning_rate)

    def forward(self, Xt):
        for m in self.layers:
            Xt = m.forward(Xt)
        return Xt

    def backward(self, loss):
        for m in reversed(self.layers):
            loss = m.backward(loss)

    def train(self, X, Y):
        Ypred = self.forward(X)
        err = NLL_backward(Ypred, Y)
        sum_loss = NLL_forward(Ypred, Y)
        self.backward(err)
        self.update_layer_params()
        print(sum_loss)
        acc_training = self.prediction(Ypred, Y)
        return acc_training, sum_loss

    def prediction(self, Ypred, Y):
        max_index_pred = self.layers[-1].class_fun(Ypred)
        max_index_original = self.layers[-1].class_fun(Y)
        acc = np.sum(np.equal(max_index_pred, max_index_original).astype(int))
        return acc

    def predict(self, X, Y):
        Ypred = self.forward(X)
        acc = self.prediction(Ypred, Y)
        acc /= Ypred.shape[1]
        loss = NLL_forward(Ypred, Y)
        print(acc)
        return acc, loss

    def batch_gradient_descent(self, X_train, Y_train, X_test, Y_test, epochs, batch_size):
        validation_X = X_train[40000:]
        validation_Y = Y_train[40000:]
        X_train = X_train[:40000]
        Y_train = Y_train[:40000]
        validation_loss = []
        validation_acc = []
        train_loss = []
        train_acc = []
        for epochs in range(epochs):
            train_loss_batches = []
            train_acc_batches = []
            mini_batch_X, mini_batch_Y = self.mini_batch_generator(X_train, Y_train, batch_size)
            iterator = 0
            for data, label in zip(mini_batch_X, mini_batch_Y):
                acc, loss = self.train(data, label.T)
                train_loss_batches.append(loss)
                train_acc_batches.append(acc)
                iterator+=1
                print(iterator)
            train_loss.append(np.sum(train_loss_batches) / 40000.0)
            train_acc.append(np.sum(train_acc_batches) / 40000.0)
            acc, loss = self.predict(validation_X, validation_Y.T)
            validation_acc.append(acc)
            validation_loss.append(loss / 10000.0)
            print('Train_loss: ' + str(train_loss[epochs]) + '   ' + 'Train_accuracy:  ' + str(train_acc[epochs]) + '   ' +
                  'Validation_loss: ' + str(loss) + '   ' + 'Validation_accuracy: ' + str(acc))

        fig, axs = plt.subplots(2)
        fig.suptitle('Losses and accuracy')
        axs[0].plot(train_loss, validation_loss)
        axs[1].plot(train_acc, validation_acc)

        acc, loss = self.predict(X_test, Y_test)
        print("The test accuracy: " + str(acc) + ".  " + "The test loss: " + str(loss))

    def mini_batch_generator(self, train_X, train_Y, batch_size):
        num_data = train_X.shape[0]
        num_batch = num_data // batch_size

        indices = np.random.permutation(num_data)
        train_X = train_X[indices]
        train_Y = train_Y[indices]
        mini_batch_X = np.array(np.split(train_X, num_batch, axis=0))
        mini_batch_Y = np.array(np.split(train_Y, num_batch, axis=0))
        return mini_batch_X, mini_batch_Y


def NLL_forward(Ypred, Y):
    return np.sum(-Y * np.log(Ypred + 1e-10))


def NLL_backward(Ypred, Y):
    return Ypred - Y

In [None]:
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.cifar10.load_data()

x_train = x_train.astype('float32')
x_test = x_test.astype('float32')
x_train = np.transpose(x_train, (0, 3, 2, 1)) / 255.0
x_test = np.transpose(x_test, (0, 3, 2, 1)) / 255.0
y_train = to_categorical(y_train, num_classes=10)
y_test = to_categorical(y_test, num_classes=10)

network = Network(
        [Convolutional_layer(20, 3, 3), ReLU(), MaxPoolingLayer(), Convolutional_layer(15, 3, 20),
         ReLU(), MaxPoolingLayer(), Convolutional_layer(10, 3, 15), ReLU(), Convolutional_layer(6, 3, 10),
         ReLU(), MaxPoolingLayer(), Flatten(), Linear(96, 40), ReLU(), Linear(40, 10), SoftMax()])

network.batch_gradient_descent(x_train, y_train, x_test, y_test, 15, 64)

274.78859579153936
1
186.47141115443168
2
157.93750403136954
3
158.0794567790229
4
152.1387754517114
5
147.99053325865597
6
148.94665637585626
7
147.58232504961305
8
147.91903628973404
9
147.1627556746133
10
147.82900753397973
11
147.8424033159298
12
146.8854235209409
13
146.875790365686
14
148.5941717369672
15
147.00074409925372
16
146.7510106473494
17
146.36392112730735
18
146.8277826954349
19
147.99653835554295
20
146.85232351237585
21
147.3829351792111
22
148.12322369771076
23
147.56998608895856
24
147.98203593767036
25
147.00356556182308
26
146.55807936396144
27
145.9026978215968
28
146.82215439637145
29
146.9807608298945
30
146.51752734650358
31
145.50533840159778
32
146.41130543255315
33
145.56002196838944
34
146.10011295509258
35
145.16139400103862
36
144.7252789058084
37
147.18509710492543
38
143.89952382478242
39
144.00579639836792
40
142.9052785893517
41
144.02545353851727
42
144.93940764031038
43
139.1527318492368
44
143.34344336644847
45
143.8593540562988
46
139.0376694306