In [1]:
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

In [2]:
from collections import OrderedDict

In [3]:
from read_mnist import load_mnist

### CNN
- 卷积层、池化层的实现

In [4]:
def img2col(x, stride, padding, kernel_h, kernel_w):
    """
    :type x: (N, Channels, H, W),其中N表示batch大小，
    Channels表示通道数，W表示二维数组表示的图像的宽，
    H表示高
    :type stride: int, 步长
    :type kernel_h: int, 卷积核的高
    :type kernel_w: int, 卷积核的宽
    """

    N, C, H, W = x.shape
#     if padding != 0:
#         cols_add = np.zeros((x.shape[0], x.shape[1], x.shape[2], padding))
#         rows_add = np.zeros((x.shape[0], x.shape[1], padding, x.shape[3] + 2 * padding))

#         x = np.concatenate((cols_add, x, cols_add), axis=3)  # 扩充列
#         x = np.concatenate((rows_add, x, rows_add), axis=2)  # 扩充行
    if padding != 0:
        x = np.pad(x, [(0,0),(0,0),(padding, padding),(padding, padding)], 'constant') # 填充函数

    new_H = int((H + 2 * padding - kernel_h) / stride + 1)  # 向上取整
    new_W = int((W + 2 * padding - kernel_w) / stride + 1)  # 向上取整

#     new_x = np.zeros((N, new_H*new_W, C*kernel_h*kernel_w))
#     for i in range(N):
#         for j in range(new_H*new_W):
#             for v in range(C):
#                 for u in range(0, H - kernel_h + 1):
#                     for z in range(0, W - kernel_w + 1):
#                         new_x[i][j][v * (kernel_h*kernel_w):(v+1)*(kernel_h*kernel_w)] = \
#                             x[i][v][u:u+kernel_h, z:z+kernel_w].reshape(1, -1)

    new_x = np.zeros((N, C, kernel_h, kernel_w, new_H, new_W))
    for i in range(kernel_h):
        i_max = i + stride * new_H
        for j in range(kernel_w):
            j_max = j + stride * new_W
            new_x[:, :, i, j, :, :] = x[:, :, i:i_max:stride, j:j_max:stride]

    return new_x.transpose(0, 4, 5, 1, 2, 3).reshape(N * new_H * new_W, -1)

def col2img(col, input_shape, kernel_h, kernel_w, stride=1, padding=0):
    N, C, H, W = input_shape
    out_h = int((H + 2 * padding - kernel_h) / stride) + 1
    out_w = int((W + 2 * padding - kernel_w) / stride) + 1
    col = col.reshape(N, out_h, out_w, C, kernel_h, kernel_w).transpose(0, 3, 4, 5, 1, 2)

    img = np.zeros((N, C, H + 2 * padding + stride - 1, W + 2 * padding + stride - 1))
    for y in range(kernel_h):
        y_max = y + stride * out_h
        for x in range(kernel_w):
            x_max = x + stride * out_w
            img[:, :, y:y_max:stride, x:x_max:stride] += col[:, :, y, x, :, :]

    return img[:, :, padding:H + padding, padding:W + padding]

In [5]:
class Conv(object):
    def __init__(self, W, b, stride=1, padding=0):
        """
        :type W: (,Channels, H, W), Channels表示通道数，
        也是传向下一层的数据通道数，W表示二维数组表示
        的滤波器的宽，H表示高
        :type b: (Channels, ),每一通道上的偏置
        :type stride: 步长，默认在长、宽两个方向是相同的
        :type padding: [True, False],是否补全，若是，则补全使保证卷积前后
        数据W、H大小不变，否则不补全
        """
        self.W = W
        self.b = b
        self.stride = stride
        self.padding = padding
        self.dW = None
        self.db = None

    def forward(self, x):
        """
        :type x: (N, Channels, H, W),其中N表示batch大小，
        Channels表示通道数，W表示二维数组表示的图像的宽，
        H表示高
        """
        N, C, H, W = x.shape
        WN, WC, WH, WW = self.W.shape

        new_H = int((H + 2*self.padding - WH)/self.stride) + 1  # 向上取整
        new_W = int((W + 2*self.padding - WW)/self.stride) + 1  # 向上取整

        extend_x = img2col(x, self.stride, self.padding, WH, WW)
        expend_W = self.W.reshape(WN, -1).T
        extend_y = np.dot(extend_x, expend_W) + self.b
        y = extend_y.reshape(N, new_H, new_W, -1).transpose(0, 3, 1, 2)  # 调整轴顺序，重新变形
        return y

    def backward(self, dout):
        WN, WC, WH, WW = self.W.shape
        dout = dout.transpose(0, 2, 3, 1).reshape(-1, WN)

        self.db = np.sum(dout, axis=0)
        self.dW = np.dot(self.extend_x.T, dout)
        self.dW = self.dW.transpose(WN, WC, WH, WW)

        dextend_x = np.dot(dout, self.extend_W.T)
        dx = col2img(dextend_x, self.x.shape, WH, WW, self.stride, self.padding)
        return  dx

### 测试img2col函数

In [6]:
x = np.ones((10, 3, 5, 5))
c = Conv(W=np.ones((20, 3, 3, 3)), b=-np.ones((20,)), stride=1, padding=0)
res_x = c.forward(x)
print res_x.shape

(10, 20, 3, 3)


In [7]:
class Pooling(object):
    def __init__(self, kernel_h, kernel_w, stride=1, padding=0):
        self.kernel_h = kernel_h
        self.kernel_w = kernel_w
        self.stride = stride
        self.padding = padding
        
        self.x = None
        self.arg_max = None
    # 默认实现MaxPooling函数
    def forward(self, x):  
        N, C, H, W = x.shape
        
        new_H = int((H + 2*self.padding - self.kernel_h)/self.stride) + 1  # 向上取整
        new_W = int((W + 2*self.padding - self.kernel_w)/self.stride) + 1  # 向上取整
        
        extend_x = img2col(x, self.stride, self.padding, self.kernel_h, self.kernel_w)
        extend_x = extend_x.reshape(-1, self.kernel_h*self.kernel_w)
        
        arg_max = np.argmax(extend_x, axis=1) 
        y = np.max(extend_x, axis=1)
        y = y.reshape(N, C, new_H, new_W)
        
        self.x = x
        self.arg_max = arg_max  # 记录中间结果，在backward时方便还原哪个位置有梯度传递，哪个位置没有梯度传播
        return y
    def backward(self, dout):
        dout = dout.transpose(0, 2, 3, 1)
        
        pool_size = self.kernel_h * self.kernel_w
        
        dmax = np.zeros((dout.size, pool_size))
        dmax[np.arange(self.arg_max.size), self.arg_max.flatten()] = dout.flatten()
        dmax = dmax.reshape(dout.shape + (pool_size, ))
        
        
        # 最后一步还原为input的形状
        dcol = dmax.reshape(dmax.shape[0] * dmax.shape[1] * dmax.shape[2], -1)
        dx = col2img(dcol, self.x.shape, self.kernel_h, self.kernel_w, self.stride, self.padding)
        
        return dx

In [8]:
pool = Pooling(kernel_h=3, kernel_w=3, stride=2)
res_px = pool.forward(x)
print res_px.shape

(10, 3, 2, 2)


In [12]:
a = (10, 3, 3, 3)
a + (9, )

(10, 3, 3, 3, 9)

In [22]:
class Activation_func(object):
    def __init__(self):
        pass
class ReLu(Activation_func):
    def __init__(self):
        self.mask = None
    def forward(self, x):
        self.mask = (x <= 0)
        x_copy = x.copy()
        x_copy[self.mask] = 0
        return x_copy
    
    def backward(self, dout):
        dout[self.mask] = 0
        dx = dout
        return dx

class Affine(object):
    def __init__(self, W, b, lam=0.01):
        self.W = W
        self.b = b
        self.dW = None
        self.db = None
        self.lam = lam
        self.x = None
    def forward(self, x):
        self.x = x
        return np.dot(x, self.W) + self.b
    
    def backward(self, dout):
        dx = np.dot(dout, self.W.T)
        self.dW = np.dot(self.x.T, dout) + self.lam * self.W # 损失函数添加权重衰减项之后的导数 
        self.db = np.sum(dout, axis=0)
        return dx

class softmax_cross_entropy(object):
    def __init__(self, y):
        self.out = None
        self.y = y
    def forward(self, x):
        exp_x = np.exp(x - np.max(x, axis=1).reshape(-1, 1))
        sum_x = np.sum(exp_x, axis=1)
        out = exp_x / sum_x.reshape(-1, 1)
        self.out = out
        return -np.sum(self.y*np.log(self.out + 1e-7)) / self.out.shape[0]
    def backward(self, dout):
        return dout * (self.out - self.y)

### 使用卷积层、池化层、全连接层、等构建并训练一个CNN 

In [30]:
class SimpleConvNet(object):
    def __init__(self, input_dim=(1, 28, 28), conv_param={'filter_num':30, 'filter_size':5, 
                                                          'stride':1, 'pad':0},hidden_size=100, 
                 output_size=10, weight_init_std = 0.01):
        filter_num = conv_param['filter_num']
        filter_size =conv_param['filter_size']
        filter_stride = conv_param['stride']
        filter_pad = conv_param['pad']
        
        input_size = input_dim[1]
        conv_output_size = (input_size - filter_size + 2*filter_pad) / filter_stride + 1
        pool_output_size = int(filter_num *(conv_output_size/2)*(conv_output_size/2))
        
        self.params = {}
        self.params['W1'] = weight_init_std * np.random.randn(filter_num, input_dim[0], 
                                                              filter_size, filter_size)
        self.params['b1'] = np.zeros(filter_num)
        
        self.params['W2'] = weight_init_std * np.random.randn(pool_output_size, hidden_size)
        self.params['b2'] = np.zeros(hidden_size)
        
        self.params['W3'] = weight_init_std * np.random.randn(hidden_size, output_size)
        self.params['b3'] = np.zeros(output_size)
        
        self.layers = OrderedDict()
        self.layers['Conv1'] = Conv(self.params['W1'], self.params['b1'], filter_stride, filter_pad)
        self.layers['Activation1'] = ReLu()
        
        self.layers['Pooling'] = Pooling(kernel_h=2, kernel_w=2, stride=2)
        
        self.layers['Affine1'] = Affine(self.params['W2'], self.params['b2'])
        self.layers['Activation2'] = ReLu()
        
        self.layers['Affine2'] = Affine(self.params['W3'], self.params['b3'])
        
        self.last_layer = None
    
    def forward(self, x):
        for layer in self.layers.values():
            x = layer.forward(x)
        return x
    
    def loss(self, x, t):
        y = self.forward(x)
        self.last_layer = softmax_cross_entropy(t)
        return self.last_layer.forward(y)
    
    def gradient(self, x, t):
        self.loss(x, t)
        dout = 1
        dout = self.last_layer.backward(dout)
        
        layers = list(self.layers.values)
        layers.reverse()
        
        for layer in layers:
            dout = layer.backward(dout)
        
        grads = {}
        grads['W1'] = self.layers['Conv1'].dW
        grads['b1'] = self.layers['Conv1'].db
        grads['W2'] = self.layers['Affine1'].dW
        grads['b2'] = self.layers['Affine1'].db
        grads['W3'] = self.layers['Affine2'].dW
        grads['b3'] = self.layers['Affine2'].db
        
        return grads

In [15]:
def train(images, labels, alpha=0.01, spochs=100, batch_size = 100, images_test=None, labels_test=None):
    net = SimpleConvNet()
#     print net.paras
    train_loss_list = []
    train_size = images.shape[0]
    for i in range(spochs):
        batch_mask = np.random.choice(train_size, batch_size)
        x = images[batch_mask]
        y = labels[batch_mask] # 抽样
        grad = net.gradient(x, y)
        
        
        for k,v in grad.items():
            net.paras[k] -= alpha*grad[k]
        loss = net.loss(x, y)
        print "has trained %d times, the train's loss %f" % (i+1, loss)
        if images_test is not None and labels_test is not None:
            print net.accuracy(images_test, labels_test)
        train_loss_list.append(loss)
    return net, train_loss_list

In [17]:
def load_data(path = "../data/mnist", kind = 'train', one_hot = True, normals = True):
    images, labels = load_mnist(path, kind=kind)
    if normals == True:
        images = images / 255.0
    if one_hot == True:
        labels_onehot = np.zeros((labels.shape[0], np.unique(labels).shape[0]))
        for i in range(labels_onehot.shape[0]):
            labels_onehot[i][labels[i]] = 1
        labels = labels_onehot
    return (images, labels)

In [18]:
train_x, train_y = load_data(kind='train')
test_x, test_y = load_data(kind='t10k')

In [20]:
train_cx = train_x.reshape(-1, 1, 28, 28)
test_cx  = test_x.reshape(-1, 1, 28, 28)

In [21]:
train_cx.shape

(60000, 1, 28, 28)

In [31]:
train(images=train_cx, labels=train_y, images_test=test_cx, labels_test=test_y)

ValueError: shapes (100,30,12,12) and (4320,100) not aligned: 12 (dim 3) != 4320 (dim 0)