In [57]:
import os, sys
import numpy as np
import random
import math

In [58]:
def sigmoid(x):
    res = 1 / (1.0 + np.exp(-x))
    return np.clip(res, 1e-8, 1-(1e-8))


def rand_arr(a, b, *args):
    # generate a random array in the range [a, b) 
    np.random.seed(0)
    return np.random.rand(*args) * (b - a) + a

In [59]:
class LossLayer:
    @classmethod
    def loss(self, pred, label):
        return (pred[0] - label) **2
    
    @classmethod
    def bottom_diff(self, pred, label):
        diff = np.zeros_like(pred)
        diff[0] = 2 * (pred[0] - label)
        return diff

    

class LstmParam:
    def __init__(self, mem_cell_ct, x_dim):
        self.mem_cell_ct = mem_cell_ct
        self.x_dim = x_dim
        concat_len = x_dim + mem_cell_ct
        
        # initialize gate weght
        self.wg = rand_arr(-0.1, 0.1, mem_cell_ct, concat_len)
        self.wi = rand_arr(-0.1, 0.1, mem_cell_ct, concat_len)
        self.wf = rand_arr(-0.1, 0.1, mem_cell_ct, concat_len)
        self.wo = rand_arr(-0.1, 0.1, mem_cell_ct, concat_len)
        
        # initialize gate bias
        self.bg = rand_arr(-0.1, 0.1, mem_cell_ct)
        self.bi = rand_arr(-0.1, 0.1, mem_cell_ct)
        self.bf = rand_arr(-0.1, 0.1, mem_cell_ct)
        self.bo = rand_arr(-0.1, 0.1, mem_cell_ct)
        
        # initialize diff of loss
        self.wg_diff = np.zeros((mem_cell_ct, concat_len))
        self.wi_diff = np.zeros((mem_cell_ct, concat_len))
        self.wf_diff = np.zeros((mem_cell_ct, concat_len))
        self.wo_diff = np.zeros((mem_cell_ct, concat_len))
        self.bg_diff = np.zeros(mem_cell_ct)
        self.bi_diff = np.zeros(mem_cell_ct)
        self.bf_diff = np.zeros(mem_cell_ct)
        self.bo_diff = np.zeros(mem_cell_ct)
        
    def apply_diff(self, lr = 1):
        # update the weight & bias
        self.wg -= lr * self.wg_diff
        self.wi -= lr * self.wi_diff
        self.wf -= lr * self.wf_diff
        self.wo -= lr * self.wo_diff
        
        self.bg -= lr * self.bg_diff
        self.bi -= lr * self.bi_diff
        self.bf -= lr * self.bf_diff
        self.bo -= lr * self.bo_diff
        
        # reset the diff to zero
        self.wg_diff = np.zeros_like(self.wg)
        self.wi_diff = np.zeros_like(self.wi)
        self.wf_diff = np.zeros_like(self.wf)
        self.wo_diff = np.zeros_like(self.wo)
        
        self.bg_diff = np.zeros_like(self.bg)
        self.bg_diff = np.zeros_like(self.bg)
        self.bg_diff = np.zeros_like(self.bg)
        self.bg_diff = np.zeros_like(self.bg)
        # np.zeros_like:Return an array of zeros with the same shape and type as a given array.

In [60]:
class LstmState:
    def __init__(self, mem_cell_ct, x_dim):
        self.g = np.zeros(mem_cell_ct) # candidates vector
        self.i = np.zeros(mem_cell_ct) # input gate
        self.f = np.zeros(mem_cell_ct) # forget gate
        self.o = np.zeros(mem_cell_ct) # output gate
        self.s = np.zeros(mem_cell_ct) # inside state
        self.h = np.zeros(mem_cell_ct) # actual output
        
        self.bottom_diff_h = np.zeros_like(self.h)
        self.bottom_diff_s = np.zeros_like(self.s)
        self.bottom_diff_x = np.zeros(x_dim)

In [61]:
class LstmNode:
    def __init__(self, lstm_param, lstm_state):
        self.state = lstm_state # store reference to parameters and to activations
        self.param = lstm_param
        self.x = None
        self.xc = None
        
    def bottom_data_is(self, x, s_prev = None, h_prev = None):
        # if this if the first lstm node in the network
        if s_prev is None: s_prev = np.zeros_like(self.state.s)
        if h_prev is None: h_prev = np.zeros_like(self.state.h)
        
        # store data for backprogation
        self.s_prev = s_prev
        self.h_prev = h_prev
        
        xc = np.hstack((x, h_prev)) # xc(t) = [x(t), h(t-1)]
        # Stack arrays in sequence horizontally (column wise).
        
        # update the gate weight
        self.state.g = np.tanh(np.dot(self.param.wg, xc) + self.param.bg) # candidates vector
        self.state.i = sigmoid(np.dot(self.param.wi, xc) + self.param.bi) # input gate
        self.state.f = sigmoid(np.dot(self.param.wf, xc) + self.param.bf) # froget gate
        self.state.o = sigmoid(np.dot(self.param.wo, xc) + self.param.bo) # output gate
        
        # through 4 gate to generate output
        self.state.s = self.state.g * self.state.i + s_prev *self.state.f # inside state
        self.state.h = self.state.s * self.state.o # actual output
        
        self.x = x
        self.xc = xc
        
        
    def top_diff_is(self, top_diff_h, top_diff_s):
        ds = self.state.o * top_diff_h + top_diff_s
        do = self.state.s * top_diff_h
        di = self.state.g * ds
        dg = self.state.i * ds
        df = self.s_prev * ds
        
        # diffs w.r.t vector inside sigma / tanh function
        di_input = (1. - self.state.i) * self.state.i * di
        df_input = (1. - self.state.f) * self.state.i * df
        do_input = (1. - self.state.o) * self.state.i * do
        dg_input = (1. - self.state.g ** 2) * dg
        
        # input layer error
        self.param.wi_diff += np.outer(di_input, self.xc)
        self.param.wf_diff += np.outer(df_input, self.xc)
        self.param.wo_diff += np.outer(do_input, self.xc)
        self.param.wg_diff += np.outer(dg_input, self.xc)
        self.param.bi_diff += di_input
        self.param.bf_diff += df_input
        self.param.bo_diff += do_input
        self.param.bg_diff += dg_input
        
        # bottom error
        dxc = np.zeros_like(self.xc)
        dxc += np.dot(self.param.wi.T, di_input)
        dxc += np.dot(self.param.wf.T, df_input)
        dxc += np.dot(self.param.wo.T, do_input)
        dxc += np.dot(self.param.wg.T, dg_input)
        
        # store bottom error
        self.state.bottom_diff_s = ds * self.state.f
        self.state.bottom_diff_x = dxc[:self.param.x_dim]
        self.state.bottom_diff_h = dxc[self.param.x_dim:]

In [62]:
class LstmNetwork():
    def __init__(self, lstm_param):
        self.lstm_param = lstm_param
        self.lstm_node_list = [] # input state
        self.x_list = [] # input sequence
        
    def y_list_is(self, y_list, loss_layer):
        # update label by loss
        assert len(y_list) == len(self.x_list) # if condition is false, show error
        idx = len(self.x_list) - 1
        
        # calculate the loss
        loss = loss_layer.loss(self.lstm_node_list[idx].state.h, y_list[idx])
        diff_h = loss_layer.bottom_diff(self.lstm_node_list[idx].state.h, y_list[idx])
        
        # here s is not affecting loss due to h(t+1), hence we set equal to zero
        diff_s = np.zeros(self.lstm_param.mem_cell_ct)
        self.lstm_node_list[idx].top_diff_is(diff_h, diff_s)
        idx -= 1
        
        while idx >= 0:
            loss += loss_layer.loss(self.lstm_node_list[idx].state.h, y_list[idx])
            diff_h = loss_layer.bottom_diff(self.lstm_node_list[idx].state.h, y_list[idx])
            diff_h += self.lstm_node_list[idx + 1].state.bottom_diff_h
            
            diff_s = self.lstm_node_list[idx+1].state.bottom_diff_s
            self.lstm_node_list[idx].top_diff_is(diff_h, diff_s)
            idx -= 1
        return loss
    
    
    def x_list_clear(self):
        self.x_list = []
        
    def x_list_add(self, x):
        self.x_list.append(x)
        if len(self.x_list) > len(self.lstm_node_list):
            lstm_state = LstmState(self.lstm_param.mem_cell_ct, self.lstm_param.x_dim)
            # initialize gate value
            self.lstm_node_list.append(LstmNode(self.lstm_param, lstm_state))
            # initialize LstmNode param
            
        idx = len(self.x_list) - 1
        if idx == 0:
            self.lstm_node_list[idx].bottom_data_is(x)
        else:
            s_prev = self.lstm_node_list[idx - 1].state.s
            h_prev = self.lstm_node_list[idx - 1].state.h
            self.lstm_node_list[idx].bottom_data_is(x, s_prev, h_prev)
            # generate a output through 4 gate
            
        

# Run Model

In [72]:
#reload(sys)
#sys.setdefaultencoding('utf-8')

np.random.seed(0)
def Samples(x_dim, N):
    ylabels = [np.random.uniform(-0.5,0.5) for i in range(N)]
    xinputs = [np.random.random(x_dim) for i in ylabels]
    return xinputs, ylabels

if __name__  == "__main__":
    x_dim = 10     # dimension of input data
    maxiter = 500
    N=100 # number of data
    
    x_list, y_list = Samples(x_dim, N)
    # generating data
    
    mem_cell_ct = 10 # dimension of Storage unit
    concat_len = x_dim + mem_cell_ct
    
    # initializing weight
    lstm_param = LstmParam(mem_cell_ct, x_dim)
    lstm_net = LstmNetwork(lstm_param)
    
    # Training model 
    for cur_iter in range(maxiter):
        ypredlist = []
        for ind in range(len(y_list)):
            lstm_net.x_list_add(x_list[ind])
            
            ypredlist.append((ind, lstm_net.lstm_node_list[ind].state.h[0]))
            
        loss = lstm_net.y_list_is(y_list, LossLayer)
        lstm_param.apply_diff(lr=0.1)
        lstm_net.x_list_clear()
        
        if (cur_iter)%50==0:
            print('cur iter:', cur_iter)
            print('loss:', loss)
    
    

cur iter: 0
loss: 9.39479643568
cur iter: 50
loss: 8.38579906229
cur iter: 100
loss: 8.38579906229
cur iter: 150
loss: 8.38579906229


KeyboardInterrupt: 