In [3]:
import numpy as np
import pandas as pd
import copy

In [50]:
def validity_check(image, conv_filter, padding, stride):
    
    is_valid = True
    image_shape = (np.shape(image[0]))
    
    filter_shape = (np.shape(conv_filter))
    
    
    if len(image_shape) != len(filter_shape) - 1:
        print('Dimensions of both image and filter should be equal')
        is_valid = False
    
    if len(image_shape) == 3 or len(filter_shape) > 3:
        if image_shape[-1] != filter_shape[-1]:
            print('Number of channels in both image and filter should be equal')
            is_valid = False
        
    if filter_shape[1] != filter_shape[2]:
        print("Filter should be of a square matrix")
        is_valid = False
        
        

    if filter_shape[1] % 2 == 0:
        print('dimensions of filter should be of odd dimensions not even')
        is_valid = False
    

    '''
    convolution_result is the array we obtain after running the filter all over the image
    '''    
    if is_valid:
        '''one or more filters but only one channel (also for image)'''
        if len(image_shape) == 2 and len(filter_shape) == 3:
            
            convolution_result = np.zeros((
                            np.int16(((image_shape[0] + 2 * padding - filter_shape[1]) / stride) + 1),
                            np.int16(((image_shape[1] + 2 * padding - filter_shape[2]) / stride) + 1)))
            
        '''one or more filters with more than one channel (also for image)'''
        if len(image_shape) == 3 and len(filter_shape) == 4:
            convolution_result = np.zeros((
                            np.int16(((image_shape[0] + 2 * padding - filter_shape[1]) / stride) + 1),
                            np.int16(((image_shape[1] + 2 * padding - filter_shape[2]) / stride) + 1),
                            image_shape[3]))
    else:
        convolution_result = -1
    
    return [is_valid, convolution_result]


def Relu(x):
    indices = np.where(x < 0)
    x[indices] = 0
    return x

In [131]:
'''
The dimensions represent height, width and no_of_channels in the input image and filters
'''


class Conv_pool:
    '''
    self.conv_result, self.pooling_result and self.relu_result  are of shape (m, n, o, p)
    m - length of  conv_result (or pooling_result or relu_result)
    n - number of different conv_results (produced by each image in the input) or pooling_results
    0 - number of rows in each entry in  conv_result (or pooling_result)
    p - number of columns in each entry in conv_result (or pooling_result)
    '''
    def __init__(self, image, conv_filter, padding, stride, pooling_filter_size, pooling_filter_stride):
        self.curr_image         = image
        self.conv_filter        = conv_filter
        self.conv_filter_stride = stride
        self.pool_filter_size   = pooling_filter_size
        self.pool_filter_stride = pooling_filter_stride
        self.padding            = padding
        self.conv_result        = np.array([])
        self.relu_result        = np.array([])
        self.pooling_result     = np.array([])
            
    
    
    #####################################################
    
    def apply_convolution_every_image(self, image, convolution_result):
        conv_filter  = self.conv_filter
        stride       = self.conv_filter_stride
        image_shape  = np.shape(image)
        filter_shape = np.shape(conv_filter)
        
        no_of_filters         = filter_shape[0]
        convoluted_image_list = []
        
        '''this loop runs for all the different filters in the conv_filter'''
        for filter_num in range(0, no_of_filters):
            convoluted_image = np.array([])
            filter_ = conv_filter[filter_num, :]
    
            '''if image has more than one channel'''
            if len(image_shape) == 3 and image_shape[-1] == filter_shape[-1]:

                convoluted_image = self.convolve_the_image_by_filter(image[:, :, 0], filter_[:, :, 0], 
                                                                   convolution_result)
                for channel in range(1, filter_shape[-1]):
                    convoluted_image = convoluted_image + self.convolve_the_image_by_filter(image[:, :, channel], 
                                                                                             filter_[:, :, channel],
                                                                                                 convolution_result)
            '''if image has only one channel'''
            if len(image_shape) == 2:
                
                convoluted_image = self.convolve_the_image_by_filter(image, filter_, convolution_result)
            '''here, you used copy.copy() so as to refrain the convoluted_image_list getting over-written
            by the new entries into the list'''
            
            convoluted_image_list.append(copy.copy(convoluted_image))
            
            
        return convoluted_image_list
            
    
    ######################################################
    
    def start_convolving_the_image(self, convolution_result):
        
        arr_of_convs = []
        no_of_images = len(self.curr_image)
        
        '''apply convolution for all the images in the input'''
        for image_num in range(0, no_of_images):
            
            curr_img = self.curr_image[image_num, :]
            convoluted_image_list = self.apply_convolution_every_image(curr_img, convolution_result)
            arr_of_convs.append(convoluted_image_list)
            
        self.conv_result = np.array(arr_of_convs)

     
    
    #########################################################
    '''
    when supplied with both image and filter, perform either convolution or pooling
    '''
    def apply_conv_or_pooling_on_data(self, image, filter_size, result, s
                                                tride_length, is_pooling, conv_filter = None):
        
        row_count = 0
        column_count = 0

        image_shape = np.shape(image)[1]
        print(np.shape(image))
        for row in range(0, image_shape, stride_length):
            if row + filter_size <= image_shape:
                column_count = 0
                for column in range(0, image_shape, stride_length):

                    if column + filter_size <= image_shape:
                        
                        '''present active region is the region in the image which will be multiplied
                            by the filter'''
                        if not is_pooling:
                            
                            present_active_region           = image[row : (row + filter_size), 
                                                                          column : (column + filter_size)]
                            image_area_product_filter       = present_active_region * conv_filter
                            sum_of_all_elements             = np.sum(image_area_product_filter)
                            result[row_count, column_count] = sum_of_all_elements
                        
                        if is_pooling:
                            
                            active_region       = image[row : (row + filter_size), 
                                                                    column : (column + filter_size)] 
                            result[row_count, column_count] = np.max(active_region)

                            
                        column_count = column_count + 1
            row_count = row_count + 1
        
        return result
        
        
           
    ######################################################
    
    def convolve_the_image_by_filter(self, image, conv_filter, convolution_result):
        
        stride      = self.conv_filter_stride
        image_shape = np.shape(image)    
        filter_size = np.shape(conv_filter)[0]
        
        result = self.apply_conv_or_pooling_on_data(image, filter_size, 
                                                    convolution_result, stride, False, conv_filter)
        return result

    
    
    ######################################################
    
    def relu_on_convolution_result(self):
        convolution_result_ = self.conv_result
        
        arr_of_relus                 = []
        conv_result_shape            = np.shape(convolution_result_)
        no_of_images_convoluted      = conv_result_shape[0]
        no_of_times_image_convoluted = conv_result_shape[1]
        
        
        for image_num in range(0, no_of_images_convoluted):
            
            relu_result_list = []
            curr_conv_result = convolution_result_[image_num, :]
        
        
            for result_num in range(0, no_of_times_image_convoluted):
                
                current_convoluted_image = curr_conv_result[result_num, :]
                indices = np.where(current_convoluted_image <= 0)
                current_convoluted_image[indices] = 0
                relu_result_list.append(current_convoluted_image)

            arr_of_relus.append(relu_result_list)
            
        self.relu_result = np.array(arr_of_relus)

        
    ######################################################
    
    '''max pooling'''    
    def pooling_on_relu_result(self):
        relu_result = self.relu_result
        pool_size   = self.pool_filter_size
        stride      = self.pool_filter_stride
        

        relu_result_shape            = np.shape(relu_result)
        no_of_images_relued          = relu_result_shape[0]
        no_of_relu_results_per_image = np.shape(relu_result)[1]
        relu_result_shape_each_image = np.shape(relu_result)[2]

        arr_of_pools = []
        
        for image_num in range(0,  no_of_images_relued):
    
            curr_image_relu      = relu_result[image_num, :]
            pooling_result_list  = []
            for relu_result_num in range(0, no_of_relu_results_per_image):
                curr_relu = curr_image_relu[relu_result_num, :]
                pooling_result   = np.zeros((np.uint16(((relu_result_shape_each_image - pool_size) / stride) + 1),
                                       np.uint16(((relu_result_shape_each_image - pool_size) / stride) + 1)))
                curr_relu_result = curr_image_relu[relu_result_num, :]
                result = self.apply_conv_or_pooling_on_data(image = curr_relu, filter_size = pool_size, 
                                                            result = pooling_result, stride_length = stride, 
                                                             is_pooling = True)
                pooling_result_list.append(result)


            arr_of_pools.append(pooling_result_list)
        self.pooling_result = np.array(arr_of_pools)
        
    
    

    ######################################################
    
    def CNN(self):
        
        '''
        validity_convolution_result[0] = Boolean_value
        validity_convolution_result[1] = convolution_result
        '''
        
        validity_convolution_result = validity_check(self.curr_image, self.conv_filter, 
                                                            self.padding, self.conv_filter_stride)
        
        if validity_convolution_result[0]:
            convolution_result = validity_convolution_result[1]
            self.start_convolving_the_image(convolution_result)
            self.relu_on_convolution_result()
            self.pooling_on_relu_result()

                    

In [31]:
def Relu(x):
    indices = np.where(x <= 0)
    x[indices] = 0
    return x
    
def softmax(x):
    x = x.reshape(1, -1)
    
    max_item = np.max(x)
    print('max is ' + str(max_item))
    exp_x = np.exp(x - max_item)
    res = np.true_divide(exp_x, np.sum(exp_x))
    return res



def cross_entropy(output, labels):
    return -np.sum(labels * np.log(output))



class fully_connected_layers_in_CNN:
    def __init__(self, conv_pool_object, labels, no_of_neurons):
        self.conv_pool_result = conv_pool_object
        self.pooling_result = conv_pool_object.pooling_result
        self.labels = labels
        self.pooling_result_shape = np.shape(self.pooling_result)
        self.neurons = no_of_neurons
    
    ##########################################################
    
    def feed_forward_in_fc_layers(self):
        pooling_result = self.pooling_result
        no_of_neurons = self.neurons
        no_of_pool_results, no_of_rows, no_of_columns = self.pooling_result_shape
        self.fc_layer_1 = pooling_result.reshape(((no_of_pool_results * no_of_rows * no_of_columns), 1))
        
        no_of_rows_in_fc_layer_1 = self.fc_layer_1.shape[0]
        self.weights_1 = np.random.random((no_of_neurons[0], no_of_rows_in_fc_layer_1))
        self.bias_1 = np.zeros(shape = (no_of_neurons[0], 1))
        
        self.z_1 = np.add(np.dot(self.weights_1, self.fc_layer_1), self.bias_1)
        print(np.shape(self.z_1))
        self.fc_layer_2 = Relu(self.z_1)
        
        no_of_rows_in_fc_layer_2 = self.fc_layer_2.shape[0]
        self.weights_2 = np.random.random((no_of_neurons[1], no_of_rows_in_fc_layer_2))
        self.bias_2 = np.zeros(shape = (no_of_neurons[1], 1))
        
        self.z_2 = np.add(np.dot(self.weights_2, self.fc_layer_2), self.bias_2)
        
        self.predicted_probs = softmax(self.z_2)
        print('pr ' + str(self.predicted_probs))
        self.loss = cross_entropy(self.predicted_probs, self.labels)
        self.diff_between_pred_and_actual_labels = self.predicted_probs - self.labels

    
    

In [7]:

class back_propagation_in_CNN:
    
    
    ##########################################################
    
    def __init__(self, fc_layer_object):
        self.fc_layer_object = fc_layer_object
        self.conv_pool_result = fc_layer_object.conv_pool_result
    
    
    
    
    ##########################################################
    
    def get_the_index(self, conv_sub_array):
        position_of_max_element = np.nanargmax(conv_sub_array)
        index_of_max_element = np.unravel_index(position_of_max_element, conv_sub_array.shape)
        return index_of_max_element
    
    
    ##########################################################
    
    def backpropagation_wrt_pooling_result(self, derivative_pool_result):
        
        
        conv_result = self.conv_pool_result.conv_result
        pool_filter_size = self.conv_pool_result.pool_filter_size
        pool_filter_stride = self.conv_pool_result.pool_filter_stride
        
        no_of_convolved_images, no_of_rows, no_of_columns = np.shape(conv_result)
        _, pool_row_length, _ = np.shape(derivative_pool_result)
        output = np.zeros(shape = np.shape(conv_result))
    
        row_count = 0
        column_count = 0
        for image_num in range(0, no_of_convolved_images):
            current_image = conv_result[image_num, :]
            row_count = 0
            for row in range(0, no_of_rows, pool_filter_stride):
                if row + pool_filter_size <= no_of_rows:
                    column_count = 0
                    for column in range(0, no_of_columns, pool_filter_stride):
                        if column + pool_filter_size <= no_of_columns:
                            image_sub_array = current_image[row : row + pool_filter_size, column : column + pool_filter_size]
                            '''index of the max element in the original conv_image (in the area covered by pool filter)'''
                            (i, j) = self.get_the_index(image_sub_array)
                            if row_count <= pool_row_length:
                                output[image_num, row + i, column + j] = derivative_pool_result[image_num, 
                                                                                            row_count, column_count]
                            column_count = column_count + 1
                
                row_count = row_count + 1
            
        return output
                              
    
    
    ##########################################################
    
    def backpropagation_wrt_convolution_layer(self, der_conv_prev):
                
        conv_curr_image = self.conv_pool_result.curr_image
        conv_image_curr_list = [conv_curr_image]
        conv_image_curr = np.array(conv_image_curr_list)
        conv_filter = self.conv_pool_result.conv_filter
        filter_stride = self.conv_pool_result.conv_filter_stride
        no_of_filters, flt_size, no_of_columns_f = np.shape(conv_filter)
        
        len_of_shape_of_curr_image = len(np.shape(conv_image_curr))
        
        _,no_of_rows, no_of_columns = np.shape(conv_image_curr)

        
        der_wrt_filter = np.zeros(np.shape(conv_filter))
        der_wrt_bias = np.zeros((no_of_filters, 1))
        dconv_result = np.zeros(np.shape(conv_image_curr))
        
        r_count = 0
        c_count = 0
        
        for filter_num in range(0, no_of_filters, filter_stride):
            r_count = 0
            for r in range(0, no_of_rows):
                if r + flt_size <= no_of_rows:
                    c_count = 0
                    for c in range(0, no_of_columns, filter_stride):
                        if c + flt_size <= no_of_columns:
                            '''update the filter'''
                            
                            der_wrt_filter += der_conv_prev[filter_num, r_count, c_count] * conv_image_curr[:, 
                                                                                                         r : r + flt_size,
                                                                                                         c : c + flt_size]
                            
                            dconv_result[:, r : r + flt_size, c : c + flt_size] += der_conv_prev[filter_num, r_count, c_count] * conv_filter[filter_num, :]
                            
                            c_count = c_count + 1
                r_count = r_count + 1
        
            der_wrt_bias[filter_num] = np.sum(der_conv_prev[filter_num])
        
        
        return dconv_result, der_wrt_filter, der_wrt_bias
                                
    
    
    
    ##########################################################
    
    def start_back_propagation(self):
        fc_layer_object = self.fc_layer_object
        fc_layer_1 = fc_layer_object.fc_layer_1
        self.derivative_wrt_weights_2  = np.dot(fc_layer_object.diff_between_pred_and_actual_labels.T, 
                                                       fc_layer_object.z_1.T)
        
        temporary_result = np.dot(fc_layer_object.weights_2.T,  
                                                    fc_layer_object.diff_between_pred_and_actual_labels.T)
        
        print(fc_layer_object.fc_layer_1)
        self.derivative_wrt_weights_1 = np.dot(temporary_result,
                                                  fc_layer_object.fc_layer_1.T)
        
        self.derivative_wrt_bias_2 = np.sum(fc_layer_object.diff_between_pred_and_actual_labels, 
                                                axis = 1).T
        self.derivative_wrt_bias_1 = np.sum(temporary_result, axis = 1).reshape(fc_layer_object.bias_1.shape)
        
        
        der_fc_layer_1 = np.dot(fc_layer_object.weights_1.T, temporary_result)
        reshaped_dfc_layer_1 = der_fc_layer_1.reshape(fc_layer_object.pooling_result_shape)
        '''back propagating through max-pooling layer (updating only those neurons with highest value)'''
        der_pool_layer = self.backpropagation_wrt_pooling_result(reshaped_dfc_layer_1)
        
        '''back propagating through Relu layer'''
        der_pool_layer[fc_layer_object.conv_pool_result.conv_result <= 0] = 0
        
        der_image, der_filter, der_conv_bias = self.backpropagation_wrt_convolution_layer(der_pool_layer)
        
        self.return_result = [der_filter, der_conv_bias]
        
    


In [132]:
l1_filter = np.zeros((2,3,3))

l1_filter[0, :, :] = np.array([[[-1, 0, 1],   
                                  [-1, 0, 1],   
                                   [-1, 0, 1]]])  
l1_filter[1, :, :] = np.array([[[1,   1,  1],   
                                   [0,   0,  0],   
                                    [-1, -1, -1]]]) 

image_1 = np.random.uniform(low = 0, high = 1, size = (8, 8))
image_2 = np.random.uniform(low = 0, high = 1, size = (8, 8))
image = np.array([image_1, image_2])

cnn = Conv_pool(image, l1_filter, 0, 1, 2, 2)
cnn.CNN()

(8, 8)
(8, 8)
(8, 8)
(8, 8)
(6, 6)
(6, 6)
(6, 6)
(6, 6)


In [134]:
(cnn.pooling_result)[0]

array([[[0.        , 0.75122844, 0.19901213],
        [1.99486622, 0.20382838, 0.43536085],
        [1.90555471, 0.30021249, 1.21539214]],

       [[0.90325235, 1.59180576, 1.11952759],
        [0.80867883, 0.08104513, 0.        ],
        [0.77199353, 0.39044037, 0.14608003]]])

In [42]:
np.random.randint(low = 0, high = 20, size = 1)[0]

7

In [45]:
np.shape(image[0])

(8, 8)

In [48]:
len(image)

2

In [49]:
for r in range(0, len(image)):
    print(image[r, :])

[[0.0550647  0.83951965 0.18522795 0.08386781 0.35454334 0.59298627
  0.78457192 0.03404839]
 [0.95565334 0.39545117 0.03907397 0.92021912 0.70602216 0.87389992
  0.11821481 0.03111724]
 [0.00812017 0.56634476 0.58355534 0.38592402 0.4160145  0.87923553
  0.384459   0.68813559]
 [0.73067203 0.70937883 0.06138757 0.47535108 0.53887899 0.69277726
  0.15300017 0.48714134]
 [0.11198032 0.49223351 0.35898734 0.05932251 0.34018652 0.95632993
  0.6948106  0.99277468]
 [0.40080755 0.71693071 0.16046499 0.05338222 0.4467253  0.0430063
  0.4192921  0.01485212]
 [0.00767441 0.46227389 0.7079541  0.15649137 0.51665108 0.12283493
  0.26025794 0.11353354]
 [0.00569858 0.38398884 0.84689602 0.98643414 0.64524681 0.84656047
  0.34218255 0.20739032]]
[[0.24237134 0.72422269 0.47493803 0.09080515 0.28915886 0.89303331
  0.43415994 0.54804265]
 [0.12902376 0.35094135 0.02964232 0.47700454 0.70893824 0.02000631
  0.48480454 0.59718791]
 [0.39099842 0.853302   0.10030094 0.31821206 0.53865502 0.94441426
  