In [1]:
import numpy as np

In [2]:
class Loader(object):
    def __init__(self, path, count):
        # count: the sample number 
        self.path = path
        self.count = count
    def get_file_content(self):
        f = open(self.path, 'rb')
        content = f.read()   # 读取字节流
        f.close()
        return content   # 放回字节数组

In [3]:
class ImageLoader(Loader):
    # 从文件数组中获取第index个图片数据
    def get_picture(self, content, index):
        start = index*28*28 + 16  # 文件头16个字节，后面每28*28个字节跟一个图片数据
        picture = []
        for i in range(28):
            picture.append([])
            for j in range(28):
                byte = content[start + i*28 + j]
                picture[i].append(byte)
        return picture
    # 获取一个图像数据sample，这里需要将图片转换成为784行向量的形式
    def get_one_sample(self, picture):
        sample = []
        for i in range(28):
            for j in range(28):
                sample.append(picture[i][j])
        return sample
    # 加载数据文件，获取全部样本向量。onerow来表示是否转化成为一行向量
    def load(self, onerow = False):
        content = self.get_file_content()
        data_set = []
        for index in range(self.count):
            onepic = self.get_picture(content, index)
            if onerow : onepic = self.get_one_sample(onepic)
            data_set.append(onepic)
        return data_set

In [4]:
# 数据标签加载器
class LabelLoader(Loader):
    def load(self):
        content = self.get_file_content()
        labels = []
        for index in range(self.count):
            onelable = content[index + 8] # 文件头有8个字节
            onelablevec = self.norm(onelable)
            labels.append(onelablevec)
        return labels
    def norm(self, label):
        label_vec = []
        for i in range(10):
            if i == label:
                label_vec.append(0.9)
            else:
                label_vec.append(0.1)
        return label_vec

In [5]:
# 获取训练数据集, onerow表示是否转化为行向量
def get_training_data_set(num, onerow = False):
    image_loader = ImageLoader("train-images.idx3-ubyte", num)
    label_loader = LabelLoader("train-labels.idx1-ubyte", num)
    return image_loader.load(onerow), label_loader.load()
def get_test_data_set(num, onerow = False):
    image_loader = ImageLoader("t10k-images.idx3-ubyte", num)
    label_loader = LabelLoader("t10k-labels.idx1-ubyte", num)
    return image_loader.load(onerow), label_loader.load()

In [6]:
# 定义一个函数，将784行向量打印出来
def printImg(onepic):
    onepic = onepic.reshape(28, 28)
    for i in range(28):
        for j in range(28):
            if onepic[i][j] == 0 :
                print(" ", end = '')
            else :
                print("*", end = '')
        print('')

In [7]:
if __name__ == "__main__":
    train_data_set, train_data_label = get_training_data_set(100)
    train_data_set = np.array(train_data_set)
    train_data_labels = np.array(train_data_label)
    for i in range(10):
        onepic = train_data_set[i]
        printImg(onepic)
        print(train_data_labels[i].argmax())

                            
                            
                            
                            
                            
            ************    
        ****************    
       ****************     
       ***********          
        ******* **          
         *****              
           ****             
           ****             
            ******          
             ******         
              ******        
               *****        
                 ****       
              *******       
            ********        
          *********         
        **********          
      **********            
    **********              
    ********                
                            
                            
                            
5
                            
                            
                            
                            
               *****        
              ******        
            

### 建立全连接网络模块

In [8]:
import random
import datetime
import numpy as np

In [9]:
# sigmoid 函数类
class SigmoidActivator(object):
    def forward(self, x): # 前向传播计算输出
        return (1 / ( 1+ np.exp(-x)))
    def backward(self, output): # 反向传播计算梯度
        return np.multiply(output, (1-output))

In [None]:
#### 实现全连接层的每一层，作为一个类来实现
class FullyConnectedLayer(object):
    def __init__(self, input_size, output_size, activator):
        self.input_size = input_size
        self.output_size = output_size
        self.activator = activator
        self.W = np.random.rand(output_size, input_size)
        self.b = np.zeros((output_size, 1)) 
        self.output = np.zeros((output_size, 1)) # 输出初始化为全零列向量
    #前向计算，计算输出。input_array：输入向量，维度必须是input_size
    def forward(self, input_array):
        self.input = input_array
        self.output = self.activator.forward(np.dot(self.W, input_array) + self.b)
    def backward(self, delta_array):
        self.delta = np.multiply(self.activator.backward(self.input), np.dot(self.W.T, delta_array))
        self.W_grad = np.dot(delta_array, self.input.T)
        self.b_grad = delta_array
        
    def update(self, lr):
        self.W -= lr*self.W_grad
        self.b -= lr*self.b_grad

In [None]:
#### 实现神经网络类
class NetWork(object):
    def __init__(self, layers):
        # 通过layers初始化一个神经网络结构,包含描述神经网络的输入层节点数，隐含层节点数，输出层节点数
        self.layers = []
        for i in range(len(layers) - 1):
            self.layers.append(FullyConnectedLayer(layers[i], layers[i+1], SigmoidActivator()))
    def train(self, labels, data_set, lr, epoch):
        # 训练函数：输入labels:样本标签矩阵，data_set:数据集，lr:学习率，epoch:训练次数
        for i in range(epoch):
            for j in range(len(data_set)):
                oneobject = np.array(data_set[j]).reshape(-1, 1)  # 一维列向量
                onelabel = np.array(labels[j]).reshape(-1, 1)
                self.train_one_sample(oneobject, onelabel, lr)
    def train_one_sample(self, oneobject, onelabel, lr):
        self.predict(oneobject)
        self.calc_gradient(onelabel)
        self.update_weight(lr)
    def predict(self, oneobject):
        oneobject = oneobject.reshape(-1, 1)
        output = oneobject
        for layer in self.layers:
            layer.forward(output)
            output = layer.output
        return output
    def calc_gradient(self, label):
        delta = np.multiply(self.layers[-1].activator.backward(self.layers[-1].output), (label - self.layers[-1].output))
        for layer in self.layers[::-1]:
            layer.backward(delta)
            delta = layer.delta
        return delta
    def update_weight(self, lr):
        for layer in self.layers:
            layer.update(lr)

In [12]:
# 根据返回结果，计算所属类型
def value2type(vec):
    # 获取概率最大的分类, 返回的是一个vec是一个列向量，获取最大的一个值
    return vec.argmax(axis = 0)

In [13]:
# 对网络进行评估，用错误率来评估网络优劣
def evaluate(network, test_data, test_labels):
    error = 0
    total = test_data.shape[0]
    for i in range(total):
        label = value2type(test_labels[i])
        predict = value2type(network.predict(test_data[i]))
        if label != predict:
            error += 1
    return float(error) / float(total)

In [14]:
if __name__ == '__main__':
    # 用神经网络实现and运算
    data_set = np.array([[0, 0], [1, 0], [0, 1], [1, 1]])
    labels = np.array([[1, 0], [1, 0], [1, 0], [0, 1]])
    net = NetWork([2, 1, 2])
    lr = 2
    epoch = 10000
    net.train(labels, data_set, lr, epoch)
    for layer in net.layers:
        print("Weight:", layer.W)
        print("Bias:", layer.b)
    for i in range(2):
        for j in range(2):
            sample = np.array([[i,j]])
            result = net.predict(sample)
            type = value2type(result)
            print("The predict of this classification is: ", type)

Weight: [[1.31801499 1.1922764 ]]
Bias: [[2.56634141]]
Weight: [[-4.59209893]
 [ 5.2841543 ]]
Bias: [[-7.27570186]
 [ 6.62041882]]
The predict of this classification is:  [1]
The predict of this classification is:  [1]
The predict of this classification is:  [1]
The predict of this classification is:  [1]


### 训练手写数字识别器

In [17]:
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

In [20]:
# 训练和评估，每训练10轮，评估一次精确度，当开始accuracy开始下降的时候，停止
def train_evalute():
    # 获取6000个训练数据，并且变成1行
    train_set, train_labels = get_training_data_set(6000, True)
    test_set, test_labels = get_test_data_set(1000, True) # 获取测试数据集
    train_data = np.array(train_set)
    test_data = np.array(test_set)
    train_labels = np.array(train_labels)
    test_labels = np.array(test_labels)
    # 定义神经网络结构，输入为784， 中间神经元300个，输出为10的分类
    network = NetWork([784, 300, 10])
    epoch = 0
    error_rate = 0.0
    error_rates = []
    while True:
        epoch += 1
        lr = 0.3
        network.train(train_labels, train_data, lr, 1)  # 开始训练学习率为0.3，训练1次
        error_rate = evaluate(network, test_data, test_labels)
        error_rates.append(error_rate)
        print("%s epoch %d finished with %.3f error rate" % (datetime.datetime.now(), epoch, error_rate))
        if error_rate < 0.1:
            break
#         if epoch %10 == 0:
#             error_rate = evaluate(network, test_data, test_labels)
#             print("%s After epoch %d, the error rate is: %.3f" % (datetime.datetime.now(), epoch, error_rate))
#             # 定义停止条件
#             if error_rate < 0.1:
#                 break
    index = 0
    for layer in network.layers:
        np.savetxt("MNIST-W" + str(index), layer_w)
        np.savetxt("MNIST-B" + str(index), layer_b)
        index += 1
        print(layer_w)
        print(layer_b)
    plt.plot(list(range(len(error_rates))), error_rates)
if __name__ == "__main__":
    train_evalute()

KeyboardInterrupt: 