In [6]:
# %pip install numpy pandas pickle
# %pip install matplotlib
# %pip install opencv-python

In [7]:
from matplotlib import pyplot as plt
from sklearn.metrics import mean_squared_error
from sklearn.metrics import log_loss

import numpy as np
import pandas as pd
import pickle
import cv2
import tqdm

In [8]:
class ConvolutionLayer:

    def __str__(self) -> str:
        return "ConvolutionLayer"

    def __init__(self, n_filter, kernel_size, stride, padding):
        self.kernel_size = kernel_size
        self.n_filter = n_filter
        self.stride = stride
        self.padding = padding

        self.weights = None
        self.biases = None

        self.cache = None
    
    def _get_windows(self, input, output_size, kernel_size, padding=0, stride=1, dilate=0):
        working_input = input
        working_pad = padding
        # dilate the input if necessary
        if dilate != 0:
            working_input = np.insert(working_input, range(1, input.shape[2]), 0, axis=2)
            working_input = np.insert(working_input, range(1, input.shape[3]), 0, axis=3)

        # pad the input if necessary
        if working_pad != 0:
            working_input = np.pad(working_input, pad_width=((0,), (0,), (working_pad,), (working_pad,)), mode='constant', constant_values=(0.,))

        in_b, in_c, out_h, out_w = output_size
        out_b, out_c, _, _ = input.shape
        batch_str, channel_str, kern_h_str, kern_w_str = working_input.strides

        return np.lib.stride_tricks.as_strided(
            working_input,
            (out_b, out_c, out_h, out_w, kernel_size, kernel_size),
            (batch_str, channel_str, stride * kern_h_str, stride * kern_w_str, kern_h_str, kern_w_str)
        )

    def forward(self, input):
        n, c, h, w = input.shape

        if self.weights is None:
            self.weights = np.random.randn(self.n_filter, c, self.kernel_size, self.kernel_size) / np.sqrt(2 / (self.kernel_size * self.kernel_size * c))
        if self.biases is None:
            self.biases = np.random.randn(self.n_filter)

        out_h = (h - self.kernel_size + 2 * self.padding) // self.stride + 1
        out_w = (w - self.kernel_size + 2 * self.padding) // self.stride + 1

        windows = self._get_windows(input, (n, c, out_h, out_w), self.kernel_size, self.padding, self.stride)
        out = np.einsum('bihwkl,oikl->bohw', windows, self.weights)
        out += self.biases[None, :, None, None]

        self.cache = input, windows
        return out

    def backward(self, dout, learning_rate):
        x, windows = self.cache

        padding = self.kernel_size - 1 if self.padding == 0 else self.padding
        dout_windows = self._get_windows(dout, x.shape, self.kernel_size, padding=padding, stride=1, dilate=self.stride - 1)
        rot_kern = np.rot90(self.weights, 2, axes=(2, 3))

        db = np.sum(dout, axis=(0, 2, 3))
        dw = np.einsum('bihwkl,bohw->oikl', windows, dout)
        dx = np.einsum('bohwkl,oikl->bihw', dout_windows, rot_kern)

        self.weights -= learning_rate * dw
        self.biases -= learning_rate * db
        return dx


class ReLUActivationLayer:

    def __str__(self) -> str:
        return "ReLUActivationLayer"

    def forward(self, input):
        return np.maximum(input, 0)

    def backward(self, output, learning_rate):
        return np.where(output > 0, 1, 0)


class MaxPoolingLayer:

    def __str__(self) -> str:
        return "MaxPoolingLayer"

    def __init__(self, pool_size, stride):
        self.pool_size = pool_size
        self.stride = stride
        self.input = None

    def forward(self, input):
        self.input = input
        batch_size, n_channel, height, width = input.shape

        output_h = int((height - self.pool_size)/self.stride + 1)
        output_w = int((width  - self.pool_size)/self.stride + 1)

        output_shape = (batch_size, n_channel, output_h, output_w)
        output = np.zeros(output_shape)

        for b in range(batch_size):
            for c in range(n_channel):
                for h in range(output_h):
                    for w in range(output_w):
                        output[b, c, h, w] = np.max(input[b, :, h*self.stride :h*self.stride + self.pool_size, w*self.stride : w*self.stride + self.pool_size])

        return output

    def backward(self, output, learning_rate):
        batch_size, n_channel, height, width = output.shape
        input = np.zeros(self.input.shape)

        for b in range(batch_size):
            for c in range(n_channel):
                for h in range(height):
                    for w in range(width):
                        input[b, c, h*self.stride :h*self.stride + self.pool_size, w*self.stride : w*self.stride + self.pool_size] = np.where(input[b, c, h*self.stride :h*self.stride + self.pool_size, w*self.stride : w*self.stride + self.pool_size] == np.max(input[b, c, h*self.stride :h*self.stride + self.pool_size, w*self.stride : w*self.stride + self.pool_size]), output[b, c, h, w], 0)

        return input    


class FlatteningLayer:

    def __init__(self) -> None:
        self.input = None

    def __str__(self) -> str:
        return "FlatteningLayer"

    def forward(self, input):
        self.input = input
        return input.reshape(input.shape[0], -1)

    def backward(self, output, learning_rate):
        return output.reshape(self.input.shape)


class DenseLayer:

    def __init__(self, n_output):
        self.n_output = n_output
        self.weights = None
        self.biases = None
        self.input = None

    def __str__(self) -> str:
        return "DenseLayer"

    def forward(self, input):
        
        self.input = input
        batch_size, n_input = input.shape

        if self.weights is None:
            self.weights = np.random.randn(n_input, self.n_output) / np.sqrt(n_input)
        if self.biases is None:
            self.biases = np.random.randn(self.n_output)

        output = np.dot(input, self.weights) + self.biases
        return output


    def backward(self, output, learning_rate):
            
            batch_size, n_input = output.shape    
            grad_weights = np.dot(self.input.T, output)/n_input
            
            grad_biases = np.mean(output, axis=0)
            grad_input = np.dot(output, grad_weights.T)

            self.weights -= learning_rate * grad_weights
            self.biases -= learning_rate * grad_biases

            return grad_input


class SoftMaxLayer:

    def __str__(self) -> str:
        return "SoftMaxLayer"

    def forward(self, input):
        val = input - np.max(input, axis=1, keepdims=True)
        val = np.exp(val) / np.exp(val).sum(axis=1, keepdims=True)
        return val

    def backward(self, output, learning_rate):
        return output


In [9]:
DATASET_DIR = "../../../numta"
DATASET_NAME = "training-b"
IMAGE_SHAPE = (28, 28)

class CNN:

    def __init__(self):
        self.layers = []
    
    def add(self, layer):
        self.layers.append(layer)
    
    def forward(self, input):
        for layer in self.layers:
            input = layer.forward(input)
        return input
    
    def backward(self, output, learning_rate) -> None:
        for layer in reversed(self.layers):
            output = layer.backward(output, learning_rate)
    
    def fit(self, X_train, y_train, learning_rate=0.01, epochs=10, batch_size=32):

        for epoch in range(epochs):
            n_batch = int(np.ceil(len(X_train)/batch_size))
            for batch in range(n_batch):
                #   reading batch of image_name from CSV and loading them as grayscale
                batch_X = X_train[batch*batch_size : (batch+1)*batch_size]
                batch_X = np.array([ 255 - load_image_as_grayscale(f"{DATASET_DIR}/{DATASET_NAME}/{img_name}") for img_name in batch_X ])

                #   reading batch of labels from CSV and converting them to one-hot encoding
                y_true = y_train[batch*batch_size : (batch+1)*batch_size]
                y_true = np.array([ np.eye(10)[digit] for digit in y_true ])

                y_pred = self.forward(batch_X)
                y_hat = y_pred - y_true
                self.backward(y_hat, learning_rate)
            
                print(f"Epoch: {epoch+1}/{epochs} Batch: {batch+1}/{n_batch}", end="\r")

            print()


    def predict(self, X):
        output = self.forward(X)
        return np.argmax(output, axis=1)
    

# def cross_entropy_loss(y_true, y_pred):
#     return np.sum(-1 * np.sum(y_true * np.log(y_pred), axis=0))

def split_dataset(X, y, test_size=0.2, shuffle=True):
    dataset = pd.DataFrame(X)
    dataset["isoriginal"] = y

    if shuffle:
        dataset = dataset.sample(frac=1.0)

    test_data = dataset.sample(frac=test_size).reset_index(drop=True)
    train_data = dataset.drop(test_data.index).reset_index(drop=True)
    
    X_test = test_data.drop(columns=["isoriginal"])
    y_test = test_data["isoriginal"].to_numpy()

    X_train = train_data.drop(columns=["isoriginal"])
    y_train = train_data["isoriginal"].to_numpy()

    return X_train, y_train, X_test, y_test



def load_dataset():
    dataset = f"{DATASET_DIR}/{DATASET_NAME}.csv"
    df = pd.read_csv(dataset)
    df = df[["filename", "digit"]]

    

    return df


def load_image_as_grayscale(image_path):    
    img = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
    img = cv2.resize(img, IMAGE_SHAPE)
    return np.array([img])


def main():

    cnn = CNN()
    cnn.add(ConvolutionLayer(n_filter=3, kernel_size=3, stride=1, padding=1))
    cnn.add(ReLUActivationLayer())
    cnn.add(MaxPoolingLayer(pool_size=2, stride=1))

    cnn.add(ConvolutionLayer(n_filter=3, kernel_size=3, stride=1, padding=1))
    cnn.add(ReLUActivationLayer())
    cnn.add(MaxPoolingLayer(pool_size=2, stride=1))

    cnn.add(FlatteningLayer())

    cnn.add(DenseLayer(n_output=84))
    cnn.add(DenseLayer(n_output=10))

    cnn.add(SoftMaxLayer())
    
    dataset = load_dataset()
    print(f"Dataset Size: {dataset.shape}")

    X = dataset["filename"].values
    y = dataset["digit"].values

    # foo = load_image_as_grayscale(f"{DATASET_DIR}/{DATASET_NAME}/{X[0]}")
    # print(f"Image Shape: {foo.shape}")
    # print(255 - foo)

    cnn.fit(X, y)

    pickle.dump(cnn, open("cnn.pkl", "wb"))
    print("Pickle File Dumped")

    cnn_x = pickle.load(open("cnn.pkl", "rb"))
    print("Pickle File Loaded")

    # print(cnn_x.layers[0].biases)
    # print(cnn_x.layers[0].biases)

    if np.allclose(cnn.layers[0].biases, cnn_x.layers[0].biases):
        print("Biases are same")

if __name__ == "__main__":
    main()

Dataset Size: (359, 2)
Epoch: 1/10 Batch: 12/12
Epoch: 2/10 Batch: 12/12
Epoch: 3/10 Batch: 12/12
Epoch: 4/10 Batch: 12/12
Epoch: 5/10 Batch: 12/12
Epoch: 6/10 Batch: 12/12
Epoch: 7/10 Batch: 12/12
Epoch: 8/10 Batch: 12/12
Epoch: 9/10 Batch: 12/12
Epoch: 10/10 Batch: 12/12
Pickle File Dumped
Pickle File Loaded
Biases are same


### Debug Cell

In [10]:
# # batch_size, n_channel, height, width
# input_shape = (10, 4, 32, 32)
# input = np.random.randn(*input_shape)

# print("input shape: ", input.shape)

# # n_filter, filter_size, stride, padding
# con = ConvolutionLayer(n_filter=5, kernel_size=3, stride=1, padding=1)
# relu = ReLUActivationLayer()
# max = MaxPoolingLayer(pool_size=2, stride=1)
# flat = FlatteningLayer()
# dens = DenseLayer(n_output=10)
# smax = SoftMaxLayer()

# output = con.forward(input)
# print("Convolution done")
# print("output shape: ", output.shape)

# output = relu.forward(output)
# print("ReLU done")
# print("output shape: ", output.shape)

# output = max.forward(output)
# print("MaxPooling done")
# print("output shape: ", output.shape)

# output = flat.forward(output)
# print("Flattening done")
# print("output shape: ", output.shape)

# output = dens.forward(output)
# print("Dense done")
# print("output shape: ", output.shape)

# output = smax.forward(output)
# print("Softmax done")
# print("output shape: ", output.shape)

# print("*" * 30)
# print("*" * 30)

# learning_rate = 0.1
# output = smax.backward(output, learning_rate)
# print("Softmax backward done")
# print("output shape: ", output.shape)

# output = dens.backward(output, learning_rate)
# print("Dense backward done")
# print("output shape: ", output.shape)

# output = flat.backward(output, learning_rate)
# print("Flattening backward done")
# print("output shape: ", output.shape)

# output = max.backward(output, learning_rate)
# print("MaxPooling backward done")
# print("output shape: ", output.shape)

# output = relu.backward(output, learning_rate)
# print("ReLU backward done")
# print("output shape: ", output.shape)

# output = con.backward(dout=output, learning_rate=learning_rate)
# print("Convolution backward done")
# print("output shape: ", output.shape)
