In [477]:
import numpy as np

### Creating a dummy data set with size as 28*28

In [490]:
data = np.random.rand(30,30)
data.shape

(30, 30)

Setting the shape as 28, 28, 1 denoting it as grayscale 

In [491]:
data = data.reshape(data.shape[0],data.shape[1],1)

In [492]:
data.shape

(30, 30, 1)

## Defining Filters of 3*3 size to use for convolutions

In [493]:
filter_1 = np.array([[1, 1, 1], [0, 0, 0], [-1, -1, -1]])

In [494]:
filter_1.shape

(3, 3)

## Implementing convolution

In [507]:
def process(data=None, filterArray=None, stride=1, process_name='conv', padding_size=0, pool_size=(0,0)):
    """
    
    Method can be used to do 
        convolution based on filter provided
        Pad the data as required
        max pooling
        average pooling
    """
    
    if padding_size>0:
        print(f"Adding padding")
        data = addPadding(data, padding_size=padding_size)
     
    max_rowIdx = data.shape[0]
    max_colIdx = data.shape[1]
        
    if process_name=='conv':
        filter_row_size = filterArray.shape[0]
        filter_col_size = filterArray.shape[1]
        filter_shape = filterArray.shape
    elif process_name == 'max_pooling' or process_name == 'avg_pooling':
        filter_row_size = pool_size[0]
        filter_col_size = pool_size[1]

    expected_outcome_row_size = (data.shape[0] - filter_row_size)/stride + 1
    expected_outcome_col_size = (data.shape[1] - filter_col_size)/stride + 1
    
    if expected_outcome_row_size - np.fix(expected_outcome_row_size)>0.5:
        expected_outcome_row_size = np.ceil(expected_outcome_row_size).astype(int)
    else:
        expected_outcome_row_size = np.floor(expected_outcome_row_size).astype(int)
        
    if expected_outcome_col_size - np.fix(expected_outcome_col_size) > 0.5:
        expected_outcome_col_size = np.ceil(expected_outcome_col_size).astype(int)
    else:
        expected_outcome_col_size = np.floor(expected_outcome_col_size).astype(int)

    convoluted_values = []
    
    print(f"Input data shape = {data.shape}")
    if process_name == 'conv':
        print(f"Filter size = {filter_shape}")
    else:
        print(f"Pool size = {pool_size}")
    print(f"Stride = {stride}")
    print(f"Expected output size = ({expected_outcome_row_size}, {expected_outcome_col_size})")
    
    #Iterating for every row with step as the Stride
    for rowIdx in np.arange(0, data.shape[0], stride):
                
        conv_area_row_min = rowIdx
        conv_area_row_max = rowIdx + (filter_row_size)

        if conv_area_row_max <= max_rowIdx:        
            #Iterating for every col with ste as the Stride
            for colIdx in np.arange(0, data.shape[1], stride):
                
                conv_area_col_min = colIdx
                conv_area_col_max = colIdx + (filter_col_size)

                if conv_area_col_max <= max_colIdx:

                    curr_conv_area = data[conv_area_row_min:conv_area_row_max, conv_area_col_min:conv_area_col_max]
                    if process_name=='conv':
                        convoluted_value = np.sum(np.multiply(curr_conv_area, filterArray))
                    elif process_name=='max_pooling':
                        convoluted_value = np.max(curr_conv_area)
                    elif process_name=='avg_pooling':
                        convoluted_value = np.average(curr_conv_area)
                    else:
                        print("Unsupported operation")
                        return null
                    
                    convoluted_values.append(convoluted_value)
                    
    convoluted_values = np.array(convoluted_values).reshape(expected_outcome_row_size, expected_outcome_col_size)
    return convoluted_values


In [508]:
def addPadding(data, padding_size=1):
    
    print(f"Input data shape {data.shape}")
    actual_data_row_size = data.shape[0]
    actual_data_col_size = data.shape[1]
    
    padded_data = np.zeros((actual_data_row_size + 2*padding_size, actual_data_col_size + 2*padding_size))
    
    for rowIdx in range(padded_data.shape[0]):
        if rowIdx >=padding_size and rowIdx<=actual_data_row_size:
            for colIdx in range(padded_data.shape[1]):
                if colIdx >= padding_size and colIdx<=actual_data_col_size:
                    padded_data[rowIdx][colIdx] = data[rowIdx-padding_size][colIdx-padding_size]
                    
    padded_data = padded_data.reshape(padded_data.shape[0], padded_data.shape[1], 1)
    print(f"Padded data array shape {padded_data.shape}")
    return padded_data
        

### Doing convolution with 3*3 filter, stride=1 without padding

In [509]:
conv = process(data=data, filterArray=filter_1, stride=1, process_name='conv', padding_size=0)
conv.shape

Input data shape = (30, 30, 1)
Filter size = (3, 3)
Stride = 1
Expected output size = (28, 28)


(28, 28)

### Doing convolution with 3*3 filter, stride=2 and with Padding size of 1

In [510]:
conv = process(data=data, filterArray=filter_1, stride=2, process_name='conv', padding_size=1)
conv.shape

Adding padding
Input data shape (30, 30, 1)
Padded data array shape (32, 32, 1)
Input data shape = (32, 32, 1)
Filter size = (3, 3)
Stride = 2
Expected output size = (15, 15)


(15, 15)

### Doing convolution with 3*3 filter, stride=2 and Padding size =2

In [511]:
conv = process(data=data, filterArray=filter_1, stride=2, process_name='conv', padding_size=2)
conv.shape

Adding padding
Input data shape (30, 30, 1)
Padded data array shape (34, 34, 1)
Input data shape = (34, 34, 1)
Filter size = (3, 3)
Stride = 2
Expected output size = (16, 16)


(16, 16)

### Doing max pooling with stride=2

In [512]:
conv = process(data=data, pool_size=(2,2), stride=2, process_name='max_pooling')
conv.shape

Input data shape = (30, 30, 1)
Pool size = (2, 2)
Stride = 2
Expected output size = (15, 15)


(15, 15)

### Doing average pooling with stride=3

In [506]:
conv = process(data=data,  stride=1, pool_size=(2,2), process_name='avg_pooling')
conv.shape

Input data shape = (30, 30, 1)
Pool size = (2, 2)
Stride = 1
Expected output size = (29, 29)


(29, 29)