In [1]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset

In [10]:
# 데이터셋 정의
class BitcoinDataset(Dataset):
    def __init__(self, data, sequence_length):
        self.data = data
        self.sequence_length = sequence_length

    def __len__(self):
        return len(self.data) - self.sequence_length

    def __getitem__(self, idx):
        seq = self.data[idx:idx + self.sequence_length]
        target = self.data[idx + self.sequence_length, -1]  # Close 가격 예측
        return torch.tensor(seq, dtype=torch.float32), torch.tensor(target, dtype=torch.float32)

In [3]:
# Transformer 모델 정의
class TransformerModel(nn.Module):
    def __init__(self, input_dim, model_dim, num_heads, num_layers, dropout=0.1):
        super(TransformerModel, self).__init__()
        self.input_proj = nn.Linear(input_dim, model_dim)
        self.positional_encoding = nn.Parameter(torch.zeros(1, 1000, model_dim))  # 위치 인코딩
        self.transformer = nn.Transformer(
            d_model=model_dim,
            nhead=num_heads,
            num_encoder_layers=num_layers,
            num_decoder_layers=num_layers,
            dropout=dropout
        )
        self.fc_out = nn.Linear(model_dim, 1)

    def forward(self, src):
        src = self.input_proj(src) + self.positional_encoding[:, :src.size(1), :]
        src = src.permute(1, 0, 2)  # (Batch, Seq, Feature) -> (Seq, Batch, Feature)
        output = self.transformer(src, src)
        output = output[-1]  # 마지막 시점의 출력만 사용
        return self.fc_out(output)


In [18]:
# 하이퍼파라미터 설정
sequence_length = 30  # 1시간 시퀀스
input_dim = 6  # OHLCV 데이터
model_dim = 64
num_heads = 4
num_layers = 2
batch_size = 32
epochs = 50

data = pd.read_csv('/workspace/BTCUSDT/BTCUSDT-1d-2021.csv', index_col=0)
data = data[['Open', 'High', 'Low', 'Close', 'Volume']]
print(data)
dataset = BitcoinDataset(data, sequence_length)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
print(dataloader)
# 모델 초기화
model = TransformerModel(input_dim, model_dim, num_heads, num_layers)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

# 모델 학습
for epoch in range(epochs):
    model.train()
    for seq, target in dataloader:
        optimizer.zero_grad()
        pred = model(seq)
        loss = criterion(pred.squeeze(), target)
        loss.backward()
        optimizer.step()
    print(f"Epoch {epoch + 1}/{epochs}, Loss: {loss.item()}")

print("모델 학습 완료!")

                Open      High       Low     Close         Volume
Open time                                                        
2021-01-01  28923.63  29600.00  28624.57  29331.69   54182.925011
2021-01-02  29331.70  33300.00  28946.53  32178.33  129993.873362
2021-01-03  32176.45  34778.11  31962.99  33000.05  120957.566750
2021-01-04  33000.05  33600.00  28130.00  31988.71  140899.885690
2021-01-05  31989.75  34360.00  29900.00  33949.53  116049.997038
...              ...       ...       ...       ...            ...
2021-12-27  50775.48  52088.00  50449.00  50701.44   28792.215660
2021-12-28  50701.44  50704.05  47313.01  47543.74   45853.339240
2021-12-29  47543.74  48139.08  46096.99  46464.66   39498.870000
2021-12-30  46464.66  47900.00  45900.00  47120.87   30352.295690
2021-12-31  47120.88  48548.26  45678.00  46216.93   34937.997960

[365 rows x 5 columns]
<torch.utils.data.dataloader.DataLoader object at 0x7fff1153e0d0>




KeyError: (172, -1)

In [None]:
import matplotlib.pyplot as plt

data = pd.read_csv('/workspace/BTCUSDT/BTCUSDT-1h-2022.csv', index_col=0)
test_data = data[['Open', 'High', 'Low', 'Close', 'Volume']]
test_dataset = BitcoinDataset(test_data, sequence_length)
test_dataloader = DataLoader(test_dataset, batch_size=1, shuffle=False)

# 모델 테스트 및 평가
def test_model(model, dataloader):
    model.eval()
    predictions, actuals = [], []
    with torch.no_grad():
        for seq, target in dataloader:
            pred = model(seq).squeeze().item()
            predictions.append(pred)
            actuals.append(target.item())
    return predictions, actuals

# 테스트 실행
predictions, actuals = test_model(model, test_dataloader)

# 정확도 계산
mse = np.mean((np.array(predictions) - np.array(actuals))**2)
rmse = np.sqrt(mse)
print(f"Test MSE: {mse:.4f}, RMSE: {rmse:.4f}")

# 매수/매도 시뮬레이션 및 수익률 계산
def simulate_trading(predictions, actuals):
    capital = 10000  # 초기 자본
    position = 0  # 현재 보유 비트코인
    capital_history = [capital]

    for i in range(1, len(predictions)):
        # 간단한 전략: 예측 상승 시 매수, 예측 하락 시 매도
        if predictions[i] > actuals[i-1]:  # 상승 예측
            if capital > 0:
                position += capital / actuals[i]
                capital = 0
        elif predictions[i] < actuals[i-1]:  # 하락 예측
            if position > 0:
                capital += position * actuals[i]
                position = 0
        # 자본 기록
        capital_history.append(capital + position * actuals[i])

    return capital_history

capital_history = simulate_trading(predictions, actuals)

# 결과 시각화
plt.figure(figsize=(12, 6))

# 예측 vs 실제 값
plt.subplot(2, 1, 1)
plt.plot(actuals, label="Actual Prices")
plt.plot(predictions, label="Predicted Prices")
plt.title("Bitcoin Price Prediction")
plt.legend()

# 수익률 그래프
plt.subplot(2, 1, 2)
plt.plot(capital_history, label="Capital Over Time")
plt.title("Trading Simulation Performance")
plt.legend()

plt.tight_layout()
plt.show()
