In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

# 한글 폰트 설정
plt.rcParams['font.family'] = ['Malgun Gothic']
plt.rcParams['axes.unicode_minus'] = False

# GPU 사용 가능 여부 확인
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"사용 디바이스: {device}")

사용 디바이스: cuda


In [3]:
def normalize_data(data):
    data_tensor = torch.FloatTensor(data) if not isinstance(data, torch.Tensor) else data
    data_min = torch.min(data_tensor)
    data_max = torch.max(data_tensor)
    normalized_data = (data_tensor - data_min) / (data_max - data_min)
    return normalized_data.numpy(), data_min.item(), data_max.item()

def denormalize_data(normalized_data, data_min, data_max):
    normalized_tensor = torch.FloatTensor(normalized_data) if not isinstance(normalized_data, torch.Tensor) else normalized_data
    denormalized = normalized_tensor * (data_max - data_min) + data_min
    return denormalized.numpy()

def calculate_mape(y_true, y_pred):
    y_true_tensor = torch.FloatTensor(y_true) if not isinstance(y_true, torch.Tensor) else y_true
    y_pred_tensor = torch.FloatTensor(y_pred) if not isinstance(y_pred, torch.Tensor) else y_pred
    mask = y_true_tensor != 0
    if mask.sum() == 0:
        return 0.0
    mape = torch.mean(torch.abs((y_true_tensor[mask] - y_pred_tensor[mask]) / y_true_tensor[mask])) * 100
    return mape.item()

def calculate_rmse(y_true, y_pred):
    y_true_tensor = torch.FloatTensor(y_true) if not isinstance(y_true, torch.Tensor) else y_true
    y_pred_tensor = torch.FloatTensor(y_pred) if not isinstance(y_pred, torch.Tensor) else y_pred
    mse = torch.mean((y_true_tensor - y_pred_tensor) ** 2)
    rmse = torch.sqrt(mse)
    return rmse.item()

def calculate_mae(y_true, y_pred):
    y_true_tensor = torch.FloatTensor(y_true) if not isinstance(y_true, torch.Tensor) else y_true
    y_pred_tensor = torch.FloatTensor(y_pred) if not isinstance(y_pred, torch.Tensor) else y_pred
    mae = torch.mean(torch.abs(y_true_tensor - y_pred_tensor))
    return mae.item()

In [None]:
# 1. 데이터 생성
# 2. 데이터를 전처리
# 3. 모델을 생성
# 4. 모델을 학습
# 5. 훈련과정을 시각화
# 6. 미래 예측
# 7. 미래 예측 시각화
# 8. 백테스트

In [None]:
# 1. 데이터 생성
# sample_data = create_data()
# 2. 데이터를 전처리
# data_cleanup = prepare(sample_data)
# 3. 모델을 생성
# model = create_model(rnn, hidden_layer=64, num_layers=2) # num_layers=1은 의미가 없다. fully-connected Linear모델과 다를 게 없다.
# 4. 모델을 학습
# result_train = train_model(model, epoch=10, lr=0.01)
# 5. 훈련과정을 시각화
# plot_train(result_train)
# 6. 미래 예측
# pred = forecast(model, data_cleanup, steps=10)
# 7. 미래 예측 시각화
# plot_pred(pred)
# 8. 백테스트
# backtest(model, data_cleanup)

In [4]:
# 시계열 데이터!
class TSDataset(Dataset):
    def __init__(self, data, seq_length):
        self.data = data # 이 데이터는 뭐야? Pandas의 DataFrame!
        self.seq_length = seq_length

    def __len__(self):
        return len(self.data) - self.seq_length
    
    def __getitem__(self, index):
        # TODO: X, y값 확인해야 함
        # 동일한 값이 X, y에 할돵되는지
        X = self.data[index : index+self.seq_length] # 시계열
        y = self.data[index + self.seq_length] # 주어진 인덱스에서 ~ 까지의 데이터
        return torch.FloatTensor(X), torch.FloatTensor(y)

In [None]:
class SimpleRNN(nn.Module):
    def __init__(self, input_size=1, hidden_size=64, num_layers=2, output_size=1, dropout=0.2): # RNN은 거의 확실하게 과적합이 되기 때문에 dropout줌
        super(SimpleRNN, self).__init__()
        self.hidden_size=hidden_size
        self.num_layers=num_layers

        self.rnn = nn.RNN(
            input_size=input_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0
        )
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device) # 초기값 : 0
        #h0 = torch.randn(self.num_layers, x.size(0), self.hidden_size).to(x.device) # 초기값 : 은닉층(들)의 기울기 array - 초기값 모르면 랜덤!
        out, _ = self.rnn(input, h0) # out, h1 - num_layers가 더 있으면 h1 사용해서 다음 레이어(은닉층간이 아님, 다음 RNN 레이어 말하는 것, RNN 레이어 간)에 넘길텐데
        out = self.fc(out)
        return out