In [None]:
from IPython.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

# 导入库

In [None]:
import os
import sys
sys.path.append(os.pardir) #将父目录添加到默认搜寻目录
import numpy as np
import matplotlib.pyplot as plt
from dataset.mnist import load_mnist
from common.util import im2col,col2im
import math 
from common.optimizer import *
from collections import OrderedDict

In [None]:
from common.layers import Convolution,Pooling,Affine

In [None]:
(x_train, t_train), (x_test, t_test) = load_mnist(flatten=False,one_hot_label=True)
#导入训练数据和测试数据
x_train_1=x_train[:100]
t_train_1=t_train[:100]

In [None]:
def softmax(x):
    if x.ndim == 2:
        x = x.T
        x = x - np.max(x, axis=0)
        y = np.exp(x) / np.sum(np.exp(x), axis=0)
        return y.T 

    x = x - np.max(x) # 溢出对策
    return np.exp(x) / np.sum(np.exp(x))

def cross_entropy_error(y, t):
    if y.ndim == 1:
        t = t.reshape(1, t.size)
        y = y.reshape(1, y.size)
        
    # 监督数据是one-hot-vector的情况下，转换为正确解标签的索引
    if t.size == y.size:
        t = t.argmax(axis=1)
             
    batch_size = y.shape[0]
    return -np.sum(np.log(y[np.arange(batch_size), t] + 1e-7)) / batch_size

class my_conv:
    def __init__(self,W,b,stride=1, pad=0): 
        self.FN ,self.C ,self.FH, self.FW = W.shape
        self.stride = stride;
        self.pad = pad;
        self.x = None;   
        self.col = None;
        self.col_W = None;
        self.W = W
        self.b = b #(FN,)
        self.dW = None;
        self.db = None;
        
    def forward(self,x):
        N, C , H, W = x.shape;
        OH = int( (H + 2*0 -self.FH)/1 + 1 );
        OW = int( (W + 2*0 -self.FW)/1 + 1 );
        
        x_col = im2col(x,self.FH,self.FW ,stride=1,pad=0);
        filter_flat = self.W.reshape(self.FN , -1).T;
        
        x_aftercov = np.dot(x_col,filter_flat) + self.b
        x_aftercov = (x_aftercov.T.reshape(self.FN,N,OH,OW).transpose(1,0,2,3)) 
        
        self.x = x
        self.col = x_col
        self.col_W = filter_flat
        return x_aftercov
    
    def backward(self,dout):
        FN, C, FH, FW = self.W.shape
        dout = dout.transpose(0,2,3,1).reshape(-1, FN)

        self.db = np.sum(dout, axis=0)
        self.dW = np.dot(self.col.T, dout)
        self.dW = self.dW.transpose(1, 0).reshape(FN, C, FH, FW)

        dcol = np.dot(dout, self.col_W.T)
        dx = col2im(dcol, self.x.shape, FH, FW, self.stride, self.pad)
        
        return dx
    
class my_pool:
    def __init__(self,pool_h, pool_w ,stride = 1, pad = 0):
        self.pool_h = pool_h;
        self.pool_w = pool_w;
        self.stride = stride;
        self.pad = pad;
        self.arg_max = None;
        self.x = None;
        
    def forward(self,x):
        self.x = x;
        N, C , H, W = x.shape;
        OH = int( (H + 2*self.pad -self.pool_h)/self.stride + 1 );
        OW = int( (W + 2*self.pad -self.pool_w)/self.stride + 1 );
        
        x_col = im2col(x,self.pool_h,self.pool_w ,stride=self.stride,pad=0);
        x_col = x_col.reshape(-1,self.pool_h*self.pool_w);
        arg_max = np.argmax(x_col, axis=1);
        self.arg_max = arg_max;
                            
        out = np.max(x_col,axis=1);
        out = out.reshape(N,OH,OW,C).transpose(0,3,1,2)
        
        return out
    
    def backward(self,dout):
        dout = dout.transpose(0, 2, 3, 1)
        
        pool_size = self.pool_h * self.pool_w
        dmax = np.zeros((dout.size, pool_size))
        dmax[np.arange(self.arg_max.size), self.arg_max.flatten()] = dout.flatten()
        dmax = dmax.reshape(dout.shape + (pool_size,)) 
        
        dcol = dmax.reshape(dmax.shape[0] * dmax.shape[1] * dmax.shape[2], -1)
        dx = col2im(dcol, self.x.shape, self.pool_h, self.pool_w, self.stride, self.pad)
        
        return dx
    
class ReLu:
    def __init__(self):
        self.mask = None

    def forward(self, x):
        self.mask = (x <= 0)
        out = x.copy()
        out[self.mask] = 0

        return out

    def backward(self, dout):
        dout[self.mask] = 0
        dx = dout

        return dx
    
class my_Affine:
    def __init__(self,W,b):
        self.W = W;
        self.b = b;
        self.x = None;
        self.original_x_shape = None;
        self.dW = None;
        self.db = None;   #这一层的权重和偏置的梯度数据会被保存下来
        
    def forward(self,x):
        self.original_x_shape = x.shape;
        x = x.reshape(x.shape[0],-1)
        self.x = x;
        out = np.dot(self.x , self.W) + self.b;
        
        return out
    
    def backward(self,dout):
        dx = np.dot(dout,self.W.T);
        self.dW = np.dot(self.x.T, dout);
        dx = dx.reshape(*self.original_x_shape)  # 还原输入数据的形状（对应张量）
        self.db = np.sum(dout,axis=0);
        
        return dx #将dx作为输出继续流向下一层
    
def softmax_loss(X, t):
    y = softmax(X)
    return cross_entropy_error(y, t)

class SoftmaxWithLoss:
    def __init__(self):
        self.loss = None
        self.y = None # softmax的输出
        self.t = None # 监督数据

    def forward(self, x, t):
        self.t = t
        self.y = softmax(x)
        self.loss = cross_entropy_error(self.y, self.t)
        
        return self.loss

    def backward(self, dout=1):
        batch_size = self.t.shape[0]
        if self.t.size == self.y.size: # 监督数据是one-hot-vector的情况
            dx = (self.y - self.t) / batch_size
        else:
            dx = self.y.copy()
            dx[np.arange(batch_size), self.t] -= 1
            dx = dx / batch_size
        
        return dx

In [None]:
def my_cross_entropy(x,t): #这里的t需要是One-hot形式
    delta = 1e-7
    batch_size = x.shape[0]
    return -np.sum(np.log(x+delta) * t) / batch_size

In [None]:
class my_softmax_loss:
    def __init__(self):
        self.loss = None;
        self.t = None;
        self.y = None;
        
    def forward(self,x,t): #这里的t需要是One-hot形式
        self.y = softmax(x);
        self.t = t;
        self.loss = my_cross_entropy(self.y , t);
        return self.loss
        
        
    def backward(self):
        dout = 1;
        batch_size= self.t.shape[0];
        out = (self.y - self.t)/batch_size;
        return out

In [None]:
class my_AdaGrad:
    def __init__(self,lr=0.01):
        self.lr = lr;
        self.h = None;
        
    def update(self,params,grads): #这里输入的params和grads要求是字典变量
        if self.h == None:
            self.h = {};
            for keys,values in params.items():
                self.h[keys] = np.zeros_like(values);
                
        else:
            for keys in params.keys():
                self.h[keys] += grads[keys]*grads[keys];
                params[keys] -= self.lr * grads[keys] / (np.sqrt(self.h[keys]) + 1e-7)
        #不需要返回值，输入的params在经过update运算后会发生变化

In [None]:
class my_cnn_network:
    def __init__(self,input_dim=(1,28,28),conv_param={'filter_num': 20,'filter_size': 5,"stride":1,"pad":0},
                 hidden_size=50,output_size=10,weitght_int_std=0.01):
        
        self.C,self.H,self.W = input_dim;
        self.FN = conv_param['filter_num'];
        self.FH = conv_param['filter_size'];
        self.FW = conv_param['filter_size'];
        stride = 1;
        conv_output = int((self.H-self.FH)/stride + 1)
        pool_output_size = int(self.FN * (conv_output/2) * (conv_output/2)) #池化后神经元个数
        
        self.params = {};
        self.layers = OrderedDict();
        
        self.params['W1'] = np.sqrt(2.0/ (self.C*self.H*self.W)) * np.random.randn(self.FN,self.C,self.FH,self.FW);
        self.params['b1'] =np.zeros(self.FN);
        self.layers['conv'] = my_conv(self.params['W1'],self.params['b1'])
        
        self.layers['relu1'] = ReLu(); 
        self.layers['pool'] = my_pool(2,2,stride=2);
        
        self.params['W2'] = np.sqrt(2.0/pool_output_size)*np.random.randn(pool_output_size,hidden_size);
        self.params['b2'] = np.zeros(hidden_size);
        self.layers['affine1'] = my_Affine(self.params['W2'],self.params['b2']);
        
        self.layers['relu2'] = ReLu();
        
        self.params['W3'] = np.sqrt(2.0/hidden_size)*np.random.randn(hidden_size,output_size);
        self.params['b3'] = np.zeros(output_size);
        self.layers['affine2'] = my_Affine(self.params['W3'],self.params['b3']);
        
        self.last_layer = my_softmax_loss();
        self.loss_value = None;
        
    def predict(self,x):
        x_predict = x;
        for layers in self.layers.values():
            x_predict = layers.forward(x_predict)
            
        return x_predict
    
    def loss(self,x,t):
        x_predict = self.predict(x);
        loss = self.last_layer.forward(x_predict,t)
        return loss
    
    def gradient(self,x,t):
        layers = list(self.layers.values());
        layers.reverse();
        
        self.loss_value = self.loss(x,t);
        
        dout = self.last_layer.backward();
        for layer in layers:
            dout = layer.backward(dout);
            
        grads={}
        grads['W1'] = self.layers['conv'].dW
        grads['b1'] = self.layers['conv'].db
        grads['W2'] = self.layers['affine1'].dW
        grads['b2'] = self.layers['affine1'].db
        grads['W3'] = self.layers['affine2'].dW
        grads['b3'] = self.layers['affine2'].db
        
        
        
        return grads 

In [None]:
class my_cnn_network1:
    def __init__(self,input_dim=(1,28,28),conv_param={'filter_num': 30,'filter_size': 5,"stride":1,"pad":0},
                 hidden_size=100,output_size=10,weitght_int_std=0.01):
        
        self.C,self.H,self.W = input_dim;
        self.FN = conv_param['filter_num'];
        self.FH = conv_param['filter_size'];
        self.FW = conv_param['filter_size'];
        stride = 1;
        conv_output = int((self.H-self.FH)/stride + 1)
        pool_output_size = int(self.FN * (conv_output/2) * (conv_output/2)) #池化后神经元个数
        
        self.params = {};
        self.layers = OrderedDict();
        
        self.params['W1'] = weitght_int_std * np.random.randn(self.FN,self.C,self.FH,self.FW);
        self.params['b1'] =np.zeros(self.FN);
        self.layers['conv'] = Convolution(self.params['W1'], self.params['b1'])
        
        self.layers['relu1'] = ReLu(); 
        self.layers['pool'] = Pooling(2,2,2);
        
        self.params['W2'] = weitght_int_std*np.random.randn(pool_output_size,hidden_size);
        self.params['b2'] = np.zeros(hidden_size);
        self.layers['affine1'] = my_Affine(self.params['W2'],self.params['b2']);
        
        self.layers['relu2'] = ReLu();
        
        self.params['W3'] = weitght_int_std*np.random.randn(hidden_size,output_size);
        self.params['b3'] = np.zeros(output_size);
        self.layers['affine2'] = my_Affine(self.params['W3'],self.params['b3']);
        
        self.last_layer = SoftmaxWithLoss();
        self.loss_value = None;
        
    def predict(self, x):
        for layer in self.layers.values():
            x = layer.forward(x)

        return x
    
    def loss(self,x,t):
        x_predict = self.predict(x);
        loss = self.last_layer.forward(x_predict,t)
        return loss
    
    def gradient(self,x,t):
        layers = list(self.layers.values());
        layers.reverse();
        
        self.loss_value = self.loss(x,t);
        
        dout = 1
        dout = self.last_layer.backward(dout)
        for layer in layers:
            dout = layer.backward(dout);
            
        grads={}
        grads['W1'] = self.layers['conv'].dW
        grads['b1'] = self.layers['conv'].db
        grads['W2'] = self.layers['affine1'].dW
        grads['b2'] = self.layers['affine1'].db
        grads['W3'] = self.layers['affine2'].dW
        grads['b3'] = self.layers['affine2'].db
        
        
        
        return grads 

In [None]:
cnnnetwork = my_cnn_network();
losslist=[]; #记录损失函数值


In [None]:
optimizer = AdaGrad(lr=0.001);

In [None]:
optimizer = SGD();

In [None]:
for i in range(1000):
    batch_mask = np.random.choice(60000,100);
    x_train_batch = x_train[batch_mask];
    t_train_batch = t_train[batch_mask];  #每一次学习都随机选300个数据
    
    grads = cnnnetwork.gradient(x_train_batch,t_train_batch);
    losslist.append(cnnnetwork.loss_value);
    optimizer.update(cnnnetwork.params,grads)
    if i % 100 ==0 :
        print(str(i) + '|' + str(losslist[-1]))

In [None]:
begin = cnnnetwork.params['W3'][0]
print(begin)

In [None]:
finish = cnnnetwork.params['W3'][0]
print(finish)

In [None]:
predict_result = np.argmax(cnnnetwork.predict(x_test),axis=1)

In [None]:
true_result = np.argmax(t_test, axis=1)

In [None]:
np.count_nonzero((true_result == predict_result))

In [None]:
9230 / len(true_result)