# 주가 예측 모델 구축

## Dataset: 삼성전자 주가 데이터

## 1. 주가 데이터 로드

### FinanceDataReader는 주가 데이터를 편리하게 가져올 수 있는 파이썬 패키지임.

In [None]:
# pip install finance-datareader
# pip install bs4

In [None]:
# import FinanceDataReader as fdr

In [None]:
# 삼성전자(005930) 전체 (1998-08-21 ~ 현재)
# samsung = fdr.DataReader('005930')
# samsung.to_csv('./data/samsung_stock.csv')

### 컬럼 설명

- `Open`:   시가
- `High`:   고가
- `Low`:    저가
- `Close`:  종가
- `Volume`: 거래량
- `Change`: 대비

In [None]:
# FinanceDataReader 미설치 시 데이터 직접 로드

import pandas as pd
samsung=pd.read_csv('./data/samsung_stock.csv', index_col='Date')

In [None]:
# 주가 데이터프레임의 형태 살펴보기

display(samsung.head(5))
display(samsung.tail(5))

In [None]:
# 주가 그래프 시각화

samsung.Close.plot(grid=True)

## 2. Data Preprocessing

In [None]:
# 2020-01-01 부터 2021-12-31 까지를 학습 데이터로 활용
# 2022-01-01 부터 현재까지를 테스트 데이터로 활용

train = samsung.loc[(samsung.index >= '2020-01-01') & (samsung.index < '2022-01-01')] 
test = samsung.loc[(samsung.index >= '2022-01-01')]

In [None]:
# 예측 모델링을 위한 X, y 정의 (X: Close를 제외한 값, Y: Close) -> Open, High, Low, Volume, Change를 이용해서 Close를 예측하는 task

X_train_org, y_train_org = train.drop(["Close","Change"], axis=1), train.Close
X_test_org, y_test_org = test.drop(["Close", "Change"], axis=1), test.Close

In [None]:
X_train_org

In [None]:
y_train_org

In [None]:
X_test_org

In [None]:
y_test_org

In [None]:
# feature scaling

from sklearn.preprocessing import MinMaxScaler, StandardScaler

scaler_X = MinMaxScaler()
X_train_arr = scaler_X.fit_transform(X_train_org)
X_test_arr = scaler_X.transform(X_test_org)

scaler_y = MinMaxScaler()
y_train_arr = scaler_y.fit_transform(y_train_org.values.reshape(-1,1))
y_test_arr = scaler_y.transform(y_test_org.values.reshape(-1,1))


In [None]:
# 시계열 형태의 데이터로 전환

import numpy as np

def build_timeseries_dataset(X, y, seq_length):
    X_list = []
    y_list = []
    for i in range(len(X)-seq_length):
        seq = X[i:i+seq_length]
        label = y[i+seq_length]
        
        X_list.append(seq)
        y_list.append(label)

    
    return np.array(X_list), np.array(y_list)

In [None]:
# 시계열 형태의 데이터로 전환

WINDOW_SIZE=10

X_train, y_train = build_timeseries_dataset(X_train_arr, y_train_arr, WINDOW_SIZE)
X_test, y_test = build_timeseries_dataset(X_test_arr, y_train_arr, WINDOW_SIZE)


print('Training shape:', X_train.shape, y_train.shape)
print('Test shape:', X_test.shape, y_test.shape)

In [None]:
# numpy array를 모두 torch tensor 형태로 전환

import torch

X_train = torch.from_numpy(X_train).float()
X_test = torch.from_numpy(X_test).float()

y_train = torch.from_numpy(y_train).float()
y_test = torch.from_numpy(y_test).float()

print('Training shape:', X_train.shape, y_train.shape)
print('Test shape:', X_test.shape, y_test.shape)

In [None]:
# Dataset 정의

from torch.utils.data import Dataset

class Dataset(Dataset):
    
    def __init__(self, X, Y):
        self.X = X
        self.Y = Y

    def __len__(self):
    
        return len(self.X)

    def __getitem__(self, idx):

        return self.X[idx], self.Y[idx]
    
train_dataset = Dataset(X_train, y_train)
test_dataset = Dataset(X_test, y_test)


In [None]:
## TensorDataset 이용한 Dataset 정의

# from torch.utils.data import TensorDataset

# train_dataset = TensorDataset(X_train, y_train)
# test_dataset = TensorDataset(X_test, y_test)

In [None]:
#DataLoader 정의


from torch.utils.data import DataLoader

train_loader = DataLoader(train_dataset, batch_size = 8, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size =8, shuffle=False)

## 3. Model Building

In [None]:
# 모델 정의

import torch.nn as nn

class Net(nn.Module):
    def __init__(self, input_size, rnn_h_size, fnn_h_size):
        super(Net, self).__init__()
        
        # RNN 함수 정의
        self.rnn = nn.RNN(input_size= input_size, hidden_size=rnn_h_size, num_layers=1, batch_first=True)
        #self.rnn = nn.LSTM(input_size= input_size, hidden_size=rnn_h_size, num_layers=1, batch_first=True)
        #self.rnn = nn.GRU(input_size= input_size, hidden_size=rnn_h_size, num_layers=1, batch_first=True)

        # fully-connected layer 함수 정의
        self.fc1 = nn.Linear(rnn_h_size, fnn_h_size)
        self.fc2 = nn.Linear(fnn_h_size, 1)

        # nonlinearity - ReLU 함수 정의
        self.relu = nn.ReLU()


    
    def forward(self, x):
        
        # RNN layer 
        x, _ = self.rnn(x)
        #x, (hidden, c) = self.rnn(x)

        # fully-connected layers
        x = self.fc1(x[:,-1])
        x = self.relu(x)
        x = self.fc2(x)

        return x



In [None]:
input_size = X_train.shape[-1]
rnn_h_size = input_size
fnn_h_size = 128

net = Net(input_size = input_size, rnn_h_size = rnn_h_size, fnn_h_size = fnn_h_size )

## 4. Define a Loss Function and Optimizer

In [None]:
# hyperparameter 설정

learning_rate = 1e-4
num_epochs = 200

In [None]:
# loss function 및 optimizer 설정

import torch.optim as optim

criterion = nn.MSELoss()
optimizer = optim.Adam(net.parameters(), lr=learning_rate)

## 5. Train the Network

In [None]:
from sklearn.metrics import mean_squared_error

In [None]:
# 여러 epoch을 반복하며 RNN 모델 학습

for epoch in range(num_epochs):

    running_loss = 0.0
    for i, data in enumerate(train_loader):
        # get the inputs; data is a list of [inputs, labels]
        inputs, labels = data

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = net(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()


    
        # 전체 test set에 대한 RMSE, MAE, R2
        pred_list = []


    # 모델 평가 시에는 gradient 계산을 할 필요가 없음
    with torch.no_grad():
        for data in test_loader:
            sequences, labels = data
            
            # 예측값 출력
            pred = net(sequences)
            pred_list.append(pred)

    pred_list = np.vstack(pred_list)
    pred_inverse = scaler_y.inverse_transform(pred_list)

    true_labels = y_test_org[10:]

    rmse = mean_squared_error(pred_inverse, true_labels)**0.5

    print('epoch: %d, iter: %d, loss: %.6f test rmse: %.6f'%((epoch+1), i, loss.item(), rmse))
            
    
    print('%d epoch processed...'%(epoch+1))

print('Finished Training')

In [None]:
# 학습된 모델 저장하기

PATH = './samsung_stock_rnn.pth'
torch.save(net.state_dict(), PATH)

In [None]:
# 저장된 모델의 Parameter 불러오기

net = Net(input_size = X_train.shape[-1], rnn_h_size = rnn_h_size, fnn_h_size = fnn_h_size)
net.load_state_dict(torch.load(PATH))

## 5. Test the Network on the Test Data

In [None]:
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

In [None]:
# 전체 test set에 대한 RMSE, MAE, R2

pred_list = []


# 모델 학습을 종료하였으므로 gradient 계산을 할 필요가 없음
with torch.no_grad():
    for data in test_loader:
        sequences, labels = data
        
        # 예측값 출력
        pred = net(sequences)
        pred_list.append(pred)

pred_list = np.vstack(pred_list)
pred_inverse = scaler_y.inverse_transform(pred_list)

true_labels = y_test_org[10:]

rmse = mean_squared_error(pred_inverse, true_labels)**0.5
mae = mean_absolute_error(pred_inverse, true_labels)
r2 = r2_score(pred_inverse, true_labels)

print('2023-01-01 부터 현재까지 주가에 대한 RMSE:', rmse)
print('2023-01-01 부터 현재까지 주가에 대한 MAE:', mae)
print('2023-01-01 부터 현재까지 주가에 대한 R2:', r2)

In [None]:
true_labels

In [None]:
from matplotlib import pyplot as plt

fig = plt.figure(figsize=(8,3))
plt.plot(np.arange(len(pred_inverse)), pred_inverse, label = 'pred')
plt.plot(np.arange(len(true_labels)), true_labels, label = 'true')

plt.title("prediction result plot")

plt.legend()
plt.show()