### Libraries

In [1]:
import math
import time 
import numpy as np
import sympy as sym
import scipy.special
import tensorflow as tf
from tensorflow.keras import *
import matplotlib.pyplot as plt
from keras.datasets import mnist
from tensorflow.keras.layers import Dense, Dropout, Flatten
import numpy as np
import matplotlib.pyplot as plt
import cv2
import os
import random
import pickle
from tensorflow.keras.applications.vgg16 import VGG16

In [2]:
def check_gpu():
  %tensorflow_version 2.x
  device_name = tf.test.gpu_device_name()
  if device_name != '/device:GPU:0':
    # raise SystemError('GPU device not found')
    return False
  else:
    print('Found GPU at: {}'.format(device_name)) 
    return True

def conv_to_latex(data_acc):
  latex_data_points_batch_acc = ''
  latex_data_points_time_batch = ''
  for i in data_acc:
    latex_data_points_batch_acc += f' {(i[0], i[1])}'
    latex_data_points_time_batch += f' {round(i[2]/60, 2), i[0]}'
  return(latex_data_points_batch_acc, latex_data_points_time_batch)


## Data sets 

In [3]:
from google.colab import drive
drive.mount('/content/gdrive/')

Mounted at /content/gdrive/


### Mnist

In [4]:
(x_test, y_test), (x_train, y_train) = mnist.load_data()

x_train, x_test = x_train/255.0, x_test/255.0

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz


### EuroSat

In [5]:
pickle_in = open("./gdrive/My Drive/Data/EuroSat/X_euroSat.pickle", "rb")
X = pickle.load(pickle_in)

pickle_in = open("./gdrive/My Drive/Data/EuroSat/Y_euroSat.pickle", "rb")
Y = pickle.load(pickle_in)


In [6]:
val_split = int(round(len(X)*0.8))
X_train, Y_train = X[:val_split], Y[:val_split] 
X_test, Y_test = X[val_split:], Y[val_split:]

x_train = np.array(X_train[:10000])
y_train = np.array(Y_train[:10000])

x_test = np.array(X_test[:2000])
y_test = np.array(Y_test[:2000])

x_train, x_test = x_train/255.0, x_test/255.0

## From scratch

### Vanilla NN from Scratch

In [7]:
class NeuralNetwork:
    class Architecture:
        class ArchitectureLayer:
            def __init__(self, nodes, activation_function):
                self.__nodes = nodes
                self.__af = activation_function

            def call_activation_function(self, x):
                if self.__af == "sigmoid":
                    return 1 / (1 + np.exp(-x))

            def __get_nodes(self):
                return self.__nodes

            nodes = property(__get_nodes)

        class ArchitectureNetwork:
            def __init__(self, layers, learning_rate):
                self.__layers = layers
                self.__lr = learning_rate
                self.__w = [np.random.rand(self.__layers[layer + 1].nodes, self.__layers[layer].nodes) - 0.5
                            for layer in range(len(self.__layers) - 1)]

            def __get_weights(self):
                return self.__w

            def __get_layers(self):
                return self.__layers

            def __get_learning_rate(self):
                return self.__lr

            def __set_weights(self, w):
                self.__w = w

            layers = property(__get_layers)
            lr = property(__get_learning_rate)
            w = property(__get_weights, __set_weights)

    class Training:
        def __init__(self, model):
            self.__model = model

        def train(self, train_images, train_labels):
            for image, label in zip(train_images, train_labels):
                image = image * 0.99 + 0.1
                targets = np.zeros(self.__model.layers[-1].nodes) + 0.1
                targets[label] = 0.99
                inputs = np.array(image.flatten(), ndmin=2).T
                targets = np.array(targets, ndmin=2).T
                inputs_array = [inputs]
                for i in range(len(self.__model.layers) - 1):
                    signal = np.dot(self.__model.w[i], inputs)
                    inputs = self.__model.layers[i].call_activation_function(signal)
                    inputs_array.append(inputs)
                error_output = targets - inputs
                for k in range(len(self.__model.layers)-2, -1, -1):
                    self.__model.w[k] += self.__model.lr * np.dot((error_output *
                                        inputs_array[k+1] * (1 - inputs_array[k+1])), np.transpose(inputs_array[k]))
                    error_output = np.dot(self.__model.w[k].T, error_output)
            return self.__model

        def epoch_training(self, train_images, train_labels, epochs, test_images, test_labels):
            accuracies = []
            for epoch in range(epochs):
                # print(f'Epoch {epoch+1}/{epochs}')
                shuffle_data = np.random.permutation(len(train_images))
                train_images = train_images[shuffle_data]
                train_labels = train_labels[shuffle_data]
                NeuralNetwork.Training.train(NeuralNetwork.Training(self.__model), train_images, train_labels)
                val_accuracy = NeuralNetwork.Evaluation.evaluate(NeuralNetwork.Evaluation(self.__model), test_images, test_labels)
                accuracies.append(val_accuracy)
                # print(f'Accuracy: {val_accuracy}')
            return self.__model, accuracies

    class Evaluation:
        def __init__(self, model):
            self.__model = model

        def evaluate(self, test_images, test_labels):
            accuracy = 0
            for image, label in zip(test_images, test_labels):
                image = image * 0.99 + 0.1
                inputs = np.array(image.flatten(), ndmin=2).T
                for i in range(len(self.__model.layers) - 1):
                    signal = np.dot(self.__model.w[i], inputs)
                    inputs = self.__model.layers[i].call_activation_function(signal)
                inputs = np.argmax(inputs)
                if inputs == label:
                    accuracy += 1
            return accuracy / len(test_images)
        
        

### CNN from scratch

In [8]:
class ConvolutionalNeuralNetwork:  # NeuralNetwork
    class Conv_op:
        def __init__(self, num_filters, filter_size, channels=3):
            self.__num_filters = num_filters
            self.__filter_size = filter_size
            self.__channels = channels
            self.__conv_filter = np.random.randn(self.__num_filters, self.__channels, filter_size, filter_size) \
                               / (filter_size * filter_size)

        def image_region(self, image):
            height, width = image.shape[:2]  
            self.__image = image
            for j in range(height - self.__filter_size + 1):
                for k in range(width - self.__filter_size + 1):
                    if self.__channels > 1:
                        image_patch = image[j:(j + self.__filter_size), k:(k + self.__filter_size), :]
                        yield image_patch, j, k
                    else:
                        image_patch = image[j:(j + self.__filter_size), k:(k + self.__filter_size)]
                        yield image_patch, j, k

        def forward_prop(self, image):
            height, width = image.shape[:2]
            conv_out = np.zeros((height - self.__filter_size + 1, width - self.__filter_size + 1, self.__num_filters))
            for image_patch, i, j in self.image_region(image):
                conv_out[i, j] = np.sum(image_patch * self.__conv_filter, axis=(1, 2, 3))
            return conv_out

        def back_prop(self, dL_dout, learning_rate):
            dL_dF_params = np.zeros(self.__conv_filter.shape)
            for image_patch, i, j in self.image_region(self.__image):
                for k in range(self.__num_filters):
                    dL_dF_params[k] += image_patch * dL_dout[i, j, k]
            self.__conv_filter -= learning_rate * dL_dF_params
            return dL_dF_params

    class Max_Pool:
        def __init__(self, filter_size):
            self.__filter_size = filter_size

        def image_region(self, image):
            new_height = image.shape[0] // self.__filter_size
            new_width = image.shape[1] // self.__filter_size
            self.__image = image
            for i in range(new_height):
                for j in range(new_width):
                    image_patch = image[(i * self.__filter_size): (i * self.__filter_size + self.__filter_size),
                                  (j * self.__filter_size):(j * self.__filter_size + self.__filter_size)]
                    yield image_patch, i, j

        def forward_prop(self, image):
            height, width, num_filters = image.shape
            output = np.zeros((height // self.__filter_size, width // self.__filter_size, num_filters))
            for image_patch, i, j in self.image_region(image):
                output[i, j] = np.amax(image_patch, axis=(0, 1))
            return output

        def back_prop(self, dL_dout):
            dL_dmax_pool = np.zeros(self.__image.shape)
            for image_patch, i, j in self.image_region(self.__image):
                height, width, num_filters = image_patch.shape
                maximum_val = np.amax(image_patch, axis=(0, 1))
                for i1 in range(height):
                    for j1 in range(width):
                        for k1 in range(num_filters):
                            if image_patch[i1, j1, k1] == maximum_val[k1]:
                                dL_dmax_pool[i * self.__filter_size + i1, j * self.__filter_size + j1, k1] = dL_dout[i, j, k1]
                return dL_dmax_pool

    class Softmax:
        def __init__(self, input_node, softmax_node):
            self.__weights = np.random.randn(input_node, softmax_node) / input_node
            self.__bias = np.zeros(softmax_node)
            self.__softmax_nodes = softmax_node

        def forward_prop(self, image):
            self.__orig_im_shape = image.shape   
            image_modified = image.flatten()
            self.__modified_imput = image_modified 
            output_val = np.dot(image_modified, self.__weights) + self.__bias
            self.__out = output_val    
            exp_out = np.exp(output_val)
            return exp_out / np.sum(exp_out, axis=0)

        def back_prop(self, dL_dout, learning_rate):
            for i, grad in enumerate(dL_dout):
                if grad == 0:
                    continue
                transformation_eq = np.exp(self.__out)
                S_total = np.sum(transformation_eq)
                dy_dz = -transformation_eq[i] * transformation_eq / (S_total ** 2)
                dy_dz[i] = transformation_eq[i] * (S_total - transformation_eq[i]) / (S_total ** 2)
                dz_dw = self.__modified_imput
                dz_db = 1
                dz_d_inp = self.__weights
                dL_dz = grad * dy_dz
                dL_dw = dz_dw[np.newaxis].T @ dL_dz[np.newaxis]
                dL_db = dL_dz * dz_db
                dL_d_inp = dz_d_inp @ dL_dz
                self.__weights -= learning_rate * dL_dw
                self.__bias -= learning_rate * dL_db
                return dL_d_inp.reshape(self.__orig_im_shape)

        def __get_softmax_nodes(self):
            return self.__softmax_nodes

        snodes = property(__get_softmax_nodes)

    class Training:
        def __init__(self, conv, pool, softmax):
            self.__conv = conv
            self.__pool = pool
            self.__softmax = softmax

        def cnn_forward_prop(self, image, label):
            out_p = self.__conv.forward_prop(image - 0.5)
            out_p = self.__pool.forward_prop(out_p)
            out_p = self.__softmax.forward_prop(out_p)
            cross_ent_loss = -np.log(out_p[label])
            if np.argmax(out_p) == label:
                accuracy_eval = 1
            else:
                accuracy_eval = 0
            return out_p, cross_ent_loss, accuracy_eval

        def train_cnn(self, image, label, learning_rate=.005):
            out, loss, acc = self.cnn_forward_prop(image, label)
            gradient = np.zeros(self.__softmax.snodes)
            gradient[label] = -1 / out[label]
            grad_back = self.__softmax.back_prop(gradient, learning_rate)
            grad_back = self.__pool.back_prop(grad_back)
            self.__conv.back_prop(grad_back, learning_rate)
            return loss, acc

        def epochs_training(self, epochs, train_images, train_labels, test_images, test_labels):
            accuracies = []
            model = ConvolutionalNeuralNetwork.Training(self.__conv, self.__pool, self.__softmax)  # NN
            for epoch in range(epochs):
                print(f'Epoch {epoch+1}/{epochs}')
                shuffle_data = np.random.permutation(len(train_images))
                train_images = train_images[shuffle_data]
                train_labels = train_labels[shuffle_data]
                loss = 0
                num_correct = 0
                for i, (im, label) in enumerate(zip(train_images, train_labels)):
                    if i % 100 == 0:
                        print(f'{int(i / 100)+1} steps out of {int(len(train_images)/100)} '
                              f'steps: Accuracy: {num_correct / 100}')
                        loss = 0
                        num_correct = 0
                    l, accu = self.train_cnn(im, label)
                    loss += l
                    num_correct += accu
                accuracy = ConvolutionalNeuralNetwork.Evaluation.evaluate_cnn(ConvolutionalNeuralNetwork.Evaluation(model), test_images, test_labels)
                accuracies.append(accuracy)
            return accuracies

    class Evaluation:
        def __init__(self, model):
            self.__model = model

        def evaluate_cnn(self, test_images, test_labels):
            loss = 0
            num_correct = 0
            for im, label in zip(test_images, test_labels):
                _, l, accu = ConvolutionalNeuralNetwork.Training.cnn_forward_prop(self.__model, im, label)
                loss += l
                num_correct += accu
            num_tests = len(test_images)
            accuracy = num_correct / num_tests
            # print('Test Loss:', loss / num_tests)
            print('Test Accuracy:', accuracy)
            return accuracy


## Testing

### Approach Mnist

In [9]:
def v_nn_s(train_images, train_labels, test_images, test_labels):
  print(f'Datasize: {batch[0]}')
  layers = [NeuralNetwork.Architecture.ArchitectureLayer(784, "sigmoid"),
          NeuralNetwork.Architecture.ArchitectureLayer(100, "sigmoid"),
          NeuralNetwork.Architecture.ArchitectureLayer(10, "sigmoid")]

  nn = NeuralNetwork.Architecture.ArchitectureNetwork(layers, 0.1)
  nn, accuracies = NeuralNetwork.Training.epoch_training(NeuralNetwork.Training(nn), train_images, train_labels, epochs, test_images, test_labels)
  return accuracies[epochs-1]

def v_nn_tf(train_images, train_labels, test_images, test_labels):
  print(f'Datasize: {batch[0]}')
  model = tf.keras.models.Sequential([
          tf.keras.layers.Flatten(input_shape=(28, 28)),
          tf.keras.layers.Dense(784, activation='sigmoid'),
          tf.keras.layers.Dense(100, activation='sigmoid'),
          # tf.keras.layers.Dropout(0.05),
          tf.keras.layers.Dense(10, activation='sigmoid')
          ])

  model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

  history = model.fit(train_images, train_labels, epochs=epochs, validation_data=(test_images, test_labels))

  test_loss, test_acc = model.evaluate(test_images,  test_labels)
  return test_acc
 

def cnn_s(train_images, train_labels, test_images, test_labels):
  print(f'Datasize: {batch[0]}')
  conv = ConvolutionalNeuralNetwork.Conv_op(4, 3, channels=1)        
  pool = ConvolutionalNeuralNetwork.Max_Pool(2)                      
  softmax = ConvolutionalNeuralNetwork.Softmax(13*13*4, 10)         

  model = ConvolutionalNeuralNetwork.Training(conv, pool, softmax)
  accuracies = ConvolutionalNeuralNetwork.Training.epochs_training(model, epochs, train_images, train_labels, test_images, test_labels)
  return accuracies[epochs-1]

def cnn_tf(train_images, train_labels, test_images, test_labels):
  print(f'Datasize: {batch[0]}')
  model = tf.keras.models.Sequential([
          tf.keras.layers.Flatten(input_shape=(28, 28, 1)),
          tf.keras.layers.Dense(784, activation='sigmoid'),
          tf.keras.layers.Dense(100, activation='sigmoid'),
          # tf.keras.layers.Dropout(0.05),
          tf.keras.layers.Dense(10, activation='sigmoid')
          ])

  model.compile(optimizer='adam', loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), metrics=['accuracy'])

  history = model.fit(train_images, train_labels, epochs=epochs, validation_data=(test_images, test_labels))
  test_loss, test_acc = model.evaluate(test_images,  test_labels)
  return test_acc


### Approach EuroSat

In [11]:
def cnn_s(train_images, train_labels, test_images, test_labels):
  print('')
  print(f'Datasize: {batch[0]}')
  conv = ConvolutionalNeuralNetwork.Conv_op(4, 3, channels=3)        
  pool = ConvolutionalNeuralNetwork.Max_Pool(2)                      
  softmax = ConvolutionalNeuralNetwork.Softmax(39*39*4, 10)          

  model = ConvolutionalNeuralNetwork.Training(conv, pool, softmax)
  accuracies = ConvolutionalNeuralNetwork.Training.epochs_training(model, epochs, train_images, train_labels, test_images, test_labels)
  return accuracies[epochs-1]

def cnn_tf(train_images, train_labels, test_images, test_labels):
  print(f'Datasize: {batch[0]}')
  model = models.Sequential()
  model.add(layers.Conv2D(4, (3, 3), activation='relu', input_shape=(80, 80, 3)))
  model.add(layers.MaxPooling2D((2, 2)))
  model.add(layers.Flatten())
  model.add(layers.Dense(10, activation='sigmoid'))

  model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

  history = model.fit(train_images, train_labels, epochs=20, validation_split=0.2)
  test_loss, test_acc = model.evaluate(test_images,  test_labels)
  return test_acc

def p_nn(train_images, train_labels, test_images, test_labels):
  print(f'Datasize: {batch[0]}')
  model = models.Sequential()
  base_model = VGG16(input_shape = (80, 80, 3), include_top = False, weights = 'imagenet')
  model.add(base_model)
  model.add(layers.Flatten())
  model.add(layers.Dense(10, activation='sigmoid'))
  base_model.trainable = False

  model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), optimizer='Adam', metrics=['accuracy'])

  history = model.fit(train_images, train_labels, epochs=20,validation_split=0.2) 
  test_loss, test_acc = model.evaluate(test_images,  test_labels)
  return test_acc 



### Optimized Models 

In [12]:
def cnn_tf(train_images, train_labels, test_images, test_labels):
  print('')
  print(f'Datasize: {batch[0]}')
  model = models.Sequential()
  model.add(layers.Conv2D(64, (3, 3), activation='relu', input_shape=(80, 80, 3)))
  model.add(layers.MaxPooling2D((2, 2)))
  model.add(layers.Conv2D(64, (3, 3), activation='relu'))
  model.add(layers.MaxPooling2D((2, 2)))
  model.add(layers.Conv2D(128, (3, 3), activation='relu'))
  model.add(layers.MaxPooling2D((2, 2)))
  model.add(Dropout(0.5))
  model.add(layers.Flatten())
  model.add(layers.Dense(512, activation='relu'))
  model.add(Dropout(0.5))
  model.add(layers.Dense(10, activation='sigmoid'))

  model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

  history = model.fit(train_images, train_labels, epochs=30, validation_split=0.2)
  test_loss, test_acc = model.evaluate(test_images,  test_labels)
  return test_acc

def p_nn(train_images, train_labels, test_images, test_labels):
  print('')
  print(f'Datasize: {batch[0]}')
  model = models.Sequential()
  base_model = VGG16(input_shape = (80, 80, 3), include_top = False, weights = 'imagenet')
  model.add(base_model)
  model.add(layers.Flatten())
  model.add(layers.Dense(256, activation='relu'))
  model.add(Dropout(0.5))
  model.add(layers.Dense(10, activation='sigmoid'))
  base_model.trainable = False

  model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), optimizer='Adam', metrics=['accuracy'])

  history = model.fit(train_images, train_labels, epochs=30, validation_split=0.2) 
  test_loss, test_acc = model.evaluate(test_images,  test_labels)
  return test_acc 

### Function calles 

In [13]:
def call_func(train_images, train_labels, test_images, test_labels, train_images_r, test_images_r):
  if func == 'v_nn_s':
    accuracies = v_nn_s(train_images, train_labels, test_images, test_labels)
  elif func == 'v_nn_tf':
    accuracies = v_nn_tf(train_images, train_labels, test_images, test_labels)
  elif func == 'cnn_s':
    accuracies = cnn_s(train_images, train_labels, test_images, test_labels)
  elif func == 'cnn_tf':
    accuracies = cnn_tf(train_images_r, train_labels, test_images_r, test_labels)
  elif func == 'p_nn':
    accuracies = p_nn(train_images, train_labels, test_images, test_labels)
  return accuracies


### Testing Mnist 

In [None]:
data = [10, 50, 100, 500, 1000, 5000, 10000]
data_acc = [[elem] for elem in data]
approach_list = ['cnn_s'] # ['v_nn_s', 'v_nn_tf', 'cnn_s', 'cnn_tf'] # 'v_nn_s', 'v_nn_tf', 'cnn_s', 'cnn_tf', 'p_nn'

latex_data = []
epochs = 5
r = 5  # robustness

gpu = check_gpu()

for func in approach_list:
  for _ in range(r):
    for batch in data_acc:
      print('')
      print(f'Datasize: {batch[0]}, Repetition: {_+1}')
      train_images = x_train[:batch[0]]
      train_labels = y_train[:batch[0]]
      test_images = x_test[:int(batch[0]*0.2)]
      test_labels = y_test[:int(batch[0]*0.2)]

      if train_images.shape[1:] == (28, 28):
        train_images_r = train_images.reshape(train_images.shape[0], 28, 28, 1)
        test_images_r = test_images.reshape(test_images.shape[0], 28, 28, 1)
      else:
        train_images_r, test_images_r = train_images, test_images

      start = time.perf_counter()

      # select approach:
      if gpu:
        with tf.device('/device:GPU:0'):
          accuracy = call_func(train_images, train_labels, test_images, test_labels, train_images_r, test_images_r)
      else:
        accuracy = call_func(train_images, train_labels, test_images, test_labels, train_images_r, test_images_r)

      # process data 
      if _ > 0 and _ != r-1:
        batch[1] += accuracy 
        batch[2] += time.perf_counter() - start
      elif _ == r-1:
        batch[1] += accuracy
        batch[2] += time.perf_counter() - start
        batch[1], batch[2] = batch[1]/r, batch[2]/r
      else:
        batch.append(accuracy)
        batch.append(time.perf_counter() - start)

  print('')
  latex_data.append(f'Appraoch: {func} with (batch, accuracy), (time, batch) {conv_to_latex(data_acc)}')
  print(latex_data)


In [None]:
for a in latex_data:
  print(a)