In [None]:
# -*- coding: utf-8 -*-
# 目的：讀取各表 Parquet，外連接對齊為 feat_6h，並生成 label_6h

from pathlib import Path
from typing import Dict
import pandas as pd
import numpy as np

RAW_DIR = Path("./data/parquet_raw_6h")
OUT_DIR = Path("./data/processed_6h")
OUT_DIR.mkdir(parents=True, exist_ok=True)

TIME_COL = "time"

# ========== 1) 讀取 ==========
files = {
    "price": RAW_DIR / "price_ohlcv_6h.parquet",
    "agg_oi": RAW_DIR / "agg_oi_6h.parquet",
    "funding": RAW_DIR / "funding_6h.parquet",
    "liq_agg": RAW_DIR / "liq_agg_6h.parquet",
    "lsr": RAW_DIR / "lsr_6h.parquet",
    "taker_flow": RAW_DIR / "taker_flow_6h.parquet",
}
dfs: Dict[str, pd.DataFrame] = {}

for k, p in files.items():
    df = pd.read_parquet(p)
    df = df.reset_index().rename(columns={"index": TIME_COL}) if TIME_COL not in df.columns else df
    # 確保時間型別正確且排序
    df[TIME_COL] = pd.to_datetime(df[TIME_COL], utc=True)
    df = df.sort_values(TIME_COL).drop_duplicates(TIME_COL, keep="last")
    # 加前綴避免欄位衝突（保留 time 與 _ts_utc 原名）
    keep = {TIME_COL, "_ts_utc"}
    rename_map = {c: f"{k}_{c}" for c in df.columns if c not in keep}
    df = df.rename(columns=rename_map)
    dfs[k] = df

# ========== 2) 外連接合併為 feat_6h ==========
feat = None
for k, df in dfs.items():
    if feat is None:
        feat = df
    else:
        feat = feat.merge(df, on=TIME_COL, how="outer")

feat = feat.sort_values(TIME_COL).reset_index(drop=True)

# 數值欄位填補策略：先前向填補，再以 0 補尾端殘留 NaN（避免後續模型 NaN）
num_cols = feat.select_dtypes(include=[np.number]).columns.tolist()
feat[num_cols] = feat[num_cols].ffill()
feat[num_cols] = feat[num_cols].fillna(0.0)

# ========== 3) 標籤計算 ==========
# 以 price 欄位建立報酬與方向標籤
# y_dir_6h：下一期 close > 本期 open -> 1 else 0
# y_tail_6h：本期區間報酬 r_6h 的極端事件（低於 5% 或高於 95% 分位）
close_col = "price_close"
open_col = "price_open"

if close_col not in feat.columns or open_col not in feat.columns:
    raise ValueError("價量表缺少 price_open/price_close 欄位，無法建立標籤。")

feat["r_6h"] = feat[close_col] / feat[open_col] - 1.0

q05 = feat["r_6h"].quantile(0.05)
q95 = feat["r_6h"].quantile(0.95)
y_tail = ((feat["r_6h"] <= q05) | (feat["r_6h"] >= q95)).astype(int)

# 建立 y_dir_6h（注意避免未來洩漏：標籤用下一期 close 與本期 open）
y_dir = (feat[close_col].shift(-1) > feat[open_col]).astype("float32")
# 最後一列無下期 close，標為 NaN
y_dir.iloc[-1] = np.nan

label = pd.DataFrame({
    TIME_COL: feat[TIME_COL],
    "y_dir_6h": y_dir,
    "y_tail_6h": y_tail.astype("float32"),
}).dropna(subset=["y_dir_6h"]).reset_index(drop=True)

# ========== 4) 保存 ==========
feat_out = Path("./data/feat_6h.parquet")
label_out = Path("./data/label_6h.parquet")
feat.to_parquet(feat_out, index=False)
label.to_parquet(label_out, index=False)
print(f"[OK] feat -> {feat_out}")
print(f"[OK] label -> {label_out}")
