In [1]:
import numpy as np 
import pandas as pd
import matplotlib.pyplot as plt 
import seaborn as sbn

In [2]:
#全连接层实现类
class FullConnectedLayer():
    def __init__(self, input_size, output_size, activator):
        '''
        构造函数
        input_size: 本层输入向量的纬度
        output_size: 本层输出向量的纬度
        activator: 激活函数
        '''
        
        self.input_size = input_size
        self.output_size = output_size
        self.activator = activator
        # 权重数组W
        self.W = np.random.uniform(-0.1, 0.1, (output_size, input_size))
        
        # 偏置项b
        self.b = np.zeros((output_size, 1))
        
        # 输出项
        self.output = np.zeros((output_size, 1))
        
    def forword(self, input_array):
        '''
        前向计算
        input_array: 输入向量，纬度必须等于input_size
        '''
        self.input = input_array
        self.outpu = self.activator.forward(
            np.dot(self.W, input_array) + self.b)
    
    def backward(self, delta_array):
        '''
        反向计算W和B的梯度
        delta_array: 从上一层传递过来的误差项
        '''
        self.delta = self.activator.backward(self.input) * np.dot(self.W.T, delta_array)
        self.W_grad = np.dot(delta_array, self.input.T)
        self.b_grad = delta_array
        
    def update(self, learning_rate):
        '''
        梯度下降算法更新权重
        '''
        self.W += learning_rate * self.W_grad
        self.b += learngin_rate * self.b_grad
        

# 激活函数
class SigmoidActivator():
    def forward(self, weighted_input):
        return 1.0 / (1.0 + np.exp(-weight_input))

    def backward(self, output):
        return output * (1 - output)
    
# 神经网络类
class Network():
    def __init__(self, layers):
        '''
        构造函数
        '''
        self.layers = []
        for i in range(len(layers) - 1):
            self.layers.append(
                FullConnectedLayer(
                    layers[i], layers[i+1],
                    SigmoidActivator()
                )
            )
        
        def predict(self, sample):
            '''
            使用神经网络实现预测
            sample: 输入样本
            '''
            output = sample
            for layer in self.layers:
                layer.forward(output)
                output = layer.output
            return output
        
        def train(self, labels, data_set, rate, epoch):
            '''
            训练函数
            labels: 样本标签
            data_set: 输入样本
            rate: 学习速率
            epoch: 训练轮数
            '''
            for i in range(epoch):
                for d in range(len(date_set)):
                    self.train_one_sample(labels[d], data_set[d], rate)
        
        def train_one_sample(self, label, sample, rate):
            self.predict(sample)
            self.calc_gradient(label)
            self.update_weight(rate)
        
        def calc_gradient(self, label):
            delta = self.layers[-1].activator.backward(self.layers[-1].output
                                                      ) * (label - self.layers[-1].output)
            for layer in self.layers[::-1]:
                layer.backward(delta)
                delta = layer.delta
            return delta
        
        def update_weight(self, rate):
            for layer in self.layers:
                layer.update(rate)

In [26]:
import struct
from bp import *
from datetime import datetime
# 数据加载器基类
class Loader():
    def __init__(self, path, count):
        '''
        初始化加载器
        path: 数据文件路径
        count: 文件中的样本个数
        '''
        self.path = path
        self.count = count
    def get_file_content(self):
        '''
        读取文件内容
        '''
        f = open(self.path, 'rb')
        content = f.read()
        f.close()
        return content
    def to_int(self, byte):
        '''
        将unsigned byte字符转换为整数
        '''
#         return struct.unpack('B', byte)[0]
        return byte
    
# 图像数据加载器
class ImageLoader(Loader):
    def get_picture(self, content, index):
        '''
        内部函数，从文件中获取图像
        '''
        start = index * 28 * 28 + 16
        picture = []
        for i in range(28):
            picture.append([])
            for j in range(28):
                picture[i].append(
                    self.to_int(content[start + i * 28 + j]))
        return picture
    def get_one_sample(self, picture):
        '''
        内部函数，将图像转化为样本的输入向量
        '''
        sample = []
        for i in range(28):
            for j in range(28):
                sample.append(picture[i][j])
        return sample
    def load(self):
        '''
        加载数据文件，获得全部样本的输入向量
        '''
        content = self.get_file_content()
        data_set = []
        for index in range(self.count):
            data_set.append(
                self.get_one_sample(
                    self.get_picture(content, index)))
        return data_set
# 标签数据加载器
class LabelLoader(Loader):
    def load(self):
        '''
        加载数据文件，获得全部样本的标签向量
        '''
        content = self.get_file_content()
        labels = []
        for index in range(self.count):
            labels.append(self.norm(content[index + 8]))
        return labels
    def norm(self, label):
        '''
        内部函数，将一个值转换为10维标签向量
        '''
        label_vec = []
        label_value = self.to_int(label)
        for i in range(10):
            if i == label_value:
                label_vec.append(0.9)
            else:
                label_vec.append(0.1)
        return label_vec
    
def get_training_data_set():
    '''
    获得训练数据集
    '''
    image_loader = ImageLoader('./dataset/train-images-idx3-ubyte', 60000)
    label_loader = LabelLoader('./dataset/train-labels-idx1-ubyte', 60000)
    return image_loader.load(), label_loader.load()

def get_test_data_set():
    '''
    获得测试数据集
    '''
    image_loader = ImageLoader('./dataset/t10k-images-idx3-ubyte', 10000)
    label_loader = LabelLoader('./dataset/t10k-labels-idx1-ubyte', 10000)
    return image_loader.load(), label_loader.load()

def get_result(vec):
    max_value_index = 0
    max_value = 0
    for i in range(len(vec)):
        if vec[i] > max_value:
            max_value = vec[i]
            max_value_index = i
    return max_value_index

def evaluate(network, test_data_set, test_labels):
    error = 0
    total = len(test_data_set)
    for i in range(total):
        label = get_result(test_labels[i])
        predict = get_result(network.predict(test_data_set[i]))
        if label != predict:
            error += 1
    return float(error) / float(total)

def train_and_evaluate():
    last_error_ratio = 1.0
    epoch = 0
    train_data_set, train_labels = get_training_data_set()
    test_data_set, test_labels = get_test_data_set()
    network = Network([784, 300, 10])
    while True:
        epoch += 1
        network.train(train_labels, train_data_set, 0.3, 1)
        print('%s epoch %d finished' % (now(), epoch))
        if epoch % 10 == 0:
            error_ratio = evaluate(network, test_data_set, test_labels)
            print('%s after epoch %d, error ratio is %f' % (now(), epoch, error_ratio))
            if error_ratio > last_error_ratio:
                break
            else:
                last_error_ratio = error_ratio
                
train_and_evaluate()

KeyboardInterrupt: 

In [27]:
train_data_set, train_labels = get_training_data_set()

In [38]:
len(train_data_set[0])

784