# Main

## import packages

In [11]:
import numpy as np
import pandas as pd
import idx2numpy

import matplotlib.pyplot as plt
import plotly.graph_objects as go

from sklearn.metrics import accuracy_score
from sklearn.preprocessing import OneHotEncoder
from sklearn.model_selection import train_test_split

import PIL

import time

In [12]:
def calculate_execution_time(func):
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        print(f"Execution time of {func.__name__}: {end_time - start_time} seconds")
        return result
    return wrapper

## class for the Activation functions

In [13]:
class Activation:
    def __init__(self, name):
        self.name = name
        if name == 'sigmoid':
            self.function = self.exponent
            self.derivative = self.exponent_der
        elif name == 'tanh':
            self.function = lambda x: np.tanh(x)
            self.derivative = lambda x: 1 - np.square(np.tanh(x))
        elif name == 'softmax':
            self.function = self.softmax
            self.derivative = self.softmax_der
        elif name == 'relu':
            self.function = lambda x: np.maximum(0, x)
            self.derivative = lambda x: x > 0
        
    def exponent(self, x):
        shift_x = x - np.max(x)
        return 1/(1 + np.exp(-shift_x))
    
    def exponent_der(self, x):
        exps = self.exponent(x)
        return exps * (1 - exps)
    
    def softmax(self, x):
        shift_x = x - np.max(x)
        exps = np.exp(shift_x)
        return exps/np.sum(exps)
    
    def softmax_der(self, x):
        array = self.softmax(x).reshape(-1,1)
        return np.diagflat(array) - np.dot(array, array.T)

## Convolution layer

In [14]:
class Conv:

    def __init__(self, num_filters = 3, size = 3, stride = 1, activation = 'relu'):
        self.next_layer = None
        self.num_filters = num_filters
        self.size = size
        self.stride = stride
        self.activation = Activation(activation)

        self.filters = np.random.randn(num_filters, size, size) / 10
    
    def add_layer(self, child):
        self.next_layer = child
        return self
    
    def iterate_regions(self, h, w):

        for i in range(h - (self.dim_f - 1)):
            for j in range(w - (self.dim_f - 1)):
                im_region = np.array([
                    np.arange(i, i + self.dim_f), 
                    np.arange(j, j + self.dim_f)
                ])
                yield im_region, i, j
    # @calculate_execution_time            
    def forward(self, image):
        self.last_input = image
        input_dimension = image.shape[1]
        output_dimension = int((input_dimension - self.size) / self.stride) + 1
        out = np.zeros((self.filters.shape[0], output_dimension, output_dimension))
        
        for f in range(self.num_filters):
            current_y = out_y = 0
            
            while current_y + self.size <= input_dimension:
                current_x = out_x = 0
                
                while current_x + self.size <= input_dimension:
                    patch = image[:, current_y:current_y + self.size, current_x:current_x + self.size]
                    out[f, out_y, out_x] = np.sum(self.filters[f] * patch)
                    current_x += self.stride
                    out_x += 1
                    
                current_y += self.stride
                out_y += 1
                
        self.last_output = out
        if self.activation:
            out = self.activation.function(out)
        return out
    # @calculate_execution_time
    def backward(self, out_prev, lr):
        input_dimension = self.last_input.shape[1]
        
        if self.activation:
            out_prev = out_prev * self.activation.derivative(self.last_output)
        
        out_next = np.zeros(self.last_input.shape)
        dfilt = np.zeros(self.filters.shape)
        
        for f in range(self.filters.shape[0]):
            current_y = out_y = 0
            
            while current_y + self.size <= input_dimension:
                current_x = out_x = 0
                
                while current_x + self.size <= input_dimension:
                    patch = self.last_input[:, current_y:current_y + self.size, current_x:current_x + self.size]
                    dfilt[f] += np.sum(out_prev[f, out_y, out_x] * patch, axis=0)
                    out_next[:, current_y:current_y + self.size, current_x:current_x + self.size] += out_prev[f, out_y, out_x] * self.filters[f]
                    current_x += self.stride
                    out_x += 1
                    
                current_y += self.stride
                out_y += 1
                
        self.filters -= lr * dfilt
        return out_next
    
    # def forward(self, input):
    #     self.last_input = input
    #     _, h, w = input.shape
    #     output = np.zeros((self.num_filters, h - (self.dim_f - 1), w - (self.dim_f - 1)))
        
        
    #     for im_region, i, j in self.iterate_regions(h, w):
    #         for f_i, filters in enumerate(self.filters):
    #             output[:, i, j] = np.sum(np.multiply(input[:, im_region[0][:, np.newaxis], im_region[1]], filters))
        
    #     return self.activation.function(output)
    
    # def backward(self, out_prev, learning_rate):
    #     out_prev = np.clip(out_prev, -1e+10, 1e+10)
    #     d_L_d_filters = np.zeros_like(self.filters)
    #     out_next = np.zeros_like(self.last_input)
        
    #     nf, h, w = out_prev.shape

    #     for im_region, i, j in self.iterate_regions(h, w):
    #         for f in range(nf):
    #             d_L_d_filters[f] += np.multiply(out_prev[f, i, j], out_prev[f, im_region[0][:, np.newaxis], im_region[1]])
    #             out_next[:, i, j] += np.sum(np.multiply(out_prev[f, im_region[0][:, np.newaxis], im_region[1]], self.filters[f]))
    #     # Update filters
    #     self.filters -= learning_rate * d_L_d_filters
    #     return out_next
    

## Pool method

In [15]:
class Pool:
    
    def __init__(self, size = 2, stride = 2, activation = 'relu'):
        self.next_layer = None
        self.stride = stride
        self.size = size
        self.activation = Activation(activation)
    
    def add_layer(self, child):
        self.next_layer = child
        return self
    
    # def iterate_regions(self, image):
    #     _, h, w = image.shape
    #     new_h = h // 2
    #     new_w = w // 2

    #     for i in range(new_h):
    #         for j in range(new_w):
    #             im_region = image[:, (i * 2):(i * 2 + (self.dim_f - 1)), (j * 2):(j * 2 + (self.dim_f - 1))]
    #             yield im_region, i, j
    # @calculate_execution_time            
    def forward(self, image):
        self.last_input = image
        
        nfilt, h_prev, w_prev = image.shape
        h = int((h_prev - self.size) / self.stride) + 1
        w = int((w_prev - self.size) / self.stride) + 1
        
        downscaled = np.zeros((nfilt, h, w))
        
        for i in range(nfilt):
            curr_y = out_y = 0
            
            while curr_y + self.size <= h_prev:
                curr_x = out_x = 0
                
                while curr_x + self.size <= w_prev:
                    patch = image[i, curr_y:curr_y + self.size, curr_x:curr_x + self.size]
                    downscaled[i, out_y, out_x] = np.max(patch)
                    curr_x += self.stride
                    out_x += 1
                    
                curr_y += self.stride
                out_y += 1
                
        return downscaled
    # @calculate_execution_time
    def backward(self, out_prev, learning_rate):
        # out_prev = np.clip(out_prev, -1e+10, 1e+10)
        out_next = np.zeros(self.last_input.shape)

        nfilt, shape, _ = self.last_input.shape
        
        for c in range(nfilt):
            current_y = out_y = 0
            while current_y + self.size <= shape:
                current_x = out_x = 0
                while current_x + self.size <= shape:
                    patch = self.last_input[c, current_y:current_y + self.size, current_x:current_x + self.size]
                    (x, y) = np.unravel_index(np.nanargmax(patch), patch.shape)
                    out_next[c, current_y + x, current_x + y] += out_prev[c, out_y, out_x]
                    current_x += self.stride
                    out_x += 1
                current_y += self.stride
                out_y += 1
        return out_next

    # def forward(self, input):
    #     self.last_input = input
        
    #     num_filters, h, w = input.shape
    #     output = np.zeros((num_filters, h // self.dim_f, w // self.dim_f))
        
    #     if self.type == 'max':
    #         for im_region, i, j in self.iterate_regions(input):
    #             output[:, i, j] = np.amax(im_region, axis=(1, 2))
        
    #     return self.activation.function(output)

    # def backward(self, out_prev, learning_rate):
    #     out_prev = np.clip(out_prev, -1e+10, 1e+10)
    #     out_next = np.zeros(self.last_input.shape)

    #     for im_region, i, j in self.iterate_regions(self.last_input):
    #         f, h, w = im_region.shape
    #         amax = np.amax(im_region, axis=(1, 2))
    #         for f2 in range(f):
    #             for i2 in range(h):
    #                 for j2 in range(w):
    #                     # If this pixel was the max value, copy the gradient to it.
    #                     if im_region[f2, i2, j2] == amax[f2]:
    #                         out_next[f2, i * 2 + i2, j * 2 + j2] = out_prev[f2, i, j]

    #         return out_next

## Dense Layer

In [16]:
class Dense():
    def __init__(self, n_inputs, n_neurons, activation):
        self.next_layer = None
        self.weights = np.random.randn(n_inputs, n_neurons)
        self.biases = np.random.randn(n_neurons)
        self.activation = Activation(activation)

    def add_layer(self, child):
        self.next_layer = child
        return self
    # @calculate_execution_time
    def forward(self, input):
        self.last_input_shape = input.shape
        input = input.flatten()
        output = np.dot(input, self.weights) + self.biases
        self.last_input = input
        self.last_output = output
        return self.activation.function(output)
    # @calculate_execution_time
    def backward(self, input, learning_rate):
        if self.next_layer is None:
            dW = input[:, np.newaxis]
        else:
            dW = input[:, np.newaxis] * self.activation.derivative(self.last_output)[:, np.newaxis]
        dW = (dW * self.last_input[np.newaxis, :]).T
        db = np.copy(input)
        dout = self.weights @ input
        self.weights -= learning_rate * dW
        self.biases -= learning_rate * db
        return dout.reshape(self.last_input_shape)

## CNN

In [17]:
class CNN:
    
    def __init__(self, layers):
        if len(layers) < 2:
            raise ValueError("Not enough layers. Should be at least 2")
        
        self.layers = [layers[0]]
        
        for layer in layers[1:]:
            self.layers[-1].add_layer(layer)
            if layer is not None and layer not in self.layers:
                self.layers.append(layer)
        
        self.early_stop = None
        
    # @calculate_execution_time
    def forward(self, image):
        for layer in self.layers:
            image = layer.forward(image)
        return image
    # @calculate_execution_time
    def backward(self, gradient, learning_rate):
        for layer in reversed(self.layers):
            gradient = layer.backward(gradient, learning_rate)

    def train(self, X_train, y_train, epochs=10, learning_rate=0.1, lr_decay=0, X_val=None, y_val=None):
        losses = []
        accuracy = []
        
        learning_rate_0 = learning_rate
        
        for epoch in range(epochs):
            # self.history['accuracy'][epoch] = []
            # self.history['loss'][epoch] = []
            for i in range(X_train.shape[0]):
                image = X_train[i]
                onehotlabel = y_train[i]
                label = np.argmax(onehotlabel)
                
                output = self.forward(image)
                
                current_accuracy = output[label]
                current_loss = -np.sum(y_train[i] * np.log(output))
                
                accuracy.append(current_accuracy)
                losses.append(current_loss)
                
                gradient = np.copy(output) - onehotlabel
                self.backward(gradient, learning_rate)
                
                # learning_rate = (lr_decay**i)*learning_rate_0
                
                # if self.early_stop is not None:
                #     if self.EarlyStopping(epoch, i, current_accuracy, current_loss):
                #         self.output = np.array(output)
                #         return 0
                
                if i % 100 == 0:
                    print(f'image: {i + 1:}')
                    print(f'accuracy: {np.mean(accuracy):.4f}, loss: {np.mean(losses):.4f}')
                    losses = []
                    accuracy = []
                    if X_val is not None and y_val is not None:
                        val_acc, val_loss = self.val_pred(X_val, y_val)
                        print(f'val_accuracy: {val_acc:.4f}, val_loss: {val_loss:.4f}')
                    print('===========')
        
        self.output = np.array(output)
        
    
    def val_pred(self, X_val, y_val):
        accuracy_list = []
        loss_list = []
        for i, image in enumerate(X_val):
            label = np.argmax(y_val[i])
            
            output = self.forward(image)
            
            accuracy = output[label]
            loss = -np.sum(y_val[i] * np.log(output, out=np.zeros_like(output), where=(output!=0)))
            
            accuracy_list.append(accuracy)
            loss_list.append(loss)
            
        return np.mean(accuracy_list), np.mean(loss_list)
        
    
    def predict(self, X):
        return np.array([np.argmax(self.forward(x), 0) for x in X])
        
    def EarlyStop(self, monitor="accuracy", min_delta=.1, patience=3):
        self.early_stop = {
            "monitor": monitor, 
            "min_delta": min_delta, 
            "patience": patience, 
        }
        
        self.history = {
            "accuracy": {},
            "loss" : {},
            "global_max_index": 0,
        }
    
    def EarlyStopping(self, epoch, im_i, accuracy, loss):
        self.history['accuracy'][epoch].append(accuracy)
        self.history['loss'][epoch].append(loss)
        
        if epoch > self.early_stop['patience']:
            if self.history[self.early_stop['monitor']][epoch][-1] > self.history[self.early_stop['monitor']][self.history['global_max_index'][0]][self.history['global_max_index'][1]]:
                self.history['global_max_index'] = (epoch, im_i)
            
            min_local_accuracy = min(self.history[self.early_stop['monitor']][epoch][-self.early_stop['patience']:])
            difference = abs(min_local_accuracy - self.history[self.early_stop['monitor']][self.history['global_max_index'][0]][self.history['global_max_index'][1]])
            
            if difference < self.early_stop['min_delta']:
                return True
        return False

## Training

### import data

In [18]:
# # data = pd.read_csv("train.csv").sample(frac=.05).to_numpy()
# X = idx2numpy.convert_from_file('D:\\Programming\\Projects\\data_sets\\t10k-images.idx3-ubyte').astype('float32')
# Y = idx2numpy.convert_from_file('D:\\Programming\\Projects\\data_sets\\t10k-labels.idx1-ubyte')

# X = X.reshape(-1, 1, 28, 28)

# # minmax
# # X = (X - np.min(X, axis=(2,3), keepdims=True)) / (np.max(X, axis=(2,3), keepdims=True) - np.min(X, axis=(2,3), keepdims=True))

# # Z-score
# X = (X - np.mean(X, axis=(2,3), keepdims=True)) / np.var(X, axis=(2,3), keepdims=True)

# Y_onehot = OneHotEncoder().fit_transform(Y.reshape(-1, 1)).toarray()

# X_train, X_test, y_train, y_test = train_test_split(X, Y_onehot, test_size=0.01, random_state=2077)

# # percent = 0.1
# # sample = np.random.choice([True, False], size=X_test.shape[0], p=[percent, 1-percent])
# # X_val = X_test[sample]
# # y_val = np.argmax(y_test[sample], 1)

In [19]:
np.random.seed(0)
np.set_printoptions(precision=2)

X = idx2numpy.convert_from_file('D:\\Programming\\Projects\\data_sets\\t10k-images.idx3-ubyte').astype('float32')
Y = idx2numpy.convert_from_file('D:\\Programming\\Projects\\data_sets\\t10k-labels.idx1-ubyte')

X= X.reshape(-1, 1, 28, 28)
X = (X - np.mean(X, axis=(2,3), keepdims=True)) / np.var(X, axis=(2,3), keepdims=True)
Y = (Y[:, None] == np.unique(Y)).astype(int)

shuffled_indices = np.random.permutation(len(X))
X = X[shuffled_indices]
Y = Y[shuffled_indices]

X_train = X[:5000]
y_train = Y[:5000]
X_val = X[5000:5100]
y_val = Y[5000:5100]

### training

In [20]:
layers = [
    Conv(num_filters=8, size=5, stride=1),
    Conv(num_filters=7, size=5, stride=1), 
    Pool(size=2, stride=2),
    Dense(n_inputs=700, n_neurons=128, activation='relu'),
    Dense(n_inputs=128, n_neurons=10, activation='softmax'),
]

CNN_model = CNN(layers)

# CNN_model.EarlyStop(monitor = "loss", min_delta = 1e-4, patience = 5)

CNN_model.train(X_train, y_train, epochs=1, learning_rate=0.01, lr_decay=0., X_val=X_val, y_val=y_val)

image: 1
accuracy: 0.0110, loss: 4.5061
val_accuracy: 0.0461, val_loss: 18.9160
image: 101
accuracy: 0.0801, loss: 6.5480
val_accuracy: 0.1023, val_loss: 2.4863
image: 201
accuracy: 0.0991, loss: 2.7980
val_accuracy: 0.1087, val_loss: 2.4873
image: 301
accuracy: 0.1472, loss: 2.8285
val_accuracy: 0.1245, val_loss: 2.6380
image: 401
accuracy: 0.0888, loss: 2.7925
val_accuracy: 0.0961, val_loss: 2.8469


KeyboardInterrupt: 

In [None]:
predictions = CNN_model.predict(X_test)

In [None]:
accuracy_score(np.argmax(y_test, 1), predictions)

0.13