In [4]:
import numpy as np

In [5]:
class Relu():
    def forward(self, x):
        self.x = x
        return np.maximum(self.x, 0)

    def backward(self, eta):
        eta[self.x <= 0] = 0
        return eta

In [6]:
class sigmoid():
    def forward(self, x):
        return 1 / (1 + np.exp(-x))

    def backward(self, eta):
        return eta * (self.out * (1-self.out))

In [42]:
class conv():
    
    def __init__(self, filter_shape, stride=1, padding='SAME', bias=True, requires_grad=True):
        self.weight = parameter(np.random.randn(*filter_shape) * (2/reduce(lambda x,y:x*y, filter_shape[1:]))**0.5)  
        self.stride = stride
        self.padding = padding
        self.requires_grad = requires_grad
        self.output_channel = filter_shape[0]   
        self.input_channel = filter_shape[1]    
        self.filter_size = filter_shape[2]  
        if bias:
            self.bias = parameter(np.random.randn(self.output_channel))
        else:
            self.bias =None

    def image_to_col(self, x, filter_size_x, filter_size_y, stride):
        N, C, H, W = x.shape
        output_H, output_W = (H-filter_size_x)//stride + 1, (W-filter_size_y)//stride + 1
        out_size = output_H * output_W
        x_cols = np.zeros((out_size*N, filter_size_x*filter_size_y*C))
        for i in range(0, H-filter_size_x+1, stride):
            i_start = i * output_W
            for j in range(0, W-filter_size_y+1, stride):
                temp = x[:,:, i:i+filter_size_x, j:j+filter_size_y].reshape(N,-1)
                x_cols[i_start+j::out_size, :] = temp
        return x_cols
    
    def forward(self, input):

        # 边缘填充
        if self.padding == "VALID":
            self.x = input
        if self.padding == "SAME":
            p = self.filter_size // 2
            self.x = np.lib.pad(input, ((0,0),(0,0),(p,p),(p,p)), "constant")
        # 处理不能恰好的被卷积核的大小和选定的步长所整除的宽和高
        x_fit = (self.x.shape[2] - self.filter_size) % self.stride
        y_fit = (self.x.shape[3] - self.filter_size) % self.stride

        if self.stride > 1:
            if x_fit != 0:
                self.x = self.x[:, :, 0:self.x.shape[2] - x_fit, :]
            if y_fit != 0:
                self.x = self.x[:, :, :, 0:self.x.shape[3] - y_fit]

        N, _, H, W = self.x.shape
        O, C, K, K = self.weight.data.shape
        weight_cols = self.weight.data.reshape(O, -1).T
        x_cols = self.image_to_col(self.x, self.filter_size, self.filter_size, self.stride)
        result = np.dot(x_cols, weight_cols) + self.bias.data
        output_H, output_W = (H-self.filter_size)//self.stride + 1, (W-self.filter_size)//self.stride + 1
        result = result.reshape((N, result.shape[0]//N, -1)).reshape((N, output_H, output_W, O))
        return result.transpose((0, 3, 1, 2))


    def backward(self, eta, lr):

        if self.stride > 1:
            N, O, output_H, output_W = eta.shape
            inserted_H, inserted_W = output_H + (output_H-1)*(self.stride-1), output_W + (output_W-1)*(self.stride-1)
            inserted_eta = np.zeros((N, O, inserted_H, inserted_W))
            inserted_eta[:,:,::self.stride, ::self.stride] = eta
            eta = inserted_eta

        N, _, output_H, output_W = eta.shape
        self.b_grad = eta.sum(axis=(0,2,3))
        self.W_grad = np.zeros(self.weight.data.shape)      
        for i in range(self.filter_size):
            for j in range(self.filter_size):
                self.W_grad[:,:,i,j] = np.tensordot(eta, self.x[:,:,i:i+output_H,j:j+output_W], ([0,2,3], [0,2,3]))
        self.weight.data -= lr * self.W_grad / N
        self.bias.data -= lr * self.b_grad / N


        if self.padding == "VALID":
            p = self.filter_size - 1
            pad_eta = np.lib.pad(eta, ((0,0),(0,0),(p,p),(p,p)), "constant", constant_values=0)
            eta = pad_eta
        elif self.padding == "SAME":
            p = self.filter_size // 2
            pad_eta = np.lib.pad(eta, ((0, 0), (0, 0), (p, p), (p, p)), "constant", constant_values=0)
            eta = pad_eta

        _, C, _, _ = self.weight.data.shape
        weight_flip = np.flip(self.weight.data, (2,3))  
        weight_flip_swap = np.swapaxes(weight_flip, 0, 1)  
        weight_flip = weight_flip_swap.reshape(C, -1).T
        x_cols = self.image_to_col(eta, self.filter_size, self.filter_size, self.stride)
        result = np.dot(x_cols, weight_flip)
        N, _, H, W = eta.shape
        output_H, output_W = (H - self.filter_size) // self.stride + 1, (W - self.filter_size) // self.stride + 1
        result = result.reshape((N, result.shape[0] // N, -1)).reshape((N, output_H, output_W, C))
        self.weight.grad = result.transpose((0, 3, 1, 2))

        return self.weight.grad

In [43]:
class Dropout():
    def __init__(self, drop_rate=0.5, is_train=True):
        self.drop_rate = drop_rate
        self.is_train = is_train
        self.fix_value = 1 - drop_rate   

    def forward(self, x):
        if self.is_train==False:    
            return x
        else:            
            N, m = x.shape
            self.save_mask = np.random.uniform(0, 1, m) > self.drop_rate   
            return (x * self.save_mask) / self.fix_value


    def backward(self, eta):
        if self.is_train==False:
            return eta
        else:
            return eta * self.save_mask

In [44]:
class softmax():
    
    def calculate_loss(self, x, label):
        N, _ = x.shape
        self.label = np.zeros_like(x)
        for i in range(self.label.shape[0]):
            self.label[i, label[i]] = 1

        self.x = np.exp(x - np.max(x, axis=1)[:, np.newaxis])   
        sum_x = np.sum(self.x, axis=1)[:, np.newaxis]
        self.prediction = self.x / sum_x

        self.loss = -np.sum(np.log(self.prediction+1e-6) * self.label)  
        return self.loss / N

    def prediction_func(self, x):
        x = np.exp(x - np.max(x, axis=1)[:, np.newaxis])  
        sum_x = np.sum(x, axis=1)[:, np.newaxis]
        self.out = x / sum_x
        return self.out


    def gradient(self):
        self.eta = self.prediction.copy() - self.label
        return self.eta

In [45]:
class parameter():
    def __init__(self, w):
        self.data = w     
        self.grad = None  

In [46]:
from functools import reduce
class fc():
    def __init__(self, input_num, output_num, bias=True, requires_grad=True):
        self.input_num = input_num          
        self.output_num = output_num        
        self.requires_grad = requires_grad
        self.weight = parameter(np.random.randn(self.input_num, self.output_num) * (2/self.input_num**0.5))
        if bias:
            self.bias = parameter(np.random.randn(self.output_num))
        else:
            self.bias = None


    def forward(self, input):
        self.input_shape = input.shape    
        if input.ndim > 2:
            N, C, H, W = input.shape
            self.x = input.reshape((N, -1))
        elif input.ndim == 2:
            self.x = input
        else:
            print("fc.forward error")
        result = np.dot(self.x, self.weight.data)
        if self.bias is not None:
            result = result + self.bias.data
        return result


    def backward(self, eta, lr):
        N, _ = eta.shape
        next_eta = np.dot(eta, self.weight.data.T)
        self.weight.grad = np.reshape(next_eta, self.input_shape)

        
        x = self.x.repeat(self.output_num, axis=0).reshape((N, self.output_num, -1))
        self.W_grad = x * eta.reshape((N, -1, 1))
        self.W_grad = np.sum(self.W_grad, axis=0) / N
        self.b_grad = np.sum(eta, axis=0) / N


        self.weight.data -= lr * self.W_grad.T
        self.bias.data -= lr * self.b_grad

        return self.weight.grad

In [47]:
class Pooling():
    def __init__(self, kernel_size=(2, 2), stride=2, ):
        self.ksize = kernel_size
        self.stride = stride

    def forward(self, input):
        N, C, H, W = input.shape
        out = input.reshape(N, C, H//self.stride, self.stride, W//self.stride, self.stride)
        out = out.max(axis=(3,5))
        self.mask = out.repeat(self.ksize[0], axis=2).repeat(self.ksize[1], axis=3) != input
        return out

    def backward(self, eta):
        result = eta.repeat(self.ksize[0], axis=2).repeat(self.ksize[1], axis=3)
        result[self.mask] = 0
        return result

In [48]:
class BN():
    def __init__(self, channel, moving_decay=0.9, is_train=True):
        self.alpha = parameter(np.ones((channel,1,1)))
        self.beta = parameter(np.zeros((channel,1,1)))
        self.is_train = is_train
        self.eps = 1e-5 

        self.moving_mean = np.zeros((channel,1,1))
        self.moving_var = np.zeros((channel,1,1))
        self.moving_decay = moving_decay


    def forward(self, x, is_train=True):
        self.is_train = is_train
        N, C, H, W = x.shape
        self.x = x

        if self.is_train:       
            self.mean = np.mean(x, axis=(0,2,3))[:,np.newaxis, np.newaxis]
            self.var = np.var(x, axis=(0,2,3))[:,np.newaxis, np.newaxis]

            if (np.sum(self.moving_mean)==0) and (np.sum(self.moving_var)==0):
                self.moving_mean = self.mean
                self.moving_var = self.var
            else:
                self.moving_mean = self.moving_mean * self.moving_decay + (1-self.moving_decay) * self.mean
                self.moving_var = self.moving_var * self.moving_decay + (1-self.moving_decay) * self.var

            self.y = (x - self.mean) / np.sqrt(self.var + self.eps)
            return  self.alpha.data * self.y + self.beta.data
        else:   
            self.y = (x - self.moving_mean) / np.sqrt(self.moving_var + self.eps)
            return  self.alpha.data * self.y + self.beta.data


    def backward(self, eta, lr):
        N, _, H, W = eta.shape
        alpha_grad = np.sum(eta * self.y, axis=(0,2,3))
        beta_grad = np.sum(eta, axis=(0,2,3))

        yx_grad = (eta * self.alpha.data)
        ymean_grad = (-1.0 / np.sqrt(self.var +self.eps)) * yx_grad
        ymean_grad = np.sum(ymean_grad, axis=(2,3))[:,:,np.newaxis,np.newaxis] / (H*W)
        yvar_grad = -0.5*yx_grad*(self.x - self.mean) / (self.var+self.eps)**(3.0/2)
        yvar_grad = 2 * (self.x-self.mean) * np.sum(yvar_grad,axis=(2,3))[:,:,np.newaxis,np.newaxis] / (H*W)
        result = yx_grad*(1 / np.sqrt(self.var +self.eps)) + ymean_grad + yvar_grad


        self.alpha.data -= lr * alpha_grad[:,np.newaxis, np.newaxis] / N
        self.beta.data -=  lr * beta_grad[:, np.newaxis, np.newaxis] / N

        return result

In [49]:
class LeNet5():
    def __init__(self):
        self.conv1 = conv((6, 1, 5, 5), stride=1, padding='SAME', bias=True, requires_grad=True)
        self.pooling1 = Pooling(kernel_size=(2, 2), stride=2)
        self.BN1 = BN(6, moving_decay=0.9, is_train=True)
        self.relu1 = Relu()

        self.conv2 = conv((16, 6, 5, 5), stride=1, padding="VALID", bias=True, requires_grad=True)
        self.pooling2 = Pooling(kernel_size=(2, 2), stride=2)
        self.BN2 = BN(16, moving_decay=0.9, is_train=True)
        self.relu2 = Relu()

        self.conv3 = conv((120, 16, 5, 5), stride=1, padding="VALID", bias=True, requires_grad=True)

        self.fc4 = fc(120*1*1, 84, bias=True, requires_grad=True)
        self.relu4 = Relu()
        self.fc5 = fc(84, 10, bias=True, requires_grad=True)

        self.softmax = softmax()

    def forward(self, imgs, labels, is_train=True):
        x = self.conv1.forward(imgs)
        x = self.pooling1.forward(x)
        x = self.BN1.forward(x, is_train)
        x = self.relu1.forward(x)

        x = self.conv2.forward(x)
        x = self.pooling2.forward(x)
        x = self.BN2.forward(x, is_train)
        x = self.relu2.forward(x)

        x = self.conv3.forward(x)

        x = self.fc4.forward(x)
        x = self.relu4.forward(x)
        x = self.fc5.forward(x)

        loss = self.softmax.calculate_loss(x, labels)
        prediction = self.softmax.prediction_func(x)
        return loss, prediction


    def backward(self, lr):
        eta = self.softmax.gradient()

        eta = self.fc5.backward(eta, lr)
        eta = self.relu4.backward(eta)
        eta = self.fc4.backward(eta, lr)

        eta = self.conv3.backward(eta, lr)

        eta = self.relu2.backward(eta)  
        eta = self.BN2.backward(eta, lr)
        eta = self.pooling2.backward(eta)     
        eta = self.conv2.backward(eta, lr)

        eta = self.relu1.backward(eta)
        eta = self.BN1.backward(eta, lr)
        eta = self.pooling1.backward(eta)
        eta = self.conv1.backward(eta, lr)

In [50]:
def normalization(x):
    eps = 1e-5
    if x.ndim > 2:
        mean = np.mean(x, axis=(0, 2, 3))[:, np.newaxis, np.newaxis]
        var = np.var(x, axis=(0, 2, 3))[:, np.newaxis, np.newaxis]
        x = (x - mean) / np.sqrt(var + eps)
    else:
        mean = np.mean(x, axis=1)[:, np.newaxis]
        var = np.var(x, axis=1)[:, np.newaxis] + eps
        x = (x - mean) / np.sqrt(var)

    return x

In [51]:
import glob
import struct
import time
import os
def load_mnist(file_dir, is_images='True'):
    bin_file = open(file_dir, 'rb')
    bin_data = bin_file.read()
    bin_file.close()
    if is_images:
        fmt_header = '>iiii'
        magic, num_images, num_rows, num_cols = struct.unpack_from(fmt_header, bin_data, 0)
    else:
        fmt_header = '>ii'
        magic, num_images = struct.unpack_from(fmt_header, bin_data, 0)
        num_rows, num_cols = 1, 1
    data_size = num_images * num_rows * num_cols
    mat_data = struct.unpack_from('>' + str(data_size) + 'B', bin_data, struct.calcsize(fmt_header))
    mat_data = np.reshape(mat_data, [num_images, num_rows * num_cols])
    print('Load images from %s, number: %d, data shape: %s' % (file_dir, num_images, str(mat_data.shape)))
    return mat_data


def data_convert(x, y, m, k):
    x[x<=40]=0
    x[x>40]=1
    ont_hot_y = np.zeros((m,k))    
    for t in np.arange(0,m):
        ont_hot_y[t,y[t]]=1
    ont_hot_y=ont_hot_y.T
    return x, ont_hot_y


def load_data(mnist_dir, train_data_dir, train_label_dir, test_data_dir, test_label_dir):
    print('Loading MNIST data from files...')
    train_images = load_mnist(os.path.join(mnist_dir, train_data_dir), True)
    train_labels = load_mnist(os.path.join(mnist_dir, train_label_dir), False)
    test_images = load_mnist(os.path.join(mnist_dir, test_data_dir), True)
    test_labels = load_mnist(os.path.join(mnist_dir, test_label_dir), False)
    return train_images, train_labels, test_images, test_labels

mnist_dir = "mnist_data/"
train_data_dir = "train-images.idx3-ubyte"
train_label_dir = "train-labels.idx1-ubyte"
test_data_dir = "t10k-images.idx3-ubyte"
test_label_dir = "t10k-labels.idx1-ubyte"

train_images, train_labels, test_images, test_labels = load_data(mnist_dir, train_data_dir, train_label_dir, test_data_dir, test_label_dir)
print("Got data. ") 


            
        

Loading MNIST data from files...
Load images from mnist_data/train-images.idx3-ubyte, number: 60000, data shape: (60000, 784)
Load images from mnist_data/train-labels.idx1-ubyte, number: 60000, data shape: (60000, 1)
Load images from mnist_data/t10k-images.idx3-ubyte, number: 10000, data shape: (10000, 784)
Load images from mnist_data/t10k-labels.idx1-ubyte, number: 10000, data shape: (10000, 1)
Got data. 


In [53]:

batch_size = 64  
test_batch = 50  
epoch = 20
learning_rate = 1e-3

iterations_num = 0 

net = LeNet5()

for E in range(epoch):
    batch_loss = 0
    batch_acc = 0

    epoch_loss = 0
    epoch_acc = 0

    for i in range(train_images.shape[0] // batch_size):
        img = train_images[i*batch_size:(i+1)*batch_size].reshape(batch_size, 1, 28, 28)
        img = normalization(img)
        label = train_labels[i*batch_size:(i+1)*batch_size]
        loss, prediction = net.forward(img, label, is_train=True) 

        epoch_loss += loss
        batch_loss += loss
        for j in range(prediction.shape[0]):
            if np.argmax(prediction[j]) == label[j]:
                epoch_acc += 1
                batch_acc += 1

        net.backward(learning_rate)

        if (i+1)%50 == 0:
            print("   epoch:%5d , batch:%5d , acc:%.4f , loss:%.4f "
                  % (E+1, i+1, batch_acc/(batch_size*50), batch_loss/(batch_size*50)))
            

            batch_loss = 0
            batch_acc = 0



    print("--------------epoch:%5d , avg_acc:%.4f , avg_loss:%.4f---------------"
          % (E+1, epoch_acc/train_images.shape[0], epoch_loss/train_images.shape[0]))
    test_acc = 0
    for k in range(test_images.shape[0] // test_batch):
        img = test_images[k*test_batch:(k+1)*test_batch].reshape(test_batch, 1 ,28, 28)
        img = normalization(img)
        label = test_labels[k*test_batch:(k+1)*test_batch]
        _, prediction = net.forward(img, label, is_train=False)

        for j in range(prediction.shape[0]):
            if np.argmax(prediction[j]) == label[j]:
                test_acc += 1

    print("------------total_acc:%.4f---------------" % (test_acc / test_images.shape[0]))

   epoch:    1 , batch:   50 , avg_batch_acc:0.2122 , avg_batch_loss:0.0622 
   epoch:    1 , batch:  100 , avg_batch_acc:0.4009 , avg_batch_loss:0.0330 
   epoch:    1 , batch:  150 , avg_batch_acc:0.4713 , avg_batch_loss:0.0257 
   epoch:    1 , batch:  200 , avg_batch_acc:0.5969 , avg_batch_loss:0.0197 
   epoch:    1 , batch:  250 , avg_batch_acc:0.6175 , avg_batch_loss:0.0183 
   epoch:    1 , batch:  300 , avg_batch_acc:0.6884 , avg_batch_loss:0.0151 
   epoch:    1 , batch:  350 , avg_batch_acc:0.7266 , avg_batch_loss:0.0132 
   epoch:    1 , batch:  400 , avg_batch_acc:0.7378 , avg_batch_loss:0.0129 
   epoch:    1 , batch:  450 , avg_batch_acc:0.7741 , avg_batch_loss:0.0112 
   epoch:    1 , batch:  500 , avg_batch_acc:0.7575 , avg_batch_loss:0.0115 
   epoch:    1 , batch:  550 , avg_batch_acc:0.7994 , avg_batch_loss:0.0102 
   epoch:    1 , batch:  600 , avg_batch_acc:0.8087 , avg_batch_loss:0.0098 
   epoch:    1 , batch:  650 , avg_batch_acc:0.8153 , avg_batch_loss:0.0092 