In [10]:
import os
import time

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader
import torchinfo

from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.model_selection import train_test_split  

device = 'cuda' if torch.cuda.is_available() else 'cpu'
device

'cpu'

In [12]:
df = pd.read_csv("dataset/005930.KS.csv")
df = df.set_index('Date')
df.drop(columns="Adj Close", inplace=True)
df.head()

Unnamed: 0_level_0,Open,High,Low,Close,Volume
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2000-01-04,6000.0,6110.0,5660.0,6110.0,74195000
2000-01-05,5800.0,6060.0,5520.0,5580.0,74680000
2000-01-06,5750.0,5780.0,5580.0,5620.0,54390000
2000-01-07,5560.0,5670.0,5360.0,5540.0,40305000
2000-01-10,5600.0,5770.0,5580.0,5770.0,46880000


In [13]:
#### Dataset 구성
df_y = df['Close'].to_frame()
df_X = df
print(df_X.shape, df_y.shape)

(6122, 5) (6122, 1)


In [14]:
#### 데이터 전리리
X_scaler = StandardScaler()
y_scaler = MinMaxScaler()
X = X_scaler.fit_transform(df_X)
y = y_scaler.fit_transform(df_y)
print(X.shape, y.shape)

(6122, 5) (6122, 1)


In [22]:
### X: 50일치 데이터 , y: 51일째 주가
time_steps = 50
data_X = []
data_y = []

for idx in range(0, y.size - time_steps):
    _X = X[idx:time_steps+idx]
    _y = y[time_steps+idx]
    data_X.append(_X)
    data_y.append(_y)

In [24]:
np.shape(data_X), np.shape(data_y)

((6072, 50, 5), (6072, 1))

In [27]:
### Train / test set 분리
X_train, X_test, y_train, y_test = train_test_split(data_X, data_y, test_size=0.2)
print(np.shape(X_train), np.shape(X_test))
X_train, X_test, y_train, y_test = (
    np.array(X_train, dtype="float32"),
    np.array(X_test, dtype='float32'),
    np.array(y_train, dtype='float32'),
    np.array(y_test, dtype='float32')
)
print(X_train.dtype)

(4857, 50, 5) (1215, 50, 5)
float32


In [32]:
### Dataset, DataLoader 구성
train_set = TensorDataset(torch.tensor(X_train), torch.tensor(y_train))
test_set = TensorDataset(torch.tensor(X_test), torch.tensor(y_test))
print('Dataset : ' , len(train_set), len(test_set))
train_loader = DataLoader(train_set, batch_size=200, shuffle=True, drop_last=True)
test_loader = DataLoader(test_set, batch_size=200)
print(len(train_loader), len(test_loader))

Dataset :  4857 1215
24 7


In [41]:
#### 모델 정의
class StockPriceModel(nn.Module):

    def __init__(self, input_size, hidden_size, num_layers, bidirectional=True, dropout_rate=0.3):
        super().__init__()
        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            bidirectional=bidirectional,
            dropout=dropout_rate
        )
        self.dropout = nn.Dropout(dropout_rate)
        i_features = hidden_size * 2 if bidirectional else hidden_size
        self.lr = nn.Linear(i_features, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, X):
        X = X.transpose(1, 0)
        out, _ = self.lstm(X)
        last_out = self.dropout(out[-1])
        last_out = self.lr(last_out)
        return self.sigmoid(last_out)

In [42]:
model = StockPriceModel(
    input_size=5,
    hidden_size=32,
    num_layers=1,
    bidirectional=True,
    dropout_rate=0.3
)
model = model.to(device)

In [43]:
print(model)

StockPriceModel(
  (lstm): LSTM(5, 32, dropout=0.3, bidirectional=True)
  (dropout): Dropout(p=0.3, inplace=False)
  (lr): Linear(in_features=64, out_features=1, bias=True)
  (sigmoid): Sigmoid()
)


In [44]:
from torchinfo import summary
summary(model, (200, 50, 5), device=device)

Layer (type:depth-idx)                   Output Shape              Param #
StockPriceModel                          [200, 1]                  --
├─LSTM: 1-1                              [50, 200, 64]             9,984
├─Dropout: 1-2                           [200, 64]                 --
├─Linear: 1-3                            [200, 1]                  65
├─Sigmoid: 1-4                           [200, 1]                  --
Total params: 10,049
Trainable params: 10,049
Non-trainable params: 0
Total mult-adds (Units.MEGABYTES): 99.85
Input size (MB): 0.20
Forward/backward pass size (MB): 5.12
Params size (MB): 0.04
Estimated Total Size (MB): 5.36

In [45]:
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
loss_fn = nn.MSELoss()

In [None]:
train_loss_list = []
test_loss_list = []

for epoch in range(100):
    model.train()
    train_loss = 0.0
    for X_train, y_train in train_loader:
        X_train, y_train = X_train.to(device), y_train.to(device)
        pred = model(X_train)
        loss = loss_fn(pred, y_train)
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
        train_loss += loss.item()
    train_loss_list.append(train_loss/len(train_loader))

    model.eval()
    test_loss = 0.0
    with torch.no_grad():
        for X_test, y_test in test_loader:
            X_test, y_test = X_test.to(device), y_test.to(device)
            pred_test = model(X_test)
            test_loss += loss_fn(pred_test, y_test).item()
        test_loss_list.append(test_loss/len(test_loader))
    if epoch % 10 == 0 or epoch == (100-1):
        print(f"[{epoch}/100] train loss: {train_loss}, val loss: {test_loss}")

[0/100] train loss: 0.5435108325909823, val loss: 0.00893161806743592
