# 재현성 구현

In [None]:
import torch
import numpy as np
import pandas as pd
from tqdm.auto import tqdm
import random
import os

def reset_seeds(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True

# 데이터 불러오기

In [None]:
import pandas_datareader.data as web

In [None]:
df = web.DataReader('005930', 'naver', start='2023-01-01', end='2023-12-31') # 종목번호: 005930
df.head()

Unnamed: 0_level_0,Open,High,Low,Close,Volume
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2023-01-02,55500,56100,55200,55500,10031448
2023-01-03,55400,56000,54500,55400,13547030
2023-01-04,55700,58000,55600,57800,20188071
2023-01-05,58200,58800,57600,58200,15682826
2023-01-06,58300,59400,57900,59000,17334989


In [None]:
df = df.astype(int)
df.info()

<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 245 entries, 2023-01-02 to 2023-12-28
Data columns (total 5 columns):
 #   Column  Non-Null Count  Dtype
---  ------  --------------  -----
 0   Open    245 non-null    int64
 1   High    245 non-null    int64
 2   Low     245 non-null    int64
 3   Close   245 non-null    int64
 4   Volume  245 non-null    int64
dtypes: int64(5)
memory usage: 11.5 KB


In [None]:
data = df.to_numpy()
data

array([[   55500,    56100,    55200,    55500, 10031448],
       [   55400,    56000,    54500,    55400, 13547030],
       [   55700,    58000,    55600,    57800, 20188071],
       ...,
       [   76100,    76700,    75700,    76600, 13164909],
       [   76700,    78000,    76500,    78000, 20651042],
       [   77700,    78500,    77500,    78500, 17797536]])

In [None]:
mins= data.min(axis = 0)
mins

array([  55400,   56000,   54500,   55400, 5824628])

In [None]:
sizes = data.max(axis = 0) - mins
sizes

array([   22300,    22500,    23000,    23100, 24191593])

In [None]:
def transform_data(data, mins, sizes, seq_len = 10, pred_len = 5):
  data = (data -mins) / sizes # 스케일링
  x_list = []
  y_list = []

  for i in range(seq_len, data.shape[0] + 1-pred_len): # 마지막까지 실행해야하므로 1-pred_len
    x = data[i-seq_len:i] # 처음부터 10개씩 입력데이터 뽑기
    y = data[i:i+pred_len, 3] # seq_len 이후 5일치, 종가만 뽑기
    x_list.append(x)
    y_list.append(y)

  x_arr = np.array(x_list)
  y_arr = np.array(y_list)
  return x_arr, y_arr

In [None]:
df.tail()

Unnamed: 0_level_0,Open,High,Low,Close,Volume
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2023-12-21,74600,75000,74300,75000,13478766
2023-12-22,75800,76300,75400,75900,14515608
2023-12-26,76100,76700,75700,76600,13164909
2023-12-27,76700,78000,76500,78000,20651042
2023-12-28,77700,78500,77500,78500,17797536


In [None]:
x_arr, y_arr = transform_data(data, mins, sizes)
x_arr.shape, y_arr.shape

((231, 10, 5), (231, 5))

In [None]:
y_arr[-1]

array([0.84848485, 0.88744589, 0.91774892, 0.97835498, 1.        ])

In [None]:
x_arr.shape # batch, seq, features

(231, 10, 5)

# 데이터셋 구현

In [None]:
class FinanceDataset(torch.utils.data.Dataset):
  def __init__(self, x, y=None):
    self.x = x
    self.y = y

  def __len__(self):
    return len(self.x)

  def __getitem__(self, idx):
    item = {}
    item['x'] = torch.Tensor(self.x[idx])

    if self.y is not None:
      item['y'] = torch.Tensor(self.y[idx])
    return item

- 결과 확인하기

In [None]:
dt = FinanceDataset(x_arr, y_arr)
dt[10]

{'x': tensor([[0.2646, 0.2489, 0.2739, 0.2468, 0.1742],
         [0.2601, 0.2444, 0.2652, 0.2424, 0.1656],
         [0.2377, 0.2222, 0.2348, 0.2165, 0.2381],
         [0.2287, 0.2444, 0.2565, 0.2641, 0.2887],
         [0.3004, 0.2800, 0.2870, 0.2771, 0.1580],
         [0.3632, 0.3422, 0.3696, 0.3463, 0.4546],
         [0.3767, 0.3511, 0.3826, 0.3680, 0.3081],
         [0.4036, 0.4000, 0.4087, 0.3983, 0.5347],
         [0.4260, 0.3956, 0.3739, 0.3420, 0.6271],
         [0.3632, 0.3422, 0.2826, 0.2424, 0.9889]]),
 'y': tensor([0.2771, 0.3506, 0.3636, 0.2684, 0.2814])}

In [None]:
dl = torch.utils.data.DataLoader(dt, batch_size = 1, shuffle = False)
batch = next(iter(dl))
batch # batch, seq, feature 차원 포함

{'x': tensor([[[0.0045, 0.0044, 0.0304, 0.0043, 0.1739],
          [0.0000, 0.0000, 0.0000, 0.0000, 0.3192],
          [0.0135, 0.0889, 0.0478, 0.1039, 0.5937],
          [0.1256, 0.1244, 0.1348, 0.1212, 0.4075],
          [0.1300, 0.1511, 0.1478, 0.1558, 0.4758],
          [0.1928, 0.2089, 0.2217, 0.2294, 0.5297],
          [0.2152, 0.2267, 0.2348, 0.2165, 0.3735],
          [0.2511, 0.2311, 0.2522, 0.2208, 0.2681],
          [0.2556, 0.2311, 0.2348, 0.2208, 0.4249],
          [0.2287, 0.2311, 0.2565, 0.2338, 0.2764]]]),
 'y': tensor([[0.2468, 0.2424, 0.2165, 0.2641, 0.2771]])}

In [None]:
batch['x'].shape

torch.Size([1, 10, 5])

# LSTM Layer
- input_size: 입력 데이터의 피처의 수
- hidden_size: hidden state의 출력 피처의 수
- num_layers: RNN layer의 수(기본값 1)
- batch_first: 입력데이터의 batch 차원 첫번째 여부(기본값 False)
  - True: (batch, seq, feature)
  - False: (seq, batch, feature)
- bidirectional: 양방향 여부(기본값 False)

In [None]:
SEED = 42

- 단방향

In [None]:
reset_seeds(SEED)
lstm_layer = torch.nn.LSTM(x_arr.shape[2], 16, batch_first = True)
output,(hn,cn) =lstm_layer(batch['x']) # 사실상 cn은 거의 의미 없음

In [None]:
output.shape # batch, seq, feature

torch.Size([1, 10, 16])

In [None]:
hn.shape # nlayer, batch, feature

torch.Size([1, 1, 16])

In [None]:
cn.shape # nlayer, batch, feature

torch.Size([1, 1, 16])

In [None]:
output[:,-1]

tensor([[ 0.1014, -0.0274,  0.0516, -0.0582,  0.0056, -0.1212,  0.1720, -0.0478,
         -0.0135, -0.1039, -0.0913, -0.0600,  0.0283, -0.0171,  0.0297, -0.1611]],
       grad_fn=<SelectBackward0>)

In [None]:
hn

tensor([[[ 0.1014, -0.0274,  0.0516, -0.0582,  0.0056, -0.1212,  0.1720,
          -0.0478, -0.0135, -0.1039, -0.0913, -0.0600,  0.0283, -0.0171,
           0.0297, -0.1611]]], grad_fn=<StackBackward0>)

- num_layers = 2 > 단방향

In [None]:
reset_seeds(SEED)
lstm_layer = torch.nn.LSTM(x_arr.shape[2], 16, batch_first = True, num_layers = 2)
output,(hn,cn) =lstm_layer(batch['x']) # cn은 거의 활용 안함 > _로 받기도함
output.shape, hn.shape, cn.shape

(torch.Size([1, 10, 16]), torch.Size([2, 1, 16]), torch.Size([2, 1, 16]))

In [None]:
hn[-1].shape

torch.Size([1, 16])

In [None]:
hn.permute(1,0,2).flatten(1).shape

torch.Size([1, 32])

- bidirectional = True > 양방향

In [None]:
reset_seeds(SEED)
lstm_layer = torch.nn.LSTM(x_arr.shape[2], 16, batch_first = True, bidirectional = True)
output,(hn,cn) =lstm_layer(batch['x'])
output.shape, hn.shape, cn.shape

(torch.Size([1, 10, 32]), torch.Size([2, 1, 16]), torch.Size([2, 1, 16]))

# GRU Layer

- 단방향

In [None]:
gru_layer = torch.nn.GRU(x_arr.shape[2], 16, batch_first = True)
output, hn = gru_layer(batch['x'])
output.shape, hn.shape

(torch.Size([1, 10, 16]), torch.Size([1, 1, 16]))

- num_layers = 3 > 단방향

In [None]:
gru_layer = torch.nn.GRU(x_arr.shape[2], 16, batch_first = True, num_layers= 3)
output, hn = gru_layer(batch['x'])
output.shape, hn.shape

(torch.Size([1, 10, 16]), torch.Size([3, 1, 16]))

- bidirectional = True (양방향)

In [None]:
gru_layer = torch.nn.GRU(x_arr.shape[2], 16, batch_first = True, bidirectional = True)
output, hn = gru_layer(batch['x'])
output.shape, hn.shape

(torch.Size([1, 10, 32]), torch.Size([2, 1, 16]))

- num_layers = 3, bidirectional = True > 양방향

In [None]:
# 양방향 + 다중 layer > 더욱 의미 있는 hidden_layer 출력
gru_layer = torch.nn.GRU(x_arr.shape[2], 16, batch_first = True, num_layers= 3, bidirectional = True)
output, hn = gru_layer(batch['x'])
output.shape, hn.shape

(torch.Size([1, 10, 32]), torch.Size([6, 1, 16]))

# 신경망 모델클래스 구현(단방향)
- num_layers, bidirectional 아닌 경우

- RNN(lstm. gru) layer에 출력 텐서들의 각 shape의 차원 의미 숙지 필요

- 모델 클래스 다양하게 만들어 보기
  - 양방향 옵션
  - num_layers 옵션 > 2 이상값

In [None]:
class Net(torch.nn.Module):
  def __init__(self, n_features, hidden_size, pred_len):
    super().__init__()
    self.lstm_layer = torch.nn.LSTM(n_features, hidden_size, batch_first = True) # LSTM
    self.seq = torch.nn.Sequential(
        torch.nn.Linear(hidden_size, hidden_size // 2),
        torch.nn.BatchNorm1d(hidden_size // 2),
        torch.nn.LeakyReLU(),
        torch.nn.Linear(hidden_size // 2, pred_len)
    )

  def forward(self,x):
    output, (hn,cn) = self.lstm_layer(x) # output, cn 사용 안하므로 _로 받아도됨

    # n_layer, batch, feature > batch, feature
    return self.seq(hn[-1])

In [None]:
model = Net(x_arr.shape[2], 16, 5)
model(torch.Tensor(x_arr[:2])) # BatchNorm1d이므로

tensor([[ 0.4416,  0.1971,  0.0530, -0.1133, -0.2685],
        [ 0.1077,  0.0405,  0.0292, -0.2949,  0.0714]],
       grad_fn=<AddmmBackward0>)

- 학습데이터 train loop함수 구현

In [None]:
def train_loop(dataloader, model, loss_fn, optimizer, device):
  model.train() # 학습 모드 전환
  epoch_loss = 0
  for batch in dataloader: # 배치단위 데이터 반환
    pred = model(batch['x'].to(device)) # 예측
    loss = loss_fn(pred, batch['y'].to(device)) # 손실함수로 계산

    optimizer.zero_grad() # 이전 경사 누적되는 걸 방지하기 위해 기울기 0으로 초기화
    loss.backward() # 역전파
    optimizer.step() # 가중치 업데이트
    epoch_loss += loss.item() # epoch loss를 계산하기 위해 배치 loss 모두 합치기

  epoch_loss /= len(dataloader) # 평균내서 epoch loss 구하기
  return epoch_loss

- 테스트데이터 test loop함수 구현

In [None]:
@torch.no_grad() # with 사용과 같은 의미
def test_loop(dataloader, model, loss_fn, device):
  epoch_loss = 0
  model.eval() # 평가모드
  pred_list = []
  for batch in dataloader:
    pred = model(batch['x'].to(device))
    if batch.get('y') is not None: # 검증데이터일 경우, y키에 텐서가 있을 경우만 loss계산
      loss = loss_fn(pred, batch['y'].to(device))
      epoch_loss += loss.item()


    pred = pred.to('cpu').numpy()
    pred_list.append(pred)

  epoch_loss /= len(dataloader)
  pred = np.concatenate(pred_list)

  return epoch_loss, pred

- 조합 후 KFold학습 수행

In [None]:
from sklearn.metrics import mean_absolute_error
from sklearn.model_selection import KFold

n_splits = 5 # K-Fold의 k값
n_features = x_arr.shape[2]
hidden_size = 16 # rnn layer의 hidden_size(출력데이터 피처 개수)
pred_len = y_arr.shape[1] # 예측 길이(output layer노드 수)
batch_size = 32
epochs = 1000
cv = KFold(n_splits, random_state= SEED, shuffle = True)
loss_fn = torch.nn.MSELoss()
device = 'cuda' if torch.cuda.is_available() else 'cpu'

In [None]:
is_holdout = False
reset_seeds(SEED)
score_list= []

for i, (tri,vai) in enumerate(cv.split(x_arr)): # cv 학습 진행
  # 학습데이터
  train_dt = FinanceDataset(x_arr[tri], y_arr[tri])
  train_dl = torch.utils.data.DataLoader(train_dt, batch_size = batch_size, shuffle = True)

  # 검증데이터
  valid_dt = FinanceDataset(x_arr[vai], y_arr[vai])
  valid_dl = torch.utils.data.DataLoader(valid_dt, batch_size = batch_size, shuffle = False)

  model = Net(n_features, hidden_size, pred_len).to(device)
  optimizer = torch.optim.Adam(model.parameters())


  best_score = np.inf
  patience = 0
  for _ in tqdm(range(epochs)):
    train_loss = train_loop(train_dl, model, loss_fn, optimizer, device)
    valid_loss, pred = test_loop(valid_dl, model, loss_fn, device)

    pred = pred * sizes[3] + mins[3] # 민맥스 결과값을 원상태로 돌려놓기 > 종가이므로 인덱싱 '3'
    true = y_arr[vai] * sizes[3] + mins[3] # 검증용 정답데이터
    score = mean_absolute_error(true, pred)

    patience += 1
    if score < best_score: # error > 작을수록 좋으므로
      patience = 0
      best_score = score
      torch.save(model.state_dict(), f'model_{i}.pt') # 가중치 저장하기

    if patience == 5:
      break
  print(f'Fold-{i} MAE: {best_score}')
  score_list.append(best_score)
  if is_holdout:
    break

  0%|          | 0/1000 [00:00<?, ?it/s]

Fold-0 MAE: 1746.3080618351064


  0%|          | 0/1000 [00:00<?, ?it/s]

Fold-1 MAE: 5854.015319293479


  0%|          | 0/1000 [00:00<?, ?it/s]

Fold-2 MAE: 5590.911277173914


  0%|          | 0/1000 [00:00<?, ?it/s]

Fold-3 MAE: 5366.902598505435


  0%|          | 0/1000 [00:00<?, ?it/s]

Fold-4 MAE: 2288.0849864130437


In [None]:
np.mean(score_list)

4169.2444486441955

# 신경망 모델클래스 구현(단방향)
- num_layers = 2 이상

In [None]:
class Net(torch.nn.Module):
  def __init__(self, n_features, hidden_size, pred_len):
    super().__init__()
    self.lstm_layer = torch.nn.LSTM(n_features, hidden_size, num_layers= 3,batch_first= True)
    self.seq = torch.nn.Sequential(
        torch.nn.Linear(hidden_size, hidden_size // 2),
        torch.nn.BatchNorm1d(hidden_size // 2),
        torch.nn.LeakyReLU(),
        torch.nn.Linear(hidden_size // 2, pred_len)
    )

  def forward(self,x):
    output, (hn,cn) = self.lstm_layer(x) # output, cn 사용 안하므로 _로 받아도됨

    # n_layer, batch, feature > batch, feature
    return self.seq(hn[-1])

- 학습데이터 train loop함수 구현

In [None]:
def train_loop(dataloader, model, loss_fn, optimizer, device):
  model.train() # 학습 모드 전환
  epoch_loss = 0
  for batch in dataloader: # 배치단위 데이터 반환
    pred = model(batch['x'].to(device)) # 예측
    loss = loss_fn(pred, batch['y'].to(device)) # 손실함수로 계산

    optimizer.zero_grad() # 이전 경사 누적되는 걸 방지하기 위해 기울기 0으로 초기화
    loss.backward() # 역전파
    optimizer.step() # 가중치 업데이트
    epoch_loss += loss.item() # epoch loss를 계산하기 위해 배치 loss 모두 합치기

  epoch_loss /= len(dataloader) # 평균내서 epoch loss 구하기
  return epoch_loss

- 테스트데이터 test loop함수 구현

In [None]:
@torch.no_grad() # with 사용과 같은 의미
def test_loop(dataloader, model, loss_fn, device):
  epoch_loss = 0
  model.eval() # 평가모드
  pred_list = []
  for batch in dataloader:
    pred = model(batch['x'].to(device))
    if batch.get('y') is not None: # 검증데이터일 경우, y키에 텐서가 있을 경우만 loss계산
      loss = loss_fn(pred, batch['y'].to(device))
      epoch_loss += loss.item()


    pred = pred.to('cpu').numpy()
    pred_list.append(pred)

  epoch_loss /= len(dataloader)
  pred = np.concatenate(pred_list)

  return epoch_loss, pred

- 조합 후 KFold학습 수행

In [None]:
from sklearn.metrics import mean_absolute_error
from sklearn.model_selection import KFold

n_splits = 5 # K-Fold의 k값
n_features = x_arr.shape[2]
hidden_size = 16 # rnn layer의 hidden_size(출력데이터 피처 개수)
pred_len = y_arr.shape[1] # 예측 길이(output layer노드 수)
batch_size = 32
epochs = 1000
cv = KFold(n_splits, random_state= SEED, shuffle = True)
loss_fn = torch.nn.MSELoss()
device = 'cuda' if torch.cuda.is_available() else 'cpu'

In [None]:
is_holdout = False
reset_seeds(SEED)
score_list= []

for i, (tri,vai) in enumerate(cv.split(x_arr)): # cv 학습 진행
  # 학습데이터
  train_dt = FinanceDataset(x_arr[tri], y_arr[tri])
  train_dl = torch.utils.data.DataLoader(train_dt, batch_size = batch_size, shuffle = True)

  # 검증데이터
  valid_dt = FinanceDataset(x_arr[vai], y_arr[vai])
  valid_dl = torch.utils.data.DataLoader(valid_dt, batch_size = batch_size, shuffle = False)

  model = Net(n_features, hidden_size, pred_len).to(device)
  optimizer = torch.optim.Adam(model.parameters())


  best_score = np.inf
  patience = 0
  for _ in tqdm(range(epochs)):
    train_loss = train_loop(train_dl, model, loss_fn, optimizer, device)
    valid_loss, pred = test_loop(valid_dl, model, loss_fn, device)

    pred = pred * sizes[3] + mins[3] # 민맥스 결과값을 원상태로 돌려놓기 > 종가이므로 인덱싱 '3'
    true = y_arr[vai] * sizes[3] + mins[3] # 검증용 정답데이터
    score = mean_absolute_error(true, pred)

    patience += 1
    if score < best_score: # error > 작을수록 좋으므로
      patience = 0
      best_score = score
      torch.save(model.state_dict(), f'model_{i}.pt') # 가중치 저장하기

    if patience == 5:
      break
  print(f'Fold-{i} MAE: {best_score}')
  score_list.append(best_score)
  if is_holdout:
    break

  0%|          | 0/1000 [00:00<?, ?it/s]

Fold-0 MAE: 4251.56017287234


  0%|          | 0/1000 [00:00<?, ?it/s]

Fold-1 MAE: 6204.345397418479


  0%|          | 0/1000 [00:00<?, ?it/s]

Fold-2 MAE: 6259.24699388587


  0%|          | 0/1000 [00:00<?, ?it/s]

Fold-3 MAE: 3774.161684782609


  0%|          | 0/1000 [00:00<?, ?it/s]

Fold-4 MAE: 3503.9489470108697


In [None]:
np.mean(score_list)

4798.652639194033

# 신경망 모델클래스 구현(양방향)
- bidirectional = True

In [None]:
class Net(torch.nn.Module):
    def __init__(self, n_features, hidden_size, pred_len):
        super().__init__()
        self.lstm_layer = torch.nn.LSTM(
            n_features, hidden_size, batch_first=True, bidirectional=True
        )
        self.seq = torch.nn.Sequential(
            torch.nn.Linear(hidden_size * 2, hidden_size),  # hidden_size * 2로 수정
            torch.nn.LeakyReLU(),
            torch.nn.Linear(hidden_size, pred_len)
        )

    def forward(self, x):
        # LSTM의 output
        output, (hn, cn) = self.lstm_layer(x)

        # Bidirectional LSTM의 forward, backward hidden state 결합
        # hn의 형상: (num_layers * 2, batch_size, hidden_size)
        # Forward 방향: hn[-2], Backward 방향: hn[-1]
        forward_hn = hn[-2, :, :]  # Forward 방향의 마지막 layer
        backward_hn = hn[-1, :, :]  # Backward 방향의 마지막 layer
        combined_hn = torch.cat((forward_hn, backward_hn), dim=1)  # (batch_size, 2 * hidden_size)

        return self.seq(combined_hn)


- 학습데이터 train loop함수 구현

In [None]:
def train_loop(dataloader, model, loss_fn, optimizer, device):
  model.train() # 학습 모드 전환
  epoch_loss = 0
  for batch in dataloader: # 배치단위 데이터 반환
    pred = model(batch['x'].to(device)) # 예측
    loss = loss_fn(pred, batch['y'].to(device)) # 손실함수로 계산

    optimizer.zero_grad() # 이전 경사 누적되는 걸 방지하기 위해 기울기 0으로 초기화
    loss.backward() # 역전파
    optimizer.step() # 가중치 업데이트
    epoch_loss += loss.item() # epoch loss를 계산하기 위해 배치 loss 모두 합치기

  epoch_loss /= len(dataloader) # 평균내서 epoch loss 구하기
  return epoch_loss

- 테스트데이터 test loop함수 구현

In [None]:
@torch.no_grad() # with 사용과 같은 의미
def test_loop(dataloader, model, loss_fn, device):
  epoch_loss = 0
  model.eval() # 평가모드
  pred_list = []
  for batch in dataloader:
    pred = model(batch['x'].to(device))
    if batch.get('y') is not None: # 검증데이터일 경우, y키에 텐서가 있을 경우만 loss계산
      loss = loss_fn(pred, batch['y'].to(device))
      epoch_loss += loss.item()


    pred = pred.to('cpu').numpy()
    pred_list.append(pred)

  epoch_loss /= len(dataloader)
  pred = np.concatenate(pred_list)

  return epoch_loss, pred

- 조합 후 KFold학습 수행

In [None]:
from sklearn.metrics import mean_absolute_error
from sklearn.model_selection import KFold

n_splits = 5 # K-Fold의 k값
n_features = x_arr.shape[2] # 입력데이터 피처 개수: 5
hidden_size = 16 # rnn layer의 hidden_size(출력데이터 피처 개수)
pred_len = y_arr.shape[1] # 예측 길이(output layer노드 수): 5
batch_size = 32
epochs = 1000
cv = KFold(n_splits, random_state= SEED, shuffle = True)
loss_fn = torch.nn.MSELoss()
device = 'cuda' if torch.cuda.is_available() else 'cpu'

In [None]:
is_holdout = False
reset_seeds(SEED)
score_list= []

for i, (tri,vai) in enumerate(cv.split(x_arr)): # cv 학습 진행
  # 학습데이터
  train_dt = FinanceDataset(x_arr[tri], y_arr[tri])
  train_dl = torch.utils.data.DataLoader(train_dt, batch_size = batch_size, shuffle = True)

  # 검증데이터
  valid_dt = FinanceDataset(x_arr[vai], y_arr[vai])
  valid_dl = torch.utils.data.DataLoader(valid_dt, batch_size = batch_size, shuffle = False)

  model = Net(n_features, hidden_size, pred_len).to(device)
  optimizer = torch.optim.Adam(model.parameters())


  best_score = np.inf
  patience = 0
  for _ in tqdm(range(epochs)):
    train_loss = train_loop(train_dl, model, loss_fn, optimizer, device)
    valid_loss, pred = test_loop(valid_dl, model, loss_fn, device)

    pred = pred * sizes[3] + mins[3] # 민맥스 결과값을 원상태로 돌려놓기 > 종가이므로 인덱싱 '3'
    true = y_arr[vai] * sizes[3] + mins[3] # 검증용 정답데이터
    score = mean_absolute_error(true, pred)

    patience += 1
    if score < best_score: # error > 작을수록 좋으므로
      patience = 0
      best_score = score
      torch.save(model.state_dict(), f'model_{i}.pt') # 가중치 저장하기

    if patience == 5:
      break
  print(f'Fold-{i} MAE: {best_score}')
  score_list.append(best_score)
  if is_holdout:
    break

  0%|          | 0/1000 [00:00<?, ?it/s]

Fold-0 MAE: 1086.6208111702126


  0%|          | 0/1000 [00:00<?, ?it/s]

Fold-1 MAE: 1490.3575407608694


  0%|          | 0/1000 [00:00<?, ?it/s]

Fold-2 MAE: 1211.153515625


  0%|          | 0/1000 [00:00<?, ?it/s]

Fold-3 MAE: 1276.2548573369565


  0%|          | 0/1000 [00:00<?, ?it/s]

Fold-4 MAE: 1132.4692255434784


In [None]:
np.mean(score_list)

1239.3711900873036

# 신경망 모델클래스 구현(양방향)
- bidirectional = True
- num_layers = 2 이상

In [None]:
class Net(torch.nn.Module):
  def __init__(self, n_features, hidden_size, pred_len):
    super().__init__()
    self.lstm_layer = torch.nn.LSTM(n_features, hidden_size, batch_first = True, bidirectional = True) # 양방향 적용
    self.fc_layer = torch.nn.Linear(hidden_size * 2, hidden_size) # 양방향이므로 hidden_size * 2
    self.batchn = torch.nn.BatchNorm1d(hidden_size)
    self.Leakyrelu = torch.nn.LeakyReLU()
    self.output_layer = torch.nn.Linear(hidden_size, pred_len) # 예측 일수: pred_len

  def forward(self, x):
    # output > batch, seq, feature
    # hn > nlayer, batch, feature
    output, (hn,cn) = self.lstm_layer(x)

    # nlayer, batch, feature > batch, nlayer, feature > batch, nlayer*feature(flatten)
    x = hn.permute(1,0,2).flatten(1)

    x = self.fc_layer(x)
    x = self.batchn(x)
    x = self.Leakyrelu(x)
    return self.output_layer(x)

- 학습데이터 train loop함수 구현

In [None]:
def train_loop(dataloader, model, loss_fn, optimizer, device):
  model.train() # 학습 모드 전환
  epoch_loss = 0
  for batch in dataloader: # 배치단위 데이터 반환
    pred = model(batch['x'].to(device)) # 예측
    loss = loss_fn(pred, batch['y'].to(device)) # 손실함수로 계산

    optimizer.zero_grad() # 이전 경사 누적되는 걸 방지하기 위해 기울기 0으로 초기화
    loss.backward() # 역전파
    optimizer.step() # 가중치 업데이트
    epoch_loss += loss.item() # epoch loss를 계산하기 위해 배치 loss 모두 합치기

  epoch_loss /= len(dataloader) # 평균내서 epoch loss 구하기
  return epoch_loss

- 테스트데이터 test loop함수 구현

In [None]:
@torch.no_grad() # with 사용과 같은 의미
def test_loop(dataloader, model, loss_fn, device):
  epoch_loss = 0
  model.eval() # 평가모드
  pred_list = []
  for batch in dataloader:
    pred = model(batch['x'].to(device))
    if batch.get('y') is not None: # 검증데이터일 경우, y키에 텐서가 있을 경우만 loss계산
      loss = loss_fn(pred, batch['y'].to(device))
      epoch_loss += loss.item()


    pred = pred.to('cpu').numpy()
    pred_list.append(pred)

  epoch_loss /= len(dataloader)
  pred = np.concatenate(pred_list)

  return epoch_loss, pred

- 조합 후 KFold학습 수행

In [None]:
from sklearn.metrics import mean_absolute_error
from sklearn.model_selection import KFold

n_splits = 5 # K-Fold의 k값
n_features = x_arr.shape[2] # 입력데이터 피처 개수
hidden_size = 16 # rnn layer의 hidden_size(출력데이터 피처 개수)
pred_len = y_arr.shape[1] # 예측 길이(output layer노드 수)
batch_size = 32
epochs = 1000
cv = KFold(n_splits, random_state= SEED, shuffle = True)
loss_fn = torch.nn.MSELoss()
device = 'cuda' if torch.cuda.is_available() else 'cpu'

In [None]:
is_holdout = False
reset_seeds(SEED)
score_list= []

for i, (tri,vai) in enumerate(cv.split(x_arr)): # cv 학습 진행
  # 학습데이터
  train_dt = FinanceDataset(x_arr[tri], y_arr[tri])
  train_dl = torch.utils.data.DataLoader(train_dt, batch_size = batch_size, shuffle = True)

  # 검증데이터
  valid_dt = FinanceDataset(x_arr[vai], y_arr[vai])
  valid_dl = torch.utils.data.DataLoader(valid_dt, batch_size = batch_size, shuffle = False)

  model = Net(n_features, hidden_size, pred_len).to(device)
  optimizer = torch.optim.Adam(model.parameters())


  best_score = np.inf
  patience = 0
  for _ in tqdm(range(epochs)):
    train_loss = train_loop(train_dl, model, loss_fn, optimizer, device)
    valid_loss, pred = test_loop(valid_dl, model, loss_fn, device)

    pred = pred * sizes[3] + mins[3] # 민맥스 결과값을 원상태로 돌려놓기 > 종가이므로 인덱싱 '3'
    true = y_arr[vai] * sizes[3] + mins[3] # 검증용 정답데이터
    score = mean_absolute_error(true, pred)

    patience += 1
    if score < best_score: # error > 작을수록 좋으므로
      patience = 0
      best_score = score
      torch.save(model.state_dict(), f'model_{i}.pt') # 가중치 저장하기

    if patience == 5:
      break
  print(f'Fold-{i} MAE: {best_score}')
  score_list.append(best_score)
  if is_holdout:
    break

  0%|          | 0/1000 [00:00<?, ?it/s]

Fold-0 MAE: 1747.718916223404


  0%|          | 0/1000 [00:00<?, ?it/s]

Fold-1 MAE: 3855.4385360054353


  0%|          | 0/1000 [00:00<?, ?it/s]

Fold-2 MAE: 2129.6684273097826


  0%|          | 0/1000 [00:00<?, ?it/s]

Fold-3 MAE: 3745.154279891304


  0%|          | 0/1000 [00:00<?, ?it/s]

Fold-4 MAE: 2294.323454483695


In [None]:
np.mean(score_list)

2754.460722782724