<a href="https://colab.research.google.com/github/codithj/stock_prediction_with_ml/blob/main/stock_predict.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**구글 드라이브 연동**

In [None]:
import os
print(os.getcwd())
!ls


In [None]:
from google.colab import drive
drive.mount('/content/gdrive/')





**라이브러리 임포팅**

In [None]:
import os
import sys
import warnings
if not sys.warnoptions:
  warnings.simplefilter('ignore')
import time
import tensorflow as tf
from sklearn.metrics import r2_score, mean_squared_error
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
sns.set_style('darkgrid')
import pandas as pd

S&P500 index 데이터 불러오기


In [None]:

df = pd.read_csv("/content/gdrive/My Drive/study/data/sp500_index.csv",
                 usecols=[0,1,2,3,4],
                 index_col=[0],
                 parse_dates=True)

print((df.isna().sum()))
df.tail()

In [None]:
df.plot(figsize=(20, 5))
plt.title('Prices for S&P500')
plt.ylabel('Close price ($)')
plt.legend(loc='best')
plt.show()

**훈련, 테스트 데이터 준비**


In [None]:
# 손실함수 MAPE
def mean_absolute_percentage_error(y_true, y_pred):
  y_true, y_pred = np.array(y_true), np.array(y_pred)
  return np.mean(np.abs((y_true - y_pred) / y_true)) * 100


# 훈련, 테스트, 검증 데이터로 분리
def split_sequence(sequence, n_steps, train_size):
  X = list() 
  y = list()

  for i in range(len(sequence)):
    # 배열의 끝인지 확인
    offset = i + n_steps
    if offset > len(sequence) - 1:
      break

    # 입력과 출력 분리
    seq_x = sequence[i:offset]
    seq_y = sequence[offset]

    X.append(seq_x)
    y.append(seq_y)

  split_X = int(len(X) * train_size)
  split_y = int(len(y) * train_size)

  # 훈련 데이터
  X_train = np.array(X[:split_X])
  y_train = np.array(y[:split_y])

  split_x_val = int(len(X[int(split_X):]) * 0.5)
  split_y_val = int(len(y[int(split_y):]) * 0.5)
  print(f'훈련데이터 사이즈: {(split_x_val, split_y_val)}')

  # 테스트 데이터
  X_test = np.array(X[split_X:(split_X + split_x_val)])
  y_test = np.array(y[split_y:(split_y + split_y_val)])

  # 검증 데이터
  X_val = np.array(X[(split_X + split_x_val):])
  y_val = np.array(y[(split_y + split_y_val):])

  return X_train, X_test, y_train, y_test, X_val, y_val

In [None]:
# 입력 시퀀스
raw_seq = list(df['Close'].values)
raw_seq_len = len(raw_seq)
raw_seq_desc = str(raw_seq_len) + ' days, ' + str(raw_seq_len / 5) + ' weeks';
print(f'입력 데이터의 기간:  {raw_seq_desc}')

# time steps 설정
n_steps = 7

# 데이터 분리
X_train, X_test, y_train, y_test, X_val, y_val = split_sequence(raw_seq,
                                                                n_steps,
                                                                train_size=0.9)
# 데이터 현황
print('데이터 현황: ',
      (len(X_train),
       len(X_test),
       len(X_val),
       len(y_train),
       len(y_test),
       len(y_val)))

model_name = {}

**Prediction with LSTM**

In [None]:
# reshape from [samples, timesteps] into [samples, timesteps, features]
n_features = 1
X_train = X_train.reshape((X_train.shape[0], X_train.shape[1], n_features))
X_test = X_test.reshape((X_test.shape[0], X_test.shape[1], n_features))
X_val = X_val.reshape((X_val.shape[0], X_val.shape[1], n_features))

# LSTM 모델
model_lstm = tf.keras.Sequential([
    tf.keras.layers.LSTM(200,
                         activation='relu',
                         input_shape=(n_steps, n_features)),
    tf.keras.layers.Dense(1)
])

model_lstm.compile(optimizer='adam', loss='mape')
print((model_lstm.summary()))

In [None]:
# fit model
fit_model = model_lstm.fit(
  X_train,
  y_train,
  epochs=50,
  verbose=2,
  callbacks=[tf.keras.callbacks.EarlyStopping()],
  validation_data=(X_val, y_val))

In [None]:
# prediction 결과
yhat = model_lstm.predict(X_test, verbose=0)
r2 = round(r2_score(y_test, yhat), 3)
mape = round(mean_absolute_percentage_error(y_test, yhat), 3)

file_model = './models/lstm.h5'
model_name[r2] = file_model
print(f'모델 위치: {model_name}')
model_lstm.save(file_model)

print(f'결과 -> r2: {r2} MAPE: {mape}')

In [None]:
plt.figure(figsize=(20, 4))
plt.plot(fit_model.history['loss'], label='train_loss')
plt.plot(fit_model.history['val_loss'], label='val_loss')
plt.legend(loc='best')
plt.show()

plt.figure(figsize=(20, 4))
plt.plot(y_test, color='b', label='Actual')
plt.plot(yhat, color='r', label='Pred')
plt.legend(loc='best')
plt.show()