In [1]:
import pandas as pd
import numpy as np
import math
import os
import re

## 最基础的全连接网络

In [None]:
class FullyConnectedNetwork(object):
    def __init__(self, shape, **paras):
        self.shape = tuple(shape) # 全连接网络的机构
        self.deep = len(shape) # 网络的层数
        self.layer_id = tuple(range(self.deep))

        self.weights = {i:np.matrix(np.random.randn(self.shape[i], self.shape[i - 1])) for i in self.layer_id[1:]} # 随机初始化weights
        self.bias = {i:np.matrix(np.random.randn(self.shape[i], 1)) for i in self.layer_id[1:]}
        self.d_z = {i:None for i in self.layer_id[1:]}
        self.a = {i:None for i in self.layer_id}
        self.z = {i:None for i in self.layer_id[1:]}
        self.y_pred = None
    
    def forward_propagation(self, train_data_mat):
        self.a[0] = train_data_mat
        for i in self.layer_id[1:]:
            self.z[i] = self.weights[i] * self.a[i - 1] + self.bias[i] # z=wx + b # broadcast
            self.a[i] = self.activate(self.z[i])
        
        y_pred_mat = np.matrix(self.a[self.layer_id[-1]])
        self.y_pred = y_pred_mat
        return y_pred_mat

    def back_propagation(self, train_label_mat, y_pred):
        d_l = self.cal_loss_derivation(y_pred, train_label_mat) # 计算loss
        self.d_z[self.layer_id[-1]] = d_l
        for i in sorted(self.layer_id[1: -1], reverse=True):
            d_a_i = self.weights[i + 1].T * self.d_z[i + 1] # wixwi+1 * wi+1xb
            d_z_i = np.multiply(self.activate_derivation(self.z[i]), d_a_i)
            self.d_z[i] = d_z_i
    
    def update_paras(self, alpha=2):
        for i in self.layer_id[1:]:
            w_gradient = self.d_z[i] * self.a[i - 1].T
            self.weights[i] = self.weights[i] - alpha * w_gradient / self.d_z[i].shape[1]

            b_gradient = self.d_z[i]
            self.bias[i] = self.bias[i] - alpha * np.average(b_gradient, axis=1)
    
    def fit(self, train_data, train_label, max_round=100, batch_size=None):
        if batch_size is None:
            data_index = list(range(len(train_data)))
            for round in range(max_round):
                np.random.shuffle(data_index)
                for i in np.data_index:
                    alpha=0.2
                    train_data_mat = np.matrix(train_data[i]).T # 转置为n*b
                    train_label_mat = np.matrix(train_label[i]).T # 转置为y*b
                    y_pred = self.forward_propagation(train_data_mat)
                    self.back_propagation(train_label_mat, y_pred)
                    self.update_paras(alpha)
        else: # 小批量（批量）更新
            data_index = list(range(len(train_data)))
            for round in range(max_round):
                np.random.shuffle(data_index)
                for i in range(math.floor(len(train_data) / batch_size) + 1):
                    batch_index = data_index[batch_size * i : batch_size * (i + 1)]
                    if len(batch_index) > 0:
                        alpha = 0.01
                        batch_data_mat = np.matrix(train_data[batch_index].T)
                        batch_label_mat = np.matrix(train_label[batch_index].T)

                        y_pred = self.forward_propagation(batch_data_mat)
                        self.back_propagation(batch_label_mat, y_pred)
                        self.update_paras(alpha)

    def predict(self, test_data_vec):
        # 将向量转换为矩阵便于计算
        test_data_mat = np.matrix(test_data_vec.reshape((self.shape[0], -1)))

        a = test_data_mat
        for i in self.layer_id[1:]:
            z = self.weights[i] * a + self.bias[i] # z=wx + b
            a = self.activate(z)
        
        y_pred_mat = np.matrix(a)
        return y_pred_mat
    
    def activate(self, z):
        a = self.sigmoid(z)
        return a
    
    @staticmethod
    def sigmoid(mat):
        x = np.array(mat).ravel()                                # 铺平
        y = []
        for i in range(len(x)):
            if  x[i] >= 0:
                y.append(1 / (1 + np.exp(-x[i])))
            else:
                y.append(np.exp(x[i]) / (1 + np.exp(x[i])))      # 当某一个元素小于0时，用另一个公式计算，解决上溢问题
        return np.matrix(np.array(y).reshape(mat.shape))
    
    def activate_derivation(self, z):
        da_dz = self.sigmoid_derivation(z)
        return da_dz
    
    @staticmethod
    def sigmoid_derivation(z):
        sigmoid_d = np.multiply(FullyConnectedNetwork.sigmoid(z), 1 - FullyConnectedNetwork.sigmoid(z))
        return sigmoid_d
    
    def cal_loss_derivation(self, y_pred, y_real):
        d_l = y_pred - y_real
        return d_l=


## 具有完善功能的全连接网络

In [7]:
class DNN(object):
    def __init__(self, shape, activation, loss_function='binary_cross_entrophy', **paras):
        self.shape = tuple(shape) # 全连接网络的机构
        self.deep = len(shape) # 网络的层数
        self.layer_id = tuple(range(self.deep))
        self.activation = {id : activation[id - 1] for id in self.layer_id[1:]}  # 从第二层开始有激活函数
        self.loss_function = loss_function
        if self.loss_function == 'softmax_ce':
            self.activation[self.layer_id[-1]] = 'softmax'

        self.weights = {i:np.matrix(np.random.randn(self.shape[i], self.shape[i - 1])) \
            for i in self.layer_id[1:]} # 随机初始化weights
        self.bias = {i:np.matrix(np.random.randn(self.shape[i], 1)) for i in self.layer_id[1:]}
        self.d_z = {i:None for i in self.layer_id[1:]}
        self.a = {i:None for i in self.layer_id}
        self.z = {i:None for i in self.layer_id[1:]}
        self.y_pred = None
    
    # 前向传播
    def forward_propagation(self, train_data_mat):
        self.a[0] = train_data_mat  # n*bs
        for i in self.layer_id[1:]:
            self.z[i] = self.weights[i] * self.a[i - 1] + self.bias[i]  # z=wx + b # broadcast # wi*bs
            self.a[i] = self.activate(self.z[i], i)  # wi*bs
        
        y_pred_mat = np.matrix(self.a[self.layer_id[-1]])
        self.y_pred = y_pred_mat
        return y_pred_mat

    # 反向传播
    def back_propagation(self, train_label_mat, y_pred):
        d_l_a = self.cal_loss_derivation(y_pred, train_label_mat) # 计算loss
        d_l_z = np.multiply(self.activate_derivation(self.z[self.layer_id[-1]], \
            self.layer_id[-1]), d_l_a)
        self.d_z[self.layer_id[-1]] = d_l_z  # 获取输出层的dz
        for i in sorted(self.layer_id[1: -1], reverse=True):
            d_a_i = self.weights[i + 1].T * self.d_z[i + 1] # wixwi+1 * wi+1xb
            d_z_i = np.multiply(self.activate_derivation(self.z[i], i), d_a_i)
            self.d_z[i] = d_z_i
    
    # 更新权重
    def update_paras(self, alpha=2):
        for i in self.layer_id[1:]:
            w_gradient = self.d_z[i] * self.a[i - 1].T
            self.weights[i] = self.weights[i] - alpha * w_gradient

            b_gradient = self.d_z[i]
            self.bias[i] = self.bias[i] - alpha * np.sum(b_gradient, axis=1)
    
    def fit(self, train_data, train_label, epochs=100, batch_size=1, \
        learning_rate=0.01, decay=1e-6):  # s*n
        data_index = list(range(train_data.shape[0]))
        for epoch in range(epochs):
            np.random.shuffle(data_index)
            for i in range(math.floor(train_data.shape[0] / batch_size) + 1):
                batch_index = data_index[batch_size * i : batch_size * (i + 1)]  # bs*n
                if len(batch_index) > 0:
                    alpha = max((0, learning_rate - epoch * decay))
                    batch_data_mat = np.matrix(train_data[batch_index].T)  # n*bs
                    batch_label_mat = np.matrix(train_label[batch_index].T)  # y*bs

                    y_pred = self.forward_propagation(batch_data_mat)
                    self.back_propagation(batch_label_mat, y_pred)
                    self.update_paras(alpha)

    def predict(self, test_data):
        # 将向量转换为矩阵便于计算
        a = np.matrix(test_data.T)
        for i in self.layer_id[1:]:
            z = self.weights[i] * a + self.bias[i] # z=wx + b
            a = self.activate(z, i)
        return a
    
    def activate(self, z, layer_id):
        activation_method = self.activation[layer_id]
        if activation_method == 'sigmoid':
            a = self.sigmoid(z)
        elif activation_method == 'tanh':
            a = self.tanh(z)
        elif activation_method == 'relu':
            a = self.relu(z)
        elif activation_method == 'softmax':
            a = self.softmax(z)
        return a
    
    def activate_derivation(self, z, layer_id):
        activation_method = self.activation[layer_id]
        if activation_method == 'sigmoid':
            da_dz = self.sigmoid_derivation(z)
        elif activation_method == 'tanh':
            da_dz = self.tanh_derivation(z)
        elif activation_method == 'relu':
            da_dz = self.relu_derivation(z)
        elif activation_method == 'softmax':
            da_dz = 1  # 在损失函数中直接计算好da_dz
        else:
            return None
        return da_dz
    
    @staticmethod
    def sigmoid(mat):
        mat = mat.copy()
        x = np.array(mat).ravel()                                # 铺平
        y = []
        for i in range(len(x)):
            if  x[i] >= 0:
                y.append(1 / (1 + np.exp(-x[i])))
            else:
                y.append(np.exp(x[i]) / (1 + np.exp(x[i])))      # 当某一个元素小于0时，用另一个公式计算，解决上溢问题
        return np.matrix(np.array(y).reshape(mat.shape))
    
    @staticmethod
    def sigmoid_derivation(z):
        z = z.copy()
        sigmoid_d = np.multiply(DNN.sigmoid(z), 1 - DNN.sigmoid(z))
        return sigmoid_d
    
    @staticmethod
    def tanh(mat):
        mat = mat.copy()
        result = (np.exp(mat) - np.exp(- mat)) / (np.exp(mat) + np.exp(- mat))
        return result

    @staticmethod
    def tanh_derivation(z):
        z = z.copy()
        result = 1 - np.power(DNN.tanh(z), 2)
        return result

    @staticmethod
    def relu(mat):
        mat = mat.copy()
        mat[mat < 0] = 0
        return mat
    
    @staticmethod
    def relu_derivation(z):
        z = z.copy()
        z[z < 0] = 0
        z[z >= 0] = 1
        return z
    
    @staticmethod
    def softmax(mat):
        mat = mat.copy()
        mat = np.exp(mat)
        mat_sum = np.sum(mat, axis=0)
        result = np.divide(mat + 1e-7, mat_sum + 1e-7)
        return result
    
    @staticmethod
    def softmax_derivation(z):
        pass
    
    def cal_loss_derivation(self, y_pred, y_real):
        if self.loss_function == 'binary_cross_entrophy':
            d_l = self.binary_cross_entrophy_derivation(y_pred, y_real)
        elif self.loss_function == 'cross_entrophy':
            d_l = self.cross_entrophy_derivation(y_pred, y_real)
        elif self.loss_function == 'softmax_ce':
            d_l = self.softmax_ce_derivation(y_pred, y_real)
        else:
            return None
        return d_l
    
    @staticmethod
    def binary_cross_entrophy_derivation(y_pred, y_real):
        result = (- np.divide(y_real + 1e-7, y_pred + 1e-7)) + \
            (np.divide(1 - y_real - 1e-7, 1 - y_pred - 1e-7))
        return result
    
    @staticmethod
    def cross_entrophy_derivation(y_pred, y_real):
        result = - np.divide(y_real + 1e-7, y_pred + 1e-7)
        return result
    
    @staticmethod
    def softmax_ce_derivation(y_pred, y_real):
        result = y_pred - y_real
        return result

### 测试数据

In [None]:
def cat_accuracy(y_pred, y_real):
    right_num = 0
    for i in range(y_pred.shape[1]):
        i_pred = np.argmax(y_pred.T[i])
        i_real = np.argmax(y_real.T[i])
        if i_pred == i_real:
            right_num += 1
    return right_num / y_pred.shape[1]

In [2]:
'''
导入数据集
'''
filename_ptn = re.compile('(\d)_\d+.txt')
train_path = './Data/trainingDigits'
test_path = './Data/testDigits'
x_train = []
y_train = []
x_test = []
y_test = []

for filename in os.listdir(train_path):
    file = filename_ptn.search(filename)
    if file:
        file_path = os.path.join(train_path, filename)
        with open(file_path, 'r+') as f:
            x_train.append(f.read())
        y_train.append(file.group(1))

for filename in os.listdir(test_path):
    file = filename_ptn.search(filename)
    if file:
        file_path = os.path.join(test_path, filename)
        with open(file_path, 'r+') as f:
            x_test.append(f.read())
        y_test.append(file.group(1))


print('共有手写数字训练集：{}组\n'.format(len(x_train)))
print('共有手写数字测试集：{}组'.format(len(x_test)))

共有手写数字训练集：1934组

共有手写数字测试集：946组


In [3]:
'''
基于全连接网络对手写数字数据进行适当的处理
'''
def proceed_ann_x(x_lst):
    x_lst = x_lst.copy()
    for i, x in enumerate(x_lst):
        digit = ''.join(x.split('\n'))
        x_lst[i] = np.array([int(px) for px in digit]).reshape((1, -1))
    x_array = np.concatenate(x_lst)
    return x_array

x_train_ann = proceed_ann_x(x_train)
x_test_ann = proceed_ann_x(x_test)

print('训练集特征的维度：{}'.format(x_train_ann.shape))

训练集特征的维度：(1934, 1024)


In [4]:
'''
对labels进行处理
'''
def proceed_y(y_lst):
    for i, y in enumerate(y_lst):
        label = np.zeros((1, 10))
        label[0, int(y)] = 1
        y_lst[i] = label
    y_array = np.concatenate(y_lst)
    return y_array

y_train = proceed_y(y_train)
y_test = proceed_y(y_test)

print('训练集标签集的维度：{}'.format(y_train.shape))

训练集标签集的维度：(1934, 10)


In [27]:
def cat_accuracy(y_pred, y_real):
    right_num = 0
    for i in range(y_pred.shape[1]):
        i_pred = np.argmax(y_pred.T[i])
        i_real = np.argmax(y_real.T[i])
        if i_pred == i_real:
            right_num += 1
    return right_num / y_pred.shape[1]

In [5]:
'''
K-Fold交叉验证
'''
def k_fold(data, labels, k, random_shuffle=True):
    data, labels = data.copy(), labels.copy()
    data_index = list(range(data.shape[0]))

    if random_shuffle == True:
        np.random.shuffle(data_index)
    
    for i in range(k):
        fold_size = int(np.floor(data.shape[0] / k) + 1)
        test_index = data_index[fold_size * i:fold_size * (i + 1)]
        train_index = data_index[:fold_size * i] + data_index[fold_size * (i + 1):]
        yield data[train_index], labels[train_index], data[test_index], labels[test_index]


In [35]:
model = DNN((1024, 128, 32, 10), ('sigmoid', 'sigmoid', 'softmax'), loss_function='softmax_ce')
model.fit(x_train_ann, y_train, epochs=50, learning_rate=0.01, decay=1e-6, batch_size=50)

y_pred = model.predict(x_train_ann)
cat_acc = cat_accuracy(y_pred, y_train.T)
print('经过100epochs的迭代：')
print('该神经网络在训练集上的准确率为：{:.02f}%'.format(cat_acc * 100))

print('----------------------------------------')
y_pred = model.predict(x_test_ann)
cat_acc = cat_accuracy(y_pred, y_test.T)
print('经过100epochs的迭代：')
print('该神经网络在测试集上的准确率为：{:.02f}%'.format(cat_acc * 100))

经过100epochs的迭代：
该神经网络在训练集上的准确率为：99.90%
----------------------------------------
经过100epochs的迭代：
该神经网络在测试集上的准确率为：90.70%


In [37]:
fold_num = 10
accuracy = 0
for x_t, y_t, x_v, y_v in k_fold(x_train_ann, y_train, fold_num):
    model = DNN((1024, 128, 32, 10), ('sigmoid', 'sigmoid', 'softmax'), loss_function='softmax_ce')
    x_t, y_t, x_v, y_v = x_t, y_t, x_v, y_v
    model.fit(x_t, y_t, epochs=50, learning_rate=0.01, decay=1e-6, batch_size=50)
    pred_result = model.predict(x_v)
    accuracy += cat_accuracy(pred_result, np.matrix(y_v).T)

print('经过K Fold交叉验证：')
print('K: {}'.format(fold_num))
print('准确率：{:.02f}%'.format(accuracy / fold_num * 100))

经过K Fold交叉验证：
K: 10
准确率：90.18%
