In [None]:
import numpy as npy
import cupy as np
from tqdm import tqdm
import matplotlib.pyplot as plt
import scipy.io as scio
from time import time
from eznf import datasets

### 辅助函数的定义

In [None]:
def onehot(i: int):
    oh = npy.zeros(10)
    if(i == 10):
        oh[0] = 1
    else:
        oh[i] = 1
    return oh

def accuracy(m, x: np.ndarray, y: np.ndarray, batch_size):
    return (m(x) == y.argmax(axis=0)).sum() / batch_size

def flatten(x: np.ndarray, batch_size: int):
    return x.reshape([batch_size, -1], order='C').T

def reflatten(x: np.ndarray, shape: list or tuple):
    """
        shape: batch_size * channels * w * h
    """
    return x.T.reshape(shape, order='C')

### 激活函数和导数定义

In [None]:
# 线性
def linear(x: np.ndarray, w: np.ndarray):
    if(w.shape[1] != x.shape[0]):
        raise ValueError('could not multiply shape {} x {} with {} x {}'.format(*w.shape, *x.shape))
    return w @ x

# relu激活函数
def relu(x: np.ndarray):
    return x * (x > 0)

# softmax激活函数
def softmax(x: np.ndarray):
    phi = x.max(axis=0)
    e = np.exp(x - phi)
    return e / e.sum(axis=0)

# 交叉熵损失
def cross_entropy(a: np.ndarray, y: np.ndarray, batch_size: int):
    return -(y*np.log(a)).sum() / batch_size

# 线性导数
def dlinear(x: np.ndarray, w: np.ndarray):
    return w

# relu导数
def drelu(x: np.ndarray):
    return x > 0


# 交叉熵损失导数
def dcross_entropy(a: np.ndarray, y: np.ndarray):
    return a - y

### 卷积和池化及其导数定义

In [None]:
# 卷积, stride = 1, padding = 0
def cov2d(x: np.ndarray, w: np.ndarray):
    """
        x: batch_size * channels * w * h
        w: out_channels * channels * w * h
    """    
    batch_size, in_channels, x_size, _ = x.shape
    out_channels, _, w_size, _ = w.shape
    steps = x_size - w_size + 1

    _w = w.reshape([out_channels, -1], order='C')   # 将卷积核展平
    _z = np.zeros([in_channels*w_size**2, batch_size*steps**2]) # 为将输入展平做准备

    for i in range(w_size):
        for j in range(w_size):
            _z[i*w_size+j::w_size**2, :] = x[:, :, i:steps+i, j:steps+j].transpose(1,0,2,3).reshape([in_channels,-1], order='C')    # 将输入展平(im2col)

    z = (_w @ _z).reshape([out_channels, batch_size, steps, steps], order='C').transpose(1,0,2,3)   # 矩阵相乘后还原(卷积)
    return z

# 最大池化, stride = p_size, padding = 0
def max_pooling(x: np.ndarray, p_size: int):
    """
        x: batch_size * channels * w * h
    """    
    batch_size, c, x_size, _ = x.shape

    if(int(x_size/p_size) != x_size/p_size):
        raise ValueError('x_size must be divided exactly by p_size')
    steps = int(x_size/p_size)
    _z = np.zeros([p_size**2, batch_size*c*steps**2])
    

    for i in range(p_size):
        for j in range(p_size):
            _z[i*p_size+j, :] = x[:, :, i::p_size, j::p_size].reshape([-1], order='C')    # 将输入展平(im2col)

    z = _z.max(axis=0).reshape([batch_size, c, steps, steps])
    arg_max = _z.argmax(axis=0)
    return z, arg_max

# 卷积对w导数, stride = 1, padding = 0
def dwcov2d(x: np.ndarray, z: np.ndarray):
    """
        x: batch_size * channels * w * h
        z: batch_size * channels * w * h
    """
    batch_size, x_c, x_size, _ = x.shape
    _, z_c, z_size, _ = z.shape
    steps = x_size - z_size + 1

    _z = z.transpose(1,0,2,3).reshape([z_c, -1], order='C')
    _x = np.zeros([batch_size*z_size**2, steps**2*x_c])
    for i in range(steps):
        for j in range(steps):
            loc = i*steps+j
            _x[:,loc::steps**2] =  x[:, :, i:i+z_size, j:j+z_size].transpose(1,0,2,3).reshape([x_c,-1]).T   # 方法同cov2d, 采用im2col算法

    dw = (_z @ _x).reshape(z_c,x_c,steps,steps) / batch_size
    return dw

# 卷积对x导数, stride = 1, padding = 0
def dxcov2d(w: np.ndarray, z: np.ndarray):
    """
        w: out_channels * channels * w * h
        z: batch_size * channels * w * h
    """
    _, in_c, w_size, _ = w.shape
    batch_size, out_c, z_size, _ = z.shape
    x_size = z_size + w_size - 1

    # time1 = time()
    x = np.zeros([batch_size, in_c, x_size, x_size])
    for i in range(z_size):
        for j in range(z_size):
            x[:, :, i:i+w_size, j:j+w_size] += (w[:,None,:,:,:] * z[:, :, i, j].transpose(1,0)[:,:,None,None,None]).sum(axis=0) # numpy广播机制牛逼！
    # print('dx: ', time() - time1)
    
    return x

# 最大池化导数, stride = p_size, padding = 0
def dmax_pooling(z: np.ndarray, p_size: int, argmax: np.ndarray):
    """
        z: batch_size * channels * w * h
    """    
    batch_size, c, z_size, _ = z.shape
    x_size = z_size * p_size
    dx = np.zeros([batch_size*c*z_size*z_size, p_size*p_size])
    dx[np.arange(argmax.size), argmax] = z.flatten()
    dx = dx.reshape([batch_size,c,z_size,z_size,p_size,p_size], order='C').transpose(0,1,4,5,2,3)
    dxx = np.zeros([batch_size, c, x_size, x_size])
    
    for i in range(p_size):
        for j in range(p_size):
            dxx[:,:,i::p_size,j::p_size] = dx[:, :, i, j,:,:]
    return dxx

### 类化

In [None]:
class Linear:
    def __init__(self, input_size, output_size, alpha, batch_size):
        self.alpha = alpha
        self.batch_size = batch_size
        self.w = 0.1*np.random.randn(output_size, input_size)
        self.x = None

    def __call__(self, x: np.ndarray):
        self.x = x
        return linear(x, self.w)
    
    def backward(self, output):
        dl =  dlinear(self.x, self.w).T @ output
        self.w = self.w - self.alpha * ((output @ self.x.T) / self.batch_size)
        return dl


class ReLU:
    def __init__(self):
        self.x = None

    def __call__(self, x: np.ndarray):
        self.x = x
        return relu(x)
    
    def backward(self, output):
        return drelu(self.x) * output


class Cov2d:
    def __init__(self, input_channel, output_channel, kernel_size, alpha):
        self.alpha = alpha
        self.w = 0.1*np.random.randn(output_channel, input_channel, kernel_size, kernel_size)
        self.x = None

    def __call__(self, x: np.ndarray):
        self.x = x
        return cov2d(x, self.w)
    
    def backward(self, output):
        # dc = dxcov2d(self.w, output)
        self.w = self.w - self.alpha * dwcov2d(self.x, output)
        # return dc


class MaxPooling:
    def __init__(self, kernel_size: int):
        self.kernel_size = kernel_size
        self.argmax = None

    def __call__(self, x: np.ndarray):
        max_z, self.argmax = max_pooling(x, self.kernel_size)
        return max_z

    def backward(self, output):
        return dmax_pooling(output, self.kernel_size, self.argmax)


class Flatten:
    def __init__(self):
        self.shape = None
    
    def __call__(self, x: np.ndarray):
        self.shape = x.shape
        return flatten(x, self.shape[0])

    def backward(self, output):
        return reflatten(output, self.shape)


class CrossEntropyLoss:
    def __init__(self, batch_size):
        self.a = None
        self.y = None
        self.batch_size = batch_size

    def __call__(self, x: np.ndarray, y: np.ndarray):
        self.y = y
        self.a = softmax(x)
        return cross_entropy(self.a, y, self.batch_size)

    def backward(self):
        return dcross_entropy(self.a, self.y)

### 全连接神经网络

In [None]:
class MLP:
    def __init__(self, alpha, batch_size):
        self.networks = [
            Linear(784, 512, alpha, batch_size),
            ReLU(),
            Linear(512, 256, alpha, batch_size),
            ReLU(),
            Linear(256, 64, alpha, batch_size),
            ReLU(),
            Linear(64, 32, alpha, batch_size),
            ReLU(),
            Linear(32, 10, alpha, batch_size),
        ]
        self.loss_f = CrossEntropyLoss(batch_size)

    def train(self, x, y):
        x = x / 255
        for n in self.networks:
            x = n(x)
        return self.loss_f(x, y)

    def __call__(self, x):
        x = x / 255
        for n in self.networks:
            x = n(x)
        return x.argmax(axis=0)

    def backward(self):
        delta = self.loss_f.backward()
        for n in reversed(self.networks):
            delta = n.backward(delta)

### 卷积神经网络

In [None]:
class CNN:
    def __init__(self, alpha, batch_size):
        self.networks = [
            Cov2d(1, 4, 3, alpha),
            MaxPooling(2),
            Flatten(),
            Linear(676, 512, alpha, batch_size),
            ReLU(),
            Linear(512, 256, alpha, batch_size),
            ReLU(),
            Linear(256, 10, alpha, batch_size),
        ]
        self.loss_f = CrossEntropyLoss(batch_size)
    
    def train(self, x, y):
        x = x / 255
        for n in self.networks:
            x = n(x)
        return self.loss_f(x, y)

    def __call__(self, x):
        x = x / 255
        for n in self.networks:
            x = n(x)
        return x.argmax(axis=0)
        

    def backward(self):
        delta = self.loss_f.backward()
        for n in reversed(self.networks):
            delta = n.backward(delta)

## 加载数据集

In [None]:
dataset = datasets.MNIST('./', True)
data = dataset.get()
X_train, Y_train, X_test, Y_test = data
Y_train = npy.vstack([onehot(i) for i in Y_train]).T
Y_train = np.array(Y_train)
Y_test = npy.vstack([onehot(i) for i in Y_test]).T
Y_test = np.array(Y_test)

## 定义神经网络

In [None]:
batch_size = 4096
m = MLP(alpha=0.1, batch_size=batch_size)
c = CNN(alpha=0.01, batch_size=batch_size)  # 学习率再大就溢出了

## 训练

### MLP

In [None]:
X_train = npy.vstack([i.flatten() for i in X_train]).T
X_train = np.array(X_train)

samples = int(X_train.shape[1] / batch_size)
epoches = 10
loss_m = []

with tqdm(total=epoches) as t:
    for i in range(epoches):
        los = 0
        for j in range(samples):
            los += m.train(X_train[:,j*batch_size: (j+1)*batch_size], Y_train[:,j*batch_size: (j+1)*batch_size])
            m.backward()
            
        
        loss_m.append(los/samples)
        t.set_description('Epoch {}'.format(i), refresh=False)
        t.set_postfix(loss=los/samples, refresh=False)
        t.update(1)

### CNN

In [None]:
X_train, Y_train, X_test, Y_test = data
Y_train = npy.vstack([onehot(i) for i in Y_train]).T
Y_train = np.array(Y_train)
Y_test = npy.vstack([onehot(i) for i in Y_test]).T
Y_test = np.array(Y_test)
X_train = X_train[:,None,:,:]
X_test = X_test[:,None,:,:]

X_train = np.array(X_train)


samples = len(X_train) // batch_size
epoches = 10

loss_c = []

for i in c.networks:
    if('alpha' in dir(i)):
        if('batch_size' in dir(i)):
            i.alpha = 0.0001
        else:
            i.alpha = 0.001


with tqdm(total=epoches) as t:
    for i in range(epoches):
        los = 0
        for j in range(samples):
            los += c.train(X_train[j*batch_size:(j+1)*batch_size], Y_train[:,j*batch_size:(j+1)*batch_size])
            c.backward()
            
        loss_c.append(los/samples)
        
        t.set_description('Epoch {}'.format(i), refresh=False)
        t.set_postfix(loss=los/samples, refresh=False)  # , acc=accuracy(c, X_train, y, len(X_train))
        t.update(1)

## 预测

In [None]:
fig1 = plt.subplot(121)
fig1.plot(range(len(loss_m)), np.array(loss_m).get())
fig2 = plt.subplot(122)
fig2.plot(range(len(loss_c)), np.array(loss_c).get())
plt.title('Loss with epoches')
plt.xlabel('epoches')
plt.ylabel('loss')
plt.savefig('./loss.jpg')