In [1]:
### 라이브러리 ###
import tensorflow as tf
import numpy as np
import pandas as pd
import datetime
import matplotlib.pyplot as plt
from sklearn.metrics import mean_squared_error
from math import sqrt


ImportError: Could not find 'cudart64_100.dll'. TensorFlow requires that this DLL be installed in a directory that is named in your %PATH% environment variable. Download and install CUDA 10.0 from this URL: https://developer.nvidia.com/cuda-90-download-archive

In [None]:
### 하이퍼파라미터 ###
input_data_column_cnt = 5  # 입력 데이터의 컬럼 개수
output_data_column_cnt = 1  # 결과데이터의 컬럼 개수
seq_length = 120  # 시퀀스의 길이(시계열데이터 입력 개수)
rnn_cell_hidden_dim = 10  # 각 셀의 (hidden) 출력 크기
forget_bias = 1.0  # 망각편향(기본값 1.0)
num_stacked_layers = 3  # stacked LSTM layers 개수
keep_prob = 1.0  # dropout할 때 keep할 비율
epoch_num = 1500  # 에폭 횟수 (학습용 전체 데이터를 몇 회 반복해서 학습할 것인가 입력)
learning_rate = 0.05  # 학습률

In [None]:
### Function ###
# Normalization - 입력변수를 정규화함으로써, 학습성과 향상
def min_max_scaling(x):
    x_np = np.asarray(x)
    return (x_np - x_np.min()) / (x_np.max() - x_np.min())


# Reverse Normalization - 결과값 확인을 위해 정규화된 값을 역 정규화
def reverse_min_max_scaling(org_x, x):
    org_x_np = np.asarray(org_x)
    x_np = np.asarray(x)
    return (x_np * (org_x_np.max() - org_x_np.min())) + org_x_np.min()


# MAPE
def mean_absolute_percentage_error(y_true, y_pred):
    y_true, y_pred = np.array(y_true), np.array(y_pred)
    return np.mean(np.abs((y_true - y_pred) / y_true)) * 100


In [None]:
# Seed - 딥러닝 모형의 일정한 결과값을 위해
tf.reset_default_graph()
tf.set_random_seed(777)

In [None]:
### Data Loading ###
path = '/'
index_file_name = 'kospi.xlsx'
encoding = 'euc-kr'  # 문자 인코딩
raw_dataframe = pd.read_excel(path + index_file_name, encoding=encoding)

In [None]:
df = raw_dataframe[['종가','시가','고가','저가','거래량']]

In [None]:
### Data preprocessing ###
del raw_dataframe['일']  # Date 열 제거

# del raw_dataframe['Time'] # Time 열 제거 - 분봉데이터의 경우
index_info = raw_dataframe.values[0:].astype(np.float)  # 지수&거래량 문자열을 부동소수점으로 변환

# 지수와 거래량 데이터 정규화 - Scale 차이로 인해 별도 정규화
index = index_info[:, :-1]
norm_index = min_max_scaling(index)

volume = index_info[:, -1:]
norm_volume = min_max_scaling(volume)

# 정규화가 끝난 후 데이터 합치기
x = np.concatenate((norm_index, norm_volume), axis=1)

# 종속 변수 선정 - 종가지수
y = x[:, [-2]]

# 입력변수와 종속변수가 들어갈 리스트 생성 후 반복문을 통해 데이터 삽입
dataX = []
dataY = []

for i in range(0, len(y) - seq_length):
    _x = x[i: i + seq_length]
    _y = y[i + seq_length]
    dataX.append(_x)
    dataY.append(_y)

In [None]:
# 학습용/테스트용 데이터 생성
train_size = int(len(dataY) * 0.8)
test_size = len(dataY) - train_size

# 데이터를 잘라 학습용 데이터 생성
trainX = np.array(dataX[0:train_size])
trainY = np.array(dataY[0:train_size])

# 데이터를 잘라 테스트용 데이터 생성
testX = np.array(dataX[train_size:len(dataX)])
testY = np.array(dataY[train_size:len(dataY)])

# 텐서플로우 플레이스홀더 생성
# 입력 X, 출력 Y를 생성
X = tf.placeholder(tf.float32, [None, seq_length, input_data_column_cnt])
Y = tf.placeholder(tf.float32, [None, 1])

# 검증용 측정지표를 산출하기 위한 targets, predictions를 생성
targets = tf.placeholder(tf.float32, [None, 1])
predictions = tf.placeholder(tf.float32, [None, 1])

In [None]:
###모델(LSTM 네트워크) 생성 ###
def lstm_cell():
    cell = tf.contrib.rnn.BasicLSTMCell(num_units=rnn_cell_hidden_dim,
                                        forget_bias=forget_bias, state_is_tuple=True, activation=tf.nn.softsign)
    if keep_prob < 1.0:
        cell = tf.contrib.rnn.DropoutWrapper(cell, output_keep_prob=keep_prob)
    return cell


# num_stacked_layers개의 층으로 쌓인 Stacked LSTMs 생성
stackedLSTMs = [lstm_cell() for _ in range(num_stacked_layers)]
multi_cells = tf.contrib.rnn.MultiRNNCell(stackedLSTMs, state_is_tuple=True) if num_stacked_layers > 1 else lstm_cell()

# LSTM Cell 연결
hypothesis, _states = tf.nn.dynamic_rnn(multi_cells, X, dtype=tf.float32)

# 과거 여러 거래일의 주가를 이용해서 다음날의 주가 1개를 예측하기때문에 MANY-TO-ONE 형태
hypothesis = tf.contrib.layers.fully_connected(hypothesis[:, -1], output_data_column_cnt, activation_fn=tf.identity)

# 손실함수로 평균제곱오차를 사용한다
loss = tf.reduce_sum(tf.square(hypothesis - Y))

# 최적화함수로 AdamOptimizer를 사용한다
optimizer = tf.train.AdamOptimizer(learning_rate)
train = optimizer.minimize(loss)

# 학습용 데이터와 테스트용 데이터의 오류 기록을 위한 리스트 생성
rmse = tf.sqrt(tf.reduce_mean(tf.squared_difference(targets, predictions)))
train_error_summary = []
test_error_summary = []
test_predict = ''

# 텐서플로우 실행을 위한 세션 설정
sess = tf.Session()
sess.run(tf.global_variables_initializer())

# 학습 시작
start_time = datetime.datetime.now()  # 시작시간 기록
print('학습을 시작합니다')
for epoch in range(epoch_num):
    _, _loss = sess.run([train, loss], feed_dict={X: trainX, Y: trainY})
    if ((epoch + 1) % 100 == 0) or (epoch == epoch_num - 1):  # 100번째마다 또는 마지막 epoch인 경우
        # 학습용데이터로 rmse오차를 구한다
        train_predict = sess.run(hypothesis, feed_dict={X: trainX})
        train_error = sess.run(rmse, feed_dict={targets: reverse_min_max_scaling(index, trainY),
                                                predictions: reverse_min_max_scaling(index, train_predict)})
        train_error_summary.append(train_error)

        # 테스트용데이터로 rmse오차를 구한다
        test_predict = sess.run(hypothesis, feed_dict={X: testX})
        test_error = sess.run(rmse, feed_dict={targets: reverse_min_max_scaling(index, testY),
                                               predictions: reverse_min_max_scaling(index, test_predict)})
        test_error_summary.append(test_error)

        # 현재 오류 출력
        print("epoch: {}, train_error(A): {}, test_error(B): {}, B-A: {}".format(epoch + 1, train_error, test_error,
                                                                                 test_error - train_error))
end_time = datetime.datetime.now()  # 종료시간 기록
elapsed_time = end_time - start_time  # 경과시간 기록
print('elapsed_time:', elapsed_time)
print('elapsed_time per epoch:', elapsed_time / epoch_num)

# 정규화된 데이터를 역정규화
trainX = reverse_min_max_scaling(index, trainX)
trainY = reverse_min_max_scaling(index, trainY)
testX = reverse_min_max_scaling(index, testX)
testY = reverse_min_max_scaling(index, testY)
test_predict = reverse_min_max_scaling(index, test_predict)

# LSTM 모형 성능 평가
rmse = sqrt(mean_squared_error(testY, test_predict))
mape = mean_absolute_percentage_error(testY, test_predict)
print("rmse:", rmse)
print("mape:", mape)

# 결과 (.txt) 생성
path = 'C:/Users/CanTuna/Desktop/졸업 논문/'
title = ('LSTM' + '_' + str(seq_length) + '_' + str(rnn_cell_hidden_dim) + '_' + str(num_stacked_layers) + '_' + str(
    epoch_num) + '_' + str(learning_rate))
result = open(path + title + '.txt', 'wt')
result.write('testY' + '\t' + 'test_predict' + '\n')
for i in range(len(testY)):
    result.write(str(testY[i][0]) + '\t' + str(test_predict[i][0]) + '\n')
result.write('Total RMSE: ' + str(rmse) + '\n')
result.write('Total MAPE: ' + str(mape) + '\n')
result.close()

# 결과 그래프 출력
plt.figure(1)
plt.figure(figsize=(12, 6))
plt.plot(testY, 'r')
plt.plot(test_predict, 'b')
plt.xlabel('Time Period')
plt.ylabel('Index')
plt.show()
