In [114]:
import numpy as np
# jupyter中导入自写的库，可以把 .ipynb保存成 .py
from activators import ReluActivator, IdentityActivator

In [115]:
# 定义一个类，保存卷积层的参数和梯度
class Filter(object):
    def __init__(self, width, height, depth):
        # 初始权重
        self.weights = np.random.uniform(-1e-4, 1e-4, (depth, height, width))
        # 初始偏置
        self.bias = 0
        self.weights_grad = np.zeros(self.weights.shape)
        self.bias_grad = 0
        
    def __repr__(self):
        return 'filter weights: \n%s \nbias: \n%s' % (repr(self.weights), repr(self.bias))
    
    def get_weights(self):
        return self.weights
    
    def get_bias(self):
        return self.bias
    
    def update(self, learning_rate):
        self.weights -= learning_rate * self.weights_grad
        self.bias -= learning_rate * self.bias_grad

In [116]:
"""
卷积层的前向传播:
1.获取卷积区域
2.进行卷积运算
3.增加zero_padding
4.进行前向传播
"""
# 获取卷积区域
def get_patch(input_array, i, j, filter_width, filter_height, stride):
    """
    从输入数组中获取本次卷积的区域
    自动适配输入为2D和3D的情况
    """
    start_i = i * stride
    start_j = j * stride
    if input_array.ndim == 2:
        input_array_conv = input_array[
            start_i : start_i + filter_height,
            start_j : start_j + filter_width
        ]
        print ("input_array_conv: ",input_array_conv)
        return input_array_conv
    
    elif input_array.ndim == 3:
        input_array_conv = input_array[:,
            start_i : start_i + filter_height,
            start_j : start_j + filter_width
        ]
        print ("input_array_conv: ",input_array_conv)
        return input_array_conv        

In [117]:
def get_max_index(array):
    max_i = 0
    max_j = 0
    max_value = array[0, 0]
    for i in range(array.shape[0]):
        for j in range(array.shape[1]):
            if array[i, j] > max_value:
                max_value = array[i, j]
                max_i, max_j = i, j
    return max_i, max_j

In [118]:
# 进行卷积运算
def conv(input_array, kernel_array, output_array, stride, bias):
    """
    计算卷积，自动适配输入为2D和3D的情况

    """
    channel_number = input_array.ndim
    output_width = output_array.shape[1]
    output_height = output_array.shape[0]
    kernel_width = kernel_array.shape[-1]
    kernel_height = kernel_array.shape[-2]
    for i in range(output_height):
        for j in range(output_width):
            output_array[i][j] = (
                get_patch(input_array, i, j, kernel_width, kernel_height,
                          stride) * kernel_array).sum() + bias

In [119]:
# 增加zero_padding
def padding(input_array, zp):
    """
    为数组增加zero padding, 自动适配输入为2D和3D的情况

    """
    if zp == 0:
        return input_array
    else:
        if input_array.ndim == 3:
            input_width = input_array.shape[2]
            input_height = input_array.shape[1]
            input_depth = input_array.shape[0]
            padded_array = np.zeros((
                input_depth,
                input_height + 2 * zp,
                input_width + 2 * zp))
            padded_array[:,
                zp : zp + input_height,
                zp : zp + input_width] = input_array
            return [padded_array]
        elif input_array.ndim == 2:
            input_width = input_array.shape[1]
            input_height = input_array.shape[0]
            padded_array = np.zeros((
                input_height + 2 * zp,
                input_width + 2 * zp))
            padded_array[zp : zp + input_height,
                zp : zp + input_width] = input_array
            return padded_array

In [120]:
# element_wise_op函数是将每个组的元素对应相乘
def element_wise_op(array, op):
    for i in np.nditer(array, op_flags = ['readwriter']):
        i[...] = op(i)

In [121]:
"""
卷积层的反向传播
1.将误差传递到上一层
2.保存传递到上一层的sensitivity map的数组
3.计算代码梯度
4.按照梯度下降法更新参数
"""

'\n卷积层的反向传播\n1.将误差传递到上一层\n2.保存传递到上一层的sensitivity map的数组\n3.计算代码梯度\n4.按照梯度下降法更新参数\n'

In [122]:
# 定义一个卷积层
class ConvLayer(object):
    """
    主要参数:
    input_width: 输入图片的宽度
    input_height: 输入图片的长度
    channel_width: 通道数，彩色为3，灰色为1
    filter_width: 卷积核的宽
    filter_height: 卷积核的长
    filter_number: 卷积核的数量
    zero_padding: 补零长度
    stride：步长
    activator: 激活函数
    learning_rate: 学习率
    """
    
    # ConvLayer类中用到的calculate_output_size函数，用于计算通过卷积运算后的feature_map大小

    @staticmethod
    def calculate_output_size(input_size, filter_size, zero_padding, stride):
        return int((input_size - filter_size + 2 * zero_padding) / stride + 1)
    
    def __init__(self, input_width, input_height,
                 channel_number, filter_width,
                 filter_height, filter_number,
                 zero_padding, stride, activator,
                 learning_rate):
        self.input_width = input_width
        self.input_height = input_height
        self.channel_number = channel_number
        self.filter_width = filter_width
        self.filter_height = filter_height
        self.filter_number = filter_number
        self.zero_padding = zero_padding
        self.stride = stride
        self.activator = activator
        self.learning_rate = learning_rate
        # 卷积运算后的output的宽和长
        self.output_width = ConvLayer.calculate_output_size(
                                                            self.input_width,
                                                            filter_width,
                                                            zero_padding,
                                                            stride)
        self.output_height = ConvLayer.calculate_output_size(
                                                            self.input_height,
                                                            filter_height,
                                                            zero_padding,
                                                            stride)
        # 通过numpy把output转换成数组
        self.output_array = np.zeros((self.filter_number, self.output_height, self.output_width))
        # 定义一个空的filters，用于存放卷积核
        self.filters = []
        for i in range(filter_number):
            self.filters.append(Filter(filter_width, filter_height, self.channel_number))
            
    # 进行前向传播
    def forward(self, input_array):
        """
        计算卷积层的输出
        输出结果保存在self.output_array
        """
        self.input_array = input_array
        self.padded_input_array = padding(input_array, self.zero_padding)
        for f in range(self.filter_number):
            filter = self.filters[f]
            conv(self.padded_input_array,
                 filter.get_weights(), self.output_array[f],
                 self.stride, filter.get_bias())

        element_wise_op(self.output_array, self.activator.forward)
        
    def backward(self, input_array, sensitivity_array, activator):
        '''
        计算传递给前一层的误差项，以及计算每个权重的梯度
        前一层的误差项保存在self.delta_array
        梯度保存在Filter对象的weights_grad
        '''
        self.forward(input_array)
        self.bp_sensitivity_map(sensitivity_array,
                                activator)
        self.bp_gradient(sensitivity_array)
        
    def update(self):
        '''
        按照梯度下降，更新权重
        '''
        for filter in self.filters:
            filter.update(self.learning_rate)

    # 将误差传递到上一层
    def bp_sensitivity_map(self, sensitivity_array, activator):
        """
        计算传递到上一层sensitivity map
        sensitivity_array:本层的sensitivity map
        activator:上一层的激活函数
        """
        # 处理卷积步长，对原始sensitivity map进行扩展
        expanded_array = self.expand_sensitivity_map(sensitivity_array)
        # full卷积，对sensitivity map进行zero padding
        # 虽然原始输入的zero padding单元也会获得残差
        # 但这个残差不需要继续向上传递，因此就不计算了
        expanded_width = expanded_array.shape[2]
        zp = (self.input_width + self.filter_width - 1 - expanded_width) / 2
        padding_array = padding(expanded_array, zp)
        # 初始化delta_array,用于保存传递到上一层的
        # sensitivity map
        self.delta_array = self.create_delta_array()
        # 对于具有多个filter的卷积层来说，最终传递到上一层的
        # sensitivity map 相当于所有的filter的
        # sensitivity map之和
        for f in range(self.filter_number):
            filter = self.filters[f]
            # 将filter权重翻转180度
            flipped_weights = np.array(map(
                lambda i: np.rot90(i, 2),
                filter.get_weights()))
            # 计算与一个filter对应的delta_array
            delta_array = self.create_delta_array()
            for d in range(delta_array.shape[0]):
                conv(padded_array[f], flipped_weights[d],
                     delta_array[d], 1, 0)
            self.delta_array += delta_array
        # 将计算结果与激活函数的偏导数做element_wise乘法操作
        derivative_array = np.array(self.input_array)
        element_wise_op(derivative_array,
                        activator.backward)
        self.delta_array *= derivative_array    
        
    # 计算代码梯度
    def bp_gradient(self, sensitivity_array):
        # 处理卷积步长，对原始sensitivity map进行扩展
        expanded_array = self.expand_sensitivity_map(sensitivity_array)
        for f in range(self.filter_number):
            # 计算每个权重的梯度
            filter = self.filters[f]
            for d in range(filter.weights.shape[0]):
                conv(self.padded_input_array[d], expanded_array[f], filter.weights_grad[d], 1, 0)
            # 计算偏置项的梯度
            filter.bias_grad = expanded_array[f].sum()
            
    def expand_sensitivity_map(self, sensitivity_array):
        depth = sensitivity_array.shape[0]
        # 确定扩展后sensitivity map的大小
        # 计算stride为1时sensitivity map的大小
        expanded_width = (self.input_width - 
            self.filter_width + 2 * self.zero_padding + 1)
        expanded_height = (self.input_height - 
            self.filter_height + 2 * self.zero_padding + 1)
        # 构建新的sensitivity_map
        expand_array = np.zeros((depth, expanded_height, 
                                 expanded_width))
        # 从原始sensitivity map拷贝误差值
        for i in range(self.output_height):
            for j in range(self.output_width):
                i_pos = i * self.stride
                j_pos = j * self.stride
                expand_array[:,i_pos,j_pos] = \
                    sensitivity_array[:,i,j]
        return expand_array
    
    # 保存传递到上一层的sensitivity map的数组
    def create_delta_array(self):
        return np.zeros((self.channel_number, self.input_height, self.input_width))

In [123]:
"""
MaxPooling层的训练
1.定义MaxPooling类
2.前向传播计算
3.反向穿播计算
"""
# 定义MaxPooling类
class MaxPoolingLayer(object):
    def __init__(self, input_width, input_height,
                 channel_number, filter_width,
                 filter_height, stride):
        self.input_width = input_width
        self.input_height = input_height
        self.channel_number = channel_number
        self.filter_width = filter_width
        self.filter_height = filter_height
        self.stride = stride
        self.output_width = (input_width - filter_width) / self.stride + 1
        self.output_height = (input_height - filter_height) / self.stride + 1
        self.output_array = np.zeros((self.channel_number, self.output_height, self.output_width))

    # 前向传播
    def forward(self, input_array):
        for d in range(self.channel_number):
            for i in range(self.output_height):
                for j in range(self.output_width):
                    self.output_array[d, i, j] = (
                        get_patch(input_array[d], i, j,
                                 self.filter_width,
                                 self.filter_height,
                                 self.stride).max())
                    
                    
    # 反向传播
    def backward(self, input_array, sensitivity_array):
        self.delta_array = np.zeros(input_array.shape)
        for d in range(self.channel_number):
            for i in range(self.output_height):
                for j in range(self.output_width):
                    patch_array = get_patch(
                        input_array[d], i, j,
                        self.filter_width,
                        self.filter_height,
                        self.stride)
                    k, l = get_max_index(patch_array)
                    self.delta_array[d,
                                     i * self.stride + k,
                                     j * self.stride + l] = \
                                     sensitivity_array[d, i, j]

In [124]:
# 测试CNN

In [125]:
def init_test():
    a = np.array(
        [[[0,1,1,0,2],
          [2,2,2,2,1],
          [1,0,0,2,0],
          [0,1,1,0,0],
          [1,2,0,0,2]],
         [[1,0,2,2,0],
          [0,0,0,2,0],
          [1,2,1,2,1],
          [1,0,0,0,0],
          [1,2,1,1,1]],
         [[2,1,2,0,0],
          [1,0,0,1,0],
          [0,2,1,0,1],
          [0,1,2,2,2],
          [2,1,0,0,1]]])
    b = np.array(
        [[[0,1,1],
          [2,2,2],
          [1,0,0]],
         [[1,0,2],
          [0,0,0],
          [1,2,1]]])
    cl = ConvLayer(5,5,3,3,3,2,1,2,IdentityActivator(),0.001)
    cl.filters[0].weights = np.array(
        [[[-1,1,0],
          [0,1,0],
          [0,1,1]],
         [[-1,-1,0],
          [0,0,0],
          [0,-1,0]],
         [[0,0,-1],
          [0,1,0],
          [1,-1,-1]]], dtype=np.float64)
    cl.filters[0].bias=1
    cl.filters[1].weights = np.array(
        [[[1,1,-1],
          [-1,-1,1],
          [0,-1,1]],
         [[0,1,0],
         [-1,0,-1],
          [-1,1,0]],
         [[-1,0,0],
          [-1,0,1],
          [-1,0,0]]], dtype=np.float64)
    return a, b, cl

In [126]:
def test():
    a, b, cl = init_test()
    cl.forward(a)
    print ("前向传播结果:", cl.output_array)
    cl.backward(a, b, IdentityActivator())
    cl.update()
    print ("反向传播后更新得到的filter1:",cl.filters[0])
    print ("反向传播后更新得到的filter2:",cl.filters[1])

In [127]:
def gradient_check():
    '''
    梯度检查
    '''
    # 设计一个误差函数，取所有节点输出项之和
    error_function = lambda o: o.sum()
    
    # 计算forward值
    a, b, cl = init_test()
    cl.forward(a)
    
    # 求取sensitivity map
    sensitivity_array = np.ones(cl.output_array.shape,
                                dtype = np.float64)
    # 计算梯度
    cl.backward(a, sensitivity_array, IdentityActivator())
    # 检查梯度
    epsilon = 10e-4
    for d in range(cl.filters[0].weights_grad.shape[0]):
        for i in range(cl.filters[0].weights_grad.shape[1]):
            for j in range(cl.filters[0].weights_grad.shape[2]):
                cl.filters[0].weights[d, i, j] += epsilon
                cl.forward(a)
                err1 = error_function(cl.output_array)
                cl.filters[0].weights[d, i, j] -= 2 * epsilon
                cl.forward(a)
                err2 = error_function(cl.output_array)
                expect_grad = (err1 - err2) / (2 * epsilon)
                cl.filters[0].weights[d, i, j] += epsilon
                print ('weights(%d,%d,%d): expected - actural %f - %f' % (
                    d, i, j, expect_grad, cl.filters[0].weights_grad[d, i, j]))

In [128]:
def init_pool_test():
    a = np.array(
        [[[1,1,2,4],
          [5,6,7,8],
          [3,2,1,0],
          [1,2,3,4]],
         [[0,1,2,3],
          [4,5,6,7],
          [8,9,0,1],
          [3,4,5,6]]], dtype = np.float64)

    b = np.array(
        [[[1,2],
          [2,4]],
         [[3,5],
          [8,2]]], dtype = np.float64)

    mpl = MaxPoolingLayer(4, 4, 2, 2, 2, 2)

    return a, b, mpl

In [129]:
def test_pool():
    a, b, mpl = init_pool_test()
    mpl.forward(a)
    print ('input array:\n%s\noutput array:\n%s' % (a, mpl.output_array))
    mpl.backward(a, b)
    print ('input array:\n%s\nsensitivity array:\n%s\ndelta array:\n%s' % (a, b, mpl.delta_array))

In [130]:
def test_example():
    a = np.array(
        [[1,0,1,0],
          [1,1,1,0],
          [1,0,1,0],
          [0,0,1,0]],
          dtype = np.float64)
    b = np.array(
        [[[1,-1],
          [1,-1]],
         [[1,1],
          [-1,-1]]], dtype = np.float64)

In [131]:
if __name__ == "__main__":
    test()

AttributeError: 'list' object has no attribute 'ndim'