In [None]:
# !pip install opencv-python
# !pip install tqdm

In [None]:
import numpy as np
import pandas as pd
import math
import os
import cv2
import matplotlib.pyplot as plt
from tqdm import tqdm
import pickle
from numpy.lib.stride_tricks import as_strided
import time


In [None]:
MODEL_INIT_FILE = 'model_desc.txt'
# IMAGE_DATASET_DIR = './TRAIN_IMAGES/'
# CSV_FILE = 'training-a_revised.csv'
IMAGE_DATASET_DIR = './NumtaDB/training-a/'
CSV_FILE = './NumtaDB/training-a.csv'
MINI_BATCH_SIZE = 64
IMAGE_DIM = 28 # Height and width of the image

# Dataset
X_train = []
y_train = []

<h2>SOFTMAX LAYER</h2>

In [None]:
class Softmax_Layer:
    def __init__(self):
        self.layer_type = 'Softmax'
    
    def __str__(self):
        return f"{self.layer_type} Layer"
    
    def forward(self, X):
        Z = np.exp(X)
        return Z / np.einsum('ij->j', Z)
    
    def backward(self, dZ, learning_rate=0.0001):
        return np.copy(dZ)

<h2>ReLU ACTIVATION </h2>

In [None]:
class ReLU_Activation:
    def __init__(self):
        self.layer_type = 'ReLU'
    
    def __str__(self):
        return f"{self.layer_type} Activation"
    
    def forward(self, X):
        self.X = X

        Z = np.copy(X)
        Z[Z < 0] = 0
        return Z
    
    def backward(self, dZ, learning_rate=0.0001):
        dX = np.copy(self.X)

        dX[dX < 0] = 0
        dX[dX > 0] = 1
        return dX * dZ

<h2>FULLY CONNECTED LAYER</h2>

In [None]:
class Fully_Connected_Layer:
    def __init__(self, output_dim):
        self.layer_type = 'Fully Connected'
        self.output_dim = output_dim
        self.W = None
        self.b = None

    def __str__(self):
        return f"Fully Connected Layer(output_dim={self.output_dim})"
    
    def forward(self, X):
        self.X = X

        if self.W is None:
            self.W = np.random.randn(X.shape[1], self.output_dim) * math.sqrt(2 / X.shape[0])
        
        if self.b is None:
            self.b = np.zeros((1, self.output_dim))

        Z = np.einsum('ij,jk->ik', X, self.W) + self.b
        
        return Z
    
    def backward(self, dZ, learning_rate=0.0001):
        dW = np.einsum('ij,ik->jk', self.X, dZ) / self.X.shape[0] # check here
        db = np.einsum('ij->j', dZ) / self.X.shape[0] # check here
        dX = np.einsum('ij,jk->ik', dZ, self.W.T)

        self.W = self.W - learning_rate * dW
        self.b = self.b - learning_rate * db

        return dX

<h2>FLATENNING LAYER</h2>

In [None]:
class Flatenning_Layer:
    def __init__(self):
        self.layer_type = 'Flatten'
    
    def __str__(self):
        return f"{self.layer_type} Layer"
    
    def forward(self, X):
        self.input_shape = X.shape
        return X.reshape((X.shape[0], -1)) # check here
    
    def backward(self, dZ, learning_rate=0.0001):
        dX = np.copy(dZ)
        return dX.reshape(self.input_shape) # check here

<h1>MAX POOLING</h1>

In [None]:

class Max_Pooling:
    def __init__(self, filter_dim, stride):
        self.layer_type = 'Max Pooling'
        self.filter_dim = filter_dim
        self.stride = stride
        # self.X = None
        # self.Z_Max_idx = None
    
    def __str__(self):
        return f"{self.layer_type} (filter_dim={self.filter_dim}, stride={self.stride})"
    

    def forward(self, X):
        self.X = X # delete this line
        self.X_shape = X.shape
        n, h, w, c = X.shape
        new_h = (h - self.filter_dim) // self.stride + 1
        new_w = (w - self.filter_dim) // self.stride + 1
        
        X_strided = np.lib.stride_tricks.as_strided(
            X,
            shape=(n, new_h, new_w, self.filter_dim, self.filter_dim, c),
            strides=(X.strides[0], self.stride * X.strides[1], self.stride * X.strides[2], X.strides[1], X.strides[2], X.strides[3]),
            writeable=False
        )
        
        self.X_strided_shape = X_strided.shape

        Z = X_strided.max(axis=(3, 4))
        
        self.Z_Max_idx = np.zeros(Z.shape, dtype=np.int32)

        for i in range(self.filter_dim):
            for j in range(self.filter_dim):
                self.Z_Max_idx += (X_strided[:, :, :, i, j, :] == Z)

        # self.Z_Max_idx = self.Z_Max_idx.repeat(self.filter_dim, axis=1).repeat(self.filter_dim, axis=2)

        return Z
    
    # def forward_(self, X):
    #     is_training = True
    #     self.X_shape = X.shape
    #     n, h, w, c = X.shape
    #     new_h = (h - self.filter_dim) // self.stride + 1
    #     new_w = (w - self.filter_dim) // self.stride + 1

    #     windows = as_strided(X,
    #                         shape=(n, new_h, new_w, self.filter_dim, self.filter_dim, c),
    #                         strides=(X.strides[0], X.strides[1],
    #                                 self.stride * X.strides[2],
    #                                 self.stride * X.strides[3],
    #                                 X.strides[2], X.strides[3])
    #                         )

    #     out = np.max(windows, axis=(3, 4))

    #     maxs = out.repeat(self.stride, axis=1).repeat(self.stride, axis=2)
    #     x_window = X[:, :new_h * self.stride, :new_w * self.stride, :]
    #     mask = np.equal(x_window, maxs).astype(int)

    #     if is_training:
    #         self.X = X
    #         self.Z_Max_idx = mask
    #     return out

    def backward(self, dZ, learning_rate=0.0001):
        
        n, h_new, w_new, c = dZ.shape
        dX = np.zeros(self.X_shape)
        dZ_flat = dZ.ravel()
        indices = np.indices((n, h_new, w_new, c))
        indices = np.concatenate((indices, self.Z_Max_idx[..., None]), axis=0)
        np.add.at(dX, tuple(indices), dZ_flat)

        return dX
    
    def backward_stackoverflow(self, dZ, learning_rate=0.0001):
        # mask = self.cache['mask']
        print(f"dZ shape = {dZ.shape}")
        dA = dZ.repeat(self.filter_dim, axis=1).repeat(self.filter_dim, axis=2)
        print(f"dA shape = {dA.shape}")
        print(f"Z_Max_idx shape = {self.Z_Max_idx.shape}")
        dA = np.multiply(dA, self.Z_Max_idx)
        pad = np.zeros(self.X_shape)
        pad[:, :dA.shape[1], :dA.shape[2], :] = dA
        return pad

    def backward_(self, dZ, learning_rate=0.0001):
        print(f" dZ shape : {dZ.shape}")
        n, h_new, w_new, c = dZ.shape
        dX = np.zeros(self.X_strided_shape)
        dZ_flat = dZ.ravel()

        for i in range(dZ_flat.shape[0]):
            max_idx = np.unravel_index(self.Z_Max_idx.flat[i], (n, h_new, w_new, self.filter_dim, self.filter_dim))
            # print(max_idx)
            dX[max_idx + (slice(None),)] = dZ_flat[i]
        # print(self.X_shape)
        # print(dX.shape)
        dX = dX.reshape(self.X_shape)
        return dX

    def backward1(self, del_v, lr):
        del_u = np.zeros(self.X_shape)
        
        num_samples = del_v.shape[0]
        input_dim = del_v.shape[1]
        num_channels = del_v.shape[3]
        
        self.v_map = self.Z_Max_idx
        self.kernel_size = self.filter_dim

        for k in range(num_samples):
            for l in range(num_channels):
                for i in range(input_dim):
                    for j in range(input_dim):
                        position = tuple(sum(pos) for pos in zip((self.v_map[k, i, j, l] // self.kernel_size, self.v_map[k, i, j, l] % self.kernel_size), (i * self.stride, j * self.stride)))
                        del_u[(k,) + position + (l,)] = del_u[(k,) + position + (l,)] + del_v[k, i, j, l]
        
        return del_u



<h1>CONVOLUTION</h1>

In [None]:
class Convolution:
    def __init__(self, num_output_channels, filter_dim, stride=1, padding=0):
        self.layer_type = 'Convolution'
        self.num_output_channels = num_output_channels
        self.filter_dim = filter_dim
        self.stride = stride
        self.padding = padding
        self.W = None
        self.b = None
    
    def __str__(self):
        return f"{self.layer_type} (num_output_channels={self.num_output_channels}, filter_dim={self.filter_dim}, stride={self.stride}, padding={self.padding})"
    
    def forward(self, X):
        self.X = X

        self.output_dim = (X.shape[1] - self.filter_dim + 2 * self.padding) // self.stride + 1

        # padding
        X = np.pad(X, ((0, 0), (self.padding, self.padding), (self.padding, self.padding), (0, 0)), 'constant')

        Z = np.zeros((X.shape[0], self.output_dim, self.output_dim, self.num_output_channels))

        if self.W is None:
            self.W = np.random.randn(self.num_output_channels, self.filter_dim, self.filter_dim, X.shape[3]) * math.sqrt(2 / X.shape[0])
        if self.b is None:
            self.b = np.zeros((self.num_output_channels))
        
        # create strided view of the data
        X_strided = np.lib.stride_tricks.as_strided(
            X,
            shape=(X.shape[0], self.output_dim, self.output_dim, self.filter_dim, self.filter_dim, X.shape[3]),
            strides=(X.strides[0], self.stride * X.strides[1], self.stride * X.strides[2], X.strides[1], X.strides[2], X.strides[3])
            )

        # print(X_strided.shape)
        # print(self.W.shape)
        # sum over the filter dimensions
        Z = np.einsum('ijklmn,olmn->ijko', X_strided, self.W) + self.b

        return Z
    
    # def backward_(self, dZ, learning_rate=0.0001):
    #     dW = np.zeros((self.num_output_channels, self.filter_dim, self.filter_dim, self.X.shape[3]))
    #     db = np.zeros((self.num_output_channels))
    #     dX = np.zeros(self.X.shape)

    #     # shapes
    #     print(f"X: {self.X.shape}")
    #     print(f"dZ: {dZ.shape}")
    #     print(f"dW: {dW.shape}")
    #     print(f"W: {self.W.shape}")

    #     for channel in range(self.num_output_channels):
    #         for i in range(self.output_dim):
    #             for j in range(self.output_dim):
    #                 dW[channel, :, :, :] += np.sum(self.X[:, i*self.stride:i*self.stride+self.filter_dim, j*self.stride:j*self.stride+self.filter_dim, :] * dZ[:, i, j, channel].reshape((dZ.shape[0], 1, 1, 1)), axis=0)/dZ.shape[0]
    #                 dX[:, i*self.stride:i*self.stride+self.filter_dim, j*self.stride:j*self.stride+self.filter_dim, :] += self.W[channel, :, :, :] * dZ[:, i, j, channel].reshape((dZ.shape[0], 1, 1, 1))/dZ.shape[0]
    #                 db[channel] += np.sum(dZ[:, i, j, channel])
        
    #     self.W = self.W - learning_rate * dW
    #     self.b = self.b - learning_rate * db

    #     print(f"dX: {dX.shape}")
    #     return dX
        
    def backward(self, dZ, learning_rate=0.0001):
        # dW = np.zeros((self.num_output_channels, self.filter_dim, self.filter_dim, self.X.shape[3]))
        # db = np.zeros((self.num_output_channels))
        # dX = np.zeros(self.X.shape)
    
        X_pad = np.pad(self.X, ((0, 0), (self.padding, self.padding), (self.padding, self.padding), (0, 0)), 'constant')
        X_strided = np.lib.stride_tricks.as_strided(X_pad, 
                    shape=(X_pad.shape[0], self.output_dim, self.output_dim, self.filter_dim, self.filter_dim, X_pad.shape[3]), 
                    strides=(X_pad.strides[0], self.stride * X_pad.strides[1], self.stride * X_pad.strides[2], X_pad.strides[1], X_pad.strides[2], X_pad.strides[3]))

        # print shapes
        # print(f"X_strided: {X_strided.shape}")
        # print(f"dZ: {dZ.shape}")
        # print(f"W: {self.W.shape}")

        dW = np.einsum('ijkmno,ijkl->lmno', X_strided, dZ) / dZ.shape[0]
        db = np.einsum('mijc->c', dZ) / dZ.shape[0]
        dX_strided = np.einsum('oijl,mwho->mwhijl', self.W, dZ)
    
        self.W = self.W - learning_rate * dW
        self.b = self.b - learning_rate * db


        return dX_strided.sum(axis=(3, 4))

<h1>MODEL</h1>

In [None]:
class Model:
    def __init__(self, filePath):
        self.layers = []
        self.filePath = filePath
        self.build_model()

    def __str__(self):
        string = 'MODEL DETAILS:\n\n'
        for i, layer in enumerate(self.layers):
            string += f"Layer {i+1}: {layer}\n"
        return string
    
    def build_model(self):
        #check if file exists
        if not os.path.exists(self.filePath):
            print('File does not exist')
            return
        with open(self.filePath, 'r') as file:
            lines = file.readlines()
            for line in lines:
                if line.startswith('#'):
                    continue

                line = line.strip()
                
                if line == '':
                    continue

                line_split = line.split(' ')
                layer_name = str(line_split[0]).upper()
                
                if layer_name == 'FC':
                    output_dim = int(line_split[1])
                    self.layers.append(Fully_Connected_Layer(output_dim))

                elif layer_name == 'CONV':
                    num_output_channels = int(line_split[1])
                    filter_dim = int(line_split[2])
                    stride = int(line_split[3])
                    padding = int(line_split[4])
                    self.layers.append(Convolution(num_output_channels, filter_dim, stride, padding))

                elif layer_name == 'MAXPOOL':
                    filter_dim = int(line_split[1])
                    stride = int(line_split[2])
                    self.layers.append(Max_Pooling(filter_dim, stride))

                elif layer_name == 'FLATTEN':
                    self.layers.append(Flatenning_Layer())

                elif layer_name == 'RELU':
                    self.layers.append(ReLU_Activation())

                elif layer_name == 'SOFTMAX':
                    self.layers.append(Softmax_Layer())
                
                else:
                    print('Invalid layer name')
                    return
        
    def forward(self, X):
        for layer in self.layers:
            # print("forward : ", layer)
            X = layer.forward(X)
        return X
    
    def backward(self, dZ, learning_rate=0.0001):
        for layer in reversed(self.layers):
            print("Backward : ",layer)
            if isinstance(layer, Max_Pooling):
                # calculate time taken for both methods
                # start = time.time()
                # dZ1 = layer.backward1(dZ, learning_rate)
                # print("Time taken for backward method 1: ", time.time() - start)
                # dZ = dZ1
                start = time.time()
                dZ2 = layer.backward_stackoverflow(dZ, learning_rate)
                # dZ2 = layer.backward_(dZ, learning_rate)
                # print("Time taken for backward method 2: ", time.time() - start)
                # print("check ",np.allclose(dZ1, dZ2))
                dZ = dZ2
            else:
                dZ = layer.backward(dZ, learning_rate)
        return dZ
    
    def train(self, X, Y, learning_rate=0.0001, epochs=10, batch_size=64):
        for epoch in tqdm(range(epochs)):
            for i in range(0, X.shape[0], batch_size):
                X_batch = X[i:i+batch_size]
                Y_batch = Y[i:i+batch_size]
                Z = self.forward(X_batch)
                dZ = Z - Y_batch
                self.backward(dZ, learning_rate)
            print(f"Epoch {epoch+1} completed. Loss: {self.loss(X, Y)}")

    def predict(self, X):
        Z = self.forward(X)
        return np.argmax(Z, axis=1)
    
    def evaluate(self, X, Y):
        Y_pred = self.predict(X)
        Y_true = np.argmax(Y, axis=1)
        return np.sum(Y_pred == Y_true) / len(Y_true) * 100
    
    def loss(self, X, Y):
        Z = self.forward(X)
        return np.sum(-Y * np.log(Z))

    

# PROCESS DATASET

In [None]:
def read_image(path):
    img = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
    # Resizing the image
    img = cv2.resize(img,(IMAGE_DIM, IMAGE_DIM))
    # reshaping the image
    img = img.reshape(IMAGE_DIM, IMAGE_DIM, 1) # 1 for grayscale
    # Displaying the image
    # plt.imshow(img, cmap='gray')
    # plt.show()
    # print(img.shape)
    img = np.array(img)
    img = img.astype('float32')
    # minus from 255
    img = 255 - img
    img /= 255
    return img

In [None]:
# https://pytorch.org/tutorials/beginner/basics/data_tutorial.html#creating-a-custom-dataset-for-your-files
class CustomImageDataset:
    def __init__(self, annotations_file, img_dir, transform=None, target_transform=None):
        self.img_labels = pd.read_csv(annotations_file)
        self.img_dir = img_dir
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        return len(self.img_labels)

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.img_labels.iloc[idx, 0])
        image = read_image(img_path)
        label = self.img_labels.iloc[idx, 3] # 3 is the column index of the label
        #one hot encoding
        label = np.eye(10)[label]
        
        # if self.transform:
        #     image = self.transform(image)
        # if self.target_transform:
        #     label = self.target_transform(label)
        return image, label

In [None]:
class CustomDataLoader:
    def __init__(self, dataset, batch_size=32, shuffle=False):
        self.dataset = dataset
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.current_idx = 0

    def __iter__(self):
        return self

    def __next__(self):
        if self.current_idx >= len(self.dataset):
            raise StopIteration
        
        # Get the next batch.
        if self.current_idx + self.batch_size > len(self.dataset):
            batch = [self.dataset[i] for i in range(self.current_idx, len(self.dataset))]
            self.current_idx = len(self.dataset)
        else:
            batch = [self.dataset[i] for i in range(self.current_idx, self.current_idx + self.batch_size)]
            self.current_idx += self.batch_size
        

        if self.shuffle:
            np.random.shuffle(batch)

        images, labels = zip(*batch)
        images = np.stack(images)
        labels = np.stack(labels)

        return images, labels


In [None]:
dataset = CustomImageDataset(annotations_file= CSV_FILE, img_dir=IMAGE_DATASET_DIR)
dataloader = CustomDataLoader(dataset, batch_size=64, shuffle=False)

# BUILD MODEL

In [None]:
for images, labels in dataloader:
    # print(images.shape, labels.shape)
    X_train.append(images)
    y_train.append(labels)
    break

X_train = np.concatenate(X_train)
y_train = np.concatenate(y_train)
print(X_train.shape, y_train.shape)

model = Model(MODEL_INIT_FILE)
print(model)

model.train(X_train, y_train, learning_rate=0.001, epochs=2, batch_size=64)

# SAVE MODEL

In [None]:
with open('model.pkl', 'wb') as file:
    pickle.dump(model, file)

# TESTING

In [None]:
TEST_MODEL_PATH = './saved_model/model_cnn.pkl'
TEST_DATA_PATH = './NumtaDB/training-b/'
TEST_DATA_LABEL_PATH = './NumtaDB/training-b.csv'

test_dataset = CustomImageDataset(annotations_file= TEST_DATA_LABEL_PATH, img_dir=TEST_DATA_PATH)
test_dataloader = CustomDataLoader(test_dataset, batch_size=128, shuffle=False)

for images, labels in test_dataloader:
    # print(images.shape, labels.shape)
    # print(model.predict(images))
    print(model.evaluate(images, labels))