In [None]:
import json

In [None]:
file_path = "/content/drive/MyDrive/우동협/공부/논문구현/RNNmodel/DB.json"

In [None]:
with open(file_path, "r") as file:
  data = json.load(file)
print(type(data))
print(data.keys())

df_train = data['train']
df_val = data['val']
df_test = data['test']
print(type(df_train))

<class 'dict'>
dict_keys(['train', 'val', 'test'])
<class 'list'>


In [None]:
# 리스트 데이터를 PyTorch Tensor로 변환
def preprocess_data(data):
    X = torch.tensor([[d["return"], d["S&P500_return"]] for d in data[:-1]], dtype=torch.float32)
    y = torch.tensor([d["return"] for d in data[1:]], dtype=torch.float32)
    # 입력 데이터를 torch 형태로 변환!
    # 리스트, 이중 리스트, 다차원 리스트, NumPy 배열을 입력하면 tensor형태로 변환한다.


    return X, y

# Train, Val, Test 데이터 변환
X_train, y_train = preprocess_data(df_train)
X_val, y_val = preprocess_data(df_val)
X_test, y_test = preprocess_data(df_test)

print("Train X:", X_train.shape, "y:", y_train.shape)
print("Val X:", X_val.shape, "y:", y_val.shape)
print("Test X:", X_test.shape, "y:", y_test.shape)

Train X: torch.Size([1665, 2]) y: torch.Size([1665])
Val X: torch.Size([204, 2]) y: torch.Size([204])
Test X: torch.Size([104, 2]) y: torch.Size([104])


### 데이터를 데이터로더 처리

## 개념
### Dataset
- __getitem__()을 이용해 데이터를 한 개씩 불러오기
- __len__()을 이용해 데이터 전체 크기 확인
- RNN 같은 시계열 모델에서는 시퀀스 형태의 데이터도 만들 수 있음
<br>
✅ 하지만 PyTorch의 DataLoader는 Dataset을 상속받은 객체를 필요로 해.<br>
✅ 위의 CustomDataset을 DataLoader에 넣으면 오류가 발생해.

### DataLoader
- 배치 학습(Batch Training) 지원 → 한 번에 여러 개의 데이터를 불러와 학습 속도를 높임.
- shuffle=True → 데이터를 랜덤하게 섞어서 학습 가능 (과적합 방지)
- num_workers → 데이터 로딩을 여러 개의 프로세스로 나눠서 빠르게 처리 가능

In [None]:
from torch.utils.data import Dataset, DataLoader
# Dataset: 데이터를 정의하고 관리하는 클래스
# DataLoader: Dataset에서 데이터를 배치(batch) 단위로 불러오는 도구


class ReturnDataset(Dataset):
    def __init__(self, X, y, sequence_length=5):
        self.X = X
        self.y = y
        self.sequence_length = sequence_length

    def __len__(self):
        return len(self.X) - self.sequence_length

    def __getitem__(self, idx):
        return (
            self.X[idx : idx + self.sequence_length],  # 시퀀스 입력
            self.y[idx + self.sequence_length - 1]  # 타겟 값
        )
    # (sequence_length, input_dim) : 데이터 단위 -> 해당 크기로 데이터를 전달
    # X_sample.shape = (5, 2)  # (시퀀스 길이, 입력 차원)
    # y_sample.shape = (1,)    # (예측할 값)

# Dataset 생성
sequence_length = 5
train_dataset = ReturnDataset(X_train, y_train, sequence_length)
val_dataset = ReturnDataset(X_val, y_val, sequence_length)
test_dataset = ReturnDataset(X_test, y_test, sequence_length)

# DataLoader 생성
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False)  # 검증 데이터는 순서 유지
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)  # 테스트 데이터도 순서 유지

# DataLoader는 배치 크기로 데이터를 가져온다.
# X_batch.shape = (16, sequence_length, input_dim)  # (배치 크기, 시퀀스 길이, 입력 차원)
# y_batch.shape = (16,)  # (배치 크기)


In [None]:
next(iter(train_loader))[1]

tensor([-0.1752,  3.0118, -0.6022, -0.3720, -0.3000,  0.0476, -0.3295,  0.1542,
        -0.1954, -0.1083, -0.2878, -0.5557, -0.3762, -1.0277, -0.3155, -0.2977])

## 모델

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim

class MyRNN(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super(MyRNN, self).__init__()
        self.rnn = CustomRNN(input_size, hidden_size, num_layers, batch_first=True)
        # nn 모듈에서 RNN 불러오기
        self.fc = nn.Linear(hidden_size, output_size)  # 최종 출력층
        # 원하는 task에 맞게 차원을 변경할 수 있는 레이어

    def forward(self, x):
        out, hidden = self.rnn(x)
        out = self.fc(out[:, -1, :])  # 마지막 타임스텝의 출력 사용
        return out


In [None]:
import torch.nn as nn
import torch.optim as optim

# 모델 초기화 (입력 크기 = 2, 은닉 크기 = 16, 층 개수 = 2, 출력 크기 = 1)
model = MyRNN(input_size=2, hidden_size=10, num_layers=2, output_size=1)

# 손실 함수 및 옵티마이저
criterion = nn.MSELoss()  # 회귀 문제이므로 MSELoss 사용
optimizer = optim.Adam(model.parameters(), lr=0.01)

# 학습 루프
num_epochs = 10
for epoch in range(num_epochs):
    model.train()  # 학습 모드
    train_loss = 0.0

    for X_batch, y_batch in train_loader:
        optimizer.zero_grad()
        outputs = model(X_batch).squeeze()  # (batch, 1) → (batch,)
        loss = criterion(outputs, y_batch)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()

    train_loss /= len(train_loader)

    # 검증 단계 (Val 데이터 활용)
    model.eval()  # 평가 모드
    val_loss = 0.0

    with torch.no_grad():
        for X_batch, y_batch in val_loader:
            outputs = model(X_batch).squeeze()
            loss = criterion(outputs, y_batch)
            val_loss += loss.item()

    val_loss /= len(val_loader)

    print(f"Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}")


Epoch [1/10], Train Loss: 0.5170, Val Loss: 2.0776
Epoch [2/10], Train Loss: 0.2300, Val Loss: 1.6095
Epoch [3/10], Train Loss: 0.1699, Val Loss: 1.4942
Epoch [4/10], Train Loss: 0.1335, Val Loss: 1.3099
Epoch [5/10], Train Loss: 0.1172, Val Loss: 1.2961
Epoch [6/10], Train Loss: 0.1099, Val Loss: 1.2747
Epoch [7/10], Train Loss: 0.1077, Val Loss: 1.2151
Epoch [8/10], Train Loss: 0.1109, Val Loss: 1.1964
Epoch [9/10], Train Loss: 0.1081, Val Loss: 1.1627
Epoch [10/10], Train Loss: 0.0974, Val Loss: 1.0815


In [None]:
# 테스트 데이터 평가
model.eval()
test_loss = 0.0

with torch.no_grad():
    for X_batch, y_batch in test_loader:
        outputs = model(X_batch).squeeze()
        loss = criterion(outputs, y_batch)
        test_loss += loss.item()

test_loss /= len(test_loader)
print(f"Final Test Loss: {test_loss:.4f}")


Final Test Loss: 0.0890


## RNN github 코드 살펴보기

In [None]:
import torch
import torch.nn as nn

class CustomRNN(nn.Module):
    def __init__(
        self, input_size, hidden_size, num_layers=1, nonlinearity="tanh", bias=True, batch_first=True, dropout=0.0, bidirectional=False
    ):
        super(CustomRNN, self).__init__()

        self.input_size = input_size
        # 입력 벡터의 크기
        self.hidden_size = hidden_size
        # RNN 모델이 데이터를 처리할 때, Hidden state의 크기
        self.num_layers = num_layers
        # RNN 신경망을 위로 몇 층 쌓을지 결정
        self.nonlinearity = nonlinearity
        # 사용할 비선형 함수, tanh 함수와 ReLu 함수가 있다.
        self.bias = bias
        # 편향 사용 유무
        self.batch_first = batch_first

        self.dropout = dropout
        self.bidirectional = bidirectional

        if nonlinearity == "tanh":
            self.activation = torch.tanh
        elif nonlinearity == "relu":
            self.activation = torch.relu
        else:
            raise ValueError("Invalid nonlinearity. Choose either 'tanh' or 'relu'")

        # 가중치 및 바이어스 초기화
        self.weight_ih = nn.ParameterList([nn.Parameter(torch.randn(hidden_size, input_size if i == 0 else hidden_size)) for i in range(num_layers)])
        # nn.ParameterList([...])
        # torch.nn.Module에서 학습 가능한 파라미터 목록을 만들기 위해 사용됩니다.
        # 내부의 nn.Parameter들이 학습 가능한 텐서로 등록됩니다.

        # nn.Parameter()
        # 학습 가능한 텐서(가중치, 편향 등)를 선언할 때 사용.
        # requires_grad=True가 기본값으로 설정되어 있어서, 역전파(Gradient Descent) 과정에서 자동으로 업데이트됨.
        # nn.Parameter를 사용하면 model.parameters() 호출 시 자동으로 포함
        # nn.ParameterList와 함께 사용하여 여러 개의 가중치를 쉽게 관리 가능

        self.weight_hh = nn.ParameterList([nn.Parameter(torch.randn(hidden_size, hidden_size)) for _ in range(num_layers)])

        if bias:
            self.bias_ih = nn.ParameterList([nn.Parameter(torch.randn(hidden_size)) for _ in range(num_layers)])
            self.bias_hh = nn.ParameterList([nn.Parameter(torch.randn(hidden_size)) for _ in range(num_layers)])
        else:
            self.register_parameter('bias_ih', None)
            self.register_parameter('bias_hh', None)

            # PyTorch의 nn.Module에서 self.register_parameter(name, param) 함수는 모델의 학습 가능한 매개변수(파라미터)를 등록하는 역할
            # 'bias_ih'라는 이름을 가진 학습 가능한 매개변수를 등록하지만, 실제 값은 None으로 설정.
            # 즉, 이 RNN 모델에서는 bias_ih를 사용하지 않겠다는 의미

    def forward(self, x, hx=None):
      if self.batch_first:
          x = x.transpose(0, 1)  # (batch, seq_len, input_size) -> (seq_len, batch, input_size)

      seq_len, batch_size, _ = x.size()

      if hx is None:
          hx = torch.zeros(self.num_layers, batch_size, self.hidden_size, device=x.device)

      h_prev = [hx[layer] for layer in range(self.num_layers)]  # 이전 hidden states 저장
      output = []

      for t in range(seq_len):
          h_new = []  # 새로운 hidden states 저장할 리스트
          for layer in range(self.num_layers):
              if layer == 0:
                  layer_input = x[t]  # 첫 번째 layer는 입력 데이터를 사용
              else:
                  layer_input = h_new[layer - 1]  # 이전 layer의 출력을 입력으로 사용

              new_h = self.activation(
                  layer_input @ self.weight_ih[layer].T + self.bias_ih[layer] +
                  h_prev[layer] @ self.weight_hh[layer].T + self.bias_hh[layer]
              )
              h_new.append(new_h)  # 새로운 hidden state 저장

          output.append(h_new[-1])  # 마지막 layer의 hidden state 저장
          h_prev = h_new  # 현재 hidden states를 다음 타임스텝에서 사용

      output = torch.stack(output)  # (seq_len, batch_size, hidden_size)

      if self.batch_first:
          output = output.transpose(0, 1)  # (batch, seq_len, hidden_size)

      return output, torch.stack(h_prev)  # 마지막 hidden state도 반환


In [None]:
num_layers = 1
for i in range(num_layers):
  print(i)

0
