# Creating a Conv2D Transform from Scratch in Numpy
***

This notebook details how to construct a conv2D transform from scratch, which is the foundation of the convolutional layers used in modern computer vision techniques based on deep learning with neural networks. 

Also covered are kernel transforms for:
* Dilated Convolutions
* Custom Sampling Grids

### Imports

In [None]:
import numpy as np

### Optimized Multi-Input, Multi-Channel 2D Convolution

In [None]:
def subsample_images(images, n_x, n_y, filters_shape, stride):
    '''
    Extracts image subsamples corresponding to the sub-arrays that would
    be element-wise multiplied with a set of kernels in a 2D convolution. 
    
    Parameters
    ----------
    images : numpy array
        Array of 2D images to be subsampled, of dimension
        (batch_size, width, height, num_channels). 
    
    n_x : int
        Number of sampling steps along the x-dimension.
    
    n_y : int
        Number of sampling steps along the y-dimension.
    
    filters_shape : tuple
        Shape of the filter array being convolved with the 2D image, 
        given as (num_filters, kernel_width, kernel_height, num_channels).
    
    stride : tuple
        Stride of the sampling steps along the x and y dimensions, 
        given as (stride_x, stride_y).
    
    
    Returns
    -------
    subsamples : numpy array
        Array containing the image subsamples having dimension
        (batch_size, kernel_width * kernel_height * num_channels). 
        Note that the subsamples have been reshaped to 1D vectors and
        concatenated across channels, so that they can be efficiently
        convolved with the filters using a dot product. 
        
    '''
    
    (_, k_width, k_height, num_channels) = filters_shape 
    batch_size = images.shape[0]
    
    subsamples = np.zeros((n_x * n_y, batch_size, k_width * k_height * num_channels))
    ids = 0
    for dx in range(n_x): 
        for dy in range(n_y):
            x = stride[0] * dx
            y = stride[1] * dy
            sample = images[:, x:(x + k_width), y:(y + k_height), :]
            sample = sample.reshape((batch_size, k_width * k_height * num_channels), order='C')
            subsamples[ids, :, :] = sample
            ids += 1
                 
    return subsamples

    
def conv2d_transform(inputs, filters, stride=(1, 1), padding='valid'):
    '''
    Applies a 2-dimensional convolution of user-provided filters and inputs 
    using the specified stride and padding. 
    
    To optimize computational efficiency, the following approach has been taken:
    *  Compute image subsamples corresponding to the sub-arrays multiplied by the
        convolutional kernel for each stranslation step. 
    * Unravel the x and y image subsample dimensions into a single vector and 
        concatenate vectors across the channels. Vectorize all convolutional filters
        using the same procedure. 
    * Compute the convolution as a dot-product between the above unraveled image
        subsample vector and convolutional filters, then reshape the output into
        a 2D feature map. 
    

    Parameters
    ----------
    inputs : numpy array
        Array containing input images or feature maps 
        of shape (batch_size, width, height, num_channels). 
        
    filters : numpy array
        Array containing convolution filters of shape
        (num_filters, kernel_width, kernel_height, num_channels). 
        
    stride : tuple
        Specifies the stride (translation step size) of the convolutional
        kernel across the input images (feature maps). Usage: (stride_x, stride_y). 
        Stride must be specified as an integer greater than 0. 
    
    padding : string
        Desired padding type, 'valid' or 'same'; 'valid' padding does not pad the inputs
        with zeros and hence the output layer will be a smaller size; 'same' padding
        add zero padding of width 1 around the edge of the input arrays to preserve
        size when convolved with a kernel using a stride of 1. 
        
        
    Returns
    -------
    outputs : numpy array
        Array containing the output feature maps of the convolution layer, having shape
        (batch_size, x_pixels, y_pixels)
        
    
    Written by Ryan P. Marchildon, April 2020
    
    '''
    batch_size, width, height, num_channels = inputs.shape
    num_filters, k_width, k_height, k_channels = filters.shape
    (stride_x, stride_y) = stride
    
    # check assumptions on input parameters
    assert padding == 'valid' or 'same', "Padding must be equal to 'valid' or 'same'."
    assert type(stride) == tuple, "Stride must be specified as a tuple of form (stride_x, stride_y)."
    assert type(stride_x) or type(stride_y) == int, "Stride must be specified as a tuple of integers."
    assert stride[0] and stride[1] > 0, 'Stride cannot be less than 1 along any dimension.'
    assert len(inputs.shape) == 4, "Input must be an array of shape (batch_size, width, height, num_channels)."
    assert len(filters.shape) == 4, "Filters must be an array of shape (num_filters, width, height, num_channels)."
        
    # pad the input arrays with zeros
    if padding == 'same':
        inputs = np.pad(inputs, ((0, 0), (1, 1), (1, 1), (0, 0)), 'constant', constant_values=0)
        batch_size, width, height, num_channels = inputs.shape # update input dimensions

    # compute the expected dimensions of the output feature map, which equals the
    # number of kernel steps (translations) across the image
    n_x = int((width - k_width)/stride_x + 1)
    n_y = int((height - k_height)/stride_y + 1)

    # obtain the image subsamples
    subsamples = subsample_images(inputs, n_x, n_y, filters.shape, stride)
            
    # reshape the filters to a single vector that is concatenated across channels
    filters = filters.reshape((num_filters, k_width * k_height * num_channels), order='C')
    
    # compute the convolution by taking the dot product with the unravelled kernels 
    output = np.tensordot(subsamples, filters, axes=(2, 1))

    # reshape output to (batch_size, x_pixels, y_pixels, num_feature_maps)
    output = output.transpose(1, 0, 2) # re-order the axes
    output = output.reshape((batch_size, n_x, n_y, num_filters), order='C')

    return output


# UNIT TESTS
# ----------
# multi-channel input (e.g. RBG)
x1 = np.array([
    [0, 2, 2, 2, 2],
    [0, 2, 1, 2, 1],
    [2, 1, 2, 2, 2],
    [2, 2, 1, 1, 2],
    [1, 2, 2, 0, 2]
])

x2 = np.array([
    [2, 1, 0, 2, 2],
    [1, 2, 1, 1, 0],
    [2, 2, 0, 0, 2],
    [0, 2, 0, 1, 1],
    [2, 0, 0, 1, 1]
])

x3 = np.array([
    [1, 2, 0, 0, 2],
    [0, 1, 1, 2, 2],
    [2, 2, 2, 2, 2],
    [2, 1, 2, 2, 2],
    [1, 2, 2, 1, 0]
])

test_input_mc = np.stack([x1, x2, x3], axis=-1)
test_input_mc= test_input_mc.reshape(1, *test_input_mc.shape)

# multi-channel filter (1 filter contains multiple kernels)
k1 = np.array([
    [0, 0, -1],
    [0, 0, -1],
    [1, -1, 0]
])

k2 = np.array([
    [-1, 0, 1],
    [-1, 0, 0],
    [1, 0, 1]
])

k3 = np.array([
    [-1, 0, -1],
    [-1, -1, 1],
    [0, 1, 1]
])

test_filter_mc = np.stack((k1, k2, k3), axis=-1)
test_filter_mc= test_filter_mc.reshape(1, *test_filter_mc.shape)

# output feature maps (only 1 since there is only 1 filter applied to 1 image)
test_output_mc = np.array([
    [2, 2, 0],
    [1, -4, -5],
    [-2, -8, -5]
])
test_output_mc = test_output_mc.reshape(1, *test_output_mc.shape, 1)

print('Test Passed?', 
      np.array_equal(
          test_output_mc, 
          conv2d_transform(
              test_input_mc, 
              test_filter_mc, 
              padding='same', 
              stride=(2, 2))))

### Dilated Convolution

In [None]:
def dilate(kernel, d=0):
    '''
    Applies a dilation to an input convolution kernel. 
    
    
    Parameters
    ----------
    d : int 
        The dilation factor (zero returns a conventional convolution).
        
        
    Returns
    -------
    output : numpy array  
        The dilated kernel.
    
    '''
    assert type(kernel) == np.ndarray, "Input kernel must be an numpy array."
    assert type(d) == int, "Dilation factor 'd' must be a non-negative integer."
    assert d >= 0, "Dilation factor 'd' must be a non-negative integer."
    
    if d == 0:
        return kernel
    else: 
        # how many zeros are we injecting
        num_zeros_x = (kernel.shape[0] - 1)*d
        num_zeros_y = (kernel.shape[1] - 1)*d
        
        # initialize dilated array
        output = np.zeros((kernel.shape[0] + num_zeros_x, kernel.shape[1] + num_zeros_y))
        
        # populate the non-zero entries
        for dx in range(kernel.shape[0]):
            for dy in range(kernel.shape[1]):
                output[(d + 1)*dx, (d + 1)* dy] = kernel[dx, dy]
                
        return output
    
    
# UNIT TESTS
# ----------
test_kernel = np.array([
    [1, 0, 1],
    [0, 1, 1],
    [1, 0, 1]
])

test_dilation_output = np.array([
    [1, 0, 0, 0, 1], 
    [0, 0, 0, 0, 0], 
    [0, 0, 1, 0, 1], 
    [0, 0, 0, 0, 0],
    [1, 0, 0, 0, 1]
])

print('Test Passed?', np.array_equal(test_dilation_output, dilate(test_kernel, 1)))

### Kernel with a Custom Sampling Grid

In [None]:
def transform_kernel(weights, sampling_grid):
    '''
    Defines a new convolution kernel where the weights are
    redistributed with an offset defined by sampling_grid 
    relative to their original coordinates. 
    
    Example Usage:

        A 3x3 weights array of:
            1 1 1
            1 1 1
            1 1 1

        and 3x3x2 sampling_grid of:
            [-1, -2] [-1, 0] [-1, 2]
            [0, -1] [0, 0] [0, 1]
            [1, -2] [1, 0] [1, 2]

        would return the following transformed kernel:
            1 0 0 1 0 0 1
            0 0 0 0 0 0 0
            0 1 0 1 0 1 0
            0 0 0 0 0 0 0
            1 0 0 1 0 0 1
            
    
    Parameters
    ----------
    weights : numpy array
    
        An array containing the convolutional filter weights of dimension
        (kernel_x_size, kernel_y_size, num_channels). 
    
    sampling_grid : numpy array
        An array containing sampling offset coordinates for each of the 
        convolutional filter weights. Of dimension (kernel_x_size, kernel_y_size, 2),
        where along the third axis, the offsets are specified as [x_offset, y_offset]. 
        Note that offsets are relative to the coordinate of the original kernel element. 
    
    
    Returns
    -------
    transformed_kernel : numpy array
    

    '''
    weights_span_x, weights_span_y, num_channels = weights.shape
    
    # transform the coordinates of the sampling grid to be 
    # relative to the [0, 0] element of the original kernel
    for x in range(sampling_grid.shape[0]):
        for y in range(sampling_grid.shape[1]):
            sampling_grid[x, y, 0] += x 
            sampling_grid[x, y, 1] += y

    # translate the coordinate system to put the origin
    # at the top-most left sampling coordinate
    x_min = np.min(sampling_grid[:, :, 0])
    y_min = np.min(sampling_grid[:, :, 1])
    sampling_grid[:, :, 0] -= x_min
    sampling_grid[:, :, 1] -= y_min

    # create a sparse array of the minimum size needed
    # to represent the transformed kernel and populate
    # it with the kernel weights at the sample grid coordinates
    x_max = np.max(sampling_grid[:, :, 0])
    y_max = np.max(sampling_grid[:, :, 1])
    transformed_kernel = np.zeros((x_max + 1, y_max + 1, num_channels))
    for x in range(weights.shape[0]):
        for y in range(weights.shape[1]):
            x_sample = sampling_grid[x, y, 0]
            y_sample = sampling_grid[x, y, 1]
            transformed_kernel[x_sample, y_sample, :] = weights[x, y, :]
    
    
    return transformed_kernel


# UNIT TESTS
# ----------
test_weights = np.ones((3, 3, 3))

test_sampling_grid = np.array([
    [[-1, -2], [-1, 0], [-1, 2]], 
    [[0, -1], [0, 0], [0, 1]], 
    [[1, -2], [1, 0], [1, 2]]
])

output_sub_array = np.array([
    [1, 0, 0, 1, 0, 0, 1], 
    [0, 0, 0, 0, 0, 0, 0], 
    [0, 1 ,0, 1, 0, 1, 0], 
    [0, 0, 0, 0, 0, 0, 0], 
    [1, 0, 0, 1, 0, 0, 1]
])

test_output = np.repeat(output_sub_array[:, :, np.newaxis], 3, axis=2)

print('Test Passed?', np.array_equal(test_output, transform_kernel(test_weights, test_sampling_grid)))

#### Note: Convolving with a 9x9 Input to Obtain a 7x7 Output

Assuming the image size is larger than the size of the transformed kernel, the dimensions of the output would be:
* `n_x = (image_width - transformed_kernel_width + 1)`
* `n_y = (image_height - transformed_kernel_height + 1)`

To achieve the desired output size, we would need to pad the input image with:
* `pad_x = (7 - n_x)`, zeros along x
* `pad_y = (7 - n_y)`, zeros along y

If the image is smaller than the kernel, the above equations still give us the correct number of zeros to pad. For example, if along x, the kernel has size 11 and the image size 9, then n_x = -1, and pad_x = 8. This will reshape the input to be (8 + 9) = 17 across which, when convolved with the size 11 filter, gives us an output dimension of (17 - 11 + 1) = 7.

Note that padding would be asymmetric in the case that pad_x or pad_y are odd integers.
Assymetric padding can be achieved using the code shown below. 


In [None]:
import math

pad_x = 2
pad_y = 3

print('Before:')
print(test_input_mc[0, :, :, 0])

print('\nAfter:')
padding = ((0, 0), (math.floor(pad_x/2), math.ceil(pad_x/2)), (math.floor(pad_y/2), math.ceil(pad_y/2)), (0, 0))
temp = np.pad(test_input_mc, padding, 'constant', constant_values=0)
print(temp[0, :, :, 0])
