In [3]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

# rolling minmax scaling 함수 (window=24)
def rolling_minmax_scale(series, window=24):
    roll_min = series.rolling(window=window, min_periods=window).min()
    roll_max = series.rolling(window=window, min_periods=window).max()
    scaled = (series - roll_min) / ((roll_max - roll_min) + 1e-8)
    scaled = scaled.replace([np.inf, -np.inf], np.nan)
    scaled = scaled.fillna(1.0)
    return scaled.clip(upper=1.0)

# binning 및 one-hot 인코딩 함수 (결과를 정수 0,1로)
def bin_and_encode(data, features, bins=100, drop_original=True):
    for feature in features:
        data[f'{feature}_Bin'] = pd.cut(data[feature], bins=bins, labels=False)
        one_hot = pd.get_dummies(data[f'{feature}_Bin'], prefix=f'{feature}_Bin').astype(np.int32)
        expected_columns = [f'{feature}_Bin_{i}' for i in range(bins)]
        one_hot = one_hot.reindex(columns=expected_columns, fill_value=0)
        data = pd.concat([data, one_hot], axis=1)
        if drop_original:
            data.drop(columns=[f'{feature}_Bin'], inplace=True)
    numeric_cols = data.select_dtypes(include=[np.number]).columns.tolist()
    for col in numeric_cols:
        data[col] = data[col].astype('float32')
    return data

# TimeSeriesDataset (분류용): lookback 후, 바로 다음 봉의 close 값을 이전 봉과 비교하여 상승이면 1, 하락이면 0을 타깃으로 함
class TimeSeriesDataset(Dataset):
    def __init__(self, input_data, target_data, lookback=24):
        self.input_data = input_data.values
        self.target_data = target_data.values
        self.lookback = lookback

    def __len__(self):
        return len(self.input_data) - self.lookback

    def __getitem__(self, idx):
        x = self.input_data[idx: idx + self.lookback, :]
        # 타깃: lookback 후의 close 값과 바로 전 봉(close 값) 비교
        y = self.target_data[idx + self.lookback, 0]
        y_prev = self.target_data[idx + self.lookback - 1, 0]
        y_target = 1 if y > y_prev else 0
        return torch.tensor(x, dtype=torch.float32), torch.tensor(y_target, dtype=torch.long)

# 분류 모델 정의 (상승/하락 예측)
class EncoderOnlyTransformerCustom(nn.Module):
    def __init__(self, input_dim, embedding_dim=512, num_layers=6, nhead=8, 
                 ffn_dim=2048, num_classes=2, max_seq_len=24):
        super(EncoderOnlyTransformerCustom, self).__init__()
        self.token_embedding = nn.Linear(input_dim, embedding_dim)
        self.position_embedding = nn.Embedding(max_seq_len, embedding_dim)
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=embedding_dim, nhead=nhead, dim_feedforward=ffn_dim)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.fc = nn.Linear(embedding_dim, num_classes)
        
    def forward(self, x):
        batch_size, seq_len, _ = x.shape
        x = self.token_embedding(x)
        positions = torch.arange(seq_len, device=x.device).unsqueeze(0).expand(batch_size, seq_len)
        pos_emb = self.position_embedding(positions)
        x = x + pos_emb
        x = x.transpose(0, 1)  # Transformer의 입력형태: [seq_len, batch, features]
        x = self.transformer_encoder(x)
        return self.fc(x[-1, :, :])  # 마지막 타임스탭 출력

# 평가 함수 (분류용)
def evaluate_model(model, data_loader, device):
    model.eval()
    total_loss, correct, total = 0, 0, 0
    criterion = nn.CrossEntropyLoss()
    with torch.no_grad():
        for x, y in data_loader:
            x, y = x.to(device), y.to(device)
            outputs = model(x)
            loss = criterion(outputs, y)
            total_loss += loss.item()
            correct += (outputs.argmax(1) == y).sum().item()
            total += y.size(0)
    return total_loss / len(data_loader), correct / total

# 1. 데이터 로드 및 전처리 (파일명 및 경로에 맞게 수정)
data = pd.read_csv("ETH_upbit_KRW_min5.csv", index_col=0)
data = data[['open', 'high', 'low', 'close']]
data.index = pd.to_datetime(data.index)

# 각 OHLC 열에 대해 rolling minmax scaling (window=24)
ohlc_features = ['open', 'high', 'low', 'close']
for feature in ohlc_features:
    data[feature] = rolling_minmax_scale(data[feature], window=24)

# one-hot 인코딩: OHLC 열은 100 구간으로 나눔
data_encoded = bin_and_encode(data.copy(), ohlc_features, bins=100, drop_original=True)

# 타깃은 원본 close 값을 사용 (상승/하락 판단용)
data['close_target'] = data['close']
data = data.dropna()

final_input_columns = [col for col in data_encoded.columns if '_Bin_' in col]
final_target_column = ['close_target']

data_input = data_encoded[final_input_columns]
data_target = data[final_target_column]

# 1. 백테스팅을 위한 데이터셋 구성
lookback = 24
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 기존 데이터 전처리 유지
data_encoded = bin_and_encode(data.copy(), ohlc_features, bins=100, drop_original=True)

# target 복원 (data_encoded에 close_target을 추가)
data_encoded['close_target'] = data['close']

# 데이터셋 생성
final_input_columns = [col for col in data_encoded.columns if '_Bin_' in col]
data_input = data_encoded[final_input_columns]
data_target = data_encoded[['close_target']]

backtest_dataset = TimeSeriesDataset(data_input, data_target, lookback=lookback)
backtest_loader = DataLoader(backtest_dataset, batch_size=32, shuffle=False)

# 2. 모델 로드
input_dim = data_input.shape[1]
model = EncoderOnlyTransformerCustom(
    input_dim=input_dim, embedding_dim=512, num_layers=6, nhead=8,
    ffn_dim=2048, num_classes=2, max_seq_len=lookback
).to(device)

# 학습된 파라미터 로드
model.load_state_dict(torch.load("model_experiment_15.pth", map_location=device))
model.eval()

# 3. 백테스팅 실행 및 예측값 저장
predictions = []
actuals = []
timestamps = []

with torch.no_grad():
    for i, (x, y) in enumerate(backtest_loader):
        x, y = x.to(device), y.to(device)
        outputs = model(x)
        probs = torch.softmax(outputs, dim=1)  # 예측 확률 계산
        predicted = torch.argmax(probs, dim=1)  # 예측 클래스 (0: 하락, 1: 상승)

        predictions.extend(predicted.cpu().numpy())
        actuals.extend(y.cpu().numpy())
        timestamps.extend(data.index[lookback + i * 32:lookback + (i + 1) * 32])  # 타임스탬프 저장

# 4. 결과 DataFrame 생성
backtest_results = pd.DataFrame({
    'timestamp': timestamps,
    'actual': actuals,
    'predicted': predictions
})

# 예측 정확도 계산
accuracy = (backtest_results['actual'] == backtest_results['predicted']).mean()
print(f"Backtest Accuracy: {accuracy*100:.2f}%")

# 5. CSV로 저장
backtest_results.to_csv("backtest_results.csv", index=False)
print("백테스팅 결과가 backtest_results.csv 파일로 저장되었습니다.")


  model.load_state_dict(torch.load("model_experiment_15.pth", map_location=device))


RuntimeError: Error(s) in loading state_dict for EncoderOnlyTransformerCustom:
	size mismatch for fc.weight: copying a param with shape torch.Size([1, 512]) from checkpoint, the shape in current model is torch.Size([2, 512]).
	size mismatch for fc.bias: copying a param with shape torch.Size([1]) from checkpoint, the shape in current model is torch.Size([2]).