In [None]:
from enum import Enum

import numpy as np
from pathlib import Path
import math
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from matplotlib.colors import LogNorm

file_name = "ch07.npz"

In [None]:
class NetType(Enum):
    Fitting = (1,)
    BinaryClassifier = (2,)
    MultipleClassifier = (3,)
    BinaryTanh = 4


In [None]:

class HyperParameters(object):
    def __init__(
        self, input_size, output_size, eta=0.1, max_epoch=1000, batch_size=5, eps=0.1,net_type=NetType.Fitting
    ):
        self.input_size = input_size
        self.output_size = output_size
        self.eta = eta
        self.max_epoch = max_epoch
        self.batch_size = batch_size
        self.eps = eps
        self.net_type = net_type

    def toString(self):
        title = str.format("bz:{0},eta:{1}", self.batch_size, self.eta)
        return title

class LossFunction(object):
    def _init_(self,net_type):
        self.net_type = net_type

   # fcFunc: feed forward calculation
    def CheckLoss(self, A, Y):
        m = Y.shape[0]
        if self.net_type == NetType.BinaryClassifier:
            loss = self.CE2(A, Y, m)
        elif self.net_type == NetType.BinaryTanh:
            loss = self.CE2_tanh(A, Y, m)
        elif self.net_type== NetType.MultipleClassifier:
            loss= self.CE3(A,Y,m)

        return loss
  
    def CE3(self,A,Y,count):
    def MSE(self,A,Y,count):
        LOSS = (A - Y)**2
        loss = LOSS.sum()/count/2
        return loss
    
    def CE2(self, A, Y, count):
        p1 = 1 - Y
        p2 = np.log(1 - A)
        p3 = np.log(A)

        p4 = np.multiply(p1 ,p2)
        p5 = np.multiply(Y, p3)

        LOSS = np.sum(-(p4 + p5))  #binary classification
        loss = LOSS / count
        return loss
    # end def

    # for binary tanh classifier
    def CE2_tanh(self, A, Y, count):
        #p = (1-Y) * np.log(1-A) + (1+Y) * np.log(1+A)
        p = (1-Y) * np.log((1-A)/2) + (1+Y) * np.log((1+A)/2)
        LOSS = np.sum(-p)
        loss = LOSS / count
        return loss
        


In [None]:
class TrainingHistory_1_0(object):
    def __init__(self):
        self.iteration = []
        self.loss_history = []

    def AddLossHistory(self, iteration, loss):
        self.iteration.append(iteration)
        self.loss_history.append(loss)

    def ShowLossHistory(self, hp, xmin=None, xmax=None, ymin=None, ymax=None):
        plt.plot(self.iteration, self.loss_history)
        title = hp.toString()
        plt.title(title)
        plt.xlabel("iteration")
        plt.ylabel("loss")
        if xmin != None and ymin != None:
            plt.axis([xmin, xmax, ymin, ymax])
        plt.show()
        return title

    def GetLast(self):
        count = len(self.loss_history)
        return self.loss_history[count-1], self.w_history[count-1], self.b_history[count-1]

In [None]:
class NeuralNet(object):
    def __init__(self, hp):
        self.hp = hp
        self.W = np.zeros(
            (self.hp.input_size, self.hp.output_size)
        )  # 权重矩阵由神经网络输入输出个数决定
        self.B = np.zeros((1, self.hp.output_size))

    # 多样本计算
    def __forwardBatch(self, batch_x):
        Z = np.dot(batch_x, self.W) + self.B
        if self.hp.net_type == NetType.BinaryClassifier:
            A = Logistic().forward(Z)
            return A
        else:
            return Z

    def __backwardBatch(self, batch_x, batch_y, batch_z):
        m = batch_x.shape[0]
        dZ = batch_z - batch_y
        dB = dZ.sum(axis=0, keepdims=True) / m
        dW = np.dot(batch_x.T, dZ) / m
        return dW, dB

    def __update(self, dW, dB):
        self.W = self.W - self.hp.eta * dW
        self.B = self.B - self.hp.eta * dB

    def inference(self, x):
        return self.__forwardBatch(x)

    def __checkLoss(self, dataReader):  # 多样本损失函数计算
        X, Y = dataReader.GetWholeTrainSamples()
        m = X.shape[0]
        Z = self.__forwardBatch(X)
        LOSS = (Z - Y) ** 2
        loss = LOSS.sum() / m / 2
        return loss

    # 训练
    def train(self, dataReader, checkpoint, net_type):
        # calculate loss to decide the stop condition
        loss_history = TrainingHistory_1_0()
        loss = 10
        if self.hp.batch_size == -1:
            self.hp.batch_size = dataReader.num_train
        max_iteration = math.ceil(dataReader.num_train / self.hp.batch_size)
        checkpoint_iteration = (int)(max_iteration * checkpoint)

        for epoch in range(self.hp.max_epoch):
            print("epoch=%d" % epoch)
            dataReader.Shuffle()
            for iteration in range(max_iteration):
                # get x and y value for one sample
                batch_x, batch_y = dataReader.GetBatchTrainSamples(
                    self.hp.batch_size, iteration
                )
                # get z from x,y
                batch_z = self.__forwardBatch(batch_x)
                # calculate gradient of w and b
                dW, dB = self.__backwardBatch(batch_x, batch_y, batch_z)
                # update w,b
                self.__update(dW, dB)

                total_iteration = epoch * max_iteration + iteration
                if (total_iteration + 1) % checkpoint_iteration == 0:
                    loss = self.__checkLoss(dataReader)
                    print(epoch, iteration, loss, self.W, self.B)
                    loss_history.AddLossHistory(epoch * max_iteration + iteration, loss)
                    if loss < self.hp.eps:
                        break
                    # end if
                # end if
            # end for
            if loss < self.hp.eps:
                break
        # end for
        loss_history.ShowLossHistory(self.hp)
        print("W=", self.W)
        print("B=", self.B)