requirments:
 - python >= 3.8.5
 - numpy >= 1.20.1
 - torch >= 1.7.1
 - ~~- torchvision = 0.8.2~~

In [503]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# @Time    : 2021/2/25 14:33
# @Author  : xujian
# @FileName: conv-ops.ipynb
# @Software: jupyter notebook
# @Blog    ：

import sys
import platform
import numpy as np
import torch
import torchvision

# python 版本
print(f'python: {platform.python_version()}')

# numpy 版本
print(f'numpy: {np.__version__}')

# torch & torchvision 版本
print(f'torch: {torch.__version__}')
print(f'torchvision: {torchvision.__version__}')


python: 3.8.5
numpy: 1.20.1
torch: 1.7.1
torchvision: 0.8.2


# 卷积运算
## 2D卷积（conv）

$Y = W*X + bias$

### img2col 函数

In [504]:
def img2col(input, out_h, out_w, k_h, k_w, stride):
    """
    :param input: input image, format: NCHW, (batch, channel, height, width)
    :param out_h: output image height
    :param out_w: output image width
    :param k_h: filter kernel height
    :param k_w: filter kernel width
    :param stride: filter stride
    :return: output: coled-image, format: batch, channel, k_w*k_w, out_h*out_w)
    """
    # get input dims
    batch, channel, in_h, in_w = input.shape
    
    # init output (cols)
    output = np.zeros((batch, channel, k_h*k_w, out_h*out_w))
   
    # init conv h/w index
    conv_h_idx = 0
    conv_w_idx = 0
    
    for i in range(batch):
        for j in range(channel):
            # for each channel, scan for left-top
            # reset conv h/w index
            conv_h_idx = 0
            conv_w_idx = 0
            for k in range(out_h*out_w):
                if (conv_w_idx + k_w) > in_w:
                    # end of the col, shift to the next row by stride
                    conv_w_idx = 0
                    conv_h_idx += stride
                output[i, j, :, k] = input[i, j, conv_h_idx:conv_h_idx+k_h, conv_w_idx:conv_w_idx+k_w].flatten()
                conv_w_idx += stride
                
    return output
    

需要说明的是，卷积操作需要对所有channel的数据进行求和（大部分彩色图片都是3通道的，上图示例只显示了单通道的情况），但是池化操作需要分开计算各个channel的数据。考虑到复用问题，上面的`img2col`函数也是每个通道分开处理的，这样可以直接应用于池化操作。在卷积操作时需要对`img2col`输出的各个通道的数据进行合并。

### 2D卷积前向操作
conv2d_forward 函数

In [505]:
def conv2d_forward(input, weights, bias, stride, pad):
    """
    :param input   : input image, format: NCHW, (batch, channel, height, width)
    :param weights  : convolution filter weightss, format: NCHW, (count, channel, height, width)
    :param bias    : bias
    :param stride  : stride
    :param pad     : pad
    :return: output: conved image, format: (batch, channel, height, width)  channel = weights.shape[0]
    """
    
    batch, channel, in_h, in_w = input.shape
    count, channel, k_h, k_w = weights.shape
    
    # calculate the output height & weights
    out_h = int((in_h + pad - k_h)/stride) + 1
    out_w = int((in_w + pad - k_w)/stride) + 1
        
    # padding
    pad_input = np.pad(input, ((0,0),(0,0),(int(pad/2),pad-int(pad/2)),(int(pad/2),pad-int(pad/2))))
    # new input height&weights
    in_h += pad
    in_w += pad
    
    
    # img2col
    col_input = img2col(pad_input, out_h, out_w, k_h, k_w, stride)
    
    # concat channel vis col-dim
    # reshape col_input to [batch, (channel*k_h*h*w), out_h*out_w]
    col_input = col_input.reshape(col_input.shape[0], -1, col_input.shape[3])
    
    # reshape kernel to [group-count, channel*k_h*h*w]
    weights_flatten = weights.reshape(weights.shape[0], -1)
    
    # copmute convolution
    # output: [batch, channel, out_h*out_w]
    output = weights_flatten @ col_input + bias.reshape(-1, 1)
    
    # reshape output to [batch, channel, height, width]
    output = output.reshape(output.shape[0], output.shape[1], out_h, out_w)

    return output

#### 2D卷积前向操作-测试验证程序：


In [506]:
import torch.nn.functional as F

# input image
input = np.random.randn(1,1,5,5)

#******** conv params *****************#
# conv params
# filter kernel
weights = np.random.randn(2,1,3,3)
# bias
# bias = np.random.randn(weights.shape[0], weights.shape[1], 1)
bias = np.zeros((weights.shape[0]))
# stride
stride = 1
# pad
pad = 1

# numpy
output = conv2d_forward(input, weights, bias, stride, pad*2)

# torch
input_tensor = torch.Tensor(input)
weights_tensor = torch.Tensor(weights)
bias_tensor = torch.Tensor(bias)

out_tensor = F.conv2d(input_tensor, weights_tensor, bias_tensor, stride=stride, padding=pad)

# computer the diff
diff = np.abs(output - out_tensor.numpy()).sum()

print(f'diff is {diff}')

diff is 4.5616584365519575e-06


### col2img 函数


In [507]:
def col2img(input_col, pad_h, pad_w, kernel_h, kernel_w, channel, pad, stride):
    """
    :param input_col: col_image, format: (batch, channel*k_h*k_w, out_h*out_w)
    :param out_h: output image height
    :param out_w: output image width
    :param k_h: filter kernel height
    :param k_w: filter kernel width
    :param stride: filter stride
    :return: output: coled-image, format: batch, channel, k_w*k_w, out_h*out_w)
    """
    
    # batch
    batch = input_col.shape[0]
    
    # channel
    channel = int(input_col.shape[1]/(kernel_h*kernel_w))
    
    # padding input image
    out_pad = np.zeros((batch, channel, int(pad_h), int(pad_w)))
    
    # split channels
    input_sp_channel = input_col.reshape(input_col.shape[0], channel, -1, input_col.shape[2])
    
    col_idx = 0;
    h_idx = 0;
    w_idx = 0;
    
    out_h = int((pad_h - kernel_h)/stride) + 1
    out_w = int((pad_w - kernel_w)/stride) + 1
    
    # 最朴素的实现方法，效率并不高，有大部分数据重复做了IO操作
    for i in range(batch):
        for j in range(channel):
            for col_idx in range(input_col.shape[-1]):
                if w_idx + kernel_w > pad_w:
                    w_idx = 0
                    h_idx += stride
                # recover img data
                out_pad[i, j, h_idx:h_idx+kernel_h, w_idx:w_idx+kernel_w] = input_sp_channel[i, j, :, col_idx].reshape(kernel_h, -1)
                w_idx += stride
                

    # remove padding
    if pad < 1:
        out = out_pad
    else:
        out = out_pad[:,:,int(pad/2):-(pad-int(pad/2)),int(pad/2):-(pad-int(pad/2))]
    
    return out

#### col2img的测试程序：

In [508]:
# check col2img

def test_col2img():

    input = np.random.randn(1,1,5,5)
    weights = np.random.rand(1,1,3,3)
    pad = 2
    stride = 1
    
    n, c, in_h, in_w = input.shape
    n, c, k_h, k_w = weights.shape
    
    # out_w & out_h
    out_h = int((in_h + pad - k_h)/stride) + 1
    out_w = int((in_w + pad - k_w)/stride) + 1
    
    # padding
    input_pad = np.pad(input, ((0,0),(0,0),(int(pad/2),pad-int(pad/2)),(int(pad/2),pad-int(pad/2))))
                        
    input_col = img2col(input_pad, out_h, out_w, k_h, k_w, stride)
    
    # concat channel vis col-dim
    # reshape col_input to [batch, (channel*k_h*h*w), out_h*out_w]
    input_col = input_col.reshape(input_col.shape[0], -1, input_col.shape[3])
                    
    input2 = col2img(input_col, int(in_h+pad), int(in_w+pad), k_h, k_w, c, pad, stride)
                        
    diff = (input - input2).sum()
                        
    print(f"img2col - col2img diff is {diff}")
    
 
if __name__ == "__main__":
    test_col2img()

img2col - col2img diff is 0.0


### 2D卷积反向传播

conv2d_backward 函数

(待补充... ...)

## 2D反卷积（转置卷积）

### weights2col 函数

In [509]:
def weights2col(weights, in_h, in_w, stride, pad):
    """
    :param weights: format: (number, channel, kernel_h, kernel_w)
    :param in_h: input image height
    :param in_w: input image width
    :param stride: filter stride
    :param pad: padding size
    :return: w_col: coled-weights, format: [channel*pad_h*pad_w, count*out_h*out_w]
    """
    
    # get weights params
    count, channel, k_h, k_w = weights.shape
    
    # get out_h, out_w
    out_h = int((in_h + pad - k_h)/stride) + 1
    out_w = int((in_w + pad - k_w)/stride) + 1
    
    
    # get padding_h, padding_w
    pad_h = in_h + pad
    pad_w = in_w + pad
    
    w_col = np.zeros((count, channel, pad_h, pad_w, out_h*out_w))
    
    conv_w_idx = 0
    conv_h_idx = 0
    
    for i in range(count):
        for j in range(channel):
            # scan by channels
            conv_w_idx = 0
            conv_h_idx = 0
            for k in range(out_h*out_w):
                # end of pad cols
                if conv_w_idx + k_w > pad_w:
                    conv_w_idx = 0
                    conv_h_idx += stride
                if conv_h_idx + k_h > pad_h:
                    conv_h_idx = 0
                # assign weights values
                w_col[i, j, conv_h_idx:conv_h_idx+k_h, conv_w_idx:conv_w_idx+k_w, k] = weights[i, j, :, :]
                conv_w_idx += stride
    
    """
    previous：
    weights的count（output_channel）仍是独立维度，
    
    # reshape to  [count, channel, pad_h*pad_w, out_h*out_w]
    w_col = w_col.reshape(w_col.shape[0], w_col.shape[1], -1, w_col.shape[-1])  
    
    # contact channels, reshape to [count, channel*pad_h*pad_w, out_h*out_w]
    w_col = w_col.reshape(w_col.shape[0], -1, w_col.shape[3])
    """

    
    
    """
    current:
    weights的count（output_channel）和最后一维contact
    !!! remaining speed test @auther
    """
    
    # transpose to  [channel, pad_h, pad_w, count, out_h*out_w]
    w_col = np.transpose(w_col, (1, 2, 3, 0, 4))
    print(f'transposed2 w_col : {w_col.shape}')

    # reshape to  [channel*pad_h*pad_w, count*out_h*out_w]
    # concat channel、pad_h、pad_w
    # concat count(new_channel)、out_h*out_w
    w_col = w_col.reshape(-1, count*out_h*out_w)
    
    return w_col

#### weights2col 的测试程序：

In [510]:
def test_weights2col():


    input = np.random.rand(1, 2, 3, 3)
#     weights = np.random.rand(2, 2, 2, 2)*100
    
    weights = np.array([[[[1,2],[3,4]],[[5,6],[7,8]]],[[[11,12],[13,14]],[[15,16],[17,18]]]])
    stride = 1
    pad = 0
        
    n, c, in_h, in_w = input.shape
        
    print(weights)
    
    w_col = weights2col(weights, in_h, in_w, stride, pad)
    
    print(w_col)
    print(w_col.shape)
    
test_weights2col()

[[[[ 1  2]
   [ 3  4]]

  [[ 5  6]
   [ 7  8]]]


 [[[11 12]
   [13 14]]

  [[15 16]
   [17 18]]]]
transposed2 w_col : (2, 3, 3, 2, 4)
[[ 1.  0.  0.  0. 11.  0.  0.  0.]
 [ 2.  1.  0.  0. 12. 11.  0.  0.]
 [ 0.  2.  0.  0.  0. 12.  0.  0.]
 [ 3.  0.  1.  0. 13.  0. 11.  0.]
 [ 4.  3.  2.  1. 14. 13. 12. 11.]
 [ 0.  4.  0.  2.  0. 14.  0. 12.]
 [ 0.  0.  3.  0.  0.  0. 13.  0.]
 [ 0.  0.  4.  3.  0.  0. 14. 13.]
 [ 0.  0.  0.  4.  0.  0.  0. 14.]
 [ 5.  0.  0.  0. 15.  0.  0.  0.]
 [ 6.  5.  0.  0. 16. 15.  0.  0.]
 [ 0.  6.  0.  0.  0. 16.  0.  0.]
 [ 7.  0.  5.  0. 17.  0. 15.  0.]
 [ 8.  7.  6.  5. 18. 17. 16. 15.]
 [ 0.  8.  0.  6.  0. 18.  0. 16.]
 [ 0.  0.  7.  0.  0.  0. 17.  0.]
 [ 0.  0.  8.  7.  0.  0. 18. 17.]
 [ 0.  0.  0.  8.  0.  0.  0. 18.]]
(18, 8)


### 2D卷积前向操作(by weights)
conv2d_forward_by_weights 函数

In [511]:
def conv2d_forward_by_weights(input, weights, bias, stride, pad):
    """
    :param input   : input image, format: NCHW, (batch, channel, height, width)
    :param weights  : convolution filter weightss, format: NCHW, (w_num, channel, height, width)
    :param bias    : bias
    :param stride  : stride
    :param pad     : pad
    :return: output: conved image, format: (batch, channel, height, width)  channel = weights.shape[0]
    """
    
    batch, in_channel, in_h, in_w = input.shape
    w_num, w_channel, k_h, k_w = weights.shape
    
    # check if input channel is equal to weights' number?
    assert(in_channel == w_num),\
        "input channel:{} is not equal weight w_num:{}".format(in_channel, w_num)
    
    # check if bias' dim is equal to weights' number?
    assert(bias.shape[0] == w_num),\
        "bias dim:{} is not equal weight w_num:{}".format(bias.shape[0], w_num)
    
    
    # calculate the output height & weights
    out_h = int((in_h + pad - k_h)/stride) + 1
    out_w = int((in_w + pad - k_w)/stride) + 1
    
    # weight2col
    # w_col : channel*in_h*in_w, out_h*out_w*w_num(new_channel)
    w_col = weights2col(weights, in_h, in_w, stride, pad)
        
    # padding
    input_pad = np.pad(input, ((0,0),(0,0),(int(pad/2),pad-int(pad/2)),(int(pad/2),pad-int(pad/2))))
    # new input height&weights
    in_h += pad
    in_w += pad
    
    
    # reshape img to [batch, 1, channel*in_h*in*w]
    input_flatten = input_pad.reshape(input_pad.shape[0], 1, -1)
    
    """
    老方法：矩阵计算时，wights的out_channel还是独立维度，需要对input 扩充维度
    
    # reshape img to [batch, w_num, 1, channel*in_h*in*w]
    input_flatten = np.expand_dims(input_flatten, 1).repeat(w_num, axis=1)
    
    # copmute convolution
    # output: [batch, channel, out_h*out_w]
    output = (input_flatten @ w_col).reshape(input.shape[0], weights.shape[0], -1)
   
    # add bias
    output = output + bias.reshape(-1, 1)
    
    # reshape output to [batch, channel, height, width]
    output = output.reshape(output.shape[0], output.shape[1], out_h, out_w)
    """

    
    """
    新方法：weights_col的channel，contact到out_w维度
    
    """
    # copmute convolution
    # output: [batch, out_h*out_w*new_channel]
    output = input_flatten @ w_col
    
    # reshape output to [batch, channel, out_h*out_w]
    output = output.reshape(output.shape[0], weights.shape[0], out_h*out_w)
    
    # add bias which reshape to [channel, 1]
    output = output + bias.reshape(-1, 1)
    
    # reshape output to [batch, channel, height, width]
    output = output.reshape(output.shape[0], weights.shape[0], out_h, out_w)

    return output

#### 2D卷积前向（vis Weigh2col)-测试验证程序：


In [512]:
import torch.nn.functional as F

# input image
input = np.random.randn(2,3,5,5)*20

#******** conv params *****************#
# conv params
# filter kernel
weights = np.random.randn(3,3,3,3)*20
# bias
bias = np.random.randn(weights.shape[0])
# bias = np.zeros((weights.shape[0]))
# stride
stride = 1
# pad
pad = 1

# numpy
output = conv2d_forward(input, weights, bias, stride, pad*2)
print(f'output:{output.shape}')
# print(output)
print('*'*20)

output_w = conv2d_forward_by_weights(input, weights, bias, stride, pad*2)
print(f'output_w:{output_w.shape}')
# print(output_w)
print('*'*20)

# torch
input_tensor = torch.Tensor(input)
weights_tensor = torch.Tensor(weights)
bias_tensor = torch.Tensor(bias)

out_tensor = F.conv2d(input_tensor, weights_tensor, bias_tensor, stride=stride, padding=pad)
# print(out_tensor.data)
print('*'*20)

# computer the diff
diff = np.abs(output - out_tensor.numpy()).sum()
print(f'diff is {diff}')

# computer the diff
diff = np.abs(output_w - out_tensor.numpy()).sum()
print(f'diff2 is {diff}')

output:(2, 3, 5, 5)
********************
transposed2 w_col : (3, 7, 7, 3, 25)
output_w:(2, 3, 5, 5)
********************
********************
diff is 0.01766495076360286
diff2 is 0.017664950762636522


### 2D反卷积前向操作
conv_transpose2d_forward 函数

In [513]:
def conv_transpose2d_forward(input, weights, bias, stride, pad):
    
    """
    :param input   : input image, format: NCHW, (batch, channel, height, width)
    :param weights  : convolution filter weightss, format: NCHW, (w_num, channel, height, width)
    :param bias    : bias
    :param stride  : stride
    :param pad     : pad
    :return: output: conved image, format: (batch, channel, height, width)  channel = weights.shape[0]
    """
    
    
    # same channel
    batch, in_channel, in_h, in_w = input.shape
    w_num, w_channel, k_h, k_w = weights.shape
    
    # check if input channel is equal to weights' number?
    assert(in_channel == w_num),\
        "input channel:{} is not equal weights' number:{}".format(in_channel, w_num)
    
    # check if bias' dim is equal to weights' number?
    assert(bias.shape[0] == w_num),\
        "bias dim:{} is not equal weight w_num:{}".format(bias.shape[0], w_num)
    
    # get out_h out_w
    # reverse the computation for conv2d
    out_h = (in_h - 1)*stride + k_h - pad
    out_w = (in_w - 1)*stride + k_w - pad
    
    # get pad_h&pad_w
    pad_h = out_h + pad
    pad_w = out_w + pad
    
    
    # weights trans to col, 
    # weights_col : channel*pad_h*pad_w, w_num*in_h*in_w
    weigths_col = weights2col(weights, out_h, out_w, stride, pad)
    print(f'weigths_col is {weigths_col.shape}')
    
    # transpose the weights_col to [w_num(in_channel)*in_h*in_w, out_channel*pad_h*pad_w]
    weights_t = np.transpose(weigths_col, (1,0))
    
    # flatten input image to [batch, in_channel*in_h*in_w]
    input_flatten = input.reshape(input.shape[0], -1)
     
    # calc the matmul result : [batch, out_channel*pad_h*pad_w]
    output_pad = input_flatten @ weights_t 
    
    # reshape output to [batch, out_channel, pad_h*pad_w]
    output_pad = output_pad.reshape(output_pad.shape[0], w_channel, -1)
        
    # add bias
    output_pad += bias.reshape(-1, 1)
    
    # reshape output to [batch, w_num, pad_h, pad_w]
    output_pad = output_pad.reshape(output_pad.shape[0], output_pad.shape[1], pad_h, pad_w)
    
    # remove padding
    if pad < 1:
        out = output_pad
    else:
        out = output_pad[:,:,int(pad/2):-(pad-int(pad/2)),int(pad/2):-(pad-int(pad/2))]
        
    return out
    

#### 2D反卷积-测试验证程序：


In [515]:
import torch.nn.functional as F

# input image
# input = np.random.randn(1,2,5,5)*10
input = np.random.randn(3,3,5,5)*10
# input = np.array([[[[1,2],[3,4]]]])


#******** conv params *****************#
# conv params
# filter kernel
# weights = np.random.randn(1,2,5,5)*10
weights = np.random.randn(3,3,3,3)*10
# weights = np.array([[[[5,6],[7,8]]]])

# bias
# bias = np.random.randn(weights.shape[0])
bias = np.zeros(weights.shape[0])
# stride
stride = 1
# pad
pad = 1

# numpy
output = conv_transpose2d_forward(input, weights, bias, stride, pad*2)
# print(f'output:{output.shape}')
# print(output)

# torch
input_tensor = torch.Tensor(input)
weights_tensor = torch.Tensor(weights)
bias_tensor = torch.Tensor(bias)

print('='*20)

out_tensor = F.conv_transpose2d(input_tensor, weights_tensor, bias_tensor, stride=stride, padding=pad)
# print(out_tensor.data)
# computer the diff

diff = np.abs(output - out_tensor.numpy()).sum()
print(f'diff is {diff}')


print('*'*20)


transposed2 w_col : (3, 7, 7, 3, 25)
weigths_col is (147, 75)
diff is 0.0064128582784857
********************
