# 使用纯Python实现的神经网络训练MNIST数据集

## 0.a结构图

![神经网络结构图](./nn.png "神经网络结构图")

## 0.b公式

- 前传：
$\begin{eqnarray*}
\left\{\begin{aligned}
& \alpha_h = \sum_{i=1}^{d}w_{ih}x_i + b_h \\
& a_h = \sigma{(\alpha_h)} \\
& \beta_j = \sum_{h=1}^{q}w_{hj}a_h + b_j \\
& \hat{y_j} = \sigma{(\beta_j)} \\
& c_i = \frac{1}{2}(\hat{y_i}-y_i)^{2}
\end{aligned}\right.
\end{eqnarray*}$

- 反传：
$\begin{eqnarray*}
\left\{\begin{aligned}
& \delta = \frac{\partial c_j}{\partial \beta_j} = (\hat{y_i}-y_i)\sigma^{'}{(\beta_j)}\\
& \Delta{b_j} = -\eta \frac{\partial c_j}{\partial b_j} = -\eta \frac{\partial c_j}{\partial \beta_j} \frac{\partial \beta_j}{\partial b_j} = -\eta \delta\\
& \Delta{w_{hj}} = -\eta \frac{\partial c_j}{\partial w_j} = -\eta \frac{\partial c_j}{\partial \beta_j} \frac{\partial \beta_j}{\partial w_j} = -\eta \delta a_h \\
& \Delta{b_h} = -\eta \frac{\partial c_j}{\partial a_h} \frac{\partial a_h}{\partial \alpha_h} \frac{\partial \alpha_h}{\partial b_h} = -\eta \frac{\partial c_j}{\partial \beta_j} \frac{\partial \beta_j}{\partial a_h} \frac{\partial a_h}{\partial \alpha_h} \frac{\partial \alpha_h}{\partial b_h} = -\eta \delta w_{hj} \sigma^{'}{(\alpha_h)} \\
& \Delta{w_{ih}} = -\eta \frac{\partial c_j}{\partial a_h} \frac{\partial a_h}{\partial \alpha_h} \frac{\partial \alpha_h}{\partial w_{ih}} = -\eta \frac{\partial c_j}{\partial \beta_j} \frac{\partial \beta_j}{\partial a_h} \frac{\partial a_h}{\partial \alpha_h} \frac{\partial \alpha_h}{\partial w_{ih}} = -\eta \delta w_{hj} \sigma^{'}{(\alpha_h)}x_i
\end{aligned}\right.
\end{eqnarray*}$

## 1.引入依赖包

In [1]:
import numpy as np
import matplotlib.pyplot as plt

%matplotlib inline

## 2.定义神经网络

In [2]:
class NeuralNet(object):
    
    # 初始化神经网络，sizes是每层的神经元个数,len(sizes)神经网络的层数
    def __init__(self, sizes, activation='sigmoid'):
        self.sizes_     = sizes
        self.num_layers_ = len(sizes)
        self.w_ = [np.random.randn(y, x) for x, y in zip(sizes[:-1], sizes[1:])]
        self.b_ = [np.random.randn(y, 1) for y in sizes[1:]]
        
        self.activate = self.sigmoid if activation == 'sigmoid' else self.tanh
        self.activate_prime = self.sigmoid_prime if activation == 'sigmoid' else self.tanh_prime
    
    # Sigmoid函数，S型曲线，
    def sigmoid(self, z):
        return 1.0/(1.0+np.exp(-z))
    # Sigmoid函数的导函数
    def sigmoid_prime(self, z):
        return self.sigmoid(z)*(1-self.sigmoid(z))
    
    # tanh函数，S型曲线
    def tanh(self, z):
        return np.tanh(z)
    
    # tanh函数的导函数
    def tanh_prime(self, z):
        return 1 - np.tanh(z)**2
 
    # 目标函数（cost=1/2*(y_predict - y_real)**2）的导函数（cost_derivative=y_predict - y_real）
    def cost_derivative(self, output_activations, y):
        return (output_activations-y)
    
    def feedforward(self, x):
        for b, w in zip(self.b_, self.w_):
            x = self.sigmoid(np.dot(w, x)+b)
        return x
 
    def backprop(self, x, y):
        nabla_b = [np.zeros(b.shape) for b in self.b_]
        nabla_w = [np.zeros(w.shape) for w in self.w_]
 
        activation = x
        activations = [x]
        zs = []
        for b, w in zip(self.b_, self.w_):
            z = np.dot(w, activation)+b
            zs.append(z)
            activation = self.activate(z)
            activations.append(activation)

        delta = self.cost_derivative(activations[-1], y) * self.activate_prime(zs[-1])
        nabla_b[-1] = delta
        nabla_w[-1] = np.dot(delta, activations[-2].transpose())
 
        for l in range(2, self.num_layers_):
            z = zs[-l]
            sp = self.activate_prime(z)
            delta = np.dot(self.w_[-l+1].transpose(), delta) * sp
            nabla_b[-l] = delta
            nabla_w[-l] = np.dot(delta, activations[-l-1].transpose())
        return (nabla_b, nabla_w)
 
    def update_mini_batch(self, mini_batch, eta):
        nabla_b = [np.zeros(b.shape) for b in self.b_]
        nabla_w = [np.zeros(w.shape) for w in self.w_]
        for x, y in mini_batch:
            delta_nabla_b, delta_nabla_w = self.backprop(x, y)
            nabla_b = [nb+dnb for nb, dnb in zip(nabla_b, delta_nabla_b)]
            nabla_w = [nw+dnw for nw, dnw in zip(nabla_w, delta_nabla_w)]
        self.w_ = [w-(eta/len(mini_batch))*nw for w, nw in zip(self.w_, nabla_w)]
        self.b_ = [b-(eta/len(mini_batch))*nb for b, nb in zip(self.b_, nabla_b)]
 
    # training_data是训练数据(x, y);epochs是训练次数;mini_batch_size是每次训练样本数;eta是learning rate
    def SGD(self, training_data, epochs, mini_batch_size, eta, test_data=None):
        if test_data:
            n_test = len(test_data)
 
        n = len(training_data)
        for j in  range(epochs):
            np.random.shuffle(training_data)
            mini_batches = [training_data[k:k+mini_batch_size] for k in range(0, n, mini_batch_size)]
            for mini_batch in mini_batches:
                self.update_mini_batch(mini_batch, eta)
            if test_data:
                print("Epoch {0}: {1} / {2}".format(j, self.evaluate(test_data), n_test))
            else:
                print("Epoch {0} complete".format(j))
 
    def evaluate(self, test_data):
        test_results = [(np.argmax(self.feedforward(x)), y) for (x, y) in test_data]
        return sum(int(x == y) for (x, y) in test_results)
 
    # 预测
    def predict(self, data):
        value = self.feedforward(data)
        return value.tolist().index(max(value))
 
    # 保存训练模型
    def save(self):
        pass  # 把_w和_b保存到文件(pickle)
    def load(self):
        pass

## 3.加载数据

In [3]:
# http://g.sweyla.com/blog/2012/mnist-numpy/
import os, struct
import numpy as np
from array import array as pyarray
from numpy import append, array, int8, uint8, zeros
 
def load_mnist(dataset="training_data", digits=np.arange(10), path="."):

    if dataset == "training_data":
        fname_image = os.path.join(path, 'train-images-idx3-ubyte')
        fname_label = os.path.join(path, 'train-labels-idx1-ubyte')
    elif dataset == "testing_data":
        fname_image = os.path.join(path, 't10k-images-idx3-ubyte')
        fname_label = os.path.join(path, 't10k-labels-idx1-ubyte')
    else:
        raise ValueError("dataset must be 'training_data' or 'testing_data'")
    
    flbl = open(fname_label, 'rb')
    magic_nr, size = struct.unpack(">II", flbl.read(8))
    lbl = pyarray("b", flbl.read())
    flbl.close()

    fimg = open(fname_image, 'rb')
    magic_nr, size, rows, cols = struct.unpack(">IIII", fimg.read(16))
    img = pyarray("B", fimg.read())
    fimg.close()

    ind = [ k for k in range(size) if lbl[k] in digits ]
    N = len(ind)

    images = zeros((N, rows, cols), dtype=uint8)
    labels = zeros((N, 1), dtype=int8)
    for i in range(len(ind)):
        images[i] = array(img[ ind[i]*rows*cols : (ind[i]+1)*rows*cols ]).reshape((rows, cols))
        labels[i] = lbl[ind[i]]

    return images, labels

def load_samples(dataset="training_data"):
    image,label = load_mnist(dataset)
    #print(image[0].shape, image.shape)   # (28, 28) (60000, 28, 28)
    #print(label[0].shape, label.shape)   # (1,) (60000, 1)
    #print(label[0])   # 5
 
    # 把28*28二维数据转为一维数据
    X = [np.reshape(x,(28*28, 1)) for x in image]
    X = [x/255.0 for x in X]   # 灰度值范围(0-255)，转换为(0-1)
    #print(X.shape)
    
        # 5 -> [0,0,0,0,0,1.0,0,0,0];  1 -> [0,1.0,0,0,0,0,0,0,0]
    def vectorized_Y(y): 
        e = np.zeros((10, 1))
        e[y] = 1.0
        return e

    if dataset == "training_data":
        Y = [vectorized_Y(y) for y in label]
        pair = list(zip(X, Y))
        return pair

    elif dataset == 'testing_data':
        pair = list(zip(X, label))
        return pair
    else:
        print('Something wrong')

## 4.训练

In [5]:
if __name__ == '__main__':
    INPUT = 28*28
    OUTPUT = 10
    net = NeuralNet([INPUT, 28, OUTPUT], activation='sigmoid')

    train_set = load_samples(dataset='training_data')
    test_set = load_samples(dataset='testing_data')

    net.SGD(train_set, 13, 100, 3.0, test_data=test_set)

    #准确率
    correct = 0;
    for test_feature in test_set:
        if net.predict(test_feature[0]) == test_feature[1][0]:
            correct += 1
    print "准确率: ", correct*1.0/len(test_set)

Epoch 0: 5993 / 10000
Epoch 1: 7429 / 10000
Epoch 2: 8534 / 10000
Epoch 3: 8792 / 10000
Epoch 4: 8925 / 10000
Epoch 5: 8995 / 10000
Epoch 6: 9056 / 10000
Epoch 7: 9081 / 10000
Epoch 8: 9124 / 10000
Epoch 9: 9137 / 10000
Epoch 10: 9171 / 10000
Epoch 11: 9172 / 10000
Epoch 12: 9190 / 10000
准确率:  0.919


### 下载数据

In [31]:
import os
# import gzip
import urllib2

names = ['train-images-idx3-ubyte.gz','train-labels-idx1-ubyte.gz', 't10k-images-idx3-ubyte.gz', 't10k-labels-idx1-ubyte.gz']
urls  = ['http://yann.lecun.com/exdb/mnist/'+name for name in names]
for url, name in zip(urls, names):
    print 'downloading:',url
    open(name, 'wb').write(urllib2.urlopen(url).read())
    print 'finish, amount {} bytes!'.format(os.path.getsize(name))
    
    # print 'decompress {}'.format(name)
    # open(name[:-3], 'wb').write(gzip.zlib.decompress(name))
    # print 'finish.'

downloading: http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
finish, amount 9912422 bytes!
downloading: http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz
finish, amount 28881 bytes!
downloading: http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz
finish, amount 1648877 bytes!
downloading: http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz
finish, amount 4542 bytes!


### 解压：

```shell
$ls | grep gz | xargs gzip -d

```