In [1]:
import numpy as np

# Implementing convolution operations with numpy

## Single-channel convolution

In [7]:
def numpy_conv(inputs, filter, _result, padding = "VALID"):
    H, W = inputs.shape
    filter_size = filter.shape[0]
    # floor
    filter_center = int(filter_size / 2.0)
    filter_center_ceil = int(np.ceil(filter_size / 2.0))

    # Define a space of the same size as the input, but the surrounding circle will be cut off afterward.
    result = np.zeros((_result.shape))
    # Update input (In SAME mode, HW will be changed.
    # H, W = inputs.shape
    # print("new size", H, W)
    for r in range(0, H - filter_size + 1):
        for c in range(0, W - filter_size + 1):
            # The input region of the convolution kernel size
            cur_input = inputs[r:r + filter_size, c:c + filter_size]
            # The input of kernel size is multiplied by the convolution kernel
            cur_output = cur_input * filter
            # Then sum all the values
            conv_sum = np.sum(cur_output)
            # Current point output value
            result[r, c] = conv_sum
    return result

## Multi-channel convolution

In [5]:
def _conv(inputs, filter, strides = [1, 1], padding = "SAME"):
    C_in, H, W = inputs.shape
    filter_size = filter.shape[2]
    # The number of output channels, which is also the number of kernels
    C_out = filter.shape[0]
    if padding == "VALID":
        result = np.zeros(
            [C_out, int(np.ceil(H - filter_size + 1) / strides[0]), int(np.ceil(W - filter_size + 1) / strides[1])],
            np.float32)
    else:
        result = np.zeros([C_out, int(H / strides[0]), int(W / strides[1])], np.float32)
        C, H_new, W_new = inputs.shape
        pad_h = (H_new - 1) * strides[0] + filter_size - H
        pad_top = int(pad_h / 2)
        pad_down = pad_h - pad_top

        pad_w = (W_new - 1) * strides[1] + filter_size - W
        pad_left = int(pad_w / 2)
        pad_right = pad_w - pad_left
        inputs = np.pad(inputs, ((0, 0), (pad_top, pad_down), (pad_left, pad_right)), 'constant',
                        constant_values = (0, 0))
    for channel_out in range(C_out):
        for channel_in in range(C_in):
            # Data of the current channel
            channel_data = inputs[channel_in]
            # Single-core single-channel convolution, then accumulation
            result[channel_out, :, :] += numpy_conv(channel_data, filter[channel_out][channel_in], result[0], padding)
    
    return result

In [8]:
# input[C_in,H,W]
inputs = np.zeros([3,9,9])
for i in range(3):
    for j in range(9):
        for z in range(9):
            inputs[i][j][z] = i+j+z

print("input:\n",inputs,"\n")

#kernel[C_out,C_in,K,K]
filter = np.zeros([2, 3, 3, 3])
for i in range(2):
    for j in range(3):
        for x in range(3):
            for y in range(3):
                filter[i][j][x][y] = i + j + x + y


print("filter\n",filter,"\n")

final_result = _conv(inputs, filter, strides=[1,1],padding="SAME")

print("result\n",final_result,"\n")

input:
 [[[ 0.  1.  2.  3.  4.  5.  6.  7.  8.]
  [ 1.  2.  3.  4.  5.  6.  7.  8.  9.]
  [ 2.  3.  4.  5.  6.  7.  8.  9. 10.]
  [ 3.  4.  5.  6.  7.  8.  9. 10. 11.]
  [ 4.  5.  6.  7.  8.  9. 10. 11. 12.]
  [ 5.  6.  7.  8.  9. 10. 11. 12. 13.]
  [ 6.  7.  8.  9. 10. 11. 12. 13. 14.]
  [ 7.  8.  9. 10. 11. 12. 13. 14. 15.]
  [ 8.  9. 10. 11. 12. 13. 14. 15. 16.]]

 [[ 1.  2.  3.  4.  5.  6.  7.  8.  9.]
  [ 2.  3.  4.  5.  6.  7.  8.  9. 10.]
  [ 3.  4.  5.  6.  7.  8.  9. 10. 11.]
  [ 4.  5.  6.  7.  8.  9. 10. 11. 12.]
  [ 5.  6.  7.  8.  9. 10. 11. 12. 13.]
  [ 6.  7.  8.  9. 10. 11. 12. 13. 14.]
  [ 7.  8.  9. 10. 11. 12. 13. 14. 15.]
  [ 8.  9. 10. 11. 12. 13. 14. 15. 16.]
  [ 9. 10. 11. 12. 13. 14. 15. 16. 17.]]

 [[ 2.  3.  4.  5.  6.  7.  8.  9. 10.]
  [ 3.  4.  5.  6.  7.  8.  9. 10. 11.]
  [ 4.  5.  6.  7.  8.  9. 10. 11. 12.]
  [ 5.  6.  7.  8.  9. 10. 11. 12. 13.]
  [ 6.  7.  8.  9. 10. 11. 12. 13. 14.]
  [ 7.  8.  9. 10. 11. 12. 13. 14. 15.]
  [ 8.  9. 10. 11. 12. 13. 1

# Implementing dropout with numpy

In [9]:
class DropoutLayer(object):
    def __init__(self, keep_prob):
        """
        :param keep_prob - probability that given unit will not be dropped out
        """
        self._keep_prob = keep_prob
        self.mask = None
    
    def forward_pass(self, a_prev: np.array, training: bool) -> np.array:
        if training:
            self._mask = (np.random.rand(*a_prev.shape) < self._keep_prob) # Calculate mask
            return self._apply_mask(a_prev, self._mask) # Dropout
        else:
            return a_prev
    
    def backward_pass(self, da_curr: np.array) -> np.array:
        return self._apply_mask(da_curr, self._mask)
    
    def _apply_mask(self, array: np.array, mask: np.array) -> np.array:
        array *= mask
        array /= self._keep_prob 
        # Divide by the ratio, keep the total unchanged, and avoid affecting the output result

        return array

In [10]:
a_prev=np.random.randint(1,10,[1,3,8,8]).astype(np.float64)
a_prev

array([[[[2., 7., 5., 4., 6., 9., 8., 5.],
         [8., 5., 9., 4., 4., 6., 6., 3.],
         [7., 2., 5., 9., 2., 3., 8., 3.],
         [9., 4., 5., 9., 8., 6., 8., 8.],
         [3., 5., 8., 3., 4., 4., 7., 8.],
         [8., 1., 5., 1., 2., 8., 4., 2.],
         [1., 8., 8., 6., 4., 9., 8., 8.],
         [1., 8., 9., 7., 1., 9., 7., 3.]],

        [[4., 6., 6., 1., 2., 3., 9., 9.],
         [2., 4., 5., 5., 8., 7., 6., 6.],
         [7., 4., 5., 2., 3., 4., 7., 9.],
         [6., 4., 3., 2., 8., 7., 2., 8.],
         [9., 7., 5., 1., 7., 8., 4., 6.],
         [7., 6., 3., 9., 7., 4., 3., 8.],
         [8., 4., 4., 8., 9., 1., 9., 3.],
         [5., 8., 4., 7., 7., 6., 3., 4.]],

        [[7., 2., 6., 8., 3., 3., 2., 8.],
         [5., 1., 5., 4., 7., 1., 4., 4.],
         [3., 7., 9., 3., 9., 7., 1., 1.],
         [2., 7., 6., 6., 1., 2., 4., 8.],
         [2., 2., 8., 8., 8., 2., 1., 6.],
         [2., 4., 5., 1., 2., 7., 3., 1.],
         [5., 8., 2., 5., 4., 7., 8., 3.],
       

In [11]:
dropout=DropoutLayer(keep_prob=0.5)
out=dropout.forward_pass(a_prev,training=True)
out

array([[[[ 0., 14.,  0.,  8.,  0.,  0., 16.,  0.],
         [ 0.,  0., 18.,  0.,  8., 12., 12.,  0.],
         [14.,  0., 10.,  0.,  4.,  6., 16.,  6.],
         [ 0.,  0., 10., 18., 16.,  0.,  0.,  0.],
         [ 0., 10., 16.,  6.,  8.,  0., 14., 16.],
         [16.,  0., 10.,  0.,  0.,  0.,  0.,  0.],
         [ 0., 16.,  0.,  0.,  8., 18.,  0., 16.],
         [ 2.,  0.,  0., 14.,  2., 18., 14.,  6.]],

        [[ 8., 12., 12.,  2.,  0.,  0.,  0.,  0.],
         [ 0.,  8.,  0., 10.,  0.,  0., 12.,  0.],
         [14.,  0.,  0.,  0.,  0.,  0.,  0., 18.],
         [12.,  8.,  0.,  0., 16.,  0.,  0.,  0.],
         [ 0.,  0., 10.,  0., 14.,  0.,  0.,  0.],
         [14., 12.,  6.,  0., 14.,  0.,  0., 16.],
         [16.,  8.,  0.,  0.,  0.,  2.,  0.,  0.],
         [ 0., 16.,  8.,  0.,  0., 12.,  0.,  8.]],

        [[14.,  4.,  0., 16.,  6.,  0.,  4., 16.],
         [10.,  2., 10.,  0.,  0.,  0.,  8.,  8.],
         [ 0., 14.,  0.,  0.,  0., 14.,  2.,  0.],
         [ 0.,  0.,  0.,  0

# Implementing pooling with numpy

In [12]:
from __future__ import annotations
from typing import Tuple

class MaxPoolLayer(object):
    def __init__(self, pool_size: Tuple[int, int], stride: int = 2):
        """
        :param pool_size - tuple holding shape of 2D pooling window
        :param stride - stride along width and height of input volume used to
        apply pooling operation
        """
        self._pool_size = pool_size
        self._stride = stride
        self._a = None
        self._cache = {}
    
    def forward_pass(self, a_prev: np.array, training: bool) -> np.array:
        """
        :param a_prev - 4D tensor with shape(n, h_in, w_in, c)
        :output 4D tensor with shape(n, h_out, w_out, c)
        ------------------------------------------------------------------------
        n - number of examples in batch
        w_in - width of input volume
        h_in - width of input volume
        c - number of channels of the input/output volume
        w_out - width of output volume
        h_out - width of output volume
        """
        self._a = np.array(a_prev, copy = True)
        n, h_in, w_in, c = a_prev.shape
        h_pool, w_pool = self._pool_size
        h_out = 1 + (h_in - h_pool) // self._stride
        w_out = 1 + (w_in - w_pool) // self._stride
        output = np.zeros((n, h_out, w_out, c))

        for i in range(h_out):
            for j in range(w_out):
                h_start = i * self._stride
                h_end = h_start + h_pool
                w_start = j * self._stride
                w_end = w_start + w_pool
                a_prev_slice = a_prev[:, h_start:h_end, w_start:w_end, :]
                self._save_mask(x = a_prev_slice, cords = (i, j))
                output[:, i, j, :] = np.max(a_prev_slice, axis = (1, 2))
        return output
    
    def backward_pass(self, da_curr: np.array) -> np.array:
        """
        :param da_curr - 4D tensor with shape(n, h_out, w_out, c)
        :output 4D tensor with shape(n, h_in, w_in, c)
        ------------------------------------------------------------------------
        n - number of examples in batch
        w_in - width of input volume
        h_in - width of input volume
        c - number of channels of the input/output volume
        w_out - width of output volume
        h_out - width of output volume
        """
        output = np.zeros_like(self._a)
        _, h_out, w_out = da_curr.shape
        h_pool, w_pool = self._pool_size

        for i in range(h_out):
            for j in range(w_out):
                h_start = i * self._stride
                h_end = h_start + h_pool
                w_start = j * self._stride
                w_end = w_start + w_pool
                output[:, h_start:h_end, w_start:w_end, :] += \
                    da_curr[:, i:i + 1, j:j + 1, :] * self._cache[(i, j)]
        return output       

    def _save_mask(self, x: np.array, cords: Tuple[int, int]) -> None:
        mask = np.zeros_like(x)
        n, h, w, c = x.shape
        x = x.reshape(n, h * w, c)
        idx = np.argmax(x, axis = 1)

        n_idx, c_idx = np.indices((n, c))
        mask.reshape(n, h * w, c)[n_idx, idx, c_idx] = 1
        self._cache[cords] = mask

In [13]:
a_prev=np.random.randint(1,10,[1,8,8,3]).astype(np.float64)
a_prev.transpose(0,3,1,2)

array([[[[3., 5., 4., 4., 6., 9., 2., 6.],
         [1., 8., 6., 3., 8., 9., 7., 9.],
         [5., 7., 3., 5., 8., 3., 5., 6.],
         [3., 6., 6., 9., 5., 6., 9., 4.],
         [7., 9., 4., 5., 3., 5., 6., 9.],
         [2., 8., 1., 8., 4., 9., 2., 2.],
         [3., 1., 9., 3., 1., 2., 9., 4.],
         [7., 3., 9., 1., 8., 1., 8., 3.]],

        [[7., 6., 7., 9., 1., 5., 3., 9.],
         [7., 9., 2., 7., 2., 2., 4., 8.],
         [7., 1., 4., 5., 3., 3., 4., 6.],
         [9., 2., 4., 2., 8., 7., 1., 1.],
         [3., 1., 9., 9., 3., 6., 3., 1.],
         [3., 1., 3., 8., 9., 1., 5., 4.],
         [7., 7., 2., 8., 8., 4., 4., 2.],
         [7., 1., 9., 1., 2., 4., 1., 1.]],

        [[5., 8., 5., 4., 3., 2., 8., 3.],
         [6., 6., 4., 2., 3., 4., 3., 9.],
         [9., 4., 1., 6., 6., 3., 7., 6.],
         [7., 7., 2., 9., 6., 3., 9., 9.],
         [3., 6., 4., 9., 8., 2., 4., 3.],
         [4., 3., 9., 4., 1., 1., 7., 4.],
         [8., 6., 2., 4., 1., 9., 7., 1.],
       

In [14]:
maxpool=MaxPoolLayer(pool_size=(2,2),stride=2)
out=maxpool.forward_pass(a_prev,training=True)
out.transpose(0,3,1,2)

array([[[[8., 6., 9., 9.],
         [7., 9., 8., 9.],
         [9., 8., 9., 9.],
         [7., 9., 8., 9.]],

        [[9., 9., 5., 9.],
         [9., 5., 8., 6.],
         [3., 9., 9., 5.],
         [7., 9., 8., 4.]],

        [[8., 5., 4., 9.],
         [9., 9., 6., 9.],
         [6., 9., 8., 7.],
         [9., 6., 9., 7.]]]])