https://www.cnblogs.com/charlotte77/p/7783261.html

## 手写一个卷积神经网络

4).进行前向传播

\begin{equation} \begin{aligned} out_{o_{11}} &= activators(net_{o_{11}}) \\ &=max(0,net_{o_{11}}) = 1 \end{aligned} \end{equation}

In [255]:
import numpy as np

class ConvLayer(object):
    '''
    参数含义：
    input_width:输入图片尺寸——宽度
    input_height:输入图片尺寸——长度
    channel_number:通道数，彩色为3，灰色为1
    filter_width:卷积核的宽
    filter_height:卷积核的长
    filter_number:卷积核数量
    zero_padding：补零长度
    stride:步长
    activator:激活函数
    learning_rate:学习率
    '''
    def __init__(self, input_width, input_height,
                 channel_number, filter_width,
                 filter_height, filter_number,
                 zero_padding, stride, activator,
                 learning_rate):
        self.input_width = input_width
        self.input_height = input_height
        self.channel_number = channel_number
        self.filter_width = filter_width
        self.filter_height = filter_height
        self.filter_number = filter_number
        self.zero_padding = zero_padding
        self.stride = stride
        self.output_width = \
            ConvLayer.calculate_output_size(
            self.input_width, filter_width, zero_padding,
            stride)
        self.output_height = \
            ConvLayer.calculate_output_size(
            self.input_height, filter_height, zero_padding,
            stride)
        print(self.filter_number,
            self.output_height, self.output_width)
        self.output_array = np.zeros((self.filter_number,
            self.output_height, self.output_width))
        self.filters = []
        for i in range(filter_number):
            self.filters.append(Filter(filter_width,
                filter_height, self.channel_number))
        self.activator = activator
        self.learning_rate = learning_rate
    
    def calculate_output_size(input_size,
        filter_size, zero_padding, stride):
        return (input_size - filter_size +
                 2 * zero_padding) // stride + 1
    
    def forward(self, input_array):
        '''
        计算卷积层的输出
        输出结果保存在self.output_array
        '''
        self.input_array = input_array
        self.padded_input_array = padding(input_array,
            self.zero_padding)
        for f in range(self.filter_number):
            filter = self.filters[f]
            conv(self.padded_input_array,
                filter.get_weights(), self.output_array[f],
                self.stride, filter.get_bias())
        element_wise_op(self.output_array,
                        self.activator.forward)
        
    def element_wise_op(array, op):
         for i in np.nditer(array,
            op_flags=['readwrite']):
            i[...] = op(i)
            
    def backward(self, input_array, sensitivity_array, 
                 activator):
        '''
        计算传递给前一层的误差项，以及计算每个权重的梯度
        前一层的误差项保存在self.delta_array
        梯度保存在Filter对象的weights_grad
        '''
        self.forward(input_array)
        self.bp_sensitivity_map(sensitivity_array,
                                activator)
        self.bp_gradient(sensitivity_array)
        
    def update(self):
        '''
        按照梯度下降，更新权重
        '''
        for filter in self.filters:
            filter.update(self.learning_rate)

    def bp_sensitivity_map(self, sensitivity_array,
                           activator):
        '''
        计算传递到上一层的sensitivity map
        sensitivity_array: 本层的sensitivity map
        activator: 上一层的激活函数
        '''
        # 处理卷积步长，对原始sensitivity map进行扩展
        expanded_array = self.expand_sensitivity_map(
            sensitivity_array)
        # full卷积，对sensitivitiy map进行zero padding
        # 虽然原始输入的zero padding单元也会获得残差
        # 但这个残差不需要继续向上传递，因此就不计算了
        expanded_width = expanded_array.shape[2]
        zp = (self.input_width +  
              self.filter_width - 1 - expanded_width) // 2
        padded_array = padding(expanded_array, zp)
        # 初始化delta_array，用于保存传递到上一层的
        # sensitivity map
        self.delta_array = self.create_delta_array()
        # 对于具有多个filter的卷积层来说，最终传递到上一层的
        # sensitivity map相当于所有的filter的
        # sensitivity map之和
        for f in range(self.filter_number):
            filter = self.filters[f]
            # 将filter权重翻转180度
            flipped_weights = np.array(list(map(
                lambda i: np.rot90(i, 2), 
                filter.get_weights())))
            # 计算与一个filter对应的delta_array
            delta_array = self.create_delta_array()
            for d in range(delta_array.shape[0]):
                print('shit', flipped_weights)
                conv(padded_array[f], flipped_weights[d],
                    delta_array[d], 1, 0)
            self.delta_array += delta_array
        # 将计算结果与激活函数的偏导数做element-wise乘法操作
        derivative_array = np.array(self.input_array)
        element_wise_op(derivative_array, 
                        activator.backward)
        self.delta_array *= derivative_array

    def bp_gradient(self, sensitivity_array):
        # 处理卷积步长，对原始sensitivity map进行扩展
        expanded_array = self.expand_sensitivity_map(
            sensitivity_array)
        for f in range(self.filter_number):
            # 计算每个权重的梯度
            filter = self.filters[f]
            for d in range(filter.weights.shape[0]):
                conv(self.padded_input_array[d], 
                     expanded_array[f],
                     filter.weights_grad[d], 1, 0)
            # 计算偏置项的梯度
            filter.bias_grad = expanded_array[f].sum()

    def expand_sensitivity_map(self, sensitivity_array):
        depth = sensitivity_array.shape[0]
        # 确定扩展后sensitivity map的大小
        # 计算stride为1时sensitivity map的大小
        expanded_width = (self.input_width - 
            self.filter_width + 2 * self.zero_padding + 1)
        expanded_height = (self.input_height - 
            self.filter_height + 2 * self.zero_padding + 1)
        # 构建新的sensitivity_map
        expand_array = np.zeros((depth, expanded_height, 
                                 expanded_width))
        # 从原始sensitivity map拷贝误差值
        for i in range(self.output_height):
            for j in range(self.output_width):
                i_pos = i * self.stride
                j_pos = j * self.stride
                expand_array[:,i_pos,j_pos] = \
                    sensitivity_array[:,i,j]
        return expand_array

    def create_delta_array(self):
        return np.zeros((self.channel_number,
            self.input_height, self.input_width))

In [256]:
class IdentityActivator(object):
    def forward(self, weighted_input):
        return weighted_input

    def backward(self, output):
        return 1

class ReluActivator(object):
    def forward(self, weighted_input):
        #return weighted_input
        return max(0, weighted_input)

    def backward(self, output):
        return 1 if output > 0 else 0

In [257]:
class SigmoidActivator(object):
    def forward(self, weighted_input):
        return 1.0 // (1.0 + np.exp(-weighted_input))
    def backward(self, output):
        return output * (1 - output)

保存卷积层的参数和梯度

In [258]:
class Filter(object):
    def __init__(self, width, height, depth):
        #初始权重
        self.weights = np.random.uniform(-1e-4, 1e-4,
            (depth, height, width))
        #初始偏置
        self.bias = 0
        self.weights_grad = np.zeros(
            self.weights.shape)
        self.bias_grad = 0

    def __repr__(self):
        return 'filter weights:\n%s\nbias:\n%s' % (
            repr(self.weights), repr(self.bias))

    def get_weights(self):
        return self.weights

    def get_bias(self):
        return self.bias

    def update(self, learning_rate):
        self.weights -= learning_rate * self.weights_grad
        self.bias -= learning_rate * self.bias_grad

## 卷积层的前向传播

1).获取卷积区域

In [259]:
# 获取卷积区域
def get_patch(input_array, i, j, filter_width,
              filter_height, stride):
    '''
    从输入数组中获取本次卷积的区域，
    自动适配输入为2D和3D的情况
    '''
    start_i = i * stride
    start_j = j * stride
    if input_array.ndim == 2:
        input_array_conv = input_array[
            start_i : start_i + filter_height,
            start_j : start_j + filter_width]
        print("input_array_conv:",input_array_conv)
        return input_array_conv

    elif input_array.ndim == 3:
        input_array_conv = input_array[:,
            start_i : start_i + filter_height,
            start_j : start_j + filter_width]
        print("input_array_conv:",input_array_conv)
        return input_array_conv

2).进行卷积运算


\begin{equation} 
    \begin{aligned} 
        \ net_{o_{11}}&= conv (input,filter)\\  &= i_{11} \times h_{11} + i_{12} \times h_{12} +i_{21} \times h_{21} + i_{22} \times h_{22}\\  &=1 \times 1 + 0 \times (-1) +1 \times 1 + 1 \times (-1)=1 
    \end{aligned} 
\end{equation}


In [260]:
def conv(input_array,
         kernel_array,
         output_array,
         stride, bias):
    '''
    计算卷积，自动适配输入为2D和3D的情况
    '''
    channel_number = input_array.ndim
    output_width = output_array.shape[1]
    output_height = output_array.shape[0]
    kernel_width = kernel_array.shape[-1]
    kernel_height = kernel_array.shape[-2]
    for i in range(output_height):
        for j in range(output_width):
            output_array[i][j] = (
                get_patch(input_array, i, j, kernel_width,
                    kernel_height, stride) * kernel_array
                ).sum() + bias

3).增加zero_padding

In [261]:
#增加Zero padding
def padding(input_array, zp):
    '''
    为数组增加Zero padding，自动适配输入为2D和3D的情况
    '''
    if zp == 0:
        return input_array
    else:
        if input_array.ndim == 3:
            input_width = input_array.shape[2]
            input_height = input_array.shape[1]
            input_depth = input_array.shape[0]
            padded_array = np.zeros((
                input_depth,
                input_height + 2 * zp,
                input_width + 2 * zp))
            padded_array[:,
                zp : zp + input_height,
                zp : zp + input_width] = input_array
            return padded_array
        elif input_array.ndim == 2:
            input_width = input_array.shape[1]
            input_height = input_array.shape[0]
            padded_array = np.zeros((
                input_height + 2 * zp,
                input_width + 2 * zp))
            padded_array[zp : zp + input_height,
                zp : zp + input_width] = input_array
            return padded_array

## 5.卷积层的反向传播

1).将误差传递到上一层

In [262]:
def bp_sensitivity_map(self, sensitivity_array,
                           activator):
        '''
        计算传递到上一层的sensitivity map
        sensitivity_array: 本层的sensitivity map
        activator: 上一层的激活函数
        '''
        # 处理卷积步长，对原始sensitivity map进行扩展
        expanded_array = self.expand_sensitivity_map(
            sensitivity_array)
        # full卷积，对sensitivitiy map进行zero padding
        # 虽然原始输入的zero padding单元也会获得残差
        # 但这个残差不需要继续向上传递，因此就不计算了
        expanded_width = expanded_array.shape[2]
        zp = (self.input_width +
              self.filter_width - 1 - expanded_width) // 2
        padded_array = padding(expanded_array, zp)
        # 初始化delta_array，用于保存传递到上一层的
        # sensitivity map
        self.delta_array = self.create_delta_array()
        # 对于具有多个filter的卷积层来说，最终传递到上一层的
        # sensitivity map相当于所有的filter的
        # sensitivity map之和
        for f in range(self.filter_number):
            filter = self.filters[f]
            # 将filter权重翻转180度
            flipped_weights = np.array(map(
                lambda i: np.rot90(i, 2),
                filter.get_weights()))
            # 计算与一个filter对应的delta_array
            delta_array = self.create_delta_array()
            for d in range(delta_array.shape[0]):
                conv(padded_array[f], flipped_weights[d],
                    delta_array[d], 1, 0)
            self.delta_array += delta_array
        # 将计算结果与激活函数的偏导数做element-wise乘法操作
        derivative_array = np.array(self.input_array)
        element_wise_op(derivative_array,
                        activator.backward)
        self.delta_array *= derivative_array

In [263]:
def create_delta_array(self):
    return np.zeros((self.channel_number,
             self.input_height, self.input_width))

In [264]:
def bp_gradient(self, sensitivity_array):
        # 处理卷积步长，对原始sensitivity map进行扩展
        expanded_array = self.expand_sensitivity_map(
            sensitivity_array)
        for f in range(self.filter_number):
            # 计算每个权重的梯度
            filter = self.filters[f]
            for d in range(filter.weights.shape[0]):
                conv(self.padded_input_array[d],
                     expanded_array[f],
                     filter.weights_grad[d], 1, 0)
            # 计算偏置项的梯度
            filter.bias_grad = expanded_array[f].sum()

In [265]:
def update(self):
    '''
         按照梯度下降，更新权重
    '''
    for filter in self.filters:
        filter.update(self.learning_rate)

## 6.MaxPooling层的训练

1).定义MaxPooling类

In [266]:
class MaxPoolingLayer(object):
    def __init__(self, input_width, input_height,
                 channel_number, filter_width,
                 filter_height, stride):
        self.input_width = input_width
        self.input_height = input_height
        self.channel_number = channel_number
        self.filter_width = filter_width
        self.filter_height = filter_height
        self.stride = stride
        self.output_width = (input_width -
            filter_width) // self.stride + 1
        self.output_height = (input_height -
            filter_height) // self.stride + 1
        self.output_array = np.zeros((self.channel_number,
            self.output_height, self.output_width))
    # 前向传播
    def forward(self, input_array):
        for d in range(self.channel_number):
            for i in range(self.output_height):
                for j in range(self.output_width):
                    self.output_array[d,i,j] = (
                        get_patch(input_array[d], i, j,
                            self.filter_width,
                            self.filter_height,
                            self.stride).max())
                    
    #反向传播
    def backward(self, input_array, sensitivity_array):
        self.delta_array = np.zeros(input_array.shape)
        for d in range(self.channel_number):
            for i in range(self.output_height):
                for j in range(self.output_width):
                    patch_array = get_patch(
                        input_array[d], i, j,
                        self.filter_width,
                        self.filter_height,
                        self.stride)
                    k, l = get_max_index(patch_array)
                    self.delta_array[d,
                        i * self.stride + k,
                        j * self.stride + l] = \
                        sensitivity_array[d,i,j]

In [267]:
def init_test():
    a = np.array(
        [[[0,1,1,0,2],
          [2,2,2,2,1],
          [1,0,0,2,0],
          [0,1,1,0,0],
          [1,2,0,0,2]],
         [[1,0,2,2,0],
          [0,0,0,2,0],
          [1,2,1,2,1],
          [1,0,0,0,0],
          [1,2,1,1,1]],
         [[2,1,2,0,0],
          [1,0,0,1,0],
          [0,2,1,0,1],
          [0,1,2,2,2],
          [2,1,0,0,1]]])
    b = np.array(
        [[[0,1,1],
          [2,2,2],
          [1,0,0]],
         [[1,0,2],
          [0,0,0],
          [1,2,1]]])
    cl = ConvLayer(5,5,3,3,3,2,1,2,IdentityActivator(),0.001)
    cl.filters[0].weights = np.array(
        [[[-1,1,0],
          [0,1,0],
          [0,1,1]],
         [[-1,-1,0],
          [0,0,0],
          [0,-1,0]],
         [[0,0,-1],
          [0,1,0],
          [1,-1,-1]]], dtype=np.float64)
    cl.filters[0].bias=1
    cl.filters[1].weights = np.array(
        [[[1,1,-1],
          [-1,-1,1],
          [0,-1,1]],
         [[0,1,0],
         [-1,0,-1],
          [-1,1,0]],
         [[-1,0,0],
          [-1,0,1],
          [-1,0,0]]], dtype=np.float64)
    return a, b, cl

In [268]:
def test():
    a, b, cl = init_test()
    cl.forward(a)
    print("前向传播结果:", cl.output_array)
    cl.backward(a, b, IdentityActivator())
    cl.update()
    print("反向传播后更新得到的filter1:",cl.filters[0])
    print("反向传播后更新得到的filter2:",cl.filters[1])

if __name__ == "__main__":
        test()

2 3 3
input_array_conv: [[[0. 0. 0.]
  [0. 0. 1.]
  [0. 2. 2.]]

 [[0. 0. 0.]
  [0. 1. 0.]
  [0. 0. 0.]]

 [[0. 0. 0.]
  [0. 2. 1.]
  [0. 1. 0.]]]
input_array_conv: [[[0. 0. 0.]
  [1. 1. 0.]
  [2. 2. 2.]]

 [[0. 0. 0.]
  [0. 2. 2.]
  [0. 0. 2.]]

 [[0. 0. 0.]
  [1. 2. 0.]
  [0. 0. 1.]]]
input_array_conv: [[[0. 0. 0.]
  [0. 2. 0.]
  [2. 1. 0.]]

 [[0. 0. 0.]
  [2. 0. 0.]
  [2. 0. 0.]]

 [[0. 0. 0.]
  [0. 0. 0.]
  [1. 0. 0.]]]
input_array_conv: [[[0. 2. 2.]
  [0. 1. 0.]
  [0. 0. 1.]]

 [[0. 0. 0.]
  [0. 1. 2.]
  [0. 1. 0.]]

 [[0. 1. 0.]
  [0. 0. 2.]
  [0. 0. 1.]]]
input_array_conv: [[[2. 2. 2.]
  [0. 0. 2.]
  [1. 1. 0.]]

 [[0. 0. 2.]
  [2. 1. 2.]
  [0. 0. 0.]]

 [[0. 0. 1.]
  [2. 1. 0.]
  [1. 2. 2.]]]
input_array_conv: [[[2. 1. 0.]
  [2. 0. 0.]
  [0. 0. 0.]]

 [[2. 0. 0.]
  [2. 1. 0.]
  [0. 0. 0.]]

 [[1. 0. 0.]
  [0. 1. 0.]
  [2. 2. 0.]]]
input_array_conv: [[[0. 0. 1.]
  [0. 1. 2.]
  [0. 0. 0.]]

 [[0. 1. 0.]
  [0. 1. 2.]
  [0. 0. 0.]]

 [[0. 0. 1.]
  [0. 2. 1.]
  [0. 0. 0.]]]
input_a