In [1]:
import numpy as np

In [2]:
class Tanh:
    def forward(self, x):
        return np.tanh(x)
    def backward(self, x, diff):
        output = np.tanh(x)
        return (1.0 - np.square(output)) * diff
    
class sigmoid:
    def forward(self, x):
        return 1.0/(1.0+np.exp(-x))
    def backward(self, x, diff):
        output = self.forward(x)
        return (1.0-output)*output*diff
    
class softmax:
    def predict(self, x):
        return np.exp(x)/np.sum(np.exp(x))
    def loss(self, x, y):
        probs = self.predict(x)
        return -np.log(probs[y])
    def diff(self, x, y):
        probs = self.predict(x)
        probs[y] -= 1.0
        return probs

In [3]:
class MultiplyGate:
    def forward(self, x, w):
        return np.dot(x, w)
    
    def backward(self, x, w, dz):
        dw = np.dot(x.T, dz)
        dx = np.dot(dz, w.T)
        return dw, dx
    
class AddGate:
    def forward(self, x1, x2):
        return x1 + x2
    
    def backward(self, x1, x2, dz):
        dx1 = np.dot(dz.T, np.ones([dz.shape[0], x1.shape[1]]))
        dx2 = np.dot(dz.T, np.ones([dz.shape[0], x2.shape[1]]))
        return dx1, dx2

In [4]:
mulGate = MultiplyGate()
addGate = AddGate()
activation = Tanh()

class RNNLayer:
    def forward(self, x, prev_s, U, W, V):
        self.mulu = mulGate.forward(x, U)
        self.mulw = mulGate.forward(x, prev_s)
        self.adduw = addGate.forward(self.mulu, self.mulw)
        self.state = activation.forward(self.adduw)
        self.mulo = mulGate.forward(self.state, V)
    
    def backward(self, x, prev_s, U, W, V, diff, dmulv):
        self.forward(x, prev_s, U, W, V)
        dV, dVx = mulGate.backward(x, V, dmulv)
        dadd = activation.backward(self.adduw, dVx)
        dmulu, dmulw = addGate.backward(self.mulu, dmulw, dadd)
        dU, dUx = mulGate.backward(self.x, U, dmulu)
        dW, dWx = mulGate.backward(prev_s, W, dmulw)
        return dU, dW, dV
        

In [5]:
output = softmax()

In [7]:
class RNN:
    def __init__(self, input_dim, hidden_nodes, output_dim, lr = 0.001, bptt_truncate = 4):
        self.input_dim = input_dim
        self.hidden_nodes = hidden_nodes
        self.output_dim = output_dim
        self.U = np.random.random([input_dim, hidden_nodes])*0.01
        self.W = np.random.random([hidden_nodes, hidden_nodes])*0.01
        self.V = np.random.random([hidden_nodes, output_dim])*0.01
        self.lr = lr
        self.bptt_truncate = bptt_truncate

    def forward(self, x):  
        # total number of time steps
        # each steps input a word
        self.time_steps = len(x)
        layers = []
        prev_s = np.zeros([self.hidden_nodes])
        for t in range(time_steps):
            layer = RNNLayer()
            input_vec = np.zeros(self.word_dim)
            input_vec[x[t]] = 1
            layer.forward(input_vec, prev_s, self.U, self.W, self.V)
            prev_s = layer.state
            layers.append(layer)
        return layers
    
    def backward(self, x, y):
        dU = np.zeros_like(self.U)
        dW = np.zeros_like(self.W)
        dV = np.zeros_like(self.V)
        layers = self.forward(x)
        for t in range(self.time_steps):
            dmulv = output.diff(layers[t],mulv, y[t])
            input_vec = np.zeros(self.word_dim)
            input_vec[x[t]] = 1
            dU_t, dW_t, dV_t = layers[t].backward(input_vec, prev_s, self.U, self.W, self.V, dmulv)
            for i in range(t-1,max(-1, t-self.bptt_truncate-1),-1):
                input_vec = np.zeros(self.word_dim)
                input_vec[x[i]] = 1
                prev_s_i = np.zeros(self.hidden_nodes) if i == 0 else layers[i-1].state
                dU_i, dW_i, dV_i = layers[i].backward(input_vec, prev_s_i, self.U, self.W, self.V, dmulv)
                dU_t += dU_i
                dW_t += dW_i
                dV_t += dV_i
            dU += dU_t
            dW += dW_t
            dV += dV_t
        return dU, dW, dV
    
    def sgd_optimizer(self, x, y, lr):
        dU, dW, dV = self.backward(x,y)
        self.U -= lr*dU
        self.W -= lr*dW
        self.V -= lr*dV
    
    def caculate_loss(self, x, y):
        loss = 0.0
        for example in range(len(y)):
            single_loss = 0.0
            for j in range(x.shape[1])
                layer = self.forward(x[example][j])
                single_loss += output.loss(layer.mulv, y[example][j])
            loss += (single_loss/x.shape[1])
            
    
    def train(self, x, y, lr=0.005, nepoch=100, evaluate_loss_after=5):     
        for epoch in nepoch:
            if epoch % evaluate_loss_after == 0:
                loss = self.caculate_loss(x,y)
                print("Epoch=%d   Loss=%f" % (epoch, loss))
            for i in range(len(y)):
                self.sgd_optimizer(x[i], y[i], lr)

In [118]:
def generate_data(binary_dim, largest_number, int2binary):
    a = np.random.randint(largest_number/2)
    b = np.random.randint(largest_number/2)
    c = a + b
    return a,b,c,int2binary[a], int2binary[b], int2binary[c]

int2binary = {}
binary_dim = 8
largest_number = pow(2, binary_dim)
binary = np.unpackbits(np.array([range(largest_number)],dtype=np.uint8).T,axis=1)
for i in range(largest_number):
    int2binary[i] = binary[i]    

rnn = RNN(2, 16, 1)
for j in range(10000):
    # a, b, c is the [1, binaray_dim] vector
    inta, intb, intc, a, b, c = generate_data(binary_dim, largest_number, int2binary)
    x = np.stack((a,b))
    y = np.array(c)
    # hidden layer : input + prev_hidden
    reverse_x = np.fliplr(x)
    reverse_y = y[::-1]
    states, output = rnn.forward(reverse_x)
    #loss = rnn.backward(reverse_x, reverse_y, states, output)
    if j%1000 == 0:
        print(inta,intb,c,"error: ",loss)
        print('predict: ',end="")
        for i in output[::-1]:
            if i >= 0.5:
                print("1",end=" ")
            else:
                print("0",end=" ")
        print("\n")