In [3]:
import numpy as np
import time
import math
import os
import sys
import matplotlib.pyplot as plt
import pickle
import struct

from enum import Enum
from pathlib import Path

#### NetType为网络类型
+ Fitting为曲线拟合
+ BinaryClassifier为二分类
+ MultipleClassifier为多分类

In [None]:
class NetType(Enum):
    Fitting = 1,
    BinaryClassifier = 2,
    MultipleClassifier = 3

#### InitialMethod为初始化方法
+ Zero 初始化为0
+ Normal 为正态随机分布初始化
+ Xavier Xavier初始化方法来源于论文《[Understanding the difficulty of training deep feedforward neural networks](http://proceedings.mlr.press/v9/glorot10a/glorot10a.pdf?source=post_page---------------------------)》，主要的目标就是使得每一层输出的方差应该尽量相等。
+ MSRA MSRA初始化方法来源于论文《[Delving Deep into Rectifiers:Surpassing Human-Level Performance on ImageNet Classification](https://arxiv.org/pdf/1502.01852.pdf)》，是一个均值为 $0$ 方差为 $2/n$ 的高斯分布。

In [None]:
class InitialMethod(Enum):
    Zero = 0,
    Normal = 1,
    Xavier = 2,
    MSRA = 3

#### WeightsBias用于设置权重矩阵和偏移矩阵的值

In [None]:
class WeightsBias_1_0(object):
    def __init__(self, n_input, n_output, init_method, eta):
        self.num_input = n_input        # 输入个数
        self.num_output = n_output      # 输出个数
        self.init_method = init_method  # 初始化方法
        self.eta = eta                  # 学习率
        self.initial_value_filename = str.format("w_{0}_{1}_{2}_init", self.num_input, self.num_output, self.init_method.name)

    def InitializeWeights(self, folder, create_new):
        self.folder = folder
        if create_new:
            self.__CreateNew()
        else:
            self.__LoadExistingParameters()
        self.dW = np.zeros(self.W.shape)
        self.dB = np.zeros(self.B.shape)

    def __CreateNew(self):
        self.W, self.B = WeightsBias_1_0.InitialParameters(self.num_input, self.num_output, self.init_method)
        self.__SaveInitialValue()
        
    def __LoadExistingParameters(self):
        file_name = str.format("{0}/{1}.npz", self.folder, self.initial_value_filename)
        w_file = Path(file_name)
        if w_file.exists():
            self.__LoadInitialValue()
        else:
            self.__CreateNew()

    def Update(self):
        self.W = self.W - self.eta * self.dW
        self.B = self.B - self.eta * self.dB

    def __SaveInitialValue(self):
        file_name = str.format("{0}/{1}.npz", self.folder, self.initial_value_filename)
        np.savez(file_name, weights=self.W, bias=self.B)

    def __LoadInitialValue(self):
        file_name = str.format("{0}/{1}.npz", self.folder, self.initial_value_filename)
        data = np.load(file_name)
        self.W = data["weights"]
        self.B = data["bias"]

    def SaveResultValue(self, folder, name):
        file_name = str.format("{0}/{1}.npz", folder, name)
        np.savez(file_name, weights=self.W, bias=self.B)

    def LoadResultValue(self, folder, name):
        file_name = str.format("{0}/{1}.npz", folder, name)
        data = np.load(file_name)
        self.W = data["weights"]
        self.B = data["bias"]

    @staticmethod
    def InitialParameters(num_input, num_output, method):
        if method == InitialMethod.Zero:
            W = np.zeros((num_input, num_output))
        elif method == InitialMethod.Normal:
            W = np.random.normal(size=(num_input, num_output))
        elif method == InitialMethod.MSRA:
            W = np.random.normal(0, np.sqrt(2/num_output), size=(num_input, num_output))
        elif method == InitialMethod.Xavier:
            W = np.random.uniform(-np.sqrt(6/(num_output+num_input)),
                                  np.sqrt(6/(num_output+num_input)),
                                  size=(num_input, num_output))
        B = np.zeros((1, num_output))
        return W, B

#### 损失函数
+ MSE 均方差损失函数(Mean Squared Error) $loss(w,b) = \frac{1}{2}(z_i - y_i)^2$
+ CE2 二分类交叉熵损失函数(Cross Entropy) $loss(w,b) = -[yln(a) + (1 - y)ln(1 - a)]$
+ CE3 多分类交叉熵损失函数(Cross Entropy) $loss(w,b) = -\sum_{i = 1}^{m}y_iln(a_i)$

In [None]:
class LossFunction_1_1(object):
    def __init__(self, net_type):
        self.net_type = net_type

    def CheckLoss(self, A, Y):
        m = Y.shape[0]
        if self.net_type == NetType.Fitting:
            loss = self.MSE(A, Y, m)
        elif self.net_type == NetType.BinaryClassifier:
            loss = self.CE2(A, Y, m)
        elif self.net_type == NetType.MultipleClassifier:
            loss = self.CE3(A, Y, m)
        return loss

    def MSE(self, A, Y, count):
        p1 = A - Y
        LOSS = np.multiply(p1, p1)
        loss = LOSS.sum()/count/2
        return loss

    def CE2(self, A, Y, count):
        p1 = 1 - Y
        p2 = np.log(1 - A)
        p3 = np.log(A)

        p4 = np.multiply(p1 ,p2)
        p5 = np.multiply(Y, p3)

        LOSS = np.sum(-(p4 + p5))
        loss = LOSS / count
        return loss

    def CE3(self, A, Y, count):
        p1 = np.log(A)
        p2 =  np.multiply(Y, p1)
        LOSS = np.sum(-p2, keepdims=False) 
        loss = LOSS / count
        return loss

#### 超参数

In [None]:
class HyperParameters_3_0(object):
    def __init__(self, n_input, n_hidden1, n_hidden2, n_output, 
                 eta=0.1, max_epoch=10000, batch_size=5, eps = 0.1,
                 net_type = NetType.Fitting,
                 init_method = InitialMethod.Xavier):

        self.num_input = n_input         # 输入的个数
        self.num_hidden1 = n_hidden1     # 第一个隐藏层神经元个数
        self.num_hidden2 = n_hidden2     # 第一个隐藏层神经元个数
        self.num_output = n_output       # 输出的个数

        self.eta = eta                   # 学习率
        self.max_epoch = max_epoch       # 1个epoch等于使用训练集中的全部样本训练一次
        self.batch_size = batch_size     # 一次训练选取的样本个数

        self.net_type = net_type         # 网络类型
        self.init_method = init_method   # 初始化方法
        self.eps = eps                   # epsilon

    def toString(self):
        title = str.format("bz:{0},eta:{1},ne:{2}x{3}", self.batch_size, self.eta, self.num_hidden1, self.num_hidden2)
        return title

#### 分类函数

In [None]:
class CClassifier(object):
    def forward(self, z):
        pass

class Logistic(CClassifier):
    def forward(self, z):
        a = 1.0 / (1.0 + np.exp(-z))
        return a

class Softmax(CClassifier):
    def forward(self, z):
        shift_z = z - np.max(z, axis=1, keepdims=True)
        exp_z = np.exp(shift_z)
        a = exp_z / np.sum(exp_z, axis=1, keepdims=True)
        return a

#### 激活函数
+ z = 本层的 wx + b 计算值矩阵
+ a = 本层的激活函数输出值矩阵
+ delta = 上(后)层反传回来的梯度值矩阵

In [None]:
class CActivator(object):
    def forward(self, z):
        pass

    def backward(self, z, a, delta):
        pass

class Identity(CActivator):
    def forward(self, z):
        return z

    def backward(self, z, a, delta):
        return delta, a

class Sigmoid(CActivator):
    def forward(self, z):
        a = 1.0 / (1.0 + np.exp(-z))
        return a

    def backward(self, z, a, delta):
        da = np.multiply(a, 1-a)
        dz = np.multiply(delta, da)
        return dz, da

class Tanh(CActivator):
    def forward(self, z):
        a = 2.0 / (1.0 + np.exp(-2*z)) - 1.0
        return a

    def backward(self, z, a, delta):
        da = 1 - np.multiply(a, a)
        dz = np.multiply(delta, da)
        return dz, da

class Relu(CActivator):
    def forward(self, z):
        a = np.maximum(z, 0)
        return a

    def backward(self, z, a, delta):
        da = np.zeros(z.shape)
        da[z > 0] = 1
        dz = da * delta
        return dz, da

#### 帮助类
+ 用于记录损失函数值极其对应的权重/迭代次数
+ 用于图形显示损失函数值历史记录

In [None]:
class TrainingHistory_2_3(object):
    def __init__(self):
        self.loss_train = []
        self.accuracy_train = []
        self.iteration_seq = []
        self.epoch_seq = []

        self.loss_val = []
        self.accuracy_val = []
       
    def Add(self, epoch, total_iteration, loss_train, accuracy_train, loss_vld, accuracy_vld):
        self.iteration_seq.append(total_iteration)
        self.epoch_seq.append(epoch)
        self.loss_train.append(loss_train)
        self.accuracy_train.append(accuracy_train)
        if loss_vld is not None:
            self.loss_val.append(loss_vld)
        if accuracy_vld is not None:
            self.accuracy_val.append(accuracy_vld)

        return False

    def ShowLossHistory(self, params, x="epoch", xmin=None, xmax=None, ymin=None, ymax=None):
        fig = plt.figure(figsize=(12,5))

        axes = plt.subplot(1,2,1)
        if x == "iteration":
            p2, = axes.plot(self.iteration_seq, self.loss_train)
            p1, = axes.plot(self.iteration_seq, self.loss_val)
            axes.set_xlabel("iteration")
        elif x == "epoch":
            p2, = axes.plot(self.epoch_seq, self.loss_train)
            p1, = axes.plot(self.epoch_seq, self.loss_val)
            axes.set_xlabel("epoch")

        axes.legend([p1,p2], ["validation","train"])
        axes.set_title("Loss")
        axes.set_ylabel("loss")
        if xmin != None or xmax != None or ymin != None or ymax != None:
            axes.axis([xmin, xmax, ymin, ymax])
        
        axes = plt.subplot(1,2,2)
        if x == "iteration":
            p2, = axes.plot(self.iteration_seq, self.accuracy_train)
            p1, = axes.plot(self.iteration_seq, self.accuracy_val)
            axes.set_xlabel("iteration")
        elif x == "epoch":
            p2, = axes.plot(self.epoch_seq, self.accuracy_train)
            p1, = axes.plot(self.epoch_seq, self.accuracy_val)
            axes.set_xlabel("epoch")

        axes.legend([p1,p2], ["validation","train"])
        axes.set_title("Accuracy")
        axes.set_ylabel("accuracy")
        
        title = params.toString()
        plt.suptitle(title)
        plt.show()
        return title

    def ShowLossHistory4(self, axes, params, xmin=None, xmax=None, ymin=None, ymax=None):
        p2, = axes.plot(self.epoch_seq, self.loss_train)
        p1, = axes.plot(self.epoch_seq, self.loss_val)
        title = params.toString()
        axes.set_title(title)
        axes.set_xlabel("epoch")
        axes.set_ylabel("loss")
        if xmin != None and ymin != None:
            axes.axis([xmin, xmax, ymin, ymax])
        return title

    def GetEpochNumber(self):
        return self.epoch_seq[-1]

    def GetLatestAverageLoss(self, count=10):
        total = len(self.loss_val)
        if count >= total:
            count = total
        tmp = self.loss_val[total-count:total]
        return sum(tmp)/count

    def Dump(self, file_name):
        f = open(file_name, 'wb')
        pickle.dump(self, f)

    def Load(file_name):
        f = open(file_name, 'rb')
        lh = pickle.load(f)
        return lh

In [None]:
class DataReader_2_0(object):
    def __init__(self, train_file, test_file):
        self.train_file_name = train_file
        self.test_file_name = test_file
        self.num_train = 0        # num of training examples
        self.num_test = 0         # num of test examples
        self.num_validation = 0   # num of validation examples
        self.num_feature = 0      # num of features
        self.num_category = 0     # num of categories
        self.XTrain = None        # training feature set
        self.YTrain = None        # training label set
        self.XTest = None         # test feature set
        self.YTest = None         # test label set
        self.XTrainRaw = None     # training feature set before normalization
        self.YTrainRaw = None     # training label set before normalization
        self.XTestRaw = None      # test feature set before normalization
        self.YTestRaw = None      # test label set before normalization
        self.XDev = None          # validation feature set
        self.YDev = None          # validation lable set

    # read data from file
    def ReadData(self):
        train_file = Path(self.train_file_name)
        if train_file.exists():
            data = np.load(self.train_file_name)
            self.XTrainRaw = data["data"]
            self.YTrainRaw = data["label"]
            assert(self.XTrainRaw.shape[0] == self.YTrainRaw.shape[0])
            self.num_train = self.XTrainRaw.shape[0]
            self.num_feature = self.XTrainRaw.shape[1]
            self.num_category = len(np.unique(self.YTrainRaw))
            # this is for if no normalize requirment
            self.XTrain = self.XTrainRaw
            self.YTrain = self.YTrainRaw
        else:
            raise Exception("Cannot find train file!!!")
        #end if

        test_file = Path(self.test_file_name)
        if test_file.exists():
            data = np.load(self.test_file_name)
            self.XTestRaw = data["data"]
            self.YTestRaw = data["label"]
            assert(self.XTestRaw.shape[0] == self.YTestRaw.shape[0])
            self.num_test = self.XTestRaw.shape[0]
            # this is for if no normalize requirment
            self.XTest = self.XTestRaw
            self.YTest = self.YTestRaw
            # in case there has no validation set created
            self.XDev = self.XTest
            self.YDev = self.YTest
        else:
            raise Exception("Cannot find test file!!!")
        #end if

    # merge train/test data first, normalize, then split again
    def NormalizeX(self):
        x_merge = np.vstack((self.XTrainRaw, self.XTestRaw))
        x_merge_norm = self.__NormalizeX(x_merge)
        train_count = self.XTrainRaw.shape[0]
        self.XTrain = x_merge_norm[0:train_count,:]
        self.XTest = x_merge_norm[train_count:,:]

    def __NormalizeX(self, raw_data):
        temp_X = np.zeros_like(raw_data)
        self.X_norm = np.zeros((2, self.num_feature))
        # 按行归一化,即所有样本的同一特征值分别做归一化
        for i in range(self.num_feature):
            # get one feature from all examples
            x = raw_data[:, i]
            max_value = np.max(x)
            min_value = np.min(x)
            # min value
            self.X_norm[0,i] = min_value 
            # range value
            self.X_norm[1,i] = max_value - min_value 
            x_new = (x - self.X_norm[0,i]) / self.X_norm[1,i]
            temp_X[:, i] = x_new
        # end for
        return temp_X

    def NormalizeY(self, nettype, base=0):
        if nettype == NetType.Fitting:
            y_merge = np.vstack((self.YTrainRaw, self.YTestRaw))
            y_merge_norm = self.__NormalizeY(y_merge)
            train_count = self.YTrainRaw.shape[0]
            self.YTrain = y_merge_norm[0:train_count,:]
            self.YTest = y_merge_norm[train_count:,:]                
        elif nettype == NetType.BinaryClassifier:
            self.YTrain = self.__ToZeroOne(self.YTrainRaw, base)
            self.YTest = self.__ToZeroOne(self.YTestRaw, base)
        elif nettype == NetType.MultipleClassifier:
            self.YTrain = self.__ToOneHot(self.YTrainRaw, base)
            self.YTest = self.__ToOneHot(self.YTestRaw, base)

    def __NormalizeY(self, raw_data):
        assert(raw_data.shape[1] == 1)
        self.Y_norm = np.zeros((2,1))
        max_value = np.max(raw_data)
        min_value = np.min(raw_data)
        # min value
        self.Y_norm[0, 0] = min_value 
        # range value
        self.Y_norm[1, 0] = max_value - min_value 
        y_new = (raw_data - min_value) / self.Y_norm[1, 0]
        return y_new

    def DeNormalizeY(self, predict_data):
        real_value = predict_data * self.Y_norm[1,0] + self.Y_norm[0,0]
        return real_value

    def __ToOneHot(self, Y, base=0):
        count = Y.shape[0]
        temp_Y = np.zeros((count, self.num_category))
        for i in range(count):
            n = (int)(Y[i,0])
            temp_Y[i,n-base] = 1
        return temp_Y

    # for binary classifier
    # if use tanh function, need to set negative_value = -1
    def __ToZeroOne(Y, positive_label=1, negative_label=0, positiva_value=1, negative_value=0):
        temp_Y = np.zeros_like(Y)
        count = Y.shape[0]
        for i in range(count):
            if Y[i,0] == negative_label:     # 负类的标签设为0
                temp_Y[i,0] = negative_value
            elif Y[i,0] == positive_label:   # 正类的标签设为1
                temp_Y[i,0] = positiva_value
            # end if
        # end for
        return temp_Y

    # normalize data by specified range and min_value
    def NormalizePredicateData(self, X_predicate):
        X_new = np.zeros(X_predicate.shape)
        n_feature = X_predicate.shape[0]
        for i in range(n_feature):
            x = X_predicate[i,:]
            X_new[i,:] = (x-self.X_norm[0,i])/self.X_norm[1,i]
        return X_new

    # need explicitly call this function to generate validation set
    def GenerateValidationSet(self, k = 10):
        self.num_validation = (int)(self.num_train / k)
        self.num_train = self.num_train - self.num_validation
        # validation set
        self.XDev = self.XTrain[0:self.num_validation]
        self.YDev = self.YTrain[0:self.num_validation]
        # train set
        self.XTrain = self.XTrain[self.num_validation:]
        self.YTrain = self.YTrain[self.num_validation:]

    def GetValidationSet(self):
        return self.XDev, self.YDev

    def GetTestSet(self):
        return self.XTest, self.YTest

    # 获得批样本数据
    def GetBatchTrainSamples(self, batch_size, iteration):
        start = iteration * batch_size
        end = start + batch_size
        batch_X = self.XTrain[start:end,:]
        batch_Y = self.YTrain[start:end,:]
        return batch_X, batch_Y

    # permutation only affect along the first axis, so we need transpose the array first
    # see the comment of this class to understand the data format
    def Shuffle(self):
        seed = np.random.randint(0,100)
        np.random.seed(seed)
        XP = np.random.permutation(self.XTrain)
        np.random.seed(seed)
        YP = np.random.permutation(self.YTrain)
        self.XTrain = XP
        self.YTrain = YP

In [None]:
train_image_file = './train-images-10'
train_label_file = './train-labels-10'
test_image_file = './test-images-10'
test_label_file = './test-labels-10'

class MnistImageDataReader(DataReader_2_0):
    # mode: "image"=Nx1x28x28,  "vector"=1x784
    def __init__(self, mode="image"):
        self.train_image_file = train_image_file
        self.train_label_file = train_label_file
        self.test_image_file = test_image_file
        self.test_label_file = test_label_file
        self.num_example = 0
        self.num_feature = 0
        self.num_category = 0
        self.num_validation = 0
        self.num_test = 0
        self.num_train = 0
        self.mode = mode    # image or vector

    def ReadLessData(self, count):
        self.XTrainRaw = self.__ReadImageFile(self.train_image_file)
        self.YTrainRaw = self.__ReadLabelFile(self.train_label_file)
        self.XTestRaw = self.__ReadImageFile(self.test_image_file)
        self.YTestRaw = self.__ReadLabelFile(self.test_label_file)

        self.XTrainRaw = self.XTrainRaw[0:count]
        self.YTrainRaw = self.YTrainRaw[0:count]

        self.num_example = self.XTrainRaw.shape[0]
        self.num_category = (np.unique(self.YTrainRaw)).shape[0]
        self.num_test = self.XTestRaw.shape[0]
        self.num_train = self.num_example
        if self.mode == "vector":
            self.num_feature = 784
        self.num_validation = 0

    def ReadData(self):
        self.XTrainRaw = self.__ReadImageFile(self.train_image_file)
        self.YTrainRaw = self.__ReadLabelFile(self.train_label_file)
        self.XTestRaw = self.__ReadImageFile(self.test_image_file)
        self.YTestRaw = self.__ReadLabelFile(self.test_label_file)
        self.num_example = self.XTrainRaw.shape[0]
        self.num_category = (np.unique(self.YTrainRaw)).shape[0]
        self.num_test = self.XTestRaw.shape[0]
        self.num_train = self.num_example
        if self.mode == "vector":
            self.num_feature = 784
        self.num_validation = 0

    # output array: num_images * channel * 28 * 28
    # due to gray image instead of color, so channel = 1
    def __ReadImageFile(self, image_file_name):
        # header
        f = open(image_file_name, "rb")
        a = f.read(4)
        b = f.read(4)
        num_images = int.from_bytes(b, byteorder='big')
        c = f.read(4)
        num_rows = int.from_bytes(c, byteorder='big')
        d = f.read(4)
        num_cols = int.from_bytes(d, byteorder='big')
        # image data binary
        image_size = num_rows * num_cols    # 28x28=784
        fmt = '>' + str(image_size) + 'B'
        image_data = np.empty((num_images,1,num_rows,num_cols)) # N x 1 x 28 x 28
        for i in range(num_images):
            bin_data = f.read(image_size)   # read 784 byte data for one time
            unpacked_data = struct.unpack(fmt, bin_data)
            array_data = np.array(unpacked_data)
            array_data2 = array_data.reshape((1, num_rows, num_cols))
            image_data[i] = array_data2
        # end for
        f.close()
        return image_data

    def __ReadLabelFile(self, lable_file_name):
        f = open(lable_file_name, "rb")
        f.read(4)
        a = f.read(4)
        num_labels = int.from_bytes(a, byteorder='big')

        fmt = '>B'
        label_data = np.zeros((num_labels,1))   # N x 1
        for i in range(num_labels):
            bin_data = f.read(1)
            unpacked_data = struct.unpack(fmt, bin_data)[0]
            label_data[i] = unpacked_data
        f.close()
        return label_data

    def NormalizeX(self):
        self.XTrain = self.__NormalizeData(self.XTrainRaw)
        self.XTest = self.__NormalizeData(self.XTestRaw)

    def __NormalizeData(self, XRawData):
        X_NEW = np.zeros(XRawData.shape)
        x_max = np.max(XRawData)
        x_min = np.min(XRawData)
        X_NEW = (XRawData - x_min)/(x_max-x_min)
        return X_NEW

    def GetBatchTrainSamples(self, batch_size, iteration):
        start = iteration * batch_size
        end = start + batch_size
        if self.num_validation == 0:
            batch_X = self.XTrain[start:end]
            batch_Y = self.YTrain[start:end]
        else:
            batch_X = self.XTrain[start:end]
            batch_Y = self.YTrain[start:end]
        # end if

        if self.mode == "vector":
            return batch_X.reshape(-1, 784), batch_Y
        elif self.mode == "image":
            return batch_X, batch_Y

    # recommend not use this function in DeepLearning
    def GetValidationSet(self):
        batch_X = self.XDev
        batch_Y = self.YDev
        if self.mode == "vector":
            return batch_X.reshape(self.num_validation, -1), batch_Y
        elif self.mode == "image":
            return batch_X, batch_Y

    def GetTestSet(self):
        if self.mode == "vector":
            return self.XTest.reshape(self.num_test,-1), self.YTest
        elif self.mode == "image":
            return self.XTest, self.YTest

    def GetBatchTestSamples(self, batch_size, iteration):
        start = iteration * batch_size
        end = start + batch_size
        batch_X = self.XTest[start:end]
        batch_Y = self.YTest[start:end]

        if self.mode == "vector":
            return batch_X.reshape(batch_size, -1), batch_Y
        elif self.mode == "image":
            return batch_X, batch_Y

    # permutation only affect along the first axis, so we need transpose the array first
    # see the comment of this class to understand the data format
    # suggest to call this function for each epoch
    def Shuffle(self):
        seed = np.random.randint(0,100)
        np.random.seed(seed)
        XP = np.random.permutation(self.XTrain)
        np.random.seed(seed)
        YP = np.random.permutation(self.YTrain)
        self.XTrain = XP
        self.YTrain = YP
        return self.XTrain, self.YTrain

#### 定义神经元
- Layers - 神经网络各层的容器，按添加顺序维护一个列表
- Parameters - 基本参数，包括普通参数和超参
- Loss Function - 提供计算损失函数值，存储历史记录并最后绘图的功能
- LayerManagement() - 添加神经网络层
- ForwardCalculation() - 调用各层的前向计算方法
- BackPropagation() - 调用各层的反向传播方法
- PreUpdateWeights() - 预更新各层的权重参数
- UpdateWeights() - 更新各层的权重参数
- Train() - 训练
- SaveWeights() - 保存各层的权重参数
- LoadWeights() - 加载各层的权重参数
#### 前向传播
$$Z1 = X \cdot W1 + B1 \tag{1}$$
$$A1 = Sigmoid(Z1) \tag{2}$$
$$Z2 = A1 \cdot W2 + B2 \tag{3}$$
$$A2 = Tanh(Z2) \tag{4}$$
$$Z3 = A2 \cdot W3  + B3 \tag{5}$$
$$A3 = Softmax(Z3) \tag{6}$$
#### 反向传播
$$dZ3 = A3-Y \tag{7}$$
$$dW3 = A2^T \cdot dZ3 \tag{8}$$
$$dB3=dZ3 \tag{9}$$
$$dA2 = dZ3 \cdot W3^T \tag{10}$$
$$dZ2 = dA2 \odot (1-A2 \odot A2) \tag{11}$$
$$dW2 = A1^T \cdot dZ2 \tag{12}$$
$$dB2 = dZ2 \tag{13}$$
$$dA1 = dZ2 \cdot W2^T \tag{14}$$
$$dZ1 = dA1 \odot A1 \odot (1-A1) \tag{15}$$
$$dW1 = X^T \cdot dZ1 \tag{16}$$
$$dB1 = dZ1 \tag{17}$$

In [None]:
class NeuralNet_3_0(object):
    def __init__(self, hp, model_name):
        # Parameters
        self.hp = hp
        self.model_name = model_name
        self.subfolder = os.getcwd() + "/" + self.__create_subfolder()
        print(self.subfolder)
        
        # Layers
        self.wb1 = WeightsBias_1_0(self.hp.num_input, self.hp.num_hidden1, self.hp.init_method, self.hp.eta)
        self.wb1.InitializeWeights(self.subfolder, False)
        self.wb2 = WeightsBias_1_0(self.hp.num_hidden1, self.hp.num_hidden2, self.hp.init_method, self.hp.eta)
        self.wb2.InitializeWeights(self.subfolder, False)
        self.wb3 = WeightsBias_1_0(self.hp.num_hidden2, self.hp.num_output, self.hp.init_method, self.hp.eta)
        self.wb3.InitializeWeights(self.subfolder, False)

    def __create_subfolder(self):
        if self.model_name != None:
            path = self.model_name.strip()
            path = path.rstrip("/")
            isExists = os.path.exists(path)
            if not isExists:
                os.makedirs(path)
            return path

    # ForwardCalculation
    def forward(self, batch_x):
        self.Z1 = np.dot(batch_x, self.wb1.W) + self.wb1.B     #(1)
        self.A1 = Sigmoid().forward(self.Z1)                   #(2)
        self.Z2 = np.dot(self.A1, self.wb2.W) + self.wb2.B     #(3)
        self.A2 = Tanh().forward(self.Z2)                      #(4)
        self.Z3 = np.dot(self.A2, self.wb3.W) + self.wb3.B     #(5)
        if self.hp.net_type == NetType.BinaryClassifier:       #(6)
            self.A3 = Logistic().forward(self.Z3)              # .
        elif self.hp.net_type == NetType.MultipleClassifier:   # .
            self.A3 = Softmax().forward(self.Z3)               # .
        else:                                                  # .
            self.A3 = self.Z3                                  #(6)

        self.output = self.A3

    # BackPropagation
    def backward(self, batch_x, batch_y):
        m = batch_x.shape[0]
        dZ3 = self.output - batch_y                            #(7)
        self.wb3.dW = np.dot(self.A2.T, dZ3)/m                 #(8)
        self.wb3.dB = np.sum(dZ3, axis=0, keepdims=True)/m     #(9)
        dA2 = np.dot(dZ3, self.wb3.W.T)                        #(10)
        dZ2,_ = Tanh().backward(None, self.A2, dA2)            #(11)
        self.wb2.dW = np.dot(self.A1.T, dZ2)/m                 #(12)
        self.wb2.dB = np.sum(dZ2, axis=0, keepdims=True)/m     #(13)
        dA1 = np.dot(dZ2, self.wb2.W.T)                        #(14)
        dZ1,_ = Sigmoid().backward(None, self.A1, dA1)         #(15)
        self.wb1.dW = np.dot(batch_x.T, dZ1)/m                 #(16)
        self.wb1.dB = np.sum(dZ1, axis=0, keepdims=True)/m     #(17)

    # UpdateWeights
    def update(self):
        self.wb1.Update()
        self.wb2.Update()
        self.wb3.Update()

    def inference(self, x):
        self.forward(x)
        return self.output

    # Train
    def train(self, dataReader, checkpoint, need_test):
        t0 = time.time()
        self.loss_trace = TrainingHistory_2_3()
        self.loss_func = LossFunction_1_1(self.hp.net_type)
        loss = 10
        if self.hp.batch_size == -1:
            self.hp.batch_size = dataReader.num_train
        max_iteration = math.ceil(dataReader.num_train / self.hp.batch_size)
        checkpoint_iteration = (int)(max_iteration * checkpoint)
        need_stop = False
        for epoch in range(self.hp.max_epoch):
            dataReader.Shuffle()
            for iteration in range(max_iteration):
                # get x and y value for one sample
                batch_x, batch_y = dataReader.GetBatchTrainSamples(self.hp.batch_size, iteration)
                # get z from x,y
                self.forward(batch_x)
                # calculate gradient of w and b
                self.backward(batch_x, batch_y)
                # update w,b
                self.update()

                total_iteration = epoch * max_iteration + iteration
                if (total_iteration+1) % checkpoint_iteration == 0:
                    need_stop = self.CheckErrorAndLoss(dataReader, batch_x, batch_y, epoch, total_iteration)
                    if need_stop:
                        break                
                    #end if
                #end if
            # end for
            if need_stop:
                break
        # end for
        self.SaveResult()
        
        t1 = time.time()
        print("time used:", t1 - t0)

        #self.CheckErrorAndLoss(dataReader, batch_x, batch_y, epoch, total_iteration)
        if need_test:
            print("testing...")
            accuracy = self.Test(dataReader)
            print(accuracy)
        # end if

    def CheckErrorAndLoss(self, dataReader, train_x, train_y, epoch, total_iteration):
        print("epoch=%d, total_iteration=%d" %(epoch, total_iteration))

        # calculate train loss
        self.forward(train_x)
        loss_train = self.loss_func.CheckLoss(self.output, train_y)
        accuracy_train = self.__CalAccuracy(self.output, train_y)
        print("loss_train=%.6f, accuracy_train=%f" %(loss_train, accuracy_train))

        # calculate validation loss
        vld_x, vld_y = dataReader.GetValidationSet()
        self.forward(vld_x)
        loss_vld = self.loss_func.CheckLoss(self.output, vld_y)
        accuracy_vld = self.__CalAccuracy(self.output, vld_y)
        print("loss_valid=%.6f, accuracy_valid=%f" %(loss_vld, accuracy_vld))

        need_stop = self.loss_trace.Add(epoch, total_iteration, loss_train, accuracy_train, loss_vld, accuracy_vld)
        if loss_vld <= self.hp.eps:
            need_stop = True
        return need_stop

    def Test(self, dataReader):
        x,y = dataReader.GetTestSet()
        self.forward(x)
        correct = self.__CalAccuracy(self.output, y)
        print(correct)

    def __CalAccuracy(self, a, y):
        assert(a.shape == y.shape)
        m = a.shape[0]
        if self.hp.net_type == NetType.Fitting:
            var = np.var(y)
            mse = np.sum((a-y)**2)/m
            r2 = 1 - mse / var
            return r2
        elif self.hp.net_type == NetType.BinaryClassifier:
            b = np.round(a)
            r = (b == y)
            correct = np.sum(r)
            return correct/m
        elif self.hp.net_type == NetType.MultipleClassifier:
            ra = np.argmax(a, axis=1)
            ry = np.argmax(y, axis=1)
            r = (ra == ry)
            correct = np.sum(r)
            return correct/m

    # SaveWeights
    def SaveResult(self):
        self.wb1.SaveResultValue(self.subfolder, "wb1")
        self.wb2.SaveResultValue(self.subfolder, "wb2")
        self.wb3.SaveResultValue(self.subfolder, "wb3")

    # LoadWeights
    def LoadResult(self):
        self.wb1.LoadResultValue(self.subfolder, "wb1")
        self.wb2.LoadResultValue(self.subfolder, "wb2")
        self.wb3.LoadResultValue(self.subfolder, "wb3")

    def ShowTrainingHistory(self, xcoord):
        self.loss_trace.ShowLossHistory(self.hp, xcoord)

    def GetTrainingTrace(self):
        return self.loss_trace

    def GetEpochNumber(self):
        return self.loss_trace.GetEpochNumber()

    def GetLatestAverageLoss(self, count=10):
        return self.loss_trace.GetLatestAverageLoss(count)

    def DumpLossHistory(self, filename):
        return self.loss_trace.Dump(filename)

#### __main__

In [None]:
if __name__ == '__main__':
    dataReader = MnistImageDataReader(mode="vector")
    dataReader.ReadData()
    dataReader.NormalizeX()
    dataReader.NormalizeY(NetType.MultipleClassifier, base=0)
    dataReader.Shuffle()
    dataReader.GenerateValidationSet(k=12)

    n_input = dataReader.num_feature
    n_hidden1 = 64
    n_hidden2 = 16
    n_output = dataReader.num_category
    eta = 0.2
    eps = 0.01
    batch_size = 128
    max_epoch = 20

    hp = HyperParameters_3_0(
        n_input, n_hidden1, n_hidden2, n_output, 
        eta, max_epoch, batch_size, eps, 
        NetType.MultipleClassifier, InitialMethod.Xavier)
    net = NeuralNet_3_0(hp, "MNIST_64_16")
    net.train(dataReader, 0.5, True)
    net.ShowTrainingHistory(xcoord="epoch")