In [1]:
import numpy as np

np.random.seed(42)

In [2]:
batch_size = 2
input_channels = 3
input_height = 24
input_width = 24
output_channels = 16
filter_height = 5
filter_width = 5
padding = 1
stride = 3
X = np.random.randint(1,100,(batch_size,input_channels,input_height,input_width))
# weight init
global_weights = np.random.randn(output_channels, input_channels, filter_height, filter_height) * np.sqrt(2 / (filter_height * filter_height * input_channels))
# global_weights_2 = np.random.randn(in_dim, self.out_dim) * np.sqrt(2 / in_dim)

# anan

In [3]:
# import numpy as np

def dilateData(input, dilate):
    X = input.copy()

    if dilate != 0:
        for i in range(1, dilate+1):
            X = np.insert(X,range(1, X.shape[2],i), 0, axis=2)
            X = np.insert(X,range(1, X.shape[3],i), 0, axis=3)
    return X

def padData(input, padding):
    X = input.copy()

    return np.pad(X, pad_width=((0,), (0,), (padding,), (padding,)), mode='constant', constant_values=(0.,))

def getSlidingWindow(input, output_size, filter_dim, padding=0, stride=1, dilate=0):
    X = input

    if padding != 0:
        X = np.pad(X, pad_width=((0,), (0,), (padding,), (padding,)), mode='constant', constant_values=(0.,))


    if dilate != 0:
        for i in range(1, dilate+1):
            X = np.insert(X,range(1, X.shape[2],i), 0, axis=2)
            X = np.insert(X,range(1, X.shape[3],i), 0, axis=3)


    in_b, in_c, out_h, out_w = output_size
    out_b, out_c, _, _ = input.shape
    batch_str, channel_str, filter_height_str, filter_width_str = X.strides

    window = np.lib.stride_tricks.as_strided(X, shape=(out_b, out_c, out_h, out_w, filter_dim, filter_dim), strides=(batch_str, channel_str, filter_height_str * stride, filter_width_str * stride, filter_height_str, filter_width_str))
    return window


class Convolution:
    def __init__(self, out_channels, filter_dim, stride=1, padding=0):
        self.out_channels = out_channels
        self.filter_dim = filter_dim
        self.stride = stride
        self.padding = padding
        self.weights = None
        self.biases = None
        self.input_data = None
        self.window = None

    def initialize_weights(self, in_channels):
        if self.weights is None:
            # self.weights = np.random.randn(self.out_channels, in_channels, self.filter_dim, self.filter_dim) * np.sqrt(2 / (self.filter_dim * self.filter_dim * in_channels))
            self.weights = global_weights
        if self.biases is None:
            self.biases = np.zeros(self.out_channels)

    def forward(self, input_data):
        self.input_data = input_data
        batch_size, in_channels, height, width = input_data.shape
        self.initialize_weights(in_channels)

        out_height = (height + 2 * self.padding - self.filter_dim) // self.stride + 1
        out_width = (width + 2 * self.padding - self.filter_dim) // self.stride + 1

        window = getSlidingWindow(input_data, (batch_size, in_channels, out_height, out_width), self.filter_dim, self.padding, self.stride)
        self.window = window

        output = np.einsum('bihwkl,oikl->bohw', window, self.weights) + self.biases[None, :, None, None]

        return output

    def backward(self, output_grad, learning_rate=0.01):
        batch_size, in_channels, height, width = self.input_data.shape

        db = np.sum(output_grad, axis=(0, 2, 3))

        dilated_dz = dilateData(output_grad, self.stride-1)
        padded_X = padData(self.input_data, self.padding)

        X_window = getSlidingWindow(padded_X, self.weights.shape, dilated_dz.shape[2],padding=0,stride=1)
        dW = np.einsum('bihwkl,bokl->oihw', X_window, dilated_dz)

        padded_dilated_dz = padData(dilated_dz, self.filter_dim-1)
        rot_kernel = np.rot90(self.weights, 2, (2, 3))

        dz_window = getSlidingWindow(padded_dilated_dz, padded_X.shape, self.filter_dim,padding=0,stride=1)

        dX = np.einsum('bohwkl,oikl->bihw', dz_window, rot_kernel)

        self.weights -= learning_rate * dW / batch_size
        self.biases -= learning_rate * db / batch_size

        if self.padding > 0:
            dX = dX[:, :, self.padding:-self.padding, self.padding:-self.padding]

        return dX



class ReLU:
    def __init__(self):
        self.input_data = None
        self.output_grad = None

    def forward(self, input_data):
        self.input_data = input_data
        # apply ReLU activation function
        return np.maximum(0, input_data)


    def backward(self, output_grad, learning_rate=0.01):
        delGrad = np.copy(self.input_data)
        delGrad[delGrad < 0] = 0
        delGrad[delGrad > 0] = 1
        delGrad = delGrad * output_grad
        return delGrad

class MaxPooling:
    def __init__(self, filter_dim, stride=1):
        self.filter_dim = filter_dim
        self.stride = stride
        self.input_data = None
        self.window = None
        self.mask = None

    def forward(self, input_data):
        self.input_data = input_data
        batch_size, in_channels, height, width = input_data.shape

        out_height = (height - self.filter_dim) // self.stride + 1
        out_width = (width - self.filter_dim) // self.stride + 1

        window = getSlidingWindow(input_data, (batch_size, in_channels, out_height, out_width), self.filter_dim, stride=self.stride)
        self.window = window

        output = np.max(window, axis=(4, 5))

        maxs = output.repeat(self.stride, axis=2).repeat(self.stride, axis=3)
        x_window = self.input_data[:, :, :out_height * self.stride, :out_width * self.stride]

        mask = (x_window == maxs)
        self.mask = mask

        return output

    def backward(self, output_grad, learning_rate=0.01):
        X = self.input_data
        batch_size, in_channels, height, width = X.shape
        h_pool, w_pool = self.filter_dim, self.filter_dim

        mask = self.mask
        dA = output_grad.repeat(h_pool, axis=2).repeat(w_pool, axis=3)
        dA = np.multiply(dA, mask)

        pad = np.zeros(X.shape)
        pad[:, :, :dA.shape[2], :dA.shape[3]] = dA

        return pad

class Flatten:
    def __init__(self):
        self.input_data = None
        self.output_grad = None

    def forward(self, input_data):
        self.input_data = input_data
        return input_data.reshape(input_data.shape[0], -1)

    def backward(self, output_grad, learning_rate=0.01):
        return output_grad.reshape(self.input_data.shape)

class FullyConnected:
    def __init__(self, out_dim):
        self.out_dim = out_dim
        self.weights = None
        self.biases = None
        self.input_data = None
        self.output_grad = None

    def initialize_weights(self, in_dim):
        if self.weights is None:
            self.weights = np.random.randn(in_dim, self.out_dim) * np.sqrt(2 / in_dim)
        if self.biases is None:
            self.biases = np.zeros(self.out_dim)

    def forward(self, input_data):
        self.input_data = input_data
        self.initialize_weights(input_data.shape[1])
        return np.dot(input_data, self.weights) + self.biases

    def backward(self, output_grad, learning_rate=0.01):
        self.output_grad = output_grad
        self.weights -= learning_rate * np.dot(self.input_data.T, output_grad)
        self.biases -= learning_rate * np.sum(output_grad, axis=0)
        return np.dot(output_grad, self.weights.T)



class Softmax:
    def __init__(self):
        self.input_data = None

    def forward(self, input_data):
        self.input_data = input_data
        exp = np.exp(input_data - np.max(input_data, axis=1, keepdims=True))
        return exp / np.sum(exp, axis=1, keepdims=True)

    def backward(self, output_grad, learning_rate=0.01):
        delU = np.copy(self.input_data)
        return delU



# asif conv

In [4]:
# from layer import Layer

class AS_Convolution():
    def __init__(self, num_filters, filter_dim, stride=1, padding=0):
        # num_filters: number of output channels
        # filter_dim: filter dimension (height, width)
        # stride: stride of the convolution. default: 1
        # padding: padding of the input (assuming zero padding and square padding). default: 0

        self.num_filters = num_filters
        self.filter_height, self.filter_width = filter_dim
        self.stride = stride
        self.padding = padding

        # initialize weights and biases
        # None, as we don't know the input shape yet
        self.weights = None
        self.biases = None

    def forward(self, input):
        # input: (batch_size, num_channels, input_height, input_width)
        batch_size, num_channels, input_height, input_width = input.shape

        # weights: (num_filters, num_channels, filter_height, filter_width)
        # initialize weights xaiver initialization
        if self.weights is None:
            # self.weights = np.random.randn(self.num_filters, num_channels, self.filter_height, self.filter_width) * (
            #     np.sqrt(2 / (
            #         self.filter_height
            #     ))
            # )
            self.weights = global_weights

        # biases: (num_filters, 1)
        # initialize biases to 0
        if self.biases is None:
            self.biases = np.zeros(self.num_filters)

        # output: (batch_size, num_filters, output_height, output_width)
        output_height = int((input_height - self.filter_height +
                            2 * self.padding) / self.stride + 1)
        output_width = int((input_width - self.filter_width + 2 *
                           self.padding) / self.stride + 1)

        # np.pad: Number of values padded to the edges of each axis.
        # ((before_1, after_1), ... (before_N, after_N)) unique pad widths for each axis.
        # pad with 'constant' values. default: 0
        input_padded = np.pad(input, ((0, 0),
                                      (0, 0),
                                      (self.padding, self.padding),
                                      (self.padding, self.padding)
                                      ), 'constant')

        # # for loop
        #
        # output = np.zeros((batch_size, self.num_filters,
        #                   output_height, output_width))
        #
        # for sample_index in range(batch_size):
        #     for filter_index in range(self.num_filters):
        #         for h in range(output_height):
        #             for w in range(output_width):
        #                 output[sample_index, filter_index, h, w] = np.sum(
        #                     input_padded[sample_index,
        #                                  :,
        #                                  h * self.stride:h * self.stride + self.filter_height,
        #                                  w * self.stride:w * self.stride + self.filter_width
        #                                  ] * self.weights[filter_index]
        #                 ) + self.biases[filter_index]

        # as strided
        # https://stackoverflow.com/a/53099870
        input_strided = np.lib.stride_tricks.as_strided(
            input_padded,
            shape=(
                batch_size,
                num_channels,
                output_height,
                output_width,
                self.filter_height,
                self.filter_width
            ),
            strides=(
                input_padded.strides[0],
                input_padded.strides[1],
                input_padded.strides[2] * self.stride,
                input_padded.strides[3] * self.stride,
                input_padded.strides[2],
                input_padded.strides[3]
            )
        )
        # einsum
        # https://ajcr.net/Basic-guide-to-einsum/
        output = np.einsum(
            'bcijkl,fckl->bfij',
            input_strided, self.weights
        )
        output += self.biases.reshape(1, -1, 1, 1)

        # self.cache = (input, input_padded, input_strided)
        self.cache = input_padded
        # self.cache = input
        return output

    def backward(self, output_error, learning_rate):
        # output_error: (batch_size, num_filters, output_height, output_width)
        # learning_rate: learning rate
        # input, input_padded, input_strided = self.cache
        input_padded = self.cache
        # input = self.cache

        # dilate the output error
        dilate = self.stride - 1
        # insert dilate number of 0 rows/cols between each row/col
        output_error_modified = np.insert(
            output_error,
            obj=np.arange(1, output_error.shape[2]).repeat(dilate),
            values=0,
            axis=2
        )
        output_error_modified = np.insert(
            output_error_modified,
            obj=np.arange(1, output_error_modified.shape[3]).repeat(dilate),
            values=0,
            axis=3
        )

        weights_error_height = input_padded.shape[2] - \
            output_error_modified.shape[2] + 1
        weights_error_width = input_padded.shape[3] - \
            output_error_modified.shape[3] + 1

        # check with original weights
        # dilate the output error at the end
        output_error_modified_weights = output_error_modified.copy()
        if weights_error_height > self.weights.shape[2]:
            output_error_modified_weights = np.insert(
                output_error_modified,
                obj=np.array([output_error_modified.shape[2]]).repeat(
                    weights_error_height - self.weights.shape[2]
                ),
                values=0,
                axis=2
            )
        if weights_error_width > self.weights.shape[3]:
            output_error_modified_weights = np.insert(
                output_error_modified_weights,
                obj=np.array([output_error_modified_weights.shape[3]]).repeat(
                    weights_error_width - self.weights.shape[3]
                ),
                values=0,
                axis=3
            )

        # update weights
        # as strided
        input_strided = np.lib.stride_tricks.as_strided(
            input_padded,
            shape=(
                input_padded.shape[0],
                input_padded.shape[1],
                self.weights.shape[2],
                self.weights.shape[3],
                output_error_modified_weights.shape[2],
                output_error_modified_weights.shape[3]
            ),
            strides=(
                input_padded.strides[0],
                input_padded.strides[1],
                input_padded.strides[2],
                input_padded.strides[3],
                input_padded.strides[2],
                input_padded.strides[3]
            )
        )
        weights_error = np.einsum(
            'bcklij,bfij->fckl',
            input_strided, output_error_modified_weights
        )
        weights_error = weights_error * 1/input_padded.shape[0]
        self.weights -= learning_rate * weights_error

        # update biases
        biases_error = np.sum(output_error_modified,
                              axis=(0, 2, 3)) * 1/input_padded.shape[0]
        self.biases -= learning_rate * biases_error

        # input error calculation
        padded_height = output_error_modified.shape[2] + \
            2 * (self.filter_height - 1)
        padded_width = output_error_modified.shape[3] + \
            2 * (self.filter_width - 1)

        input_error_height = padded_height - self.filter_height + 1
        input_error_width = padded_width - self.filter_width + 1

        # check with the original input
        # dilate the output error at the end
        if input_error_height < input_padded.shape[2]:
            output_error_modified = np.insert(
                output_error_modified,
                obj=np.array([output_error_modified.shape[2]]).repeat(
                    input_padded.shape[2] - input_error_height),
                values=0,
                axis=2
            )
        if input_error_width < input_padded.shape[3]:
            output_error_modified = np.insert(
                output_error_modified,
                obj=np.array([output_error_modified.shape[3]]).repeat(
                    input_padded.shape[3] - input_error_width),
                values=0,
                axis=3
            )

        # pad the output error with filter_size - 1
        output_error_modified = np.pad(
            output_error_modified,
            (
                (0, 0),
                (0, 0),
                (self.filter_height - 1, self.filter_height - 1),
                (self.filter_width - 1, self.filter_width - 1)
            ),
            'constant'
        )
        # rotate the weights by 180
        weights_modified = np.rot90(self.weights, 2, (2, 3))

        # assert input_padded.shape[2] == output_error_modified.shape[2] - \
        #     self.filter_height + 1
        # assert input_padded.shape[3] == output_error_modified.shape[3] - \
        #     self.filter_width + 1
        # as strided
        output_error_modified_strided = np.lib.stride_tricks.as_strided(
            output_error_modified,
            shape=(
                output_error_modified.shape[0],
                output_error_modified.shape[1],
                input_padded.shape[2],
                input_padded.shape[3],
                self.filter_height,
                self.filter_width
            ),
            strides=(
                output_error_modified.strides[0],
                output_error_modified.strides[1],
                output_error_modified.strides[2],
                output_error_modified.strides[3],
                output_error_modified.strides[2],
                output_error_modified.strides[3]
            )
        )
        # einsum
        input_error = np.einsum(
            'bfijkl,fckl->bcij',
            output_error_modified_strided, weights_modified
        )

        # drop the padded rows/cols
        input_error = input_error[:, :, self.padding:-
                                  self.padding, self.padding:-self.padding]

        # # clear cache
        # input_padded = None
        # self.cache = None

        return input_error


# utsa conv

In [4]:
# import numpy as np
 
# np.random.seed(42)
 
 
def getSlidingWindow(input, output_size, filter_dim, padding=0, stride=1, dilate=0):
    X = input
 
    if padding != 0:
        X = np.pad(X, pad_width=((0,), (0,), (padding,), (padding,)),
                   mode='constant', constant_values=(0.,))
 
    if dilate != 0:
        for i in range(1, dilate+1):
            X = np.insert(X, range(1, X.shape[0], i), 0, axis=0)
            X = np.insert(X, range(1, X.shape[1], i), 0, axis=1)
 
    in_b, in_c, out_h, out_w = output_size
    out_b, out_c, _, _ = input.shape
    batch_str, channel_str, filter_height_str, filter_width_str = X.strides
 
    window = np.lib.stride_tricks.as_strided(X, shape=(out_b, out_c, out_h, out_w, filter_dim, filter_dim), strides=(
        batch_str, channel_str, filter_height_str * stride, filter_width_str * stride, filter_height_str, filter_width_str))
    return window
 
 
def dilateKoreDao(input, dilatePoriman):
    X = input.copy()
 
    if dilatePoriman != 0:
        for i in range(1, dilatePoriman+1):
            X = np.insert(X, range(1, X.shape[2], i), 0, axis=2)
            X = np.insert(X, range(1, X.shape[3], i), 0, axis=3)
 
    return X
 
 
def padKoreDao(input, padding):
 
    return np.pad(input, pad_width=((0,), (0,), (padding,), (padding,)),
                  mode='constant', constant_values=(0.,))
 
 
class Convolution:
    def __init__(self, out_channels, filter_dim, stride=1, padding=0):
        self.out_channels = out_channels
        self.filter_dim = filter_dim
        self.stride = stride
        self.padding = padding
        self.weights = None
        self.biases = None
        self.input_data = None
        self.window = None
 
    def initialize_weights(self, in_channels):
        if self.weights is None:
            # self.weights = np.random.randn(self.out_channels, in_channels, self.filter_dim,
            #                                self.filter_dim) * np.sqrt(2 / (self.filter_dim * self.filter_dim * in_channels))
            self.weights = global_weights
        if self.biases is None:
            self.biases = np.zeros(self.out_channels)
 
    def forward(self, input_data):
        self.input_data = input_data
        batch_size, in_channels, height, width = input_data.shape
        self.initialize_weights(in_channels)
 
        out_height = (height + 2 * self.padding -
                      self.filter_dim) // self.stride + 1
        out_width = (width + 2 * self.padding -
                     self.filter_dim) // self.stride + 1
 
        window = getSlidingWindow(input_data, (batch_size, in_channels,
                                  out_height, out_width), self.filter_dim, self.padding, self.stride)
        self.window = window
 
        output = np.einsum('bihwkl,oikl->bohw', window,
                           self.weights) + self.biases[None, :, None, None]
 
        return output
 
    def backward(self, output_grad, learning_rate=0.01):
        # if self.padding == 0:
        #     padding = self.filter_dim - 1
        # else:
        #     padding = self.padding
 
        batch_size, in_channels, height, width = self.input_data.shape
 
        # d_window = getSlidingWindow(output_grad, self.input_data.shape, self.filter_dim, padding=self.stride-1, stride=1, dilate=self.stride - 1)
        # rot_kernel = np.rot90(self.weights, 2, axes=(2, 3))
 
        # db = np.sum(output_grad, axis=(0, 2, 3))
        # self.biases -= learning_rate * db / batch_size
 
        # output_grad dilate
        # dilate = stride - 1
        # dilated_output_
 
        # convolve dilated_output_grad with padded_input
        # strided view of dilated_output_grad
        # einsum
 
        # dw = np.einsum('bihwkl,bohw->oikl', self.window, output_grad)
        # dx = np.einsum('bohwkl,oikl->bihw', d_window, rot_kernel)
 
        # self.weights -= learning_rate * dw
        # self.biases -= learning_rate * db
 
        db = np.sum(output_grad, axis=(0, 2, 3))
 
        dilated_dz = dilateKoreDao(output_grad, self.stride-1)
 
        padded_X = padKoreDao(self.input_data, self.padding)
 
        X_window = getSlidingWindow(
            padded_X, self.weights.shape, dilated_dz.shape[2], padding=0, stride=1)
 
        dw = np.einsum('bihwkl,bokl->oihw', X_window, dilated_dz)
 
        padded_dilated_dz = padKoreDao(dilated_dz, self.filter_dim-1)
 
        rot_kernel = np.rot90(self.weights, 2, axes=(2, 3))
 
        dz_window = getSlidingWindow(
            padded_dilated_dz, padded_X.shape, self.filter_dim, padding=0, stride=1)
        dx = np.einsum('bohwkl,oikl->bihw', dz_window, rot_kernel)
 
        if self.padding > 0:
            # drop padding from dx
            dx = dx[:, :, self.padding:-self.padding,
                    self.padding:-self.padding]
 
        # update
        self.weights -= learning_rate * dw / batch_size
        self.biases -= learning_rate * db / batch_size
 
        return dx


# nawmi conv

In [13]:
class FN_ConvLayer:
    def __init__(self, num_filters, filter_dim, stride, padding):
        self.num_filters = num_filters
        self.filter_dim = filter_dim
        self.stride = stride
        self.padding = padding
        self.input = None
        self.filters = None
        self.bias = np.zeros(num_filters)
        # self.left_gradient =   None
        # self.filters_gradient = None
        # self.bias_gradient = None
        self.learning_rate = 0.001
        self.out_h = None
        self.out_w = None

    def forward(self, input):
        self.input = input

        if self.filters is None:
            # self.filters = np.random.randn(self.num_filters, input.shape[1], self.filter_dim, self.filter_dim) * np.sqrt(2 / (self.num_filters * self.filter_dim * self.filter_dim))
            self.filters = global_weights

        #output dimensions
        self.out_h = int(
            (
                (input.shape[2] - self.filter_dim + 2 * self.padding) / self.stride
            ) + 1
        )
        self.out_w = int(((input.shape[3] - self.filter_dim + 2 * self.padding) / self.stride) + 1)

        #padding
        padded_input = np.pad(input, ((0, 0), (0, 0), (self.padding, self.padding), (self.padding, self.padding)), 'constant')

        #shape for strided input --> (batch_size, num_filters, out_h, out_w, filter_dim, filter_dim)
        strided_input = np.lib.stride_tricks.as_strided(padded_input, 
                                                        shape=(
                                                            padded_input.shape[0],
                                                            padded_input.shape[1],
                                                            self.out_h,
                                                            self.out_w,
                                                            self.filter_dim,
                                                            self.filter_dim
                                                        ), 
                                                        strides=(
                                                            padded_input.strides[0],
                                                            padded_input.strides[1],
                                                            padded_input.strides[2] * self.stride,
                                                            padded_input.strides[3] * self.stride,
                                                            padded_input.strides[2],
                                                            padded_input.strides[3]
                                                        )
                                                    )
        
        
        output = np.einsum('bchwkl, nckl -> bnhw', strided_input, self.filters) + self.bias.reshape(1, self.num_filters, 1, 1)

        return output

# running conv

In [5]:
asif_conv = AS_Convolution(num_filters=output_channels, filter_dim=(filter_height,filter_width), stride=stride, padding=padding)

In [6]:
asif_z = asif_conv.forward(X)

In [7]:
asif_z.shape

(2, 16, 8, 8)

In [14]:
nawmi_conv = FN_ConvLayer(num_filters=output_channels, filter_dim=filter_height, stride=stride, padding=padding)

In [15]:
nawmi_z = nawmi_conv.forward(X)

In [16]:
nawmi_z.shape

(2, 4, 4, 4)

In [17]:
np.allclose(asif_z, nawmi_z)

True

In [8]:
anan_conv = Convolution(out_channels=output_channels, filter_dim=filter_height, stride=stride, padding=padding)

In [9]:
anan_z = anan_conv.forward(X)

In [10]:
anan_z.shape

(2, 16, 8, 8)

In [11]:
np.allclose(asif_z,anan_z)

True

In [10]:
utsa_conv = Convolution(out_channels=output_channels, filter_dim=filter_height, stride=stride, padding=padding)

In [11]:
utsa_z = utsa_conv.forward(X)

In [12]:
utsa_z.shape

(2, 16, 8, 8)

In [13]:
np.allclose(asif_z,utsa_z)

True

In [12]:
asif_dx = asif_conv.backward(asif_z, 0.1)

In [13]:
asif_dx.shape

(2, 3, 24, 24)

In [16]:
utsa_dx = utsa_conv.backward(utsa_z, 0.1)

In [17]:
utsa_dx.shape

(2, 3, 24, 24)

In [18]:
np.allclose(asif_dx, utsa_dx)

True

In [14]:
anan_dx = anan_conv.backward(asif_z, 0.1)

In [15]:
anan_dx.shape

(2, 3, 24, 24)

In [16]:
np.allclose(asif_dx,anan_dx)

True

# utsa maxpool

In [5]:
def getSlidingWindow(input, output_size, filter_size, padding=0, stride=1, dilate=False):
 
    # input X - input size of (B,C,H,W) - B=Batch,C=Channels,H=height,W=width
    # output_size - (output_height,output_width)
    # filter_size - (K,L) - K=kernel height,L=kernel width
 
    modified_input = input
 
    # dilate the input if necessary
 
    if dilate == True:
        modified_input = np.insert(
            modified_input, range(1, input.shape[2]), 0, axis=2)
        modified_input = np.insert(
            modified_input, range(1, input.shape[3], 0, axis=3))
 
 
    # padding of the input
 
    if padding != 0:
        modified_input = np.pad(modified_input, pad_width=((0,), (0,), (padding,), (padding,)), mode='constant', constant_values=(0.,))
 
    # set the size for stride  
    output_height, output_width = output_size
    output_batch, output_channel, _, _ = input.shape
    batch_stride, channel_stride, filter_height_stride, filter_width_stride = modified_input.strides
    filter_height, filter_width = filter_size,filter_size
 
    # bihwkl - batch, input channel, output height, output width, filter height, filter width
    # stride of the input
    return np.lib.stride_tricks.as_strided(
        modified_input,
        (output_batch, output_channel, output_height, output_width, filter_height, filter_width),
        (batch_stride, channel_stride, stride*filter_height_stride, stride*filter_width_stride, filter_height_stride, filter_width_stride)
    )
 
 
class UT_Maxpool:
    # implementation of a maxpooling layer. maxpool input with output_channels different filters and stride
    # each filter spans all channels in the input
 
    def __init__(self, filter_size, stride):
        # filter_size: the specified size of the kernel(both height and width) ex : 2x2 filter with height 2 and width 2
        # stride: the stride of convolution
        self.filter_size = filter_size
        self.stride = stride
        self.cache = None
 
    def forward(self, input):
        B, C, H, W = input.shape
        #print("Maxpool Input Shape : ", H, W)
        H_out = (H - self.filter_size)//self.stride + 1
        W_out = (W - self.filter_size)//self.stride + 1
        #print("Maxpool Output Shape : ", H_out, W_out)
        sliding_window = getSlidingWindow(
            input, (H_out, W_out), self.filter_size, 0, self.stride)
        maxpool_output = np.max(sliding_window, axis=(4, 5))
        # print(maxpool_output.shape)
 
        # cahche the input and the sliding_window for use in back propagation
        self.cache = input, sliding_window
 
        return maxpool_output
    
    # def backward(self, d_output):
    #     input, sliding_window = self.cache
    #     B, C, H, W = input.shape
    #     H_out = (H - self.filter_size)//self.stride + 1
    #     W_out = (W - self.filter_size)//self.stride + 1
 
    #     d_input = np.zeros_like(input)
    #     for i in range(B):
    #         for j in range(C):
    #             for k in range(H_out):
    #                 for l in range(W_out):
    #                     # get the index of the max value in the sliding window
    #                     max_index = np.argmax(
    #                         sliding_window[i, j, k, l, :, :])
    #                     # get the row and column of the max value in the sliding window
    #                     max_row = max_index // self.filter_size
    #                     max_col = max_index % self.filter_size
    #                     # add the gradient to the corresponding index in the input
    #                     d_input[i, j, k*self.stride+max_row, l *
    #                             self.stride+max_col] += d_output[i, j, k, l]
 
    #     return d_input
 
    def backward(self, d_output):
        B,C,H,W = self.cache[0].shape
        H_out = (H - self.filter_size)//self.stride + 1
        W_out = (W - self.filter_size)//self.stride + 1
        d_input = np.zeros_like(self.cache[0])
 
        for i in range(B):
            for j in range(C):
                for k in range(H_out):
                    for l in range(W_out):
                        # create the current window with stride
                        current_window = self.cache[0][i, j, k*self.stride:k*self.stride+self.filter_size, l*self.stride:l*self.stride+self.filter_size]
                        # get the index of the max value in the current window
                        max_val = np.max(current_window)
                        # update the d_input where the max value is located
                        d_input[i, j, k*self.stride:k*self.stride+self.filter_size, l*self.stride:l*self.stride+self.filter_size] = (current_window == max_val) * d_output[i, j, k, l]
        return d_input


# asif maxpool

In [17]:
# from layer import Layer

class AS_MaxPooling():
    def __init__(self, filter_dim, stride):
        # filter_dim: filter dimension. (filter_height, filter_width)

        self.filter_height, self.filter_width = filter_dim
        self.stride = stride
        pass

    def forward(self, input):
        batch_size, num_channels, height, width = input.shape

        output_height = int(
            (height - self.filter_height) / self.stride + 1)
        output_width = int(
            (width - self.filter_width) / self.stride + 1)

        # # for loop
        # output = np.zeros(
        #     (batch_size, num_channels, output_height, output_width))
        # for h in range(output_height):
        #     for w in range(output_width):
        #         strided_window = input[
        #             :,
        #             :,
        #             h * self.stride:h * self.stride + self.filter_height,
        #             w * self.stride:w * self.stride + self.filter_width
        #         ]
        #         output[:, :, h, w] = np.max(strided_window, axis=(2, 3))

        # as strided
        input_strided = np.lib.stride_tricks.as_strided(
            input,
            shape=(
                batch_size,
                num_channels,
                output_height,
                output_width,
                self.filter_height,
                self.filter_width
            ),
            strides=(
                input.strides[0],
                input.strides[1],
                input.strides[2] * self.stride,
                input.strides[3] * self.stride,
                input.strides[2],
                input.strides[3]
            )
        )
        output = np.max(input_strided, axis=(4, 5))

        # special case when stride == filter_size
        max_value_mask = None
        # max_value_mask -> (batch_size, num_channels, input_height, input_width)
        if self.stride == self.filter_height:
            # initialize from output. repeating
            max_value_mask = output.repeat(
                self.stride,
                axis=2
            ).repeat(
                self.stride,
                axis=3
            )
            # pad for non-divisible input size
            max_pad_height = height - max_value_mask.shape[2]
            max_pad_width = width - max_value_mask.shape[3]
            if max_pad_height > 0 or max_pad_width > 0:
                max_value_mask = np.pad(
                    max_value_mask,
                    (
                        (0, 0),
                        (0, 0),
                        (0, max_pad_height),
                        (0, max_pad_width)
                    ),
                    'constant',
                )
            # compare with input
            # problem for multiple maxima :(
            max_value_mask = np.equal(max_value_mask, input)

        self.cache = input, max_value_mask

        return output

    def backward(self, output_error, learning_rate):
        # output_error. (batch_size, num_channels, out_height, out_width)
        input, max_value_mask = self.cache
        batch_size, num_channels, height, width = input.shape
        # special case when stride == filter_size
        if self.stride == self.filter_height:
            # check if max_value_mask is not None
            if max_value_mask is None:
                raise Exception(
                    'max_value_mask is None. Check if stride == filter_height')
            else:
                # output error needs to be tiled
                repeated_output_error = output_error.repeat(
                    self.stride,
                    axis=2
                ).repeat(
                    self.stride,
                    axis=3
                )
                # pad for non-divisible input size
                pad_height = height - repeated_output_error.shape[2]
                pad_width = width - repeated_output_error.shape[3]
                if pad_height > 0 or pad_width > 0:
                    repeated_output_error = np.pad(
                        repeated_output_error,
                        (
                            (0, 0),
                            (0, 0),
                            (0, pad_height),
                            (0, pad_width)
                        ),
                        'constant',
                    )
                # element-wise multiplication
                input_error = np.einsum(
                    'ijkl,ijkl->ijkl',
                    max_value_mask,
                    repeated_output_error
                )

        else:
            # for loop
            _, _, output_height, output_width = output_error.shape
            input_error = np.zeros(input.shape)

            for i in range(batch_size):
                for j in range(num_channels):
                    for h in range(output_height):
                        for w in range(output_width):
                            input_window = input[
                                i,
                                j,
                                h*self.stride:h*self.stride+self.filter_height,
                                w*self.stride:w*self.stride+self.filter_width
                            ]
                            # https://stackoverflow.com/a/9483964
                            max_index = np.unravel_index(
                                input_window.argmax(), input_window.shape)
                            max_index = (i, j, h*self.stride +
                                         max_index[0], w*self.stride+max_index[1])

                            # add overlapped gradients
                            # https://ai.stackexchange.com/a/17109
                            input_error[max_index] += output_error[i, j, h, w]

        # # clear cache
        # input = None
        # max_value_mask = None
        # self.cache = None
        return input_error


# anan maxpool

In [4]:
def getSlidingWindow(input, output_size, filter_dim, padding=0, stride=1, dilate=0):
    X = input

    if dilate != 0:
        for _ in range(dilate):
            X = np.insert(X, range(1, X.shape[2]), 0, axis=2)
            X = np.insert(X, range(1, X.shape[3]), 0, axis=3)

    if padding != 0:
        X = np.pad(X, pad_width=((0,), (0,), (padding,), (padding,)), mode='constant', constant_values=(0.,))

    in_b, in_c, out_h, out_w = output_size
    out_b, out_c, _, _ = input.shape
    batch_str, channel_str, filter_height_str, filter_width_str = X.strides

    window = np.lib.stride_tricks.as_strided(X, shape=(out_b, out_c, out_h, out_w, filter_dim, filter_dim), strides=(batch_str, channel_str, filter_height_str * stride, filter_width_str * stride, filter_height_str, filter_width_str))
    return window

class AN_MaxPooling:
    def __init__(self, filter_dim, stride=1):
        self.filter_dim = filter_dim
        self.stride = stride
        self.input_data = None
        self.window = None
        self.mask = None

    def forward(self, input_data):
        self.input_data = input_data
        batch_size, in_channels, height, width = input_data.shape

        out_height = (height - self.filter_dim) // self.stride + 1
        out_width = (width - self.filter_dim) // self.stride + 1

        window = getSlidingWindow(input_data, (batch_size, in_channels, out_height, out_width), self.filter_dim, stride=self.stride)
        self.window = window

        output = np.max(window, axis=(4, 5))

        maxs = output.repeat(self.stride, axis=2).repeat(self.stride, axis=3)
        x_window = self.input_data[:, :, :out_height * self.stride, :out_width * self.stride]

        mask = (x_window == maxs)
        self.mask = mask

        return output

    def backward(self, output_grad, learning_rate=0.01):
        X = self.input_data
        batch_size, in_channels, height, width = X.shape
        h_pool, w_pool = self.filter_dim, self.filter_dim

        mask = self.mask
        dA = output_grad.repeat(h_pool, axis=2).repeat(w_pool, axis=3)
        dA = np.multiply(dA, mask)

        pad = np.zeros(X.shape)
        pad[:, :, :dA.shape[2], :dA.shape[3]] = dA

        return pad


# run maxpool

In [18]:
X.shape

(2, 3, 24, 24)

In [19]:
asif_mp = AS_MaxPooling(filter_dim=(2,2), stride=2)

In [20]:
asif_z = asif_mp.forward(X)

In [21]:
asif_z.shape

(2, 3, 12, 12)

In [17]:
utsa_mp = UT_Maxpool(filter_size=2, stride=2)

In [18]:
utsa_z = utsa_mp.forward(X)

In [19]:
utsa_z.shape

(2, 3, 4, 4)

In [20]:
np.allclose(asif_z, utsa_z)

True

In [22]:
anan_mp = MaxPooling(filter_dim=2, stride=2)

In [23]:
anan_z = anan_mp.forward(X)

In [24]:
np.allclose(asif_z, anan_z)

True

In [25]:
asif_dx = asif_mp.backward(asif_z, 0.01)

In [23]:
utsa_dx = utsa_mp.backward(utsa_z)

In [26]:
anan_dx = anan_mp.backward(anan_z, 0.01)

In [24]:
np.allclose(asif_dx, utsa_dx)

True

In [27]:
np.allclose(asif_dx, anan_dx)

True