In [3]:
import numpy as np

In [27]:
def Conv(arr,filt,stride,pad_width,pad_value=0):
    width = arr.shape[0]
    height = arr.shape[1]
    filt_size = filt.shape[0]
    
    row = col = 0
    
    if(len(arr.shape) == 2):
        arr = arr.reshape(arr.shape[0],arr.shape[1],1)
    if(len(filt.shape) == 2):
        filt = filt.reshape(filt.shape[0],filt.shape[1],1)
   
    assert(filt.shape[2]==arr.shape[2]) , "different no of channels : " + str(filt.shape[2]) + " , " + str(arr.shape[2])
    
    
    #padding
    arr = np.pad(arr,((pad_width,pad_width),(pad_width,pad_width),(0,0)),'constant',constant_values=(pad_value,pad_value))

    
    # determine the size of the array obtained after convolution
    new_width = np.int(((width + (2*pad_width) - filt_size)/stride) + 1)
    new_height = np.int(((height + (2*pad_width) - filt_size)/stride) + 1)
    new_arr = np.zeros(( new_width, new_height))

    # perform the convolution operation 
    for i in range(0,width,stride):
        col = 0
        for j in range(0,height,stride):
            conv = arr[i:i+filt_size,j:j+filt_size,:]
            if(conv.shape == filt.shape):
                conv = conv * filt
                conv = conv.reshape(conv.shape[0]*conv.shape[1]*conv.shape[2])
                new_arr[row][col] = np.sum(conv)
                col = (col+1)%new_height

        row+=1
        

    return new_arr


def Pool(arr,filt_size,method,stride):
    width = arr.shape[0]
    height = arr.shape[1]
    row = col = 0
    
    if(len(arr.shape) == 2):
        arr = arr.reshape(arr.shape[0],arr.shape[1],1)
    
    # determine the size of the array obtained after convolution
    new_width = np.int(((width - filt_size)/stride) + 1)
    new_height = np.int(((height - filt_size)/stride) + 1)
    new_arr = np.zeros(( new_width, new_height , arr.shape[2]))

    # perform max pooling
    
    for ch in range(arr.shape[2]):
        row = 0
        for i in range(0,width,stride):
            col = 0
            for j in range(0,height,stride):
                conv = arr[i:i+filt_size,j:j+filt_size,ch]
            
                if(conv.shape[:2] == (filt_size,filt_size)):
                
                    if(method =="max"):
                        pool = np.max(conv)
                    elif(method =="mean"):
                        pool = np.mean(conv)
                        
                    new_arr[row , col , ch] = pool
                    col = (col+1)%new_height

            row+=1
            
    if(new_arr.shape[2]==1):
        new_arr = new_arr.reshape(new_arr.shape[0],new_arr.shape[1])
        

    return new_arr


def forward(layer_info,X):
    
    if(len(X.shape) == 3):
        X = X.reshape(1,X.shape[0],X.shape[1],X.shape[2])
    
    if(len(X.shape) == 2):
        X = X.reshape(1,X.shape[0],X.shape[1])
        
        
    array = X
    cache = []
    
    for layer , info in layer_info.items():
        
        cache.append(array)
        
        if(info['layer'] == "Conv"):
            if(len(info['filters'].shape) == 3):
                info['filters'] = info['filters'].reshape(1,info['filters'].shape[0],info['filters'].shape[1],info['filters'].shape[2])
            
            elif(len(info['filters'].shape) == 2):
                info['filters'] = info['filters'].reshape(1,info['filters'].shape[0],info['filters'].shape[1])
 
            array = np.array([np.array([Conv(arr,k,stride=info['stride'],pad_width=info['pad'],pad_value=info['pad value']).T for k in info['filters']]).T for arr in array]) + info['bias']
        
        elif(info['layer'] == "Pool"):
            array = np.array([Pool(arr,filt_size=info['filt size'],method = info["method"],stride=info['stride']) for arr in array])
            
    return array , cache


def backward(dZ , layer_info , cache , l_rate):
    keys = list(reversed(list(layer_info.keys())))
    cache = list(reversed(cache))
    
    for key , A in zip(keys , cache):
        
        dA = np.zeros(A.shape)
        
        if(layer_info[key]['layer'] == 'Conv'):
            
            # get info of hyperparameters
            pad_width ,pad_value ,stride = (layer_info[key]['pad'] , layer_info[key]['pad value'] , layer_info[key]['stride'])
                

            W = layer_info[key]['filters']
            dW = np.zeros(layer_info[key]['filters'].shape)
            filt_size = W.shape[1]

            # pad both X and dX and store them in new set of variables
            A_pad =  np.pad(A,((0,0),(pad_width,pad_width),(pad_width,pad_width),(0,0)),'constant',constant_values=0)
            dA_pad = np.pad(dA,((0,0),(pad_width,pad_width),(pad_width,pad_width),(0,0)),'constant',constant_values=0)

            (m , height , width , ch) = dZ.shape

            #loop over samples , height and width of each image
            for i in range(m):
                for h in range(0,height,stride):
                    for w in range(0,width,stride):

                        # slice the array needed for convolution 
                        sliced = A_pad[i ,h:h+filt_size , w:w+filt_size, :]

                        #calculate the derivatives dW and dA
                        dW+= sliced * np.sum(dZ[i,h,w,:])
                        dA_pad[i ,h:h+filt_size , w:w+filt_size, :]+= np.sum(W[:,:,:,:] * np.sum(dZ[i,h,w,:]),axis=0)

                # once dx_pad is calculated , assign only the unpadded part to the derivative dX
                dA[i,:,:,:] = dA_pad[i , pad_width:-pad_width, pad_width:-pad_width, :]

            db = dZ.sum(axis=(0,1,2))
            
            layer_info[key]['filters'] -= (l_rate / m) * dW
            layer_info[key]['bias'] -= (l_rate / m) * db
            
            dZ = dA
            
        
        elif(layer_info[key]['layer'] == 'Pool'):
            
            # get info of hyperparameters
            filt_size , stride , method = (layer_info[key]['filt size'] , layer_info[key]['stride'] , layer_info[key]['method'])
            
            # get dimensions of previous layer's derivatives
            (m , height , width , ch) = dZ.shape
            
            for i in range(m):
                for h in range(0,height,stride):
                    for w in range(0,width,stride):
                        
                        # do backward pass for max pooling
                        if(method == 'max'):
                            # slice the array needed for convolution
                            sliced = A[i ,h:h+filt_size , w:w+filt_size, :]
                            
                            #create a mask for the sliced array
                            mask = (sliced == np.max(sliced))
                            
                            dA[i ,h:h+filt_size , w:w+filt_size, :] += mask * dA[i, h, w,:].max()
                            
                            
                        #do backward pass for mean pooling
                        elif(method == 'mean'):
                            # slice the array needed for convolution
                            sliced = dA[i,h,w,:]
                            
                            # obtain the distributed value for the sliced array
                            shape = (filt_size , filt_size)
                            average = sliced / (h * w)
                            dis_value = np.ones(shape) * average 
                            
                            dA[i ,h:h+filt_size , w:w+filt_size, :] += dis_value
                            
            dZ = dA 
            
    return layer_info

In [28]:
np.random.seed(1)

layer_info = {
    
    "l1" : {
        "layer" : "Conv" ,
        "pad" : 1 ,
        "pad value" : 0 ,
        "filters" : np.random.rand(10,3,3,3) , #(no of samples , height , width , channels) , min input to be given -> (height , width)
        "bias" : np.random.rand(1,1,1,10) ,
        "stride" : 2
           }  , 
    
    "l2" : {
    
        "layer" : "Pool" ,
        "filt size" : 3 ,
        "stride" : 1 , 
        "method" : "max"   #mention the method to pool i.e "max" or "mean" pooling 
           }
    
}

X = np.random.randn(40,10,10,3)

output,cache = forward(layer_info , X)  #(no of samples , height , width , channels) , min input to be given -> (height , width)

In [29]:
output.shape

(40, 3, 3, 10)

In [30]:
np.random.seed(1)

y_true = np.random.randn(output.shape[0],output.shape[1],output.shape[2],output.shape[3])

dZ = y_true - output

layer_info_mod = backward(dZ ,layer_info , cache , 1)