In [21]:
from mnist import MNIST
import numpy as np
import logging
import random


#setting logger---------------------------------------
def init_logger():
    logger = logging.getLogger('NNET Debug')
    logger.setLevel(logging.DEBUG)
    # 再创建一个handler，用于输出到控制台
    ch = logging.StreamHandler()
    ch.setLevel(logging.DEBUG)
    # 定义handler的输出格式
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s \n %(message)s')
    ch.setFormatter(formatter)
    
    logger.addHandler(ch)
    return logger
#handling data---------------------------------------
class StructData:
    def __init__(self,data_set):
        self.size = len(data_set[0])
        self.data = np.array(data_set[0])
        self.label = np.array(data_set[1])
        self.curr_batch_start = 0;
        
    def shuffle(self):
        append_list = np.append(self.data, self.label.reshape(self.size,1), axis = 1)
        np.random.shuffle(append_list)
        self.data = append_list[:,:-1]
        self.label = append_list[:,-1]
    #act like a circular list
    def get_next_batch(self,batch_size):
        start = self.curr_batch_start
        end = (self.curr_batch_start + batch_size) % self.size 
        self.curr_batch_start = end

        if(start < end):
            return self.data[start:end],self.label[start:end]
        else:
            data = np.append(self.data[start:self.size], self.data[0:end], axis = 0)
            label = np.append(self.label[start:self.size], self.label[0:end], axis = 0)
            return data,label
#reading data---------------------------------------        
mndata = MNIST('mnist_data')
mnist_train=StructData(mndata.load_training())
mnist_train.shuffle()
mnist_test=StructData(mndata.load_testing())
#---------------------------------------
'''
logger = init_logger()
logger.info('Mnist data readed.')
logger.info('Train set size:{0},Test set size:{1}'.format(len(mnist_train.data),len(mnist_test.data)))
logger.debug('Data size:{0}'.format(len(mnist_test.data[0])))
logger.debug('Data:\n{0}'.format(mnist_test.data[0]))
logger.debug('Data label:{0}'.format(mnist_test.label[0]))
'''
#---------------------------------------
print('Mnist data readed.')
print('Train set size:{0}, Test set size:{1}'.format(len(mnist_train.data),len(mnist_test.data)))
print('Data label:{0}'.format(mnist_train.label[0]))
print('Data size:{0}'.format(len(mnist_train.data[0])))
print('Data:\n{0}'.format(mnist_train.data[0]))

Mnist data readed.
Train set size:60000, Test set size:10000
Data label:6
Data size:784
Data:
[  0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0
   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0
   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0
   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0  92 253 192
  12   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0
   0   0   0   0   0   0   0 169 252 253 181  19   0   0   0   0   0   0
   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0 169
 252 253 233  43   0   0   0   0   0   0   0   0   0   0   0   0   0   0
   0   0   0   0   0   0   0   0   7 178 252 240  71   0   0   0   0   0
   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0
  57 252 252 140   0   0   0   0   0   0   0   0   0   0   0   0   0   0
   0   0   0   0   0   0   0   0   0   0 198 253 253 141   0   0   0   0
   0   0   0   0   0   0   0  

In [22]:
#network component defining---------------------------------
class AffineComponent:
    def __init__(self,input_dim,output_dim):
        self.input_dim = input_dim
        self.output_dim = output_dim
        self.weights = 0.01 * np.random.randn(input_dim, output_dim)
        self.bias = 0.01 * np.random.randn(1,output_dim)

    def propagate(self,input_data):
        assert input_data.shape[1] == self.input_dim
        self.input_data = input_data
        return input_data.dot(self.weights) + self.bias

    def back_propagate(self,derivative):
        assert derivative.shape[1] == self.output_dim
        propagate_derivative = derivative.dot(self.weights.T)
        self.__update(derivative)
        return propagate_derivative

    def __update(self,derivative):
        #the learning rate is a global value which will change while training
        #see more detain in LearningRateScheduler
        self.bias -= learning_rate * derivative.sum(axis = 0, keepdims= True )
        self.weights -= learning_rate * self.input_data.T.dot(derivative)


class NolinearComponent:
    def __init__(self,dim,nolinear_type):
        self.dim = dim
        self.nolinear_type = nolinear_type

    def propagate(self,input_data):
        assert input_data.shape[1] == self.dim
        self.input_data = input_data
        if(self.nolinear_type == "relu"):
            return self.__relu(input_data)
        else:
            #program is not expected to reach here
            assert false

    def __relu(self,input_data):
        #important! must use copy or the input data will be change through index
        output_data = input_data.copy()
        output_data[output_data < 0] = 0
        return output_data
    #----------------------------
    def back_propagate(self,derivative):
        assert derivative.shape[1] == self.dim
        if(self.nolinear_type == "relu"):
            return self.__back_relu(derivative)
        else:
            #program is not expected to reach here
            assert false
            
    def __back_relu(self,derivative):
        derivative[self.input_data < 0] = 0
        return derivative


class SoftmaxOutputComponent:
    def __init__(self,dim):
        self.dim = dim

    def propagate(self,input_data):
        assert input_data.shape[1] == self.dim
        self.input_data = input_data
        e_x = np.exp(input_data)
        return e_x / e_x.sum(axis=1, keepdims=True)

    def back_propagate(self,probs,label):
        assert probs.shape[0] == label.shape[0]
        batch_size = probs.shape[0]
        delta = probs
        delta[range(batch_size),batch_label] -= 1
        return delta / batch_size

'''
this class aims to change the learning rate while training
with a large initial learning rate the model can converge fast at the begining
with a decreasing learning rate the network can converge better at final iters
'''
class LearningRateScheduler():
    def __init__(self,begin_lr,end_lr,scheduler_type):
        self.begin_lr = begin_lr
        self.end_lr = end_lr
        self.scheduler_type = scheduler_type
        
    def caculate(self,curr_iter,total_iter):
        if(self.scheduler_type == 'linear'):
            return self.__linear(curr_iter,total_iter)
        else:
            #program is not expected to reach here
            assert false
            
    def __linear(self,curr_iter,total_iter):
        return ((total_iter - curr_iter) * self.begin_lr + curr_iter * self.end_lr)/total_iter

In [23]:
#network defining------------------------------------------------------------
dnn1_affine = AffineComponent(784,100)
dnn1_relu = NolinearComponent(100,"relu")
dnn2_affine = AffineComponent(100,20)
dnn2_relu = NolinearComponent(20,"relu")
dnn3_affine = AffineComponent(20,10)
output = SoftmaxOutputComponent(10)

def network_propagate(input_data):
    activate = dnn1_affine.propagate(input_data)
    activate = dnn1_relu.propagate(activate)
    activate = dnn2_affine.propagate(activate)
    activate = dnn2_relu.propagate(activate)
    activate = dnn3_affine.propagate(activate)
    return output.propagate(activate)

def network_backpropagate(probs,batch_label):
    derivative = output.back_propagate(probs,batch_label)
    derivative = dnn3_affine.back_propagate(derivative)
    derivative = dnn2_relu.back_propagate(derivative)
    derivative = dnn2_affine.back_propagate(derivative)
    derivative = dnn1_relu.back_propagate(derivative)
    derivative = dnn1_affine.back_propagate(derivative)

def caculate_loss(probs,batch_label):
    batch_size = probs.shape[0]
    loss_list = -np.log(probs[range(batch_size), batch_label])
    average_loss = loss_list.mean(axis=0)
    return average_loss


#network training------------------------------------------------------------
#the network is overfitting without normalization, may add later
batch_size = 1000
total_iter = 2000
init_lr = 0.02
end_lr = 0.001
lr_scheduler = LearningRateScheduler(init_lr,end_lr,'linear')

for i in range(0,total_iter+1):
    learning_rate = lr_scheduler.caculate(i,total_iter)
    batch_data, batch_label = mnist_train.get_next_batch(batch_size)
    probs = network_propagate(batch_data)
    loss = caculate_loss(probs, batch_label)
    if(i%50==0):
        print("iter:",i,"loss:",loss,"learning_rate:",learning_rate)
    network_backpropagate(probs,batch_label)

iter: 0 loss: 2.3071955731146465 learning_rate: 0.02
iter: 50 loss: 0.8715972956564584 learning_rate: 0.019524999999999997
iter: 100 loss: 0.3014861036324603 learning_rate: 0.01905
iter: 150 loss: 0.19811641597449864 learning_rate: 0.018574999999999998
iter: 200 loss: 0.17014129369546424 learning_rate: 0.0181
iter: 250 loss: 0.16213706785458323 learning_rate: 0.017625
iter: 300 loss: 0.16751993729225712 learning_rate: 0.01715
iter: 350 loss: 0.13726659522999804 learning_rate: 0.016675000000000002
iter: 400 loss: 0.12970179974529594 learning_rate: 0.0162
iter: 450 loss: 0.07907475207672139 learning_rate: 0.015725
iter: 500 loss: 0.07808314538949185 learning_rate: 0.01525
iter: 550 loss: 0.08658684442810474 learning_rate: 0.014775
iter: 600 loss: 0.10787833742941626 learning_rate: 0.0143
iter: 650 loss: 0.07770955952734199 learning_rate: 0.013824999999999999
iter: 700 loss: 0.09185342641594992 learning_rate: 0.013349999999999999
iter: 750 loss: 0.05515470804732235 learning_rate: 0.012875

In [25]:
#network testing------------------------------------------------------------
total_num = 0
correct_num = 0
total_loss = 0

test_batch_num = 10
test_batch_size = int(mnist_test.size / test_batch_num)

mnist_test.shuffle()
for i in range(1,test_batch_num+1):
    #handle it in batch in case out of memory, however in this example, it's useless
    batch_data, batch_label = mnist_test.get_next_batch(test_batch_size)
    probs = network_propagate(batch_data)
    prediction = probs.argmax(axis = 1).reshape(test_batch_size,1)
    loss = caculate_loss(probs, batch_label)

    total_num += test_batch_size
    correct_num += np.sum(batch_label == prediction.T)
    total_loss += loss
    print('total_num:', total_num, 'correct_num:', correct_num, 'total_loss:', total_loss / i)

print('test finished, test sample:{0}, accurancy:{1}, average loss:{2}'.format(total_num, float(correct_num) / total_num, total_loss / test_batch_num))

total_num: 1000 correct_num: 976 total_loss: 0.07716743732753546
total_num: 2000 correct_num: 1953 total_loss: 0.07905484055412207
total_num: 3000 correct_num: 2931 total_loss: 0.07908418728302334
total_num: 4000 correct_num: 3908 total_loss: 0.07822225157425092
total_num: 5000 correct_num: 4888 total_loss: 0.07687986564714142
total_num: 6000 correct_num: 5873 total_loss: 0.07209125418882943
total_num: 7000 correct_num: 6846 total_loss: 0.07212297989483465
total_num: 8000 correct_num: 7815 total_loss: 0.07781544568994414
total_num: 9000 correct_num: 8782 total_loss: 0.08170254618881352
total_num: 10000 correct_num: 9754 total_loss: 0.08345664912738646
test finished, test sample:10000, accurancy:0.9754, average loss:0.08345664912738646
