In [None]:
import pandas as pd
import numpy as np

import torch
import torch.nn as nn
import torch.optim as optim

In [None]:
df = pd.read_csv('./total_data.csv', encoding = 'cp949')
df.head()
df = df[df['종목명'] == 'KB금융']

In [None]:
df_np = df['종가'].values
print(len(df_np))

987


In [None]:
# data split
# input_dim = 50
# forecast_dim = 3

window_size = 50
horizon_size = 3


In [None]:
from sklearn.preprocessing import MinMaxScaler
train_x = []
train_y = []

mm = MinMaxScaler()
fitted = mm.fit(df_np.reshape(-1,1))
out = mm.transform(df_np.reshape(-1,1))
df_np = out.reshape(-1)
for i in range(len(df_np) - window_size - horizon_size):
  train_x.append(df_np[i:i+window_size]) # len == window_size
  train_y.append(df_np[i+window_size: i+window_size + horizon_size]) # len == horizon_size

x_tensor = torch.FloatTensor(train_x)
y_tensor = torch.FloatTensor(train_y)
print(f'src shape : {x_tensor.shape}')
print(f'label shape : {y_tensor.shape}')

src shape : torch.Size([934, 50])
label shape : torch.Size([934, 3])


  x_tensor = torch.FloatTensor(train_x)


In [None]:
import math
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)

In [None]:
class Transformer(nn.Module):
  def __init__(self,d_model, n_head, num_enc):
    super(Transformer, self).__init__()

    # encoder layer parameter
    self.d_model = d_model
    self.n_head = n_head
    self.num_enc = num_enc

    self.pos_encoder = PositionalEncoding(d_model, 0.1)

    self.encoderBlock = nn.TransformerEncoderLayer(
      d_model = self.d_model,
      nhead = self.n_head,
      batch_first = True
    )

    self.encoder = nn.TransformerEncoder(
        encoder_layer = self.encoderBlock,
        num_layers = self.num_enc
    )
    
    self.decoder = nn.Linear(d_model, d_model//2)
    self.fc = nn.Linear(d_model//2, 3)
    self.relu = nn.ReLU()

  def forward(self, x):
      # encoder
    # out = self.pos_encoder(x)
    # out = self.encoder(out.transpose(0,1))
    out = self.encoder(x)
    
      # encoder output == decoder input
      # encoder output shape = encoder input shape
      # decoder
    out = self.decoder(out)
    out = self.relu(out)
    out = self.fc(out)

      # forecast horizon which has a length of 3
    return out

In [None]:
# N-Beats loss

"""
  sMAPE Loss is not affected by unscaled feature
"""
class sMAPE(nn.Module):
  def __init__(self):
    super(sMAPE, self).__init__()
  
  def forward(self, src,tgt):
    # src shape =  (700,1,3)
    # tgt shape = (700,1,3)
    tot = 0
    s = src.view(-1,3)
    t = tgt.view(-1,3)

    up = torch.abs(s - t) # shape ( 700 , 3)
    down =torch.abs(s) + torch.abs(t) # (700,3)

    tot = torch.mean( up / down, -1) * 200 # (700)
    return torch.mean(tot)

In [None]:
if torch.cuda.is_available():
  device = torch.device("cuda")
else:
  device = torch.device("cpu")

In [None]:
x_tensor = x_tensor.view(-1,1,window_size).to(device)
y_tensor = y_tensor.view(-1,1,horizon_size).to(device)
print(f'src shape : {x_tensor.shape}')
print(f'label shape : {y_tensor.shape}')

src shape : torch.Size([934, 1, 50])
label shape : torch.Size([934, 1, 3])


In [None]:
# learning hyperparameters
lr = 0.001
num_epochs = 500

In [None]:
# model init
model = Transformer(window_size,2,2).to(device)
optimizer = optim.Adam(model.parameters(), lr = lr)
criterion = sMAPE()

In [None]:
# train_test_split
train_x = x_tensor[:700]
train_y = y_tensor[:700]
test_x = x_tensor[700:]
test_y = y_tensor[700:]

In [None]:
for epoch in range(num_epochs+1):
  optimizer.zero_grad()
  pred = model(train_x)
  loss = criterion(pred, train_y)
  loss.backward()

  optimizer.step()
  if epoch % 100 == 0:
    print(f'Epoch : {epoch} , Loss : {loss.item()}')

Epoch : 0 , Loss : 182.57794189453125
Epoch : 100 , Loss : 11.58338737487793
Epoch : 200 , Loss : 8.638136863708496
Epoch : 300 , Loss : 7.706544399261475
Epoch : 400 , Loss : 6.611324787139893
Epoch : 500 , Loss : 6.965249538421631


In [None]:
model.eval()
y_pred = model(test_x)
tot_loss = criterion(y_pred, test_y)
print(f'Loss : {tot_loss.item():.4f}')

Loss : 10.8210
