In [1]:
import numpy as np
import pandas as pd
import math
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import r2_score
from sklearn.model_selection import train_test_split

In [2]:
data = pd.read_csv('data.csv')

In [3]:
def sigmoid(x):
    return 1 / (1 + np.exp(-x))

def sigmoid_diff(y):
    return y * (1 - y)

def tanh(x):
    return np.tanh(x)

def tanh_diff(y):
    return 1 - y * y

def create_sequences(data, window_size):
    x, y = [], []
    for i in data.index:
        if i + 10 + 1 > data.index[-1]:
            break
        x.append(data.iloc[i:i+10])
        y.append(data.iloc[i+10+1])
    return np.array(x), np.array(y)

In [28]:
class Parameter:
    def __init__(self, name, shape, weight_sd, c=0):
        self.name = name
        self.value = np.random.rand(shape[0], shape[1]) * weight_sd + c
        self.diff = np.zeros_like(self.value)
        self.m = np.zeros_like(self.value)

class LSTM:
    def __init__(self, learning_rate, max_iterations, time_step, input_shape, hidden_layers=10):
        self.learning_rate = learning_rate
        self.max_iterations = max_iterations
        self.time_step = time_step
        self.input_shape = input_shape
        self.hidden_layers = hidden_layers
        self.initialize_parameters()
        
    def initialize_parameters(self):
        # Parameters for Forgot gate
        self.W_F = Parameter('W_F', shape=(self.hidden_layers, self.hidden_layers+self.input_shape[0]), weight_sd=0.1, c=0.5)
        self.B_F = Parameter('B_F', shape=(self.hidden_layers, 1), weight_sd=0.1) 
        
        # Parameters for Input gate
        self.W_I = Parameter('W_I', shape=(self.hidden_layers, self.hidden_layers+self.input_shape[0]), weight_sd=0.1, c=0.5)
        self.B_I = Parameter('B_I', shape=(self.hidden_layers, 1), weight_sd=0.1)
        
        # Parameters for Ouput gate
        self.W_C = Parameter('W_C', shape=(self.hidden_layers, self.hidden_layers+self.input_shape[0]), weight_sd=0.1, c=0.5)
        self.B_C = Parameter('B_C', shape=(self.hidden_layers, 1), weight_sd=0.1)
        self.W_O = Parameter('W_O', shape=(self.hidden_layers, self.hidden_layers+self.input_shape[0]), weight_sd=0.1, c=0.5)
        self.B_O = Parameter('B_O', shape=(self.hidden_layers, 1), weight_sd=0.1)
        
        # Parameters for Output
        self.W_V = Parameter('W_V', shape=(self.input_shape[1], self.hidden_layers), weight_sd=0.1, c=0.5)
        self.B_V = Parameter('B_V', shape=(self.input_shape[1], 1), weight_sd=0.1)
        
    def forward(self, x):
        self.x = x
        h = [np.zeros((self.hidden_layers, 1)) for _ in range(self.time_step)]
        c = [np.zeros((self.hidden_layers, 1)) for _ in range(self.time_step)]
        for step in range(1, self.time_step):
            z = np.vstack((h[step-1], x[step]))
            f = sigmoid((self.W_F.value @ z) + self.B_F.value)
            i = sigmoid((self.W_I.value @ z) + self.B_I.value)
            c_bar = tanh((self.W_C.value @ z) + self.B_C.value)

            c[step] = (f * c[step-1]) + (i * c_bar)

            o = sigmoid((self.W_O.value @ z) + self.B_O.value)
            # print('o\t{}\nh\t{}\nc\t{}'.format(o.shape, h[step].shape, c[step].shape))
            h[step] = o * tanh(c[step])
        self.f, self.i, self.c_bar, self.c, self.o, self.h = f, i , c_bar, c, o, h
        v = self.W_V.value @ self.h[-1] + self.B_V.value
        
        return v
        
    
    def backward(self, y, y_pred):
        h_diff = [np.zeros((self.hidden_layers, 1)) for _ in range(self.time_step + 1)]
        c_diff = [np.zeros((self.hidden_layers, 1)) for _ in range(self.time_step + 1)]
        
        delta_e = y - y_pred
        
        self.W_V.diff = delta_e * self.h[-1].T
        self.B_V.diff = delta_e
        
        for step in reversed(range(self.time_step)):
            h_diff[step] = self.W_V.value.T @ delta_e + h_diff[step+1]
            o_diff = tanh(self.c[step]) * h_diff[step] * sigmoid_diff(self.h[step])
            # print('o_diff\t{}\nh_diff\t{}'.format(o_diff.shape, h_diff[step].shape))
            c_diff = self.o[step] * h_diff[step] * tanh_diff(self.c[step]) + c_diff[step+1]
            c_bar_diff = self.i[step] * c_diff[step] * tanh_diff(self.c_bar[step])
            i_diff = self.c_bar[step] * c_diff[step] * sigmoid_diff(self.i[step])
            f_diff = self.c[step-1] * c_diff[step] * sigmoid_diff(self.f[step])
            
            z = np.vstack((self.h[step-1], self.x[step]))
            
            self.W_F.diff += f_diff @ z.T
            self.B_F.diff += f_diff
            
            self.W_I.diff += i_diff @ z.T
            self.B_I.diff += i_diff
            self.W_O.diff += o_diff @ z.T
            self.B_O.diff += o_diff
            
            self.W_C.diff += c_diff @ z.T
            self.B_C.diff += c_diff
    
    def get_parameters(self):
        return [self.W_F, self.B_F, 
                self.W_I, self.B_I,
                self.W_C, self.B_C,
                self.W_O, self.B_O,
                self.W_V, self.B_V]
    
    def clear_gradients(self):
        for parameter in self.get_parameters():
            parameter.diff = np.zeros_like(parameter.value)
        
    def clip_gradients(self):
        for parameter in self.get_parameters():
            np.clip(parameter.diff, -1, 1, out=parameter.diff)
            
    def update_parameters(self):
        for parameter in self.get_parameters():
            if parameter.name is 'W_V':
                parameter.value += self.learning_rate * parameter.diff
            else:
                parameter.value += self.learning_rate * (parameter.diff/self.time_step)
            
    def fit(self, x, y):
        for epoch in range(self.max_iterations):
            loss = 0
            for i in range(len(x)):
                y_pred = self.forward(x[i]) 
                loss += ((y[i] - y_pred)**2)/2
                self.backward(y[i], y_pred)
                self.update_parameters()
                self.clear_gradients()
            print('{}/{} - loss : {}'.format(epoch, self.max_iterations, loss/len(x)))
            
    def predict(self, x):
        y_pred = []
        for i in range(len(x)):
            y_pred.append(self.forward(x[i]))
        return np.concatenate(y_pred)


In [5]:
input_data = data.iloc[:,[1]]
scaler = StandardScaler()
input_data = scaler.fit_transform(input_data)
input_data = pd.DataFrame(input_data)
x, y = create_sequences(input_data, 10)
x_train, x_test, y_train, y_test = train_test_split(x, y, train_size=0.7, random_state=5)

In [6]:
print('x_train\t{}'.format(x_train.shape))
print('x_test\t{}'.format(x_test.shape))
print('y_train\t{}'.format(y_train.shape))
print('y_test\t{}'.format(y_test.shape))

x_train	(2105, 10, 1)
x_test	(903, 10, 1)
y_train	(2105, 1)
y_test	(903, 1)


In [29]:
model = LSTM(learning_rate=0.01, max_iterations=13, time_step=6, input_shape=(1, 1), hidden_layers=10)

In [30]:
model.fit(x_train, y_train)

0/13 - loss : [[0.552002]]
1/13 - loss : [[0.4786441]]
2/13 - loss : [[0.21009338]]
3/13 - loss : [[0.06296894]]
4/13 - loss : [[0.01997713]]
5/13 - loss : [[0.05599385]]
6/13 - loss : [[0.00751818]]
7/13 - loss : [[0.04040022]]
8/13 - loss : [[0.01934626]]
9/13 - loss : [[0.06808437]]
10/13 - loss : [[0.0370212]]
11/13 - loss : [[0.01517296]]
12/13 - loss : [[0.01118506]]


In [31]:
y_pred = model.predict(x_test)

In [32]:
print(r2_score(y_test, y_pred))

0.9825568209872819
