In [41]:
import numpy as np

# Explanation

(1)hidden_size: output dimension 

-> result of hidden_state: h_t.shape = (hidden_size, 1)

(2)vstack: stack in row direction

 -> rsult of vstack: concat_vector.shape = (input_size + hidden_size , 1)

* but it can only same size col.





In [42]:
class activation_function:
    def __init__(self):
        pass

    def sigmoid(self, x):
        return 1 / (1 + np.exp(-x))
    
    def tanh(self, x):
        return np.tanh(x)

class LSTM(activation_function):
    def __init__(self, hidden_size, concat_size):
        # z  = alpha(w @ [h, x] + b) 
        # alpha = activation function
        self.hidden_size = hidden_size
        self.concat_size = concat_size
        self.w_f = np.random.randn(hidden_size, concat_size)
        self.b_f = np.zeros((hidden_size, 1))
        self.w_o = np.random.randn(hidden_size, concat_size)
        self.b_o = np.zeros((hidden_size, 1))
        self.w_c = np.random.randn(hidden_size, concat_size)
        self.b_c = np.zeros((hidden_size, 1))
        self.w_i = np.random.randn(hidden_size, concat_size)
        self.b_i = np.zeros((hidden_size, 1))
        self.cache = None
    
    def forget_gate(self, concat_vector):
        value = (self.w_f @ concat_vector) + self.b_f
        f_t = self.sigmoid(value)
        return f_t
    
    def input_gate(self, concat_vector):
        value = (self.w_i @ concat_vector) + self.b_i
        i_t = self.sigmoid(value)
        return i_t
    
    def candidate_memory(self, concat_vector):
        value = (self.w_c @ concat_vector) + self.b_c
        c_t_hat = np.tanh(value)
        return c_t_hat
    
    def output_gate(self ,concat_vector):
        value = (self.w_o @ concat_vector) + self.b_o
        o_t = self.sigmoid(value)
        return o_t

    def cell_state_update(self, c_prev ,concat_vector):
        c_t_hat = self.candidate_memory(concat_vector)
        f_t = self.forget_gate(concat_vector)
        i_t = self.input_gate(concat_vector)
        c_t = (f_t * c_prev) + (i_t * c_t_hat)
        return c_t
    
    #print summarized information
    def hidden_state(self, c_t, concat_vector):
        h_t = self.output_gate(concat_vector) * np.tanh(c_t)
        return h_t
    
    def forward(self, h_prev, x_t , c_prev):
        concat_vector = np.vstack((h_prev, x_t))
        c_t = self.cell_state_update(c_prev, concat_vector)
        h_t = self.hidden_state(c_t, concat_vector)
        self.cache = {
            'h_prev': h_prev,
            'x_t': x_t,
            'c_prev': c_prev,
            'f_t': self.forget_gate(concat_vector),
            'i_t': self.input_gate(concat_vector),
            'c_t_hat': self.cell_state_update(c_prev, concat_vector),
            'o_t': self.output_gate(concat_vector),
            'c_t': c_t,
            'h_t': h_t,
            'concat': concat_vector
        }
        return h_t, c_t
    
    def backward(self, dh, dc_next=None):
        # 가져오기
        f_t = self.cache['f_t']
        i_t = self.cache['i_t']
        c_t_hat = self.cache['c_t_hat']
        o_t = self.cache['o_t']
        c_t = self.cache['c_t']
        c_prev = self.cache['c_prev']
        h_prev = self.cache['h_prev']
        x_t = self.cache['x_t']
        concat = self.cache['concat']

        # c_t에 대한 미분 (from dh)
        dtanh = o_t * (1 - np.tanh(c_t)**2)
        dc = dh * dtanh  # ∂L/∂c_t

        if dc_next is not None:
            dc += dc_next  # c_t는 이전 time step에도 영향을 주기 때문에 합산

        # 각 게이트에 대한 미분
        do = dh * np.tanh(c_t)
        do_raw = do * o_t * (1 - o_t)

        df = dc * c_prev
        df_raw = df * f_t * (1 - f_t)

        di = dc * c_t_hat
        di_raw = di * i_t * (1 - i_t)

        dc_hat = dc * i_t
        dc_hat_raw = dc_hat * (1 - c_t_hat**2)

        # weight gradient 계산
        self.dw_f = df_raw @ concat.T
        self.db_f = df_raw

        self.dw_i = di_raw @ concat.T
        self.db_i = di_raw

        self.dw_c = dc_hat_raw @ concat.T
        self.db_c = dc_hat_raw

        self.dw_o = do_raw @ concat.T
        self.db_o = do_raw

        # 다음 타임스텝으로 넘겨줄 gradient
        dconcat = (self.w_f.T @ df_raw +
                self.w_i.T @ di_raw +
                self.w_c.T @ dc_hat_raw +
                self.w_o.T @ do_raw)  # shape: (hidden + input, 1)

        dh_prev = dconcat[:self.hidden_size, :]
        dx = dconcat[self.hidden_size:, :]
        dc_prev = dc * f_t

        return dh_prev, dc_prev

    def update(self, lr = 0.01):
        self.w_f -= lr * self.dw_f
        self.b_f -= lr * self.db_f

        self.w_i -= lr * self.dw_i
        self.b_i -= lr * self.db_i

        self.w_c -= lr * self.dw_c
        self.b_c -= lr * self.db_c

        self.w_o -= lr * self.dw_o
        self.b_o -= lr * self.db_o

        

# example_data_set

(1)lstm only get 3_dimension data_set
    
    -> we need to data process

(2) because data is just an array of numbers,

* we need to create a rule to input into lstm.

    def create_sequences divides an array of numbers into time_steps,

    and creates multiple samples.

In [43]:
train_set = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
test_set = [1, 2, 3, 4, 5, 6]
train_set = np.array(train_set)
test_set = np.array(test_set)

def create_sequences(data, time_steps =6):
    x, y = [], []
    for i in range(len(data) - time_steps): #10 - 3 = 7
        seq_x = data[i:i + time_steps]
        #0~6, 1~7, 2~8, 3~9, 4~10 (sample)
        seq_y = data[i + time_steps]
        #7, 8, 9, 10 (correct label)
        x.append(seq_x)
        y.append(seq_y)
    return np.array(x), np.array(y)

#==================make matrix==================
train_data_x , train_data_y = create_sequences(train_set)
#==================check data shape=============
print(train_data_x.shape, train_data_y.shape)

#==================reshape 3_dimension==========
train_data_x = train_data_x.reshape((train_data_x.shape[0], train_data_x.shape[1], 1))
print(train_data_x.shape)

(4, 6) (4,)
(4, 6, 1)


# ex_1 basic lstm

- this result is not accurate. 

    so we need backward definition.

In [44]:
# input_size = 1
# hidden_size = 4
# lstm = LSTM(input_size, concat_size = input_size + hidden_size)

# h = np.zeros((hidden_size, 1))
# c = np.zeros((hidden_size, 1))

# last_seq = test_set[-3: ]
# for i in range(100):
#     for i in last_seq:
#         x_t = np.array([i])
#         h, c = lstm.forward(h, x_t, c)

# next_value = h.mean()
# print(next_value)



# Dense & backward

(1) dense class is reshape lstm result with scalar

(2) calculator class is 

In [45]:
class Dense:
    def __init__(self, input_size):
        self.w = np.random.randn(1, input_size) * 0.01
        self.b = np.zeros((1,1))
        self.lr = 0.01
    
    def forward(self, h):
        self.h = h
        y_pred = self.w @ h + self.b
        return y_pred
    
    def backward(self, dy):
        self.dw = dy @ self.h.T
        self.db = dy
        self.dh = self.w.T @ dy
        return self.dh
    
    def update(self):
        self.w -= self.lr * self.dw
        self.b -= self.lr * self.db

class calculator:
    def __init__(self):
        pass
    
    def mse_loss(self, y_pred, y_true):
        return np.mean((y_pred - y_true) ** 2)
    
    def mse_grad(self, y_pred, y_true):
        return 2 * (y_pred - y_true)
    
    

In [46]:
input_size = 1
hidden_size = 8
lstm = LSTM(hidden_size, concat_size = hidden_size + input_size)
dense = Dense(input_size = hidden_size)
cal = calculator()
epochs = 1000

for epoch in range(epochs):
    total_loss = 0
    for i in range(len(train_data_x)):
        x_seq = train_data_x[i]
        y_true = train_data_y[i]

        h = np.zeros((hidden_size, 1))
        c = np.zeros((hidden_size, 1))
        
        for t in range(x_seq.shape[0]):
            x_t = x_seq[t].reshape(-1,1)
            h, c = lstm.forward(h, x_t, c)

        y_pred = dense.forward(h)
        loss = cal.mse_loss(y_pred, y_true)
        dy = cal.mse_grad(y_pred, y_true)

        dh = dense.backward(dy)
        dh, dc = lstm.backward(dh)

        dense.update()
        lstm.update()

        total_loss += loss
    
    if epoch % 100 == 0:
        print(f"[{epoch}] Loss: {total_loss/len(train_data_x):.4f}")

test_input = test_set
h = np.zeros((hidden_size, 1))
c = np.zeros((hidden_size, 1))
for t in range(test_input.shape[0]):
    x_t = test_input[t].reshape(1,1)
    h, c = lstm.forward(h, x_t, c)

y_pred = dense.forward(h)
print(y_pred.item())



[0] Loss: 69.0937
[100] Loss: 0.9174
[200] Loss: 0.9844
[300] Loss: 0.7992
[400] Loss: 0.3992
[500] Loss: 0.3049
[600] Loss: 0.1400
[700] Loss: 0.1324
[800] Loss: 0.1130
[900] Loss: 0.1114
7.1891586492689585
