In [17]:
import numpy as np

class Activation:
    
    def relu(self, x):
        return np.maximum(x, 0)
    def sigmoid(self, x):
        return 1 / (1 + np.exp(-x))
    def tanh(self, x):
        return np.tanh(x)
    def linear(self, x):
        return x
    def softplus(self, x):
        return np.log(1+np.exp(x))
    def softmax(self, x):
        y = np.empty(shape=x.shape)
        for i in range(y.shape[0]):
            exps = np.exp(x[i] - x[i].max())
            y[i] = exps / np.sum(exps)
            
        return y       
        #only softmax is not element-wise in act functions
        #exps = np.exp(x - x.max())
        #return exps / np.sum(exps)

    def g_relu(self, x):
        return 1 * (x > 0)
    def g_sigmoid(self, x):
        return (1 - x) * x
    def g_tanh(self, x):
        return 1 - x*x
    def g_linear(self, x):
        return 1 * (x==x)
    def g_softplus(self, x):
        return self.sigmoid(x)
    def g_softmax(self, x):
        y = np.empty(shape=x.shape)
        for i in range(y.shape[0]):
            dx_ds = np.diag(x[i]) - np.dot(x[i].T, x[i])
            y[i] = dx_ds.sum(axis=0)
        
        return y 
        #only softmax is not element-wise in act functions
        #dx_ds = np.diag(x) - np.dot(x.T, x)
        #return dx_ds.sum(axis=0) 
    
    def __init__(self, act):
        funcs = {
            "TANH" : self.tanh,
            "SIGMOID" : self.sigmoid,
            "RELU" : self.relu,
            "LINEAR" : self.linear,
            "SOFTPLUS" : self.softplus,
            "SOFTMAX" : self.softmax
        }

        grads = {
            "TANH" : self.g_tanh,
            "SIGMOID" : self.g_sigmoid,
            "RELU" : self.g_relu,
            "LINEAR" : self.g_linear,
            "SOFTPLUS" : self.g_softplus,
            "SOFTMAX" : self.g_softmax
        }
        self.act = act
        self.func = funcs[act]
        self.grad = grads[act]
        
        return       
    
    def __str__(self):
        s = "\nActivation:" + self.act
        return s

In [18]:
import random

class LinearLayer:

    def init_weights(self, num):
        w=[]
        for i in range(num): 
            w.append(random.uniform(-1, 1))
        return w            

    # W.shape = (n_node, n_in+1)
    def __init__(self, n_in, n_node, W=None):
        self.n_node = n_node
        self.n_in = n_in
        # W includes b, so the input size is n_in + 1 (i.e., x0==1) 
        size = n_node * (n_in+1)
        if W == None:
            weights = self.init_weights(size)
            weights = np.array(weights).reshape(n_node, n_in+1)
            scale = np.sqrt(2./size)
            self.W = weights * scale
        else: 
            nn, ni = W.shape
            if nn != n_node or ni != n_in+1:
                raise ValueError("Incorrect weights input")
            else:
                self.W = W  # W.shape = (n_node, n_in+1)
        
        # seperately initialize b, sometimes benefiting RELU
        # self.W[:,n_in] = 0.1 
        # self.W[:,n_in] = np.zeros(n_node)
        
        self.X = None       # X.shape = (n_sample, n_in+1)
        self.signal = None  # signal.shape = (n_sample, n_node) = g_signal.shape
        self.Y = None       # Y.shape = (n_sample, n_node)

        self.g_Y = None     # g_Y.shape = (n_sample, n_node)
        self.g_W = None       # G.shape = (n_in+1, n_node)
        self.g_X = None     # g_X.shape = (n_sample, n_in)
        return
    
    # input: X.shape = (n_sample, n_in)
    def forward(self, X):
        n_sample, n_in = X.shape
        self.X = np.ones(shape=(n_sample, n_in+1))
        self.X[:,:n_in] = X  # add column (x0=1)
        self.signal = np.dot(self.X, self.W.T)
        return self.signal

    # input: g_signal.shape = (n_sample, n_node)
    def backward(self, g_signal):
        n_sample, n_node = g_signal.shape
        self.g_W = np.dot(self.X.T, g_signal)/n_sample
        g_X = np.dot(g_signal, self.W)  # W.shape = (n_sample, n_in+1)
        self.g_X = g_X[:,:self.n_in]  # remove column (meaningless gradient to x0=1)
        return self.g_X

    def update(self, learning_rate):
        #set_trace()
        self.W = self.W - self.g_W.T * learning_rate
        return
            
    def __str__(self):
        s = "\nX is:\n"+str(self.X)
        s += "\nW is:\n" + str(self.W)
        s += "\nY is:\n"+str(self.Y)
        s += "\ng_Y is:\n"+str(self.g_Y)
        s += "\ng_W is:\n"+str(self.g_W)
        s += "\ng_X is:\n"+str(self.g_X)
        return s
        

In [19]:
class NeuronLayer(LinearLayer):
        
    def __init__(self, n_in, n_node, W=None, act="RELU"):
        super().__init__(n_in, n_node, W=None)
        self.activation = Activation(act)
        return
        
    def forward(self, X):
        n_sample,n_in = X.shape
        if n_in != self.n_in: 
            raise ValueError("Incorrect data input", n_in, self.n_in)
        # linear part first
        signal = super().forward(X)
        # nonlinear
        self.Y = self.activation.func(signal) 
        
        return self.Y
      
    def backward(self, g_Y):
        n_sample, n_node = g_Y.shape
        if n_node != self.n_node: 
            raise ValueError("Incorrect data input")
        self.g_Y = g_Y

        # nonlinear
        g_signal = self.activation.grad(self.Y)
        g_signal = g_signal * g_Y
        
        # linear
        self.g_X = super().backward(g_signal)
        return self.g_X

    def update(self, learning_rate):
        super().update(learning_rate)
        return

    def __str__(self):
        s = super().__str__()
        s += str(self.activation)
        return s
            

In [20]:
class SoftMaxLayer(NeuronLayer):
        
    def __init__(self, n_in, n_node, W=None):
        super().__init__(n_in, n_node, W=None, act="SOFTMAX")
        self.predict = None
        self.truth = None
        return
        
    def forward(self, X):
        # hidden part first, computing softmax
        self.Y = super().forward(X)
        self.predict = self.Y.argmax(axis=1)
        return self.predict
      
    def backward(self, T):
        # softmax gradient, dL/ds (combining the cost and activation layer)
        n_sample = T.size
        self.truth = np.zeros(shape=(n_sample, self.n_node))
        self.truth[np.arange(n_sample), T] = 1
        g_signal = (self.Y - self.truth)
        # then linear layer
        self.g_X = LinearLayer.backward(self, g_signal)
        return self.g_X
    
    def update(self, learning_rate):
        super().update(learning_rate)
        return
    
    def __str__(self):
        s = super().__str__()
        s += "\nPrediction:\n" + str(self.predict)
        s += "\nTruth:\n" + str(self.truth)
        return s
            

In [21]:
class MLP:

    def __init__(self, n_in, n_node, n_layer=1, output_layer=None):   
        self.n_in = n_in
        self.n_node = n_node if (n_node != None) else (n_in+1)
        if n_layer == 0:
            raise ValueError("n_layer should not be 0.") 

        self.n_layer = n_layer
        self.hidden_layer = list() 
        self.hidden_layer.append(NeuronLayer(n_in, n_node))
        for i in range(1, n_layer):
            self.hidden_layer.append(NeuronLayer(n_node, n_node))

        if output_layer == None:
            raise ValueError("Output_layer should be assigned.") 
        
        self.output_layer = output_layer
        self.learning_rate = 0.1
        return
    
    def forward(self, X):
        #set_trace()
        for i in range(self.n_layer):
            X = self.hidden_layer[i].forward(X)

        self.output_layer.forward(X)
        return
    
    def backward(self, Y):
        #set_trace()
        g_Y = self.output_layer.backward(Y)
        for i in range(self.n_layer-1, -1, -1):
            g_Y = self.hidden_layer[i].backward(g_Y)

        return
    
    def update(self):
        for i in range(self.n_layer):
            self.hidden_layer[i].update(self.learning_rate)

        self.output_layer.update(self.learning_rate)
        return
    
    def train_1batch(self, X, Y, learning_rate=0.1):
        if learning_rate != 0:
            self.learning_rate = learning_rate
            
        self.forward(X)
        self.backward(Y)
        self.update()
        return
    
    def predict_1sample(self, x):
        #set_trace()
        for i in range(self.n_layer):
            x = self.hidden_layer[i].forward(x)

        predict = self.output_layer.forward(x)
        return predict

In [22]:
import gzip
import struct

import matplotlib as mpl
import matplotlib.pyplot as plt

# pre-requirement: MNIST files from http://yann.lecun.com/exdb/mnist/ stored in local directory
class MnistInput:
    def __init__(self, data):
        if data == "training":
            zX = './train-images-idx3-ubyte.gz'
            zy = './train-labels-idx1-ubyte.gz'
        elif data == "testing":
            zX = './t10k-images-idx3-ubyte.gz'
            zy = './t10k-labels-idx1-ubyte.gz'
        else: raise ValueError("Incorrect data input")
        
        self.zX = zX
        self.zy = zy
        return
    
    def read(self, num):

        zX = self.zX
        zy = self.zy
        with gzip.open(zX) as fX, gzip.open(zy) as fy:
            magic, nX, rows, cols = struct.unpack(">IIII", fX.read(16))
            magic, ny = struct.unpack(">II", fy.read(8))
            if nX != ny: raise ValueError("Inconsistent data and label files")

            img_size = cols*rows
            if num <= 0 or num > nX: num = nX 
            for i in range(num):
                X = struct.unpack("B"*img_size, fX.read(img_size))
                y, = struct.unpack("B", fy.read(1))
                yield (X, y)
        return
    


In [23]:
from IPython.core.debugger import set_trace
class MNIST:

    def __init__(self, nn):
        self.nn = nn
        self.train_input = MnistInput("training")
        self.test_input = MnistInput("testing")
        self.n_epoch = 0
        return
       
    def train_in_batch(self, n_sample, batch_size=100, epochs=500, learning_rate=0.1):
        learning_rate /= (1 + self.n_epoch*0.1)
        self.n_epoch += 1
        n_ep = 0
        n_x = 0 
        X = []
        Y = []
        batch_size = batch_size if batch_size > 0 else 100
        epochs = epochs if epochs > 0 else 500
        for x, y in self.train_input.read(n_sample):
            n_x += 1
            X.append(x)
            Y.append(y)
            if n_x >= batch_size:
                n_ep += 1
                X = np.array(X)/255
                Y = np.array(Y)
                self.nn.train_1batch(X, Y, learning_rate)
                if  n_ep >= epochs: return

                n_x = 0
                X = []
                Y = []                
        return
    
    def test(self, n_sample):
        correct = 0
        total = 0
        for x, y in self.test_input.read(n_sample):
            X = np.array(x).reshape(1, len(x)) / 255
            predict = self.nn.predict_1sample(X)
            correct += 1 * (predict[0] == y)
            total += 1
            #print("\nPredict:", predict)
            #plt.imshow(x, cmap=mpl.cm.Greys)
            #plt.show()
        accuracy = correct/total
        return accuracy

In [24]:
def run_mnist():
    output_layer = SoftMaxLayer(28*28+1, 10)
    mlp = MLP(28*28, 28*28+1, output_layer = output_layer)
    mnist = MNIST(mlp)
    for i in range(10):
        mnist.train_in_batch(-1,100,500,1)
        accuracy = mnist.test(-1)
        print("\nEpoch {} accuracy: {}".format(i, accuracy))

In [25]:
# learn the index of number "3".
def run_test():
    out = SoftMaxLayer(4,3)
    mlp = MLP(3,4, output_layer = out)
    X = [
        [1,2,3],
        [2,1,3],
        [3,1,2],
        [3,2,1],
        [1,3,2],
        [2,3,1]
        ]
    Y = [2,2,0,0,1,1]
    X = np.array(X)
    Y = np.array(Y)

    for j in range(160):
        mlp.train_1batch(X, Y)

    '''
    for j in range(40):
        for i in range(X.shape[0]):
            mlp.train_1batch(np.array([X[i]]), np.array([Y[i]]))
    '''

    X = np.array([[1,2,3],[2,3,1],[3,2,2]])
    for i in range(X.shape[0]):
        predict = mlp.predict_1sample(np.array([X[i]]))
        print("\nPredict: ", X[i], predict[0])


In [None]:
def is_main_module():
    return __name__ == '__main__' and '__file__' not in globals()

In [None]:
if is_main_module():
    run_test()
    run_mnist()


Predict:  [1 2 3] 2

Predict:  [2 3 1] 1

Predict:  [3 2 2] 0

Epoch 0 accuracy: 0.9561

Epoch 1 accuracy: 0.9643

Epoch 2 accuracy: 0.9708

Epoch 3 accuracy: 0.9752

Epoch 4 accuracy: 0.9761

Epoch 5 accuracy: 0.9769

Epoch 6 accuracy: 0.9775
