In [0]:
from tensorflow.python.keras.layers import Input, RNN , Layer, Dense
from tensorflow.python.keras.models import Model
from tensorflow.python.keras.optimizers import adam 
import tensorflow as tf 
import numpy as np 
import pandas as pd 

[https://investing.com](https://www.investing.com/equities/kb-financial-group-historical-data)


KB 은행, 주식 데이터 다운로드
![Imgur](https://i.imgur.com/k2F31Df.png)

In [0]:
# 종가 데이터 가져오기
print(pd.read_csv('kb_stock_data.csv').head())
stock_data = pd.read_csv('kb_stock_data.csv')['Price']

# String to float
def _fn(str_):
    return float(str_.replace(',', '').replace('.',''))
stock_data = list(map(_fn, stock_data))

In [0]:
n_time_step = 5 

# stock data 을 2017~ 2019 년도 순으로 재배열 한다. 
rev_stock_data = stock_data[::-1]

# train dataset 과 validation dataset 을 나눈다. 
# 2019년 12월 데이터셋은 Test 데이터를 사용한다. 

test_dataset = rev_stock_data[-19:]
train_dataset = rev_stock_data[:-19]

def data_slicer(sample_array, n_time_step):
    """
    Description:
        [1, 2, 3, 4, 5, 6, 7, 8, 9,]
        <--------->
            <-------->
                <------->
        xs : [[1,2,3,4], [4,5,6,7], [7,8,9,10]]
        *window stride 는 1로 고정.
    
    Args:
        sample_array : 1D > array 
        n_time_step : int 
    
    Return:
        divided_array: 

    """
    n_move = len(sample_array) - n_time_step + 1
    sliced_array = []
    for idx in range(n_move):
        sliced_array.append(sample_array[idx:idx+n_time_step])
    return np.asarray(sliced_array)



In [0]:
# slice dataset
sliced_train = data_slicer(train_dataset, n_time_step + 1)

# generate train dataset / test dataset 
train_xs = sliced_train[:, :n_time_step]
train_ys = sliced_train[:, n_time_step:]

# RNN model need 3D Tensor 
train_xs = np.expand_dims(np.array(train_xs), axis=-1)

# normalization 
train_max = np.max(train_xs)
train_min = np.min(train_xs)
norm_train_xs = (train_xs - train_min ) / (train_max - train_min)
norm_train_ys = (train_ys - train_min ) / (train_max - train_min)


# normalization checking 
norm_train_ys.min(), norm_train_ys.max(), norm_train_xs.min(), norm_train_xs.max(), 
print(train_xs.shape, train_ys.shape)

In [0]:
# test_dataset 
sliced_test = data_slicer(test_dataset, n_time_step + 1)

# generate train dataset / test dataset 
test_xs = sliced_test[:, :n_time_step]
test_ys = sliced_test[:, n_time_step:]

# RNN model need 3D Tensor 
test_xs = np.expand_dims(np.array(test_xs), axis=-1)

# normalization
norm_test_xs = (test_xs - train_min ) / (train_max - train_min)
norm_test_ys = (test_ys - train_min ) / (train_max - train_min)

# normalization checking 
print(norm_test_ys.min(), norm_test_ys.max(), norm_test_xs.min(), norm_test_xs.max(), )
print(norm_test_xs.shape, norm_test_ys.shape)

In [0]:
class RNNCell(Layer):
    def __init__(self, n_units, **kwargs):
        self.state_size = n_units
        self.n_units = n_units
        super(RNNCell, self).__init__(**kwargs)
    
    def build(self, input_shape):
        self.w_x = self.add_weight(name='w_x',
                        shape=(input_shape[-1], self.n_units),
                        initializer=tf.initializers.glorot_normal())
        self.w_h = self.add_weight(name='w_h',
                        shape=(self.n_units, self.n_units),
                        initializer=tf.initializers.orthogonal())
        self.b = self.add_weight(name='b',
                        shape=(self.n_units),
                        initializer=tf.initializers.zeros())
        super(RNNCell, self).build(input_shape)
    
    def call(self, inputs, states):
        prev_state = states[0]
        z = tf.matmul(inputs, self.w_x) + tf.matmul(prev_state, self.w_h) + self.b 
        a = tf.tanh(z)
        return a, [a]
    

In [0]:
# from tensorflow.python.keras.layers import Input, RNN , Layer, Dense
tf.reset_default_graph()
inputs = Input(shape=(n_time_step, 1))
hidden = RNN(RNNCell(n_units=300), return_state=True)(inputs)
pred = Dense(1, activation='linear')(hidden[0])
model = Model(inputs, pred)

In [0]:
model.compile(loss='mse', optimizer=adam(0.0001))

In [0]:
model.fit(norm_train_xs, norm_train_ys, batch_size=128, epochs=240)

## Prediction Train Dataset

In [0]:
y_norm_hat = model.predict(norm_train_xs)
y_hat = y_norm_hat* (train_max - train_min) + train_min

In [0]:
import matplotlib.pyplot as plt 

plt.scatter(np.arange(len(train_ys)), y_hat, s=3, alpha=0.5, label='pred')
plt.scatter(np.arange(len(train_ys)), train_ys, s=3, alpha=0.5, label='true')
plt.legend()


## Prediction Test Dataset

In [0]:
test_norm_hat = model.predict(norm_test_xs)
test_hat = test_norm_hat * (train_max - train_min) + train_min

In [0]:
plt.scatter(np.arange(len(test_ys)), test_hat, s=16, alpha=0.5, label='pred')
plt.scatter(np.arange(len(test_ys)), test_ys, s=16, alpha=0.5, label='true')
plt.legend()
