# 用NumPy搭建神经网络

## 模型代码

### 导入依赖包

In [1]:
from abc import abstractmethod
import numpy as np

### 模型抽象类

In [2]:
class Module(object):
    def __init__(self) -> None:
        super(Module, self).__init__()
    
    @abstractmethod
    def forward(self):
        pass
    
    def __call__(self, *args, **kwds):
        return self.forward(*args, **kwds)

### 全连接层

In [3]:
class Linear(Module):
    def __init__(self, in_features, out_features, bias=False) -> None:
        super(Linear, self).__init__()
        self.W = np.random.normal(size=(in_features, out_features))
        self.X = None
        self.bias = None
        if bias:
            self.bias = np.random.normal(size=(out_features))
    
    def forward(self, X):
        """
        X: (batch_size, hidden_size)
        """
        self.X = X
        Y = np.dot(X, self.W)
        if self.bias is not None:
            Y = Y + self.bias
        return Y
    
    def backward(self, delta_Y, lr):
        """
        delta_Y: (batch_size, output_size)
        """
        delta_Y_ = np.dot(delta_Y, self.W.transpose())
        self.W = self.W - np.dot(self.X.transpose(), delta_Y) * lr
        if self.bias is not None:
            self.bias -= np.average(delta_Y, axis=0)
        return delta_Y_

### tanh激活函数层

In [4]:
class Tanh(Module):
    def __init__(self) -> None:
        super(Tanh, self).__init__()
        self.Y = None

    def forward(self, X):
        self.Y = np.tanh(X)
        return self.Y

    def backward(self, delta_Y):
        return np.multiply((1 - self.Y ** 2), delta_Y)

### Softmax层

In [5]:
class Softmax(Module):
    def __init__(self) -> None:
        super(Softmax, self).__init__()
        self.exps = None
        self.exps_sum = None
    
    def forward(self, X):
        """
        X: (batch_size, features)
        """
        C = np.max(X)
        self.exps = np.exp(X - C)
        self.exps_sum = np.sum(self.exps, axis=1).reshape((-1, 1))
        return np.divide(self.exps, self.exps_sum)

    def backward(self, delta_Y):
        """
        delta_Y: (batch_size, features)
        """
        exps_sum_square = self.exps_sum ** 2
        ii_matrix = np.multiply(self.exps, self.exps_sum) / exps_sum_square # (batch_size, features)
        ij_matrix = - np.matmul(self.exps[:, :, np.newaxis], self.exps[:, np.newaxis, :]) / exps_sum_square[:, :, np.newaxis] # (batch_size, features, features)
        ij_Y = np.multiply(delta_Y[:, :, np.newaxis], ij_matrix).sum(axis=1) # (batch_size, features)
        delta_Y = ij_Y + np.multiply(delta_Y, ii_matrix) # (batch_size, features)
        return delta_Y

### Log对数函数层

In [6]:
class Log(Module):
    def __init__(self) -> None:
        super(Log, self).__init__()
        self.inf = 1e-10
        self.X = None
    
    def forward(self, X):
        self.X = X
        return np.log(X + self.inf)
    
    def backward(self, delta_Y):
        return np.multiply(1 / (self.X + self.inf), delta_Y)

### 损失函数层

In [7]:
class NLLloss(Module):
    def __init__(self) -> None:
        super(NLLloss, self).__init__()
        self.target = None
        self.loss = None
    
    def forward(self, Y, target):
        """
        Y: (batch_size, features)
        target: (batch_size)
        """
        self.target = np.ones(shape=Y.shape) * 1e-6
        for i, j in enumerate(target):
            self.target[i, j] = 1
        self.loss = -np.sum(np.multiply(Y, self.target)) / len(target)
        return self.loss
    
    def backward(self):
        return - self.target * self.loss

## 训练代码

### 导入依赖包

In [8]:
from keras.datasets import mnist
import numpy as np
from tqdm import tqdm

Using TensorFlow backend.


### 数据加载与处理

In [9]:
np.random.seed(42)
(X_train, y_train), (X_test, y_test) = mnist.load_data()
X_train = X_train / 255
X_test = X_test / 255

# 打乱训练集
index = list(range(len(X_train)))
np.random.shuffle(index)
X_train = X_train[index]
y_train = y_train[index]

index = list(range(len(X_test)))
np.random.shuffle(index)
X_test = X_test[index]
y_test = y_test[index]

### 模型建立

In [10]:
# 训练参数
epochs = 30
batch_size_train = 64
batch_size_test = 1000
lr = 0.0003

# 模型
linear1 = Linear(in_features=28 * 28, out_features=1024, bias=True)
tanh1 = Tanh()
linear2 = Linear(in_features=1024, out_features=10, bias=True)
softmax = Softmax()
log = Log()
nllloss = NLLloss()

### 模型训练

In [11]:
for epoch in range(epochs):
    """ train """
    process_bar = tqdm(range(len(X_train) // batch_size_train), ncols=150)
    for itor in process_bar:
        X = X_train[itor * batch_size_train: itor * batch_size_train + batch_size_train]
        y = y_train[itor * batch_size_train: itor * batch_size_train + batch_size_train]
        """ 前向传播 """
        tmp = X.reshape((batch_size_train, -1))
        tmp = linear1(tmp)
        tmp = tanh1(tmp)
        tmp = linear2(tmp)
        tmp = softmax(tmp)
        pre = np.argmax(tmp, axis=1)

        train_acc = np.sum(y == pre)
        train_total = len(y)
        
        tmp = log(tmp)
        loss = nllloss(tmp, y)
        """ 反向传播 """
        Y = nllloss.backward()
        Y = log.backward(Y)
        Y = softmax.backward(Y)
        Y = linear2.backward(Y, lr)
        Y = tanh1.backward(Y)
        Y = linear1.backward(Y, lr)

        process_bar.set_description('Train epoch:{} '.format(epoch + 1))
        process_bar.set_postfix_str('loss: {:.4f}  Acc:{:.2f}%'.format(
                                    loss, 100. * train_acc / train_total))
    
    """ test """
    test_total = 0
    test_acc = 0
    test_process_bar = tqdm(range(len(X_test) // batch_size_test), ncols=150)
    for itor in test_process_bar:
        X = X_test[itor * batch_size_test: itor * batch_size_test + batch_size_test]
        y = y_test[itor * batch_size_test: itor * batch_size_test + batch_size_test]

        tmp = X.reshape((batch_size_test, -1))
        tmp = linear1(tmp)
        tmp = tanh1(tmp)
        tmp = linear2(tmp)
        Y = softmax(tmp)
        Y = np.argmax(Y, axis=1)
        test_total += len(y)
        test_acc += np.sum(y == Y)
        test_process_bar.set_description('Test epoch:{} '.format(epoch + 1))
        test_process_bar.set_postfix_str('Acc [{}/{} ({:.2f}%)]'.format(
                                        test_acc, test_total, 100. * test_acc/test_total))

Train epoch:1 : 100%|█████████████████████████████████████████████████████████████████████| 937/937 [00:13<00:00, 71.38it/s, loss: 1.0747  Acc:93.75%]
Train epoch:2 : 100%|█████████████████████████████████████████████████████████████████████| 937/937 [00:13<00:00, 71.95it/s, loss: 0.5686  Acc:93.75%]
Train epoch:3 : 100%|█████████████████████████████████████████████████████████████████████| 937/937 [00:13<00:00, 71.20it/s, loss: 0.5429  Acc:92.19%]
Train epoch:4 : 100%|█████████████████████████████████████████████████████████████████████| 937/937 [00:13<00:00, 69.63it/s, loss: 0.5103  Acc:92.19%]
Train epoch:5 : 100%|█████████████████████████████████████████████████████████████████████| 937/937 [00:13<00:00, 68.80it/s, loss: 0.5022  Acc:93.75%]
Train epoch:6 : 100%|█████████████████████████████████████████████████████████████████████| 937/937 [00:13<00:00, 68.64it/s, loss: 0.4981  Acc:93.75%]
Train epoch:7 : 100%|█████████████████████████████████████████████████████████████████████| 93