In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import abc
import yaml
from functools import reduce

In [2]:
def yaml_reader():
    with open("config.yaml","r") as file:
        config = yaml.safe_load(file)
    return config

In [3]:
config = yaml_reader()

In [4]:
testdir = config["dataset_parameters"]["testdir"] #This is where dataset located. Change it to the relevant.
traindir = config["dataset_parameters"]["traindir"]

In [5]:
rate_learning = config["training_parameters"]["learning_rate"]
epochs = config["training_parameters"]["num_epochs"]  # After 30th epoch we can see the beginning of overfitting at this parameters. I guess there could be a bit more complexity of model than it need.
bs = config["training_parameters"]["batch_size"] # Change this parameter according to hardware.
dropout_rate = config["model_parameters"]["dropout_rate"] #A little bit increase of this probabilty will occur as bad converge
wd = config["model_parameters"]["weight_decay"] # Weight decay for weight regularization

In [6]:
data_train = pd.read_csv(traindir).to_numpy()
data_test = pd.read_csv(testdir).to_numpy()

In [7]:
class Tensor():
    def __init__(self, value: float):
        """
        """
        self.val = value
        self.grads_list = []

    def flatten(self):
        return np.array(self.val).reshape(-1)
        
    def __str__(self):
        """
        """
        return f"{self.val}"
    def __add__(self, obj) -> float:
        """
        """
        return self.val + obj.val
    def __sub__(self, obj) -> float:
        """
        """
        return self.val - obj.val
    def __mul__(self, obj) -> float:
        """
        """
        return self.val * obj.val
    def __truediv__(self, obj) -> float:
        """
        """
        return self.val / obj.val        

In [8]:
class Layer(abc.ABC):
    @abc.abstractclassmethod
    def __init__(self, heigth: int, width: int):
        pass
    def __call__(self, input: np.ndarray) -> np.ndarray:
        return self._call(input)
        
    @abc.abstractclassmethod
    def _call(self, input: np.ndarray):
        pass   

In [9]:
class FC(Layer):
    def __init__(self, heigth: int, width: int, var=1) -> None:
        """
        """
        self.weights = np.random.normal(0, var, size=(heigth, width))
    def _call(self, input: Tensor) -> np.ndarray:
        """
        """
        if (len(input.shape)) > 1:
            input = input.flatten()
        return input@self.weights
    def __str__(self) -> None:
        """
        """
        return f"{self.weights}"

In [10]:
class ActivationFunction(abc.ABC):
    @abc.abstractclassmethod
    def __init__(self):
        pass
    def __call__(self, input: np.ndarray) -> np.ndarray:
        return self._call(input)
    @abc.abstractclassmethod
    def _call(self, input: np.ndarray):
        pass

In [11]:
class ReLU(ActivationFunction):
    def __init__(self: ActivationFunction):
        return
    def _call(self: ActivationFunction, input: np.ndarray) -> np.ndarray:
        vect_relu = np.vectorize(self.relu)
        return vect_relu(input)

    def relu(self, x):
        return x if x > 0 else 0

In [12]:
class Softmax(ActivationFunction):
    def __init__(self: ActivationFunction):
        return
    def _call(self:ActivationFunction, input: np.ndarray) -> np.ndarray:
        input = input.reshape(-1)
        exp_input = sum(np.exp(input))
        return [np.exp(x)/exp_input for x in input]

In [13]:
class MnistClassifier():
    def __init__(self, list_of_layers): 
        self.architecture = list_of_layers 
    def __call__(self, x):
        return reduce(lambda acc, func: func(acc), self.architecture, x)

In [14]:
class Conv2d(Layer):
    def __init__(self, in_channel: int, out_channel:int, kernel_size: list = [2,2], var: float =1.0):
        self.kernel_size = kernel_size
        self.in_channel = in_channel
        self.out_channel = out_channel
        self.weights = np.random.normal(0, var, size=(out_channel, in_channel, kernel_size[0], kernel_size[1]))

    def get_image_sector(self, image: np.ndarray, j: int, k: int) -> np.ndarray:
        """
        """
        sector = image[j:j+self.kernel_size[0],k:k+self.kernel_size[1]]
        return sector.reshape(-1)

    def _call(self, input: np.ndarray) -> np.ndarray:
        self.img_h, self.img_w = input.shape[-2:]
        try:
            input = input.reshape(-1, self.in_channel, self.img_w, self.img_h)
        except e:
            print(e)
            return
        self.batch_size = input.shape[0]
        return self.conv(input)
    
    def conv(self, images: np.ndarray) -> np.ndarray:
        conved_image = np.zeros([self.batch_size, self.out_channel, self.img_h-self.kernel_size[0]+1, self.img_w-self.kernel_size[1]+1])
        for i, img in enumerate(images):
            for j, out_filter in enumerate(self.weights):
                in_channels_array = []
                for k, in_filter in enumerate(out_filter):
                    sector_channel = self.crop_image_sector(img[k])
                    proccesed_image_separate = np.zeros([len(sector_channel), 1])
                    for l, sector in enumerate(sector_channel):
                        proccesed_image_separate[l] = np.sum(np.dot(sector.reshape(self.kernel_size), in_filter))
                    in_channels_array.append(proccesed_image_separate)
                in_channels_array = np.array(in_channels_array)
                conved_image[i] = np.sum(in_channels_array, axis=0).reshape(self.img_h-self.kernel_size[0]+1, self.img_w-self.kernel_size[1]+1)
        return conved_image
            

    def crop_image_sector(self, image: np.ndarray) -> list:
        sectors = []
        for j in range(self.img_h):
            for k in range(self.img_w):
                sectors.append(self.get_image_sector(image, j, k))
            sectors = [list(sector) for sector in sectors if len(sector) >= np.prod(self.kernel_size)]
        return np.array(sectors)

In [21]:
class BatchNorm2d(Layer):
    def __init__(self, channels: int):
        self.betas = np.random.random([1, channels])
        self.gammas = np.random.random([1, channels])
        self.mean = 0
        self.std = 0
        self.eps = 1e-6
        self.channels = channels
        self.batch_normalize = np.vectorize(self.normalize)
    def _call(self, input:np.ndarray):
        self.calculate_mean()
        self.calculate_std()
        return self.batch_normalize(input)

    def calculate_mean(self, input:np.ndarray) -> None:
        self.mean = np.mean(input)

    def calculate_std(self, input:np.ndarray) -> None:
        self.std = np.std(input)

    def normalize(self, x):
        for i in range(self.channels):
            x = self.gammas[i]+self.betas[i]*(x - self.mean)/(self.std+self.eps)
        return x
            

In [16]:
class MaxPool2d(Layer):
    def __init__(self, heigth: int, width: int, in_channel: int) -> None:
        """
        """
        self.kernel_size = [heigth, width]
        self.in_channel = in_channel

    def _call(self, input: np.ndarray) -> np.ndarray:
        """
        """
        self.img_h, self.img_w = input.shape[-2:]
        try:
            input = input.reshape(-1, self.in_channel, self.img_w, self.img_h)
        except e:
            print(e)
            return
        self.batch_size = input.shape[0]
        self.out_h, self.out_w = self.calculate_out_shape()
        out = np.array(self.pool(input))
        return out.reshape(self.batch_size, self.in_channel, self.out_h, self.out_w)

    def calculate_out_shape(self):
        if self.img_h%self.kernel_size[0] == 0:
            out_h = self.img_h//self.kernel_size[0]
        else:
            out_h = self.img_h//self.kernel_size[0] + 1
        if self.img_w%self.kernel_size[1] == 0:
            out_w = self.img_w // self.kernel_size[1]
        else:
            out_w = self.img_w // self.kernel_size[1] + 1
        return out_h, out_w
            
    def get_image_sector(self, image: np.ndarray, j: int, k: int) -> np.ndarray:
        """
        """
        sector = image[j*self.kernel_size[0]:(j+1)*self.kernel_size[0],k*self.kernel_size[1]:(k+1)*self.kernel_size[1]]
        return sector.reshape(-1)

    def pool(self, images: np.ndarray) -> np.ndarray:
        """
        """
        out = np.zeros([self.batch_size, self.in_channel, self.out_h, self.out_w])
        for i, img in enumerate(images):
            sectors = []
            for l in range(self.in_channel):
                for j in range(self.img_h):
                    for k in range(self.img_w):
                        sectors.append(self.get_image_sector(img[l], j, k))
            sectors = [sector.tolist() for sector in sectors if sector.size > 0]
            max_sectors = np.array([max(sector) for sector in sectors])
            out[i] = max_sectors.reshape(self.in_channel, self.out_h, self.out_w)
        return out
            
            

In [17]:
model = MnistClassifier([
    Conv2d(in_channel=1, out_channel=3),
    MaxPool2d(2,2, in_channel=3),
    ReLU(),
    Conv2d(in_channel=3, out_channel=16),
    MaxPool2d(2,2, in_channel=16),
    ReLU(),
    Conv2d(in_channel=16, out_channel=32),
    MaxPool2d(2,2, in_channel=32),
    ReLU(),
    Conv2d(in_channel=32, out_channel=64),
    MaxPool2d(2,2, in_channel=64),
    ReLU(),
    FC(192, 32),
    ReLU(),
    FC(32, 10),
    Softmax()
]
)

In [18]:
input = np.random.randint(0, 255, size=(1, 3, 29, 29))

In [19]:
predict = model(input)

  exp_input = sum(np.exp(input))
  return [np.exp(x)/exp_input for x in input]
  return [np.exp(x)/exp_input for x in input]
