In [6]:
from datasets import load_dataset
import numpy as np

In [7]:
dataset = load_dataset("mnist")

In [8]:
train_dataset = dataset["train"]
test_dataset = dataset["test"]

In [9]:
np.array(train_dataset[0]['image']).shape

(28, 28)

In [52]:
# Implementing LeNet
# Initial Layer of convolution with feature map/ filter count of 6
# Kernel size 5, output size will be (28 - 5) + 1 = 24
# Let's build convolution Layer, so that I could generalize any architecture implementation
class Conv_Layer:
    def __init__(self, input_channel, input_size, stride, filter_shape, output_chanel_or_filter_num):
        self.input_channel = input_channel # channel means to multiply with same filter in same place and adding the input_channel times
        self.input_size = input_size # size of input or for first conv layer, it will be image resolution
        self.stride = stride
        self.filter_shape = filter_shape
        self.output_chanel = self.output_chanel_or_filter_num # This could also be called the number of filer numbers

        # In a normalize standarizarion here,
        # the shape of input is (input_channel, input_shape, input_shape)
    
    def initialize_filer_weights(self):
        # This structure defines every filter weights for each input channel, and it goes for each output channel
        self.filter_weights = 0.01 * np.random.randn(self.output_chanel, self.input_channel,filter_shape,filter_shape)

    def initialize_filter_biases(self):
        self.filter_biases = np.zeros(self.output_chanel, 1)

    def forward(self, input):
        self.inputs = input
        self.input = input
        self.output = []
        for i, filter in enumerate(self.filter_weights):
            output_per_filter = []
            # Now I will iterate through column and through row
            for row in range (self.input_size):

                output_per_filter_row = []
                # This projects from overflowing through columns (Verically)
                if(row > self.input_size - self.filter_shape - 1):
                    break
                    
                for column in range (self.input_size):

                    # This projects from overflowing through columns (Horizontally)
                    if(column > self.input_size - self.filter_shape - 1):
                        break
                        
                    else:
                        # Do multiplication, do slice among input channels, multiply, sum and append to output_per_filter
                        multi_channel_filter_feature_block =  self.input[:,row:self.filter_shape + row, column:self.filter_shape + column]
                        # Now multiplying each of the arrays with filter, and adding each and each one of them next
                        filter_applied_frame = multi_channel_filter_feature_block * filter
                        # added all filter per output channel fashion
                        output_per_filter_row[column] = filter_applied_frame.sum() + self.filter_biases[i,0]

                    column+=self.stride
                output_per_filter[row] = output_per_filter_row
                row+=self.stride
            
            self.output[i] = output_per_filter
                        

    # Need to calculate gradients for both filters and dinputs
    # first calculating for dinputs
    def backward(self,dvalues):
        self.dbiases = np.sum(dvalues, axis=(1,2))
        self.dinputs = np.zeros_like(self.inputs)
        # this is dweights or dfilter_weights
        self.dweights = np.zeros_like(self.filter_weights)
        for i, filter in enumerate(self.filter_weights):

            # Now I will iterate through column and through row
            for row in range (self.output[0].shape[1]):

                # This projects from overflowing through columns (Verically)
                if(row > self.input_size - self.filter_shape - 1):
                    break
                    
                for column in range (self.output[0].shape[0]):

                    # This projects from overflowing through columns (Horizontally)
                    if(column > self.input_size - self.filter_shape - 1):
                        break
                        
                    else:
                        # Do multiplication, do slice among input channels, multiply, sum and append to output_per_filter
                        self.dinputs[:, row:row + self.filter_shape, column: column + self.filter_shape] += filter * dvalues[i,row,column]
                        self.dweights[i, :, :,:] += dvalues[i, row, column] * self.inputs[:, row: row + self.filter_shape, \
                        column: column + self.filter_shape]

                    column+=self.stride
                
                row+=self.stride
                
        

In [11]:
# There is no learning parameters in Pooling
class Pooling:
    def __init__(self,pooling_frame_size, input_channel, input_size,stride=0):
        self.pooling_frame_size = pooling_frame_size
        self.input_channel = input_channel
        self.input_size = input_size

        if(stride != 0):
            self.stride = stride
        else:
            self.stride = self.pooling_frame_size
        
    def pooling(self,inputs):
        pass

    def backward(self,dvalues):
        pass

class Average_Pooling(Pooling):
    def __init__(self,pooling_frame_size, input_channel, input_size, stride):
        super().__init__(pooling_frame_size, input_channel, input_size)
        

    def pooling(self,inputs):
        self.inputs = inputs
        self.output = []
        
        for i in range(self.input_channel):
            output_per_filter = []
            # Now I will iterate through column and through row
            for j in range (self.input_size):

                output_per_filter_row = []
                # This projects from overflowing through columns (Verically)
                if(j > self.input_size - self.filter_shape - 1):
                    break
                    
                for k in range (self.input_size):

                    # This projects from overflowing through columns (Horizontally)
                    if(k > self.input_size - self.filter_shape - 1):
                        break
                        
                    else:
                        # Do multiplication, do slice among input channels, multiply, sum and append to output_per_filter
                        average_pooling_section = self.inputs[i, j:self.pooling_frame_size + j, k:self.pooling_frame_size+ k]
                        average = np.mean(average_pooling_section)
                        output_per_filter_row.append(average)

                    k+=self.stride
                output_per_filter.append(output_per_filter_row)
                j+=self.stride
            
            self.output[i] = output_per_filter

    def backward(self,dvalues):
        self.dinputs = np.ones_like(self.inputs) # It isn't proper dinputs, but more of a initial filter that hepls to caluclate the dinputs
        for i in range(self.input_channel):
            for dvalue in dvalues:
                for row in range(dvalue.shape[0]):
                    for column in range(dvalue.shape[1]):
                        # Now mutiply the block with specific elements
                        # modifying the ones_like variable cause it's average, and every elements are involved
                        row_specific = 0 if row == 0 else row * self.pooling_frame_size
                        self.dinputs[i,row: row + self.pooling_frame_size, column: column + self.pooling_frame_size] = \
                        (1/(self.pooling_frame_size * 2))* dvalues
                        
       
        # # This is also wrong, as I have to expand dvalues dimentsion to dinputs size, dvalues shape is row/2 * column/2
        # self.dinputs = (1/(self.pooling_frame_size * 2)) * dvalues


In [12]:
class Max_Pooling(Pooling):
    def __init__(self,pooling_frame_size, input_channel, input_size, stride):
        super().__init__(pooling_frame_size, input_channel, input_size,stride)

    def pooling(self,inputs):
        self.inputs = inputs
        self.output = []
        # Entering 1 where ever the max value is choosen from
        # By the way the filter name doesnot mean filter like for CNN, it's just used as normal english word
        self.dinputs_max_filter = np.zeros_like(self.inputs)
        
        for i in range(self.input_channel):
            output_per_filter = []
            # Now I will iterate through column and through row
            for j in range (self.input_size):

                output_per_filter_row = []
                # This projects from overflowing through columns (Verically)
                if(j > self.input_size - self.filter_shape - 1):
                    break
                    
                for k in range (self.input_size):

                    # This projects from overflowing through columns (Horizontally)
                    if(k > self.input_size - self.filter_shape - 1):
                        break
                        
                    else:
                        # Do multiplication, do slice among input channels, multiply, sum and append to output_per_filter
                        average_pooling_section = self.inputs[i, j:self.pooling_frame_size + j, k:self.pooling_frame_size+ k]
                        max_val = np.max(average_pooling_section)
                        output_per_filter_row.append(max_val)

                        row= -1
                        column = -1
                        flat_index_argmax = np.argmax(average_pooling_section)
                        row = int(flat_index_argmax / self.pooling_frame_size)
                        column = flat_index_argmax % self.pooling_frame_size

                        # Might need to test if this really works or not
                        self.dinputs[i,j+row, k+column] = 1

                    k+=self.stride
                output_per_filter.append(output_per_filter_row)
                j+=self.stride
            
            self.output[i] = output_per_filter

    def backward(self,dvalues):
        for i in range(self.input_channel):
            for dvalue in dvalues:
                for row in range(dvalue.shape[0]):
                    for column in range(dvalue.shape[1]):
                        # Now mutiply the block with specific elements
                        # modifying the ones_like variable cause it's average, and every elements are involved
                        row_specific = 0 if row == 0 else row * self.pooling_frame_size
                        self.dinputs_max_filter[i,row: row + self.pooling_frame_size, column: column + self.pooling_frame_size] = \
                        self.dinputs_max_filter * dvalues

        
        # # This is wrong, the shape is not right here dvalues has shape that is (row/2, column/2) in reference to self.dinputs
        # self.dinputs = self.dinputs_max_filter * dvalues

In [13]:
class Relu:

    def feed_forward(self,inputs):

        #Also keeping in the inputs to make the backward pass
        self.inputs = inputs
        
        # Relu basically means take input value if it is bigger than 0, else just make it 0
        self.output = np.maximum(0, inputs)

    def backward(self, dvalues):

        self.dinputs = dvalues.copy()

        self.dinputs[self.inputs <= 0] = 0
        

In [14]:
class Softmax:

    def feed_forward(self,inputs):
        normalized_val_exp = np.exp(inputs - np.max(inputs, axis=1, keepdims=True))
        
        probabilities_value = normalized_val_exp / np.sum(normalized_val_exp, axis=1, keepdims=True)

        self.output = probabilities_value

    def backward(self,dvalues):

        self.dinputs = np.empty_like(dvalues)

        for index, (single_softmax_output ,single_sample_CCELoss_dval) in \
        enumerate(zip(self.output, dvalues)):

            # reshape softmax output of final layer say [1, 2, 3] is now [[1],[2],[3]]
            single_softmax_output = single_softmax_output.reshape(-1,1)
    
            jacobian_matrix = np.diagflat(single_softmax_output) - \
                                np.dot(single_softmax_output, single_softmax_output.T)
            self.dinputs[index] = np.dot(jacobian_matrix, single_sample_CCELoss_dval)
        

In [15]:
class Loss:

    # We are averaging all the errors in a batch
    #output_val is predicted_value
    def calculate(self,output_val,true_class):

        loss_along_each_itter_inBatch = self.feed_forward(output_val,true_class)

        averaged_err = np.mean(loss_along_each_itter_inBatch)

        return averaged_err
        

class Loss_CrossCategorical(Loss):

    def feed_forward(self,output_val,true_class):

        number_of_samples = len(output_val)
    
    
        # Clipping true_class prediction as it isn't 0 or 1
    
        # 1e-7 is lower limit and 1 - 1e-7 is upper limit
        
        clipped_val = np.clip(output_val, 1e-7, 1- 1e-7)
        
        # considering the output or true_class in in format [0,1,0,0,2,1,0]
        # here the output class per each sample is corresponding index
    
        #considering true_class is in this format
    
        correct_confidences = clipped_val[range(number_of_samples), true_class]
    
        # Now calculating Loss
    
        negative_log_likelihoods = -np.log(correct_confidences)
    
        return negative_log_likelihoods

    # here dvalues means softmax output or say prediction in final layer
    def backward(self, dvalues, true_class):

        number_of_possible_classes = len(dvalues[0])

        number_of_samples = len(dvalues)

        # So that division by 0 or value near 0 doesn't divide anything
        clipped_val = np.clip(dvalues, 1e-7, 1- 1e-7)

        # One hot encoding true class through index assigning
        true_class_eye_format = np.eye(number_of_possible_classes)[true_class]
        
        # The shape of true_class_eye_format will be (batch_size, number_of_possible_class) or (batch_size, final_layer_number_of_neurons)
        # same as that of softmax
        
        self.dinputs = - true_class_eye_format / clipped_val

        # Normalizing gradient
        self.dinputs = self.dinputs / number_of_samples


# Combined version of cross-categorical entropy loss and softmax partial derivate version is left on purpose
# I will do that after some time

In [16]:
class Accuracy:
    def calculate(self,predictiction_prop, true_class):
        # Calcualting argmax per row
        predicted_class = np.argmax(predictiction_prop, axis=1)

        accuracy = np.mean(predicted_class == true_class)

        return accuracy

In [54]:
class Optimizer_SGD:
    def __init__(self,lr = 1.0):
        self.learning_rate = lr

    def update_params(self,layer):
        layer.weights -= self.learning_rate * layer.dweights
        layer.biases -= self.learning_rate * layer.dbiases

    def update_params_CNN_layer(self,layer):
        layer.filter_weights -= self.learning_rate * layer.filter_weights
        layer.filter_biases -= self.learning_rate * layer.filter_biases