In [1]:
import numpy as np

class RecurLayer:
    
    # n_iter controls internal recurrence within one sample. When it is 0, no internal recurrence.
    # External recurrence always happens between samples, if they are forwarded consecutively.
    # Backwarding/updating will reset the context.
    def __init__(self, n_x, n_h, n_iter=1, acti="TANH"):
        assert n_iter
        self.n_x = n_x
        self.n_h = n_h        
        
        self.wx = np.random.uniform(-1, 1, (n_h, n_x)) * 0.1       
        self.wh = np.ones(shape=(n_h, n_h)) 
        self.bh = np.random.uniform(-1, 1, (n_h, 1)) * 0.1

        # for debugging purpose
        #self.wx = np.ones(shape=(n_h, n_x))       
        #self.wh = np.ones(shape=(n_h, n_h)) 
        #self.bh = np.ones(shape=(n_h, 1)) 
        
        self.acti = Activation(acti)
        self.n_iter = n_iter
        self.n_recur = None
        # initialize input/output and gradients data structure
        self.reset_context()
        
    #              bh
    # x -> (wx) -> + -> a -> (acti) -> -> h     
    #              L- <-(wh)---------J 
    #                                   
    def forward_1sample(self, x):
        #set_trace()
        assert self.n_x == x.size
        self.X.append(x)
        
        s = np.dot(self.wx, x) 
        h = self.H[-1][-1]
        local_H = [] 
        # internal recurrence forward
        for i in range(self.n_iter):
            a = s + np.dot(self.wh, h) + self.bh
            h = self.acti.func(a)
            local_H.append(h)
        
        self.H.append(local_H)
        return h
    
    def forward(self, nx):
        self.n_recur = nx.shape[0]
        # external recurrent forward
        for i in range(self.n_recur):
            h = self.forward_1sample(nx[i])
        
        return h

    def backward_1sample(self, g_h):
        #set_trace()
        assert g_h.size == self.n_h
        x = self.X.pop()
        local_H = self.H.pop()
        # internal recurrence backward
        for i in reversed(range(self.n_iter)):
            h = local_H.pop()
            g_signal = self.acti.grad(h)
            g_a = g_signal * g_h
            self.g_bh += g_a
            self.g_wh += np.outer(g_a, h)
            self.g_wx += np.outer(g_a, x)
            self.g_x += np.dot(self.wx.T, g_a)
            g_h = np.dot(self.wh.T, g_a)
        
        return g_h

    def backward(self, g_h):
        assert g_h.size == self.n_h
        # external recurrent backward
        self.g_bh = self.g_wh = self.g_wx = self.g_x = 0
        for i in range(self.n_recur):
            g_h = self.backward_1sample(g_h)
        
        return self.g_x
        
    def update(self, learning=0.01):
        self.wx -= self.g_wx * learning
        self.wh -= self.g_wh * learning
        self.bh -= self.g_bh * learning
        init_h = np.zeros(shape=(self.n_h, 1))
        orig_h = self.H.pop().pop()
        assert (orig_h == init_h).all() and self.H == []
        self.reset_context()
        return
    
    def reset_context(self):
        self.X = []
        init_h = np.zeros(shape=(self.n_h, 1))
        self.H = [[init_h]]
        self.g_bh = self.g_wh = self.g_wx = self.g_x = 0
        self.n_recur = None
        return
        
    def __str__(self):
        s = "\nX is:\n"+str(self.X)
        s += "\nwx is:\n" + str(self.wx)
        s += "\nwh is:\n" + str(self.wh)
        s += "\nbh is:\n" + str(self.bh)
        s += "\nH is:\n"+str(self.H)
        s += "\ng_H is:\n"+str(self.g_h)
        s += "\ng_wx is:\n"+str(self.g_wx)
        s += "\ng_wh is:\n"+str(self.g_wh)
        s += "\ng_bh is:\n"+str(self.g_bh)
        s += "\ng_X is:\n"+str(self.g_x)
        
        return s
        
class RecurNet:
    
    def __init__(self, n_x, n_h, n_y, h_acti="TANH", y_acti="RELU", 
                 n_iter=1, outlayer=None, learning=0.01):

        n_h = n_h if n_h > 0 else (n_x + n_y) // 2
        self.n_x = n_x
        self.recur = RecurLayer(n_x, n_h, n_iter, h_acti)
        self.output = outlayer
        if outlayer is None:
            self.output = PercepLayer(n_h, n_y, y_acti)
        
        self.learning = learning
        return
    
    def forward(self, x):
        h = self.recur.forward(x)
        y = self.output.forward(h)
        return y
    
    def backward(self, g_y):
        g_h = self.output.backward(g_y)
        g_x = self.recur.backward(g_h)
        return g_x

    def update(self):
        self.output.update(self.learning)
        self.recur.update(self.learning)
        return
    
    def predict_1sample(self, x):
        n_recur = x.size // self.n_x
        x = x.reshape(n_recur, self.n_x, 1)
        y = self.forward(x)
        self.recur.reset_context()
        return y
    
    def train_1sample(self, x, label):
        #set_trace()
        n_recur = x.size // self.n_x
        x = x.reshape(n_recur, self.n_x, 1)
        self.forward(x)
        self.backward(label)
        self.update()
        return
        

In [2]:
%run 'multilayer-perceptron.ipynb'

In [6]:
def run_rnn_test():
    n_x=3; n_h=4; n_class=3
    outlayer = SoftMaxLayer(n_h, n_class)
    rnn = RecurNet(n_x, n_h, n_class, n_iter=1, outlayer=outlayer, h_acti="SIGMOID", learning=0.1)
    X = [
        [1,2,3],
        [2,1,3],
        [3,1,2],
        [3,2,1],
        [1,3,2],
        [2,3,1]
        ]
    Y = [2,2,0,0,1,1]
    X = np.array(X)
    Y = np.array(Y)

    for j in range(40):
#        if j % 10 == 0:
#            print(rnn.recur)
#            print(rnn.output)
            
        for i in range(X.shape[0]):
            rnn.train_1sample(X[i].reshape(-1,1), Y[i])

    X = np.array([[1,2,3],[2,3,1],[3,1,2]])
    for i in range(X.shape[0]):
        predict = rnn.predict_1sample(X[i].reshape(-1,1))
        print("\nPredict: ", X[i], predict)

if __name__  == '__main__':
    run_rnn_test()
    


Predict:  [1 2 3] 2

Predict:  [2 3 1] 1

Predict:  [3 1 2] 0


In [4]:
%run 'mnist.ipynb'
def run_rnn_mnist():
    n_x = 28; n_h= 28*3+1; n_class=10
    outlayer = SoftMaxLayer(n_h, n_class)
    rnn = RecurNet(n_x, n_h, n_class, n_iter=1,
                   outlayer=outlayer, h_acti="RELU", learning=0.001)
    mnist = MNIST(rnn, folder="../convolution-network")
    for i in range(5):
        mnist.train(-1)
        accuracy = mnist.test(-1)
        print("\nAccuracy of epoch {} is {}".format(i, accuracy))
    return mnist

mnist = None
if __name__  == '__main__':
    mnist = run_mlp_mnist()


Accuracy of epoch 0 is 0.955

Accuracy of epoch 1 is 0.9622

Accuracy of epoch 2 is 0.9681

Accuracy of epoch 3 is 0.97

Accuracy of epoch 4 is 0.9728
