<a href="https://colab.research.google.com/github/hjkwon-pknu/calculator/blob/main/Project25.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# -*- coding: utf-8 -*-
#!/usr/bin/env python3
import sys
import tempfile
import subprocess
from datetime import datetime, timedelta
from zipfile import ZipFile
from os import environ, getenv, makedirs, getcwd, walk, remove
from os.path import basename, join, exists, expanduser as home

def pip_install(package):
  subprocess.check_call([sys.executable, "-m", "pip", "install", package])

def pip_install_requirements(requirements_dir):
  subprocess.check_call([sys.executable, "-m", "pip", "install", "-r", requirements_dir.rstrip(".txt")+".txt"])

## GitPython으로 git을 사용할 수 있도록 한다. 없다면 pip로 GitPython을 설치 한다.
try:
  from git import Repo
except:
  pip_install("GitPython")
  from git import Repo

## 캔들 데이터를 사용하기 위해서는 Pandas가 필요하다.
try:
  import pandas as pd
except:
  pip_install("pandas")
  import pandas as pd

## Pandas 짝꿍 Numpy가 필요하다.
try:
  import numpy as np
except:
  pip_install("numpy")
  import numpy as np

## Binance REST API로 다운로드된 데이터에서 부족한 부분만 가져올 수 있도록 requests를 사용하자.
try:
  import requests
except:
  pip_install("requests")
  import requests

## 바이낸스 퍼블릭 데이터 다운로드 소스코드를 Temp(임시폴더) 다운로드 받아서 사용하도록 한다.
repo_url = "https://github.com/binance/binance-public-data.git"
temp_path = tempfile.mkdtemp(prefix='candle_download_')

## git으로 소스코드를 임시폴더에 클론(다운로드) 시키고 위치를 저장해두자.
repo_path = Repo.clone_from(repo_url, temp_path)
WORK_PATH = repo_path.working_dir

## STORE_DIRECTORY 환경변수가 없으면 사용자폴더에 binance_data를 사용하도록 설정한다.
STORE_PATH = join(home('~'), "binance_data") if not "STORE_DIRECTORY" in environ.keys() else getenv("STORE_DIRECTORY")
environ["STORE_DIRECTORY"] = STORE_PATH

## 캔들 데이터를 다운로드 받는 download-kline.py를 실행한다.
def download_klines(cmd, args):
  subprocess.check_call(cmd + args)

## 저장할 위치가 없으면 만들어주고, download-kline.py에 다운로드 받을 코인 정보등을 입력한다.
def download_binance_datas(symbol="BTCUSDT", interval="1m"):
  # environ["STORE_DIRECTORY"] = "/Users/name/binance_data/"
  if not exists(STORE_PATH):
    makedirs(STORE_PATH)
  # Install requirements library
  pip_install_requirements(join(WORK_PATH, "python", "requirements.txt"))
  # configure download command,
  kline_cmd = [sys.executable, join(WORK_PATH, "python", "download-kline.py")]
  monthly_args = ["-t", "um", "-s", symbol, "-i", interval, "-skip-daily", "1", "-startDate", "2020-01-01"]
  daily_args = ["-t", "um", "-s", symbol, "-i", interval, "-skip-monthly", "1", "-startDate", f"{datetime.now().strftime('%Y-%m')}-01"]
  # excute download kline
  download_klines(kline_cmd, monthly_args)
  download_klines(kline_cmd, daily_args)

def klines_unzip(search_directory=STORE_PATH):
  search_directory = join(search_directory, 'data')
  for root, dirs, files in walk(search_directory):
    for file in files:
      if file.endswith('.zip'):
        zip_file_path = join(root, file)
        # 압축을 풀 디렉토리 선택 (zip 파일이 있는 폴더와 동일한 위치)
        extract_directory = root
        # 압축 파일 열기
        with ZipFile(zip_file_path, 'r') as zip_ref:
          # 압축 해제
          zip_ref.extractall(extract_directory)
        print(f'압축 해제: {zip_file_path} -> {extract_directory}')

def klines_history(search_directory=STORE_PATH):
  search_directory = join(search_directory, 'data')
  # 모든 CSV 파일을 저장할 데이터 프레임 초기화
  history = pd.DataFrame()
  for root, dirs, files in walk(search_directory):
    for file in sorted(files):
      if file.endswith('.csv') and file != f'{basename(root)}.csv':
        csv_file_path = join(root, file)
        # 해더가 있는지 확인하고 넘어가야함. 첫 번째 라인 확인
        with open(csv_file_path, 'r') as file:
          first_line = file.readline()
          # 컬럼 이름이 있는 경우 읽을때
          if 'open_time' in first_line or 'Open' in first_line or 'open' in first_line:
            print(f'DataFrame(Header exist): {csv_file_path}')
            df = pd.read_csv(csv_file_path)  # header=0 (기본값)
          # 컬럼 이름이 없는 경우 읽을때
          else:
            print(f'DataFrame(Header empty): {csv_file_path}')
            df = pd.read_csv(csv_file_path, header=None)
            df.columns = ['open_time', 'open','high', 'low', 'close', 'volume', 'close_time', 'quote_volume', 'count', 'taker_buy_volume', 'taker_buy_quote_volume', 'ignore']
        df = df.iloc[:, :6]
        df.columns = ['datetime', 'open','high', 'low', 'close', 'volume']
        history = pd.concat([history, df])
  history.index = pd.to_datetime(history['datetime'], unit='ms', utc=True)
  history = history.astype(float)
  history = history.tz_convert('Asia/Seoul')
  history = history.iloc[np.unique(history.index.values, return_index=True)[1]]
  history_file_path = join(STORE_PATH, f'{basename(root)}_history.csv')
  print("### Save History Klines/Candles (Download) : ", history_file_path)
  history.to_csv(history_file_path, index=False)
  return history, history_file_path, history['datetime'].iloc[-1]

if __name__ == "__main__":
  ## 바이낸스 비트코인 선물 1분지표 다운로드를 실행한다.
  download_binance_datas(symbol="BTCUSDT", interval="1m")
  ## 다운받은 zip파일들의 압축을 해제한다.
  klines_unzip(search_directory=STORE_PATH)
  ## csv파일들을 읽어 csv단일 파일로 저장한다.
  history_df, history_file_path, history_last_timestamp = klines_history(STORE_PATH)
  print(f"### Download Klines/Candles Count is: {len(history_df)}")
  print(f"### Last Klines/Candles Timestamp is: {history_last_timestamp}")

압축 해제: /root/binance_data/data/futures/um/daily/klines/BTCUSDT/1m/BTCUSDT-1m-2025-11-07.zip -> /root/binance_data/data/futures/um/daily/klines/BTCUSDT/1m
압축 해제: /root/binance_data/data/futures/um/daily/klines/BTCUSDT/1m/BTCUSDT-1m-2025-11-01.zip -> /root/binance_data/data/futures/um/daily/klines/BTCUSDT/1m
압축 해제: /root/binance_data/data/futures/um/daily/klines/BTCUSDT/1m/BTCUSDT-1m-2025-11-02.zip -> /root/binance_data/data/futures/um/daily/klines/BTCUSDT/1m
압축 해제: /root/binance_data/data/futures/um/daily/klines/BTCUSDT/1m/BTCUSDT-1m-2025-11-06.zip -> /root/binance_data/data/futures/um/daily/klines/BTCUSDT/1m
압축 해제: /root/binance_data/data/futures/um/daily/klines/BTCUSDT/1m/BTCUSDT-1m-2025-11-08.zip -> /root/binance_data/data/futures/um/daily/klines/BTCUSDT/1m
압축 해제: /root/binance_data/data/futures/um/daily/klines/BTCUSDT/1m/BTCUSDT-1m-2025-11-09.zip -> /root/binance_data/data/futures/um/daily/klines/BTCUSDT/1m
압축 해제: /root/binance_data/data/futures/um/daily/klines/BTCUSDT/1m/BTCUSDT-1m

In [3]:
# 파일명 예: ml_step1_baseline.py
import sys
import subprocess

# 의존성 자동 설치 함수
def pip_install(package):
  subprocess.check_call([sys.executable, "-m", "pip", "install", package])

# pandas 자동 설치
try:
  import pandas as pd
except:
  print("pandas가 설치되지 않았습니다. 자동 설치 중...")
  pip_install("pandas")
  import pandas as pd

# numpy 자동 설치
try:
  import numpy as np
except:
  print("numpy가 설치되지 않았습니다. 자동 설치 중...")
  pip_install("numpy")
  import numpy as np

# scikit-learn 자동 설치
try:
  from sklearn.pipeline import Pipeline
  from sklearn.preprocessing import StandardScaler
  from sklearn.linear_model import LogisticRegression
  from sklearn.metrics import accuracy_score, balanced_accuracy_score, classification_report
except:
  print("scikit-learn이 설치되지 않았습니다. 자동 설치 중...")
  pip_install("scikit-learn")
  from sklearn.pipeline import Pipeline
  from sklearn.preprocessing import StandardScaler
  from sklearn.linear_model import LogisticRegression
  from sklearn.metrics import accuracy_score, balanced_accuracy_score, classification_report

FILE_PATH = r"/root/binance_data/1m_history.csv"  # 필요 시 경로 수정
USE_ROWS = 1_000_000  # 처음엔 50만~100만 행으로 시작

def load_data(path, use_rows=None):
  df = pd.read_csv(path, usecols=['datetime','open','high','low','close','volume'])
  if use_rows is not None and len(df) > use_rows:
    df = df.tail(use_rows)
  df = df.sort_values('datetime').reset_index(drop=True)
  return df

def make_features(df):
  # 수익률/변동성/거래량 기반의 가벼운 특성
  df['ret_1'] = df['close'].pct_change(1)
  df['ret_3'] = df['close'].pct_change(3)
  df['ret_5'] = df['close'].pct_change(5)
  df['hl_range'] = (df['high'] - df['low']) / df['close']
  df['vol_chg'] = df['volume'].pct_change().replace([np.inf, -np.inf], 0).fillna(0)
  df['ret_mean_10'] = df['ret_1'].rolling(10, min_periods=10).mean()
  df['ret_std_10']  = df['ret_1'].rolling(10, min_periods=10).std()
  # 타깃: 다음 1분이 오르면 1, 내리면 0 (동일가는 0으로 처리)
  df['y'] = (df['close'].shift(-1) > df['close']).astype(int)
  df = df.dropna().reset_index(drop=True)
  features = ['ret_1','ret_3','ret_5','hl_range','vol_chg','ret_mean_10','ret_std_10']
  return df, features

def time_split(df, test_ratio=0.2):
  n = len(df)
  split = int(n * (1 - test_ratio))
  train = df.iloc[:split].copy()
  test  = df.iloc[split:].copy()
  return train, test

def main():
  df = load_data(FILE_PATH, USE_ROWS)
  df, feats = make_features(df)
  train, test = time_split(df, test_ratio=0.2)
  X_tr, y_tr = train[feats].values, train['y'].values
  X_te, y_te = test[feats].values,  test['y'].values

  pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('clf', LogisticRegression(max_iter=300, class_weight='balanced'))
  ])
  pipe.fit(X_tr, y_tr)

  pred = pipe.predict(X_te)
  proba = pipe.predict_proba(X_te)[:,1]

  print("Accuracy:", accuracy_score(y_te, pred))
  print("Balanced Acc:", balanced_accuracy_score(y_te, pred))
  print(classification_report(y_te, pred, digits=4))

  # 간단 PnL(확률 임계값 0.55 이상/이하일 때만 매매)
  thr = 0.55
  signal = np.where(proba >= thr, 1, np.where(proba <= 1-thr, -1, 0))
  ret = test['close'].pct_change().fillna(0).values
  pnl = (signal[:-1] * ret[1:])  # 한 틱 뒤 반영
  print(f"Trades: {(signal!=0).sum()}, PnL(단순 합): {pnl.sum():.5f}")

if __name__ == "__main__":
  main()

Accuracy: 0.5162551625516255
Balanced Acc: 0.5106379503388222
              precision    recall  f1-score   support

           0     0.5236    0.6887    0.5949    103153
           1     0.5008    0.3325    0.3997     96845

    accuracy                         0.5163    199998
   macro avg     0.5122    0.5106    0.4973    199998
weighted avg     0.5125    0.5163    0.5004    199998

Trades: 450, PnL(단순 합): 0.15382


In [7]:
# ===== Step A. Setup & Load =====
import os, gc, math, random, sys, subprocess
import numpy as np
import pandas as pd

# 경로 자동 설정: /root ... 없으면 /content ...
ROOT_PATHS = ["/root/binance_data/1m_history.csv", "/content/binance_data/1m_history.csv"]
for p in ROOT_PATHS:
    if os.path.exists(p):
        FILE_PATH = p
        break
else:
    # 필요 시 직접 지정
    FILE_PATH = "/root/binance_data/1m_history.csv"

USE_ROWS = 1_000_000   # 메모리/속도에 맞춰 조절

# 재현성
SEED = 42
random.seed(SEED); np.random.seed(SEED)

def load_data(path, use_rows=None):
    usecols = ['datetime','open','high','low','close','volume']
    df = pd.read_csv(path, usecols=usecols)
    if use_rows is not None and len(df) > use_rows:
        df = df.tail(use_rows)
    # datetime이 문자열이면 정렬용으로 그대로 사용
    df = df.sort_values('datetime').reset_index(drop=True)
    # 안전 캐스팅
    for c in ['open','high','low','close','volume']:
        df[c] = pd.to_numeric(df[c], errors='coerce')
    df = df.dropna().reset_index(drop=True)
    return df

df = load_data(FILE_PATH, USE_ROWS)
print(df.head(), df.shape)

       datetime     open     high      low    close   volume
0  1.702733e+12  42438.4  42456.0  42438.4  42452.3  122.215
1  1.702733e+12  42452.3  42456.0  42452.3  42455.9   32.218
2  1.702733e+12  42456.0  42471.6  42434.6  42434.6  188.557
3  1.702733e+12  42434.6  42434.7  42354.5  42367.0  816.443
4  1.702733e+12  42367.0  42391.9  42358.9  42381.0  210.102 (1000000, 6)


In [17]:
# ===== Resample OHLCV from 1m to higher timeframe =====
import pandas as pd, numpy as np, os

# 1) 1분 CSV 읽기 (지금 쓰던 FILE_PATH 그대로)
df_raw = pd.read_csv(FILE_PATH, usecols=['datetime','open','high','low','close','volume'])

# 2) datetime을 실제 시각으로 변환해 인덱스 설정 (ms 기준)
#    (당신의 CSV는 ms 타임스탬프였죠: 1.702733e+12 형태)
dt = pd.to_datetime(df_raw['datetime'], unit='ms', utc=True)
df1m = df_raw.copy()
df1m.index = dt
df1m = df1m[['open','high','low','close','volume']].astype(float).sort_index()

def resample_ohlcv(df, rule='5T'):
    # rule: '5T'=5분, '15T'=15분, '1H'=1시간, '1D'=일봉
    o = df['open'].resample(rule).first()
    h = df['high'].resample(rule).max()
    l = df['low'].resample(rule).min()
    c = df['close'].resample(rule).last()
    v = df['volume'].resample(rule).sum()
    out = pd.concat([o,h,l,c,v], axis=1)
    out.columns = ['open','high','low','close','volume']
    out = out.dropna().reset_index()
    # Step A~E와 동일 포맷(열 이름 유지)
    out.rename(columns={'index':'datetime'}, inplace=True)
    # datetime을 ms 정수로 되돌려도 되고(완전 호환), string으로 둬도 Step B가 잘 동작합니다.
    out['datetime'] = (out['datetime'].view('int64') // 1_000_000).astype('int64')  # UTC ms
    return out

# === 여기서 원하는 주기로 바꾸세요 ===
TARGET_RULE = '5T'   # '5T'(5분), '15T', '1H', '1D' 중 택1
dfX = resample_ohlcv(df1m, TARGET_RULE)

# 저장 경로 생성 & 저장
base_dir = '/content/binance_data' if os.path.exists('/content') else '/root/binance_data'
os.makedirs(base_dir, exist_ok=True)
resampled_path = os.path.join(base_dir, f'BTCUSDT_{TARGET_RULE}.csv')
dfX.to_csv(resampled_path, index=False)

print("Saved:", resampled_path, "shape:", dfX.shape)

Saved: /content/binance_data/BTCUSDT_5T.csv shape: (616320, 6)


In [18]:
# ===== Step B. Features & Labels =====
# (RSI, EMA, MACD, Bollinger, ATR 근사 등 기본 지표를 numpy/pandas로 계산)

def ema(s, span):
    return s.ewm(span=span, adjust=False).mean()

def rsi(close, n=14):
    delta = close.diff()
    up = delta.clip(lower=0).rolling(n).mean()
    down = (-delta.clip(upper=0)).rolling(n).mean()
    rs = up / (down + 1e-12)
    return 100 - (100 / (1 + rs))

def atr_like(high, low, close, n=14):
    # 간단 TR: high-low, 롤링 평균
    tr = (high - low).abs()
    return tr.rolling(n).mean()

def macd(close, f=12, s=26, sig=9):
    ema_f = ema(close, f)
    ema_s = ema(close, s)
    macd_line = ema_f - ema_s
    signal = ema(macd_line, sig)
    hist = macd_line - signal
    return macd_line, signal, hist

def make_features_labels(df, h=3, deadzone_q=0.45):
    X = df.copy()

    # 수익률 기반
    X['ret_1'] = X['close'].pct_change(1)
    X['ret_3'] = X['close'].pct_change(3)
    X['ret_5'] = X['close'].pct_change(5)

    # 변동성/범위/거래량
    X['hl_range'] = (X['high'] - X['low']) / X['close']
    X['rv_10'] = X['ret_1'].rolling(10).std() * np.sqrt(10)
    X['vol_chg'] = X['volume'].pct_change().replace([np.inf, -np.inf], 0).fillna(0)
    X['z_close_50'] = (X['close'] / X['close'].rolling(50).mean()) - 1
    X['atr_14'] = atr_like(X['high'], X['low'], X['close'], 14)

    # 기술지표
    X['rsi_14'] = rsi(X['close'], 14)
    X['ema_20'] = ema(X['close'], 20)
    X['ema_50'] = ema(X['close'], 50)
    macd_line, macd_sig, macd_hist = macd(X['close'])
    X['macd'] = macd_line
    X['macd_sig'] = macd_sig
    X['macd_hist'] = macd_hist

    # 볼린저 밴드 폭
    ma20 = X['close'].rolling(20).mean()
    std20 = X['close'].rolling(20).std()
    X['bb_width'] = (2*std20) / (ma20 + 1e-12)

    # 레이블: h분 수익률, 데드존
    future_ret = X['close'].shift(-h) / X['close'] - 1
    lo = future_ret.quantile(deadzone_q)
    hi = future_ret.quantile(1 - deadzone_q)
    y3 = np.where(future_ret > hi, 1, np.where(future_ret < lo, -1, 0))  # 1:롱, -1:숏, 0:관망

    X = X.dropna().reset_index(drop=True)
    y3 = pd.Series(y3).iloc[len(pd.Series(y3)) - len(X):].reset_index(drop=True)  # 길이 맞춤

    feat_cols = [
        'ret_1','ret_3','ret_5','hl_range','rv_10','vol_chg','z_close_50','atr_14',
        'rsi_14','ema_20','ema_50','macd','macd_sig','macd_hist','bb_width'
    ]
    return X, feat_cols, y3

H = 3             # 예측 지평(horizon) 분
DEADZONE_Q = 0.45 # 0.45면 가운데 10%만 관망 제거 아닌 '관망'으로 둠
Xdf, FEATS, y3 = make_features_labels(df, h=H, deadzone_q=DEADZONE_Q)
print("Features:", len(FEATS), "rows:", len(Xdf), "label dist:", pd.Series(y3).value_counts())

Features: 15 rows: 999951 label dist: -1    449979
 1    449976
 0     99996
Name: count, dtype: int64


In [19]:
# ===== Step C. Sequences & Splits =====
from sklearn.preprocessing import StandardScaler

LOOKBACK = 32   # 지난 60분 시퀀스로 다음 h분 결과 학습
TEST_RATIO = 0.2
VAL_RATIO = 0.1  # train 내부에서 검증 분리

# 관망(0) 제외하고 '방향' 이진분류(롱=1, 숏=0)로 1차 접근
use_mask = (y3 != 0)
X_used = Xdf.loc[use_mask, FEATS].copy()
y_bin  = (y3[use_mask] == 1).astype(int).values

# 스케일러 (train 구간에만 적합하도록 분할 먼저)
n = len(X_used)
split_test = int(n * (1 - TEST_RATIO))
X_train_df = X_used.iloc[:split_test].copy()
X_test_df  = X_used.iloc[split_test:].copy()
y_train    = y_bin[:split_test]
y_test     = y_bin[split_test:]

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train_df.values)
X_test_scaled  = scaler.transform(X_test_df.values)

def to_sequences(X_arr, y_arr, lookback):
    Xs, ys = [], []
    for i in range(lookback, len(X_arr)):
        Xs.append(X_arr[i-lookback:i, :])
        ys.append(y_arr[i])
    return np.array(Xs, dtype=np.float32), np.array(ys, dtype=np.int64)

X_tr_seq, y_tr_seq = to_sequences(X_train_scaled, y_train, LOOKBACK)
X_te_seq, y_te_seq = to_sequences(X_test_scaled,  y_test,  LOOKBACK)

# train 내부에서 val 분리
nv = int(len(X_tr_seq) * (1 - VAL_RATIO))
X_tr, X_val = X_tr_seq[:nv], X_tr_seq[nv:]
y_tr, y_val = y_tr_seq[:nv], y_tr_seq[nv:]

for name, arr in [('X_tr',X_tr),('X_val',X_val),('X_te',X_te_seq)]:
    print(name, arr.shape)
print('y dist train:', y_tr.mean(), 'val:', y_val.mean(), 'test:', y_te_seq.mean())
gc.collect()

X_tr (647938, 32, 15)
X_val (71994, 32, 15)
X_te (179959, 32, 15)
y dist train: 0.5006790773191262 val: 0.5005833819484957 test: 0.4973077200917987


3964

In [20]:
# ===== Step D. Model =====
# Colab에는 tensorflow 기본 포함 (필요 시 설치: pip install tensorflow==2.15.*)
import tensorflow as tf
from tensorflow.keras import layers, models, callbacks

print(tf.__version__)
MODEL_TYPE = "GRU"  # "GRU" 또는 "LSTM"
UNITS = 64
DROPOUT = 0.2
LR = 1e-3
EPOCHS = 20
BATCH = 512

inp = layers.Input(shape=(LOOKBACK, len(FEATS)))
if MODEL_TYPE.upper() == "GRU":
    x = layers.GRU(UNITS, return_sequences=False)(inp)
else:
    x = layers.LSTM(UNITS, return_sequences=False)(inp)
x = layers.Dropout(DROPOUT)(x)
x = layers.Dense(64, activation='relu')(x)
out = layers.Dense(1, activation='sigmoid')(x)

model = models.Model(inp, out)
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=LR),
              loss='binary_crossentropy',
              metrics=['accuracy', tf.keras.metrics.AUC(name='auc')])

cb = [
    callbacks.EarlyStopping(patience=3, restore_best_weights=True, monitor='val_auc', mode='max'),
    callbacks.ReduceLROnPlateau(patience=2, factor=0.5, monitor='val_loss')
]

hist = model.fit(
    X_tr, y_tr,
    validation_data=(X_val, y_val),
    epochs=EPOCHS, batch_size=BATCH, verbose=1, callbacks=cb
)

print("Val best:", max(hist.history['val_auc']))

2.19.0
Epoch 1/20
[1m1266/1266[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m173s[0m 134ms/step - accuracy: 0.5098 - auc: 0.5130 - loss: 0.6942 - val_accuracy: 0.5086 - val_auc: 0.5177 - val_loss: 0.6930 - learning_rate: 0.0010
Epoch 2/20
[1m1266/1266[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m176s[0m 139ms/step - accuracy: 0.5140 - auc: 0.5195 - loss: 0.6926 - val_accuracy: 0.5112 - val_auc: 0.5176 - val_loss: 0.6928 - learning_rate: 0.0010
Epoch 3/20
[1m1266/1266[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m173s[0m 137ms/step - accuracy: 0.5157 - auc: 0.5227 - loss: 0.6923 - val_accuracy: 0.5124 - val_auc: 0.5183 - val_loss: 0.6927 - learning_rate: 0.0010
Epoch 4/20
[1m1266/1266[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m170s[0m 134ms/step - accuracy: 0.5172 - auc: 0.5237 - loss: 0.6923 - val_accuracy: 0.5125 - val_auc: 0.5181 - val_loss: 0.6927 - learning_rate: 0.0010
Epoch 5/20
[1m1266/1266[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m170s[0m 134ms/step - acc

In [21]:
# ===== Step E. Evaluate & Backtest =====
from sklearn.metrics import classification_report, accuracy_score, balanced_accuracy_score

# 테스트 예측
proba = model.predict(X_te_seq, batch_size=4096).ravel()
pred  = (proba >= 0.5).astype(int)

print("Test Acc:", accuracy_score(y_te_seq, pred))
print("Balanced Acc:", balanced_accuracy_score(y_te_seq, pred))
print(classification_report(y_te_seq, pred, digits=4))

# 백테스트: 원래 관망 제외 데이터의 테스트 구간에 해당하는 '종가'를 맞춰서 사용
# 테스트 세그먼트의 시작 index를 원본(관망제외, 스케일된 테이블) 기준으로 계산
te_start = split_test + LOOKBACK
close_used = X_used.index  # 관망 제외 DataFrame의 원래 인덱스
idx_close_slice = close_used[te_start: te_start + len(proba)]  # 길이 맞춤
close_series = df.loc[idx_close_slice, 'close'].reset_index(drop=True)

def backtest_from_proba(proba, close, thr=0.55, fee_bps=6.0):
    """
    thr: 매수>=thr, 매도<=1-thr, 그 외 0
    fee_bps: 편도 수수료(bp). 예: 6bp = 0.06%
    """
    close = pd.Series(close).reset_index(drop=True)
    ret = close.pct_change().fillna(0).values
    signal = np.where(proba >= thr, 1, np.where(proba <= 1-thr, -1, 0))
    pos = np.roll(signal, 1); pos[0] = 0

    trades = (pos != np.roll(pos,1)).astype(int); trades[0] = (pos[0] != 0)
    cost = trades * (fee_bps / 1e4)

    pnl = pos * ret - cost
    eq = (1 + pd.Series(pnl)).cumprod()
    sharpe = (pd.Series(pnl).mean() / (pd.Series(pnl).std() + 1e-12)) * math.sqrt(365*24*60)
    mdd = ((eq - eq.cummax())/eq.cummax()).min()
    return {
        "trades": int(trades.sum()),
        "net_return": float(eq.iloc[-1] - 1),
        "sharpe": float(sharpe),
        "max_drawdown": float(mdd),
        "final_equity": float(eq.iloc[-1]),
    }

BT = backtest_from_proba(proba, close_series.values, thr=0.58, fee_bps=6.0)
BT

# 1) 시퀀스 생성할 때 '정답 시점의 원본 인덱스'도 함께 반환
def to_sequences_with_index(X_arr, y_arr, base_index, lookback, stride=1):
    Xs, ys, idxs = [], [], []
    for i in range(lookback, len(X_arr), stride):
        Xs.append(X_arr[i-lookback:i, :])
        ys.append(y_arr[i])
        idxs.append(base_index[i])  # 이 시퀀스가 예측하는 '정답 시점'의 원본 인덱스
    return np.array(Xs, dtype=np.float32), np.array(ys, dtype=np.int64), np.array(idxs, dtype=np.int64)

# 2) (다시) Train/Test 분할 + 시퀀스 생성 (기존 파라미터 그대로 사용)
use_mask = (y3 != 0)
X_used = Xdf.loc[use_mask, FEATS].copy()
y_bin  = (y3[use_mask] == 1).astype(int).values
base_index = X_used.index.values  # 원본 df에서의 인덱스

n = len(X_used)
split_test = int(n * (1 - TEST_RATIO))
X_train_df = X_used.iloc[:split_test].copy()
X_test_df  = X_used.iloc[split_test:].copy()
y_train    = y_bin[:split_test]
y_test     = y_bin[split_test:]
base_idx_tr = base_index[:split_test]
base_idx_te = base_index[split_test:]

from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train_df.values)
X_test_scaled  = scaler.transform(X_test_df.values)

# (stride=1로 정확도 우선; 느리면 2~3으로)
X_tr_seq, y_tr_seq, idx_tr_seq = to_sequences_with_index(X_train_scaled, y_train, base_idx_tr, LOOKBACK, stride=1)
X_te_seq, y_te_seq, idx_te_seq = to_sequences_with_index(X_test_scaled,  y_test,  base_idx_te, LOOKBACK, stride=1)

# train 내부에서 val 분리
VAL_RATIO = 0.1
nv = int(len(X_tr_seq) * (1 - VAL_RATIO))
X_tr, X_val = X_tr_seq[:nv], X_tr_seq[nv:]
y_tr, y_val = y_tr_seq[:nv], y_tr_seq[nv:]
idx_val_seq = idx_tr_seq[nv:]   # 검증 시퀀스의 '정답 시점' 원본 인덱스
print(X_tr.shape, X_val.shape, X_te_seq.shape)


[1m44/44[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m15s[0m 330ms/step
Test Acc: 0.5104662728732656
Balanced Acc: 0.5095566961152715
              precision    recall  f1-score   support

           0     0.5098    0.6785    0.5822     90464
           1     0.5117    0.3406    0.4090     89495

    accuracy                         0.5105    179959
   macro avg     0.5108    0.5096    0.4956    179959
weighted avg     0.5108    0.5105    0.4961    179959

(647938, 32, 15) (71994, 32, 15) (179959, 32, 15)


In [22]:
import numpy as np, pandas as pd, math

def backtest_from_proba(proba, close, thr=0.55, fee_bps=6.0, invert=False):
    close = pd.Series(close).reset_index(drop=True)
    ret = close.pct_change().fillna(0).values
    sig = np.where(proba >= thr, 1, np.where(proba <= 1-thr, -1, 0))
    if invert: sig = -sig
    pos = np.roll(sig, 1); pos[0] = 0
    trades = (pos != np.roll(pos,1)).astype(int); trades[0] = (pos[0] != 0)
    cost = trades * (fee_bps/1e4)
    pnl = pos * ret - cost
    eq = (1 + pd.Series(pnl)).cumprod()
    sharpe = (pd.Series(pnl).mean() / (pd.Series(pnl).std() + 1e-12)) * math.sqrt(365*24*60)
    mdd = ((eq - eq.cummax())/eq.cummax()).min()
    return {"trades": int(trades.sum()), "net_return": float(eq.iloc[-1]-1),
            "sharpe": float(sharpe), "max_drawdown": float(mdd), "final_equity": float(eq.iloc[-1])}

def scan_with_constraints(proba, close, fees=(4.0,6.0,8.0), thrs=None, invert_opts=(False, True),
                          objective="sharpe", min_trades=20, max_trades=None):
    if thrs is None: thrs = np.round(np.linspace(0.52, 0.66, 15), 3)
    best = None
    for inv in invert_opts:
        for f in fees:
            for t in thrs:
                m = backtest_from_proba(proba, close, thr=t, fee_bps=f, invert=inv)
                tr = m["trades"]
                if tr < min_trades:  # 무매매/과소매매 제거
                    continue
                if (max_trades is not None) and (tr > max_trades):
                    continue
                score = m["sharpe"] if objective=="sharpe" else m["net_return"]
                if (best is None) or (score > best["score"]):
                    m.update({"thr": float(t), "fee_bps": float(f), "invert": inv, "score": float(score)})
                    best = m
    return best

# 1) 검증 확률/가격
proba_val = model.predict(X_val, batch_size=4096).ravel()
close_val = df.loc[idx_val_seq, 'close'].reset_index(drop=True)

# 2) 검증에서 최적 설정(거래 최소 20건, Sharpe 최대)
best_val = scan_with_constraints(proba_val, close_val.values,
                                 thrs=np.round(np.linspace(0.50, 0.70, 41), 3),
                                 objective="sharpe", min_trades=20)
print("VAL best (constrained):", best_val)

# 3) 테스트에 동일 설정 적용
proba_te = model.predict(X_te_seq, batch_size=4096).ravel()
close_te  = df.loc[idx_te_seq,  'close'].reset_index(drop=True)

bt = backtest_from_proba(proba_te, close_te.values,
                         thr=best_val["thr"], fee_bps=best_val["fee_bps"], invert=best_val["invert"])
print("TEST (VAL-opt constrained):", bt)

[1m18/18[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 222ms/step
VAL best (constrained): {'trades': 20, 'net_return': -0.00773809773253753, 'sharpe': -4.59457976744704, 'max_drawdown': -0.011269994556993101, 'final_equity': 0.9922619022674625, 'thr': 0.595, 'fee_bps': 4.0, 'invert': False, 'score': -4.59457976744704}
[1m44/44[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 222ms/step
TEST (VAL-opt constrained): {'trades': 46, 'net_return': -0.05466642524397314, 'sharpe': -1.8194765592249718, 'max_drawdown': -0.059367861170083525, 'final_equity': 0.9453335747560269}


In [23]:
import numpy as np, pandas as pd, math

def backtest_from_proba(proba, close, thr=0.55, fee_bps=6.0, invert=False):
    close = pd.Series(close).reset_index(drop=True)
    ret = close.pct_change().fillna(0).values
    sig = np.where(proba >= thr, 1, np.where(proba <= 1-thr, -1, 0))
    if invert:
        sig = -sig
    pos = np.roll(sig, 1); pos[0] = 0
    trades = (pos != np.roll(pos,1)).astype(int); trades[0] = (pos[0] != 0)
    cost = trades * (fee_bps/1e4)
    pnl = pos * ret - cost
    eq = (1 + pd.Series(pnl)).cumprod()
    sharpe = (pd.Series(pnl).mean() / (pd.Series(pnl).std() + 1e-12)) * math.sqrt(365*24*60)
    mdd = ((eq - eq.cummax())/eq.cummax()).min()
    return {"trades": int(trades.sum()), "net_return": float(eq.iloc[-1]-1),
            "sharpe": float(sharpe), "max_drawdown": float(mdd), "final_equity": float(eq.iloc[-1])}

def slice_close_for_seq(df, used_index, start_i, length):
    idx = used_index[start_i: start_i+length]
    return df.loc[idx, 'close'].reset_index(drop=True)

def scan_all(proba, close, fees=(4.0,6.0,8.0), thrs=None):
    if thrs is None:
        thrs = np.round(np.linspace(0.52, 0.66, 15), 3)
    best = None
    for inv in (False, True):
        for f in fees:
            for t in thrs:
                m = backtest_from_proba(proba, close, thr=t, fee_bps=f, invert=inv)
                m.update({"thr": float(t), "fee_bps": float(f), "invert": inv})
                if best is None or m["net_return"] > best["net_return"]:
                    best = m
    return best

# 테스트 구간 종가 추출 (Step C에서 만든 변수 이용)
te_start = split_test + LOOKBACK
close_te = slice_close_for_seq(df, X_used.index, te_start, len(proba))

best_test = scan_all(proba, close_te.values)
print("TEST best-by-net-return:", best_test)

TEST best-by-net-return: {'trades': 8, 'net_return': 0.04410212174441419, 'sharpe': 1.5248108046929756, 'max_drawdown': -0.0048732666979932615, 'final_equity': 1.0441021217444142, 'thr': 0.61, 'fee_bps': 4.0, 'invert': True}


In [24]:
# 검증 확률
proba_val = model.predict(X_val, batch_size=4096).ravel()

# 검증 구간 종가 슬라이스 (시퀀스 길이 맞춤)
val_start = split_test - (len(y_tr_seq) - len(y_tr)) + LOOKBACK
close_val = slice_close_for_seq(df, X_used.index, val_start, len(proba_val))

best_val = scan_all(proba_val, close_val.values)
print("VAL best-by-net-return:", best_val)

# 같은 설정으로 테스트에 적용
bt = backtest_from_proba(proba, close_te.values,
                         thr=best_val["thr"], fee_bps=best_val["fee_bps"], invert=best_val["invert"])
print("TEST with VAL-opt settings:", bt)

[1m18/18[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 217ms/step
VAL best-by-net-return: {'trades': 2, 'net_return': 0.0003022085869848201, 'sharpe': 1.0110373551704277, 'max_drawdown': -0.00039999999999987523, 'final_equity': 1.0003022085869848, 'thr': 0.61, 'fee_bps': 4.0, 'invert': True}
TEST with VAL-opt settings: {'trades': 8, 'net_return': 0.04410212174441419, 'sharpe': 1.5248108046929756, 'max_drawdown': -0.0048732666979932615, 'final_equity': 1.0441021217444142}


In [25]:
import numpy as np

# 1) invert=True는 고정, thr를 0.56~0.64 범위에서 탐색, 최소 트레이드 수 강제
def scan_fixed_invert(proba, close, fee_bps=4.0, thr_low=0.56, thr_high=0.64, steps=33,
                      min_trades=12, objective="sharpe"):
    thrs = np.round(np.linspace(thr_low, thr_high, steps), 3)
    best = None
    for t in thrs:
        m = backtest_from_proba(proba, close, thr=t, fee_bps=fee_bps, invert=True)
        if m["trades"] < min_trades:
            continue
        score = m["sharpe"] if objective=="sharpe" else m["net_return"]
        if (best is None) or (score > best["score"]):
            m.update({"thr": float(t), "fee_bps": float(fee_bps), "invert": True, "score": float(score)})
            best = m
    return best

# 2) 검증에서 고르고 → 테스트에 그대로 적용
proba_val = model.predict(X_val, batch_size=4096).ravel()
proba_te  = model.predict(X_te_seq, batch_size=4096).ravel()

close_val = df.loc[idx_val_seq, 'close'].reset_index(drop=True) if 'idx_val_seq' in globals() \
            else close_val  # F2-수정 버전이면 idx_* 사용
close_te  = df.loc[idx_te_seq,  'close'].reset_index(drop=True) if 'idx_te_seq' in globals() \
            else close_te

best_val_robust = scan_fixed_invert(
    proba_val, close_val.values,
    fee_bps=4.0, thr_low=0.56, thr_high=0.64, steps=33,
    min_trades=12, objective="sharpe"  # 필요시 "net_return"
)
print("VAL best (invert=True, min_trades=12):", best_val_robust)

bt_robust = backtest_from_proba(
    proba_te, close_te.values,
    thr=best_val_robust["thr"], fee_bps=best_val_robust["fee_bps"], invert=True
)
print("TEST (apply VAL-robust):", bt_robust)

# 3) 참고: 임계값에 따른 거래 수 곡선도 확인
def trades_vs_thr(proba, close, fee_bps=4.0, thr_low=0.54, thr_high=0.66, steps=25):
    thrs = np.round(np.linspace(thr_low, thr_high, steps), 3)
    rows = []
    for t in thrs:
        m = backtest_from_proba(proba, close, thr=t, fee_bps=fee_bps, invert=True)
        rows.append((t, m["trades"], m["sharpe"], m["net_return"]))
    for r in rows: print(r)
trades_vs_thr(proba_val, close_val.values, fee_bps=4.0, thr_low=0.56, thr_high=0.64, steps=9)

[1m18/18[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 410ms/step
[1m44/44[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m17s[0m 381ms/step
VAL best (invert=True, min_trades=12): {'trades': 20, 'net_return': -0.0082191867247553, 'sharpe': -4.762392822519156, 'max_drawdown': -0.0082191867247553, 'final_equity': 0.9917808132752447, 'thr': 0.595, 'fee_bps': 4.0, 'invert': True, 'score': -4.762392822519156}
TEST (apply VAL-robust): {'trades': 46, 'net_return': 0.01693606239099754, 'sharpe': 0.6069303647212715, 'max_drawdown': -0.018512474662901755, 'final_equity': 1.0169360623909975}
(np.float64(0.56), 938, -46.76891249657162, -0.3088396248245383)
(np.float64(0.57), 372, -29.10154027294587, -0.1266318386626436)
(np.float64(0.58), 108, -15.976442337735506, -0.04423298173756529)
(np.float64(0.59), 28, -6.3928119029759705, -0.011781844514366702)
(np.float64(0.6), 12, -5.51890591869348, -0.004115397279843114)
(np.float64(0.61), 2, 1.5024412145668848, 0.0005973125030802606)
(np.float

In [26]:
def fine_scan(proba, close, fee_bps=4.0, thrs=np.round(np.arange(0.595, 0.6051, 0.001), 3)):
    rows = []
    for t in thrs:
        m = backtest_from_proba(proba, close, thr=t, fee_bps=fee_bps, invert=True)
        m.update({'thr': float(t)})
        rows.append(m)
        print(f"thr={t:.3f}  trades={m['trades']}  net={m['net_return']:.4f}  sharpe={m['sharpe']:.2f}")
    rows.sort(key=lambda x: x['sharpe'], reverse=True)   # 또는 'net_return'
    return rows[0]

best_fine = fine_scan(proba_val, close_val.values)   # 검증에서 고르고
print("VAL best (fine):", best_fine)

bt_fine = backtest_from_proba(proba, close_te.values, thr=best_fine['thr'], fee_bps=4.0, invert=True)
print("TEST (apply fine VAL thr):", bt_fine)

thr=0.595  trades=20  net=-0.0082  sharpe=-4.76
thr=0.596  trades=20  net=-0.0099  sharpe=-6.15
thr=0.597  trades=18  net=-0.0065  sharpe=-7.85
thr=0.598  trades=16  net=-0.0057  sharpe=-7.08
thr=0.599  trades=14  net=-0.0049  sharpe=-6.30
thr=0.600  trades=12  net=-0.0041  sharpe=-5.52
thr=0.601  trades=12  net=-0.0041  sharpe=-5.52
thr=0.602  trades=10  net=-0.0036  sharpe=-4.99
thr=0.603  trades=8  net=-0.0025  sharpe=-3.78
thr=0.604  trades=6  net=-0.0012  sharpe=-2.18
thr=0.605  trades=4  net=0.0000  sharpe=0.11
VAL best (fine): {'trades': 4, 'net_return': 4.837469130425376e-05, 'sharpe': 0.11457573403189662, 'max_drawdown': -0.0009483906760439091, 'final_equity': 1.0000483746913043, 'thr': 0.605}
TEST (apply fine VAL thr): {'trades': 14, 'net_return': 0.04175176316800955, 'sharpe': 1.4404445286722023, 'max_drawdown': -0.005324773134532057, 'final_equity': 1.0417517631680095}


In [27]:
for delta in [-0.0025, 0, 0.0025]:
    thr = round(best_fine['thr'] + delta, 3)
    m = backtest_from_proba(proba, close_te.values, thr=thr, fee_bps=4.0, invert=True)
    print(f"TEST thr={thr:.3f}: trades={m['trades']}, net={m['net_return']:.4f}, sharpe={m['sharpe']:.2f}")

TEST thr=0.603: trades=20, net=0.0286, sharpe=0.99
TEST thr=0.605: trades=14, net=0.0418, sharpe=1.44
TEST thr=0.607: trades=8, net=0.0441, sharpe=1.52


In [28]:
FINAL = {
    'timeframe': '5m',
    'model': 'GRU-1L',
    'lookback': 32,
    'label': {'H': H, 'deadzone_q': DEADZONE_Q},
    'invert': True,
    'fee_bps': 4.0,
    'thr': float(best_fine['thr'])
}
print(">>> FINAL CONFIG:", FINAL)

>>> FINAL CONFIG: {'timeframe': '5m', 'model': 'GRU-1L', 'lookback': 32, 'label': {'H': 3, 'deadzone_q': 0.45}, 'invert': True, 'fee_bps': 4.0, 'thr': 0.605}
