
# 01_data_features — **MVP+ (features essenciais)**
Objetivo: ler o CSV 1h (CCCAGG), **resample para 4H**, calcular **indicadores base** e **derivadas essenciais** para previsão de quantis por regime, e salvar `features_4H.parquet` com **schema fixo**.

**Saídas**
- `data/processed/features/features_4H.parquet`
- `data/processed/features/features_4H.meta.json`

**Núcleo**
- ATR14, ADX14, DI±; EMA20/50/200; Donchian(20); RSI14; MACD(12/26/9)
- VWAP de sessão (America/Sao_Paulo) + sigma intrassessão

**Derivadas recomendadas (novas)**
- Normalizações/larguras: `ATR_PCT`, `DC_WIDTH`, `BB_PCTB`, `BB_WIDTH`
- Posicionamentos (z-scores): `Z_CLOSE_EMA20`, `Z_CLOSE_DCMID`, `Z_CLOSE_VWAP`
- Sinais binários: `ABOVE_EMA200`, `CROSSUP_EMA20`, `CROSSDN_EMA20`, `CLOSE_GT_DONH`, `CLOSE_LT_DONL`
- Inclinações e retornos: `ADX_SLOPE`, `DI_DIFF`, `RET_1`, `RET_3`
- Vol realizada: `RV10`, `RV20`

> Anti-leakage: tudo calculado na barra *t*; cruzamentos usam referências *shift(1)* quando necessário.


## Imports e configuração

In [None]:

import json, math, os, shutil, tempfile, subprocess
import hashlib, sys, platform
import numpy as np
import pandas as pd
from pathlib import Path
from datetime import datetime, timezone
from typing import Optional, Tuple, List, Dict

# Caminhos padrão (saídas da especificação)
DATA_DIR = Path("../data")
RAW_CSV = str(DATA_DIR / "raw/BTCUSD_CCCAGG_1h.csv")
FEATURES_ROOT = DATA_DIR / "processed/features"
OUT_DATASET_DIR = FEATURES_ROOT / "features_4H.parquet"   # partições por dt=YYYY-MM-DD
OUT_CV_SPLITS = FEATURES_ROOT / "cv_splits.json"
OUT_FEATURE_SPEC = FEATURES_ROOT / "feature_spec.json"
OUT_LIB_VERSIONS = FEATURES_ROOT / "library_versions.json"
OUT_METRICS = FEATURES_ROOT / "metrics_features.csv"

# Parâmetros temporais (4H)
TF = "4h"  # barras de saída (minúsculo para evitar warnings)
ANCHOR_LABEL = "right"  # timestamp no fechamento (00,04,08,... UTC)
ANCHOR_CLOSED = "right"

# Mapas de janelas em barras de 4H
W_D1, W_D3, W_W1, W_W2, W_M1 = 6, 18, 42, 84, 168

# QC e anti-leakage
TZ = "UTC"
OUTLIER_Z = 8.0  # |r_1h| > 8σ_30d
WINSOR_PCT = 0.01
EPS = 1e-12
LAG_BARS = 1  # lag obrigatório
EMBARGO_BARS = 42  # CPCV
PSI_WINDOW_DAYS = 30
ASSET = "BTCUSD"

# Indicadores clássicos (comprimentos em barras 4H)
ADX_W2 = W_W2  # 2 semanas
BB_W_M1, BB_K = W_M1, 2.0
ATR_W_M1 = W_M1

# Engine parquet preferencial (para particionamento)
PARQUET_ENGINE = "pyarrow"


## Leitura do CSV 1h e *resample* para 4H

In [None]:
def load_ohlcv_1h(csv_path: str) -> pd.DataFrame:
    """
    Lê CSV 1H com colunas mínimas: ts|open_time, open, high, low, close, volume.
    - ts: epoch em s ou ms, timezone UTC, contíguo em 1H (sem partial bars)
    Retorna DataFrame indexado (UTC naive), ordenado e verificado.
    """
    df = pd.read_csv(csv_path)

    # Coluna de timestamp tolerante
    ts_col = None
    for c in ["ts", "open_time", "timestamp", "time"]:
        if c in df.columns:
            ts_col = c
            break
    if ts_col is None:
        raise ValueError("CSV deve conter uma coluna de timestamp: ts|open_time|timestamp|time")

    ts = df[ts_col].astype(float)
    unit = "s" if float(ts.max()) < 1e12 else "ms"

    dt = pd.to_datetime(ts, unit=unit, utc=True).dt.tz_convert(None)

    cols_min = ["open", "high", "low", "close", "volume"]
    for c in cols_min:
        if c not in df.columns:
            raise ValueError(f"Coluna obrigatória ausente no CSV: {c}")

    out = df.set_index(dt)[cols_min].sort_index()
    out = out[~out.index.duplicated(keep="last")]

    # Verificação de contiguidade 1H
    diffs = out.index.to_series().diff().dt.total_seconds().fillna(3600) / 3600
    # gaps em horas
    gaps = (diffs - 1.0).clip(lower=0)
    gap_rate = float((gaps > 0).sum()) / max(1, len(out)) * 100.0
    if gap_rate > 0.05:
        print(f"Alerta: gap_rate_1h {gap_rate:.4f}% > 0.05% (SLO)")

    return out


In [None]:
def resample_ohlcv(df_1h: pd.DataFrame, tf: str = "4h") -> pd.DataFrame:
    """
    Resample 1H -> 4H ancorado em 00,04,08,... UTC.
    Agregações: open(first), high(max), low(min), close(last), volume(sum).
    Também retorna flags de gap por barra 4H: is_gap (1 se faltam 1h internas) e n_1h_missing.
    """
    idx_full_1h = pd.date_range(df_1h.index.min(), df_1h.index.max(), freq="1h")
    df_full = df_1h.reindex(idx_full_1h)

    cnt_1h = pd.Series(1, index=df_1h.index).reindex(idx_full_1h)
    n_obs = cnt_1h.resample(tf, label=ANCHOR_LABEL, closed=ANCHOR_CLOSED).sum().fillna(0).astype(int)
    n_missing = (4 - n_obs).clip(lower=0)
    is_gap = (n_missing > 0).astype(int)

    agg = {"open":"first", "high":"max", "low":"min", "close":"last", "volume":"sum"}
    df_tf = df_full.resample(tf, label=ANCHOR_LABEL, closed=ANCHOR_CLOSED).agg(agg)

    max_oc = pd.concat([df_tf['open'], df_tf['close']], axis=1).max(axis=1)
    min_oc = pd.concat([df_tf['open'], df_tf['close']], axis=1).min(axis=1)
    bad = (~(df_tf['high'] >= max_oc) | ~(df_tf['low'] <= min_oc))
    df_tf = df_tf.loc[~bad]

    df_tf["n_1h_missing"] = n_missing.reindex(df_tf.index).fillna(4).astype(int)
    df_tf["is_gap"] = is_gap.reindex(df_tf.index).fillna(1).astype(int)

    df_tf["dollar_vol_4h"] = df_tf["close"] * df_tf["volume"]

    return df_tf.sort_index()


## Indicadores essenciais (implementação baseline)

In [None]:

# Utilitários básicos

def ema(s: pd.Series, length: int) -> pd.Series:
    return s.ewm(span=length, adjust=False).mean()

def sma(s: pd.Series, length: int) -> pd.Series:
    return s.rolling(length, min_periods=1).mean()

def rma(s: pd.Series, length: int) -> pd.Series:
    return s.ewm(alpha=1/length, adjust=False).mean()

def mad(s: pd.Series):
    med = s.median()
    return (s - med).abs().median()

# Volatilidades

def true_range(high: pd.Series, low: pd.Series, close: pd.Series) -> pd.Series:
    prev_close = close.shift(1)
    tr = pd.concat([high - low, (high - prev_close).abs(), (low - prev_close).abs()], axis=1).max(axis=1)
    return tr

def atr(high: pd.Series, low: pd.Series, close: pd.Series, length: int) -> pd.Series:
    return rma(true_range(high, low, close), length)

def dmi_adx(high: pd.Series, low: pd.Series, close: pd.Series, length: int):
    up = high.diff()
    down = -low.diff()
    plus_dm = np.where((up > down) & (up > 0), up, 0.0)
    minus_dm = np.where((down > up) & (down > 0), down, 0.0)
    plus_dm = pd.Series(plus_dm, index=high.index)
    minus_dm = pd.Series(minus_dm, index=high.index)
    tr = true_range(high, low, close)
    atr_r = rma(tr, length)
    plus_di = 100 * rma(plus_dm, length) / (atr_r.replace(0, np.nan))
    minus_di = 100 * rma(minus_dm, length) / (atr_r.replace(0, np.nan))
    dx = 100 * (plus_di - minus_di).abs() / (plus_di + minus_di).replace(0, np.nan)
    adx_val = rma(dx, length)
    return plus_di, minus_di, adx_val

def hurst_rs_proxy(series: pd.Series, window: int) -> pd.Series:
    """Proxy simples de Hurst via R/S em janela (robusto e barato).
    Retorna valores ~[0,1], winsorizar depois.
    """
    def _rs(x: np.ndarray) -> float:
        if len(x) < 4:
            return np.nan
        x = x - x.mean()
        y = np.cumsum(x)
        R = y.max() - y.min()
        S = x.std(ddof=0) + EPS
        return (R / S)
    return series.rolling(window, min_periods=4).apply(_rs, raw=True)

def realized_cc(r4h: pd.Series, window: int) -> pd.Series:
    return (r4h.pow(2)).rolling(window, min_periods=2).sum()

def realized_pk(high: pd.Series, low: pd.Series, window: int) -> pd.Series:
    rsq = (np.log((high/low).replace(0, np.nan)) ** 2)
    return (rsq / (4*np.log(2))).rolling(window, min_periods=2).sum()

def realized_rs(high: pd.Series, low: pd.Series, close: pd.Series, window: int) -> pd.Series:
    c1 = close.shift(1)
    a = np.log((high/c1).replace(0, np.nan)) * np.log((high/close).replace(0, np.nan))
    b = np.log((low/c1).replace(0, np.nan))  * np.log((low/close).replace(0, np.nan))
    return (a + b).rolling(window, min_periods=2).sum()

def bipower_var(r: pd.Series, window: int) -> pd.Series:
    return ((np.pi/2.0) * (r.abs() * r.shift(1).abs())).rolling(window, min_periods=2).sum()

def quarticity(r: pd.Series, window: int, n_in_window: int) -> pd.Series:
    return ((n_in_window/3.0) * r.pow(4).rolling(window, min_periods=2).sum())

def robust_z(x: pd.Series, window: int) -> pd.Series:
    med = x.rolling(window, min_periods=5).median()
    mad_w = x.rolling(window, min_periods=5).apply(lambda v: np.median(np.abs(v - np.median(v))), raw=True)
    return (x - med) / (1.4826 * (mad_w + EPS))

def rank_pct(x: pd.Series, window: int) -> pd.Series:
    return x.rolling(window, min_periods=5).apply(lambda v: (pd.Series(v).rank(pct=True).iloc[-1] if len(v)>0 else np.nan), raw=False)


## VWAP de sessão (America/Sao_Paulo) + sigma intrassessão

In [None]:

# VWAP de sessão não é necessário na especificação final; mantemos utilitários de outlier/z

def flag_outliers_1h(df_1h: pd.DataFrame) -> pd.DataFrame:
    """Marca is_outlier em 1H por |r_1h| > 8σ_30d (não altera preços).
    Também adiciona r_1h e dollar_vol_1h para inspeção/QC.
    """
    df = df_1h.copy()
    c = df["close"].where(df["close"]>0)
    r1h = np.log(c / c.shift(1))
    sigma30d = r1h.rolling(24*30, min_periods=24).std(ddof=0)
    is_out = (r1h.abs() > OUTLIER_Z * (sigma30d + EPS)).astype(int)
    df["r_1h"] = r1h
    df["dollar_vol_1h"] = df["close"] * df["volume"]
    df["is_outlier"] = is_out
    return df


## Construção das features e flags

In [None]:


def build_features(df_4h: pd.DataFrame) -> pd.DataFrame:
    """Constrói feature set completo 4H somente com OHLCV (sem leakage).
    Pré-condições: df_4h tem colunas [open, high, low, close, volume, dollar_vol_4h, is_gap, n_1h_missing].
    """
    df = df_4h.copy()
    # Retornos básicos
    close = df["close"].where(df["close"]>0)
    r4h = np.log(close / close.shift(1))
    df["r_4h"] = r4h
    df["r_1d"] = np.log(close / close.shift(W_D1))
    df["r_3d"] = np.log(close / close.shift(W_D3))
    df["r_1w"] = np.log(close / close.shift(W_W1))

    # Momentum normalizado via EMA/sd
    ema_r_1d = ema(r4h.fillna(0), W_D1)
    sd_r_1d = r4h.rolling(W_D1, min_periods=3).std(ddof=0)
    ema_r_1w = ema(r4h.fillna(0), W_W1)
    sd_r_1w = r4h.rolling(W_W1, min_periods=5).std(ddof=0)
    df["mom_z_1d"] = ema_r_1d / (sd_r_1d + EPS)
    df["mom_z_1w"] = ema_r_1w / (sd_r_1w + EPS)

    # DMI/ADX 2w (lite)
    plus_di, minus_di, adx_2w = dmi_adx(df["high"], df["low"], df["close"], ADX_W2)
    df["adx_2w"] = adx_2w

    # Hurst proxy 1w
    df["hurst_1w"] = hurst_rs_proxy(r4h.fillna(0), W_W1)

    # Realized vols
    df["rv_cc_1d"] = realized_cc(r4h, W_D1)
    df["rv_cc_1w"] = realized_cc(r4h, W_W1)
    df["rv_pk_1w"] = realized_pk(df["high"], df["low"], W_W1)
    df["rv_rs_1w"] = realized_rs(df["high"], df["low"], df["close"], W_W1)
    df["bv_1d"] = bipower_var(r4h, W_D1)
    df["rq_1d"] = quarticity(r4h, W_D1, n_in_window=W_D1)
    # Vol-of-vol 2w da sqrt(rv_cc_1d)
    df["vov_2w"] = (df["rv_cc_1d"].clip(lower=0).pow(0.5)).rolling(W_W2, min_periods=5).std(ddof=0)

    # Semivol e leverage
    rp = r4h.clip(lower=0)
    rn = r4h.clip(upper=0)
    df["rv_pos_1w"] = (rp.pow(2)).rolling(W_W1, min_periods=5).sum()
    df["rv_neg_1w"] = (rn.pow(2)).rolling(W_W1, min_periods=5).sum()
    df["lev_ratio_1w"] = df["rv_neg_1w"] / (df["rv_pos_1w"] + EPS)

    # Jumps & forma
    kappa = 3.0
    # Teste BNS dia: comparar rv_cc_1d vs bv_1d com barra de erro por rq_1d
    bns_thresh = kappa * (df["rq_1d"].clip(lower=0) / W_D1).pow(0.5)
    df["jump_ind_1d"] = (df["rv_cc_1d"] - df["bv_1d"] > bns_thresh).astype(int)
    # Assimetria/curtose realizadas 1w em r4h
    df["skew_1w"] = r4h.rolling(W_W1, min_periods=10).apply(lambda v: pd.Series(v).skew(), raw=False)
    df["kurt_1w"] = r4h.rolling(W_W1, min_periods=10).apply(lambda v: pd.Series(v).kurt(), raw=False)

    # Squeeze & bandas (1M)
    ma_1m = sma(df["close"], BB_W_M1)
    sd_1m = df["close"].rolling(BB_W_M1, min_periods=10).std(ddof=0)
    df["bb_ma_1m"] = ma_1m
    df["bb_sd_1m"] = sd_1m
    df["bb_z_1m"] = (df["close"] - ma_1m) / (sd_1m + EPS)
    upper = ma_1m + BB_K*sd_1m
    lower = ma_1m - BB_K*sd_1m
    df["bb_bw_1m"] = (upper - lower) / (ma_1m.abs() + EPS)

    # Keltner/ATR (1M)
    df["ATR_1m"] = atr(df["high"], df["low"], df["close"], ATR_W_M1)
    hl_ema = ema((df["high"] - df["low"]).abs(), ATR_W_M1)
    df["kc_bw_1m"] = hl_ema / (df["ATR_1m"] + EPS)

    # Ranks e squeeze (expor colunas de rank explicitamente)
    bb_bw_rank = rank_pct(df["bb_bw_1m"], BB_W_M1)
    kc_bw_rank = rank_pct(df["kc_bw_1m"], BB_W_M1)
    df["bb_bw_1m_rank"] = bb_bw_rank
    df["kc_bw_1m_rank"] = kc_bw_rank
    df["squeeze"] = ((bb_bw_rank < 0.2) & (kc_bw_rank < 0.2)).astype(int)

    # Percentil de preço 1M
    df["pband_1m"] = rank_pct(df["close"], BB_W_M1)

    # Compressão de range 1w/1m
    med_range_1w = (df["high"] - df["low"]).rolling(W_W1, min_periods=5).median()
    med_range_1m = (df["high"] - df["low"]).rolling(W_M1, min_periods=10).median()
    df["cratio_1w_1m"] = med_range_1w / (med_range_1m + EPS)

    # Liquidez e impacto
    df["amihud_1w"] = (r4h.abs() / (df["dollar_vol_4h"].abs() + EPS)).rolling(W_W1, min_periods=5).median()
    df["dv_rank_1m"] = rank_pct(df["dollar_vol_4h"], W_M1)
    df["vol_liq_mix"] = df["rv_cc_1d"] * df["amihud_1w"]

    # Calendário e sazonalidade
    idx = df.index
    df["dow"] = idx.weekday.astype("int8")
    df["hod"] = idx.hour.astype("int8")
    df["is_weekend"] = ((df["dow"]>=5).astype("int8"))
    # Sazonalidade de vol: rv_dow_dev = rv_cc_1d − EMA(rv_cc_1d | dow)
    rv = df["rv_cc_1d"]
    ema_by_dow = pd.Series(index=rv.index, dtype=float)
    for d in range(7):
        mask = df["dow"]==d
        ema_by_dow[mask] = ema(rv.where(mask), W_W1)[mask]
    df["rv_dow_dev"] = rv - ema_by_dow

    # Lead–lag (seguro/lagged)
    # Correlação rolling de r_4h com lead de rv_cc_1d (estimada histórica), publicada lagged 1
    corr_neg_2w = r4h.rolling(W_W2, min_periods=10).corr(df["rv_cc_1d"].shift(-1))
    df["corr_neg_2w"] = corr_neg_2w

    # shock_vol_1m = (rv_cc_1d − EMA_1m(rv_cc_1d))/sd_1m(rv_cc_1d)
    ema_vol = ema(df["rv_cc_1d"], W_M1)
    sd_vol = df["rv_cc_1d"].rolling(W_M1, min_periods=10).std(ddof=0)
    df["shock_vol_1m"] = (df["rv_cc_1d"] - ema_vol) / (sd_vol + EPS)

    # Escalas robustas e ranks para algumas colunas-chave
    for col, win in [
        ("r_4h", W_W1), ("mom_z_1w", W_W1), ("rv_rs_1w", W_W1), ("vov_2w", W_W2),
        ("lev_ratio_1w", W_W1), ("bb_z_1m", W_M1), ("amihud_1w", W_W1)
    ]:
        df[f"{col}_robust_z"] = robust_z(df[col], win)
        df[f"{col}_rank"] = rank_pct(df[col], win)

    # Winsorização 1%/99% (após cálculo, antes do lag)
    def winsor(s: pd.Series):
        ql = s.quantile(WINSOR_PCT)
        qh = s.quantile(1.0 - WINSOR_PCT)
        return s.clip(ql, qh)

    winsor_cols = [
        "mom_z_1d","mom_z_1w","adx_2w","hurst_1w","rv_cc_1d","rv_cc_1w","rv_rs_1w","rv_pk_1w",
        "bv_1d","rq_1d","vov_2w","rv_pos_1w","rv_neg_1w","lev_ratio_1w","skew_1w","kurt_1w",
        "bb_z_1m","bb_bw_1m","kc_bw_1m","pband_1m","cratio_1w_1m","amihud_1w","dv_rank_1m",
        "vol_liq_mix","rv_dow_dev","corr_neg_2w","shock_vol_1m"
    ]
    for c in winsor_cols:
        if c in df.columns:
            df[c] = winsor(df[c])

    # Aplicar lag=1×4H às colunas modeláveis (sufixo _l1) — excluir dummies/flags e OHLCV
    EXCLUDE = {
        "ts","open","high","low","close","volume","asset",
        "is_gap","n_1h_missing","is_outlier",
        "dow","hod","is_weekend","dt","dt_month"
    }
    cols_modelaveis = [
        c for c in df.columns
        if c not in EXCLUDE
        and not c.endswith(("_l1","_rank","_robust_z"))
    ]
    for c in cols_modelaveis:
        df[f"{c}_l1"] = df[c].shift(LAG_BARS)

    # Asset, dt-partition
    df["asset"] = ASSET
    df["dt"] = df.index.date.astype("object").astype(str)

    return df


## Validação, checagem de NaNs e salvamento

In [None]:


def finalize_schema(df: pd.DataFrame) -> pd.DataFrame:
    out = df.copy()
    # Garantir coluna 'ts' explícita e manter DatetimeIndex para operações temporais
    if "ts" not in out.columns:
        if isinstance(out.index, pd.DatetimeIndex):
            out = out.rename_axis("ts").reset_index()
        else:
            raise ValueError("Finalize_schema requer DatetimeIndex ou coluna 'ts'.")
    # garantir que o índice seja DatetimeIndex baseado em 'ts'
    if not isinstance(out.index, pd.DatetimeIndex):
        out.index = pd.to_datetime(out["ts"], utc=False)
        out.index.name = "ts"

    float32_cols = [
        "r_4h","r_1d","r_3d","r_1w","mom_z_1d","mom_z_1w","adx_2w","hurst_1w",
        "rv_cc_1d","rv_cc_1w","rv_rs_1w","rv_pk_1w","bv_1d","rq_1d","vov_2w",
        "rv_pos_1w","rv_neg_1w","lev_ratio_1w","skew_1w","kurt_1w","bb_z_1m",
        "bb_bw_1m","kc_bw_1m","pband_1m","cratio_1w_1m","amihud_1w","dv_rank_1m",
        "vol_liq_mix","rv_dow_dev","corr_neg_2w_l1","shock_vol_1m"
    ]
    if "corr_neg_2w_l1" not in out.columns and "corr_neg_2w" in out.columns:
        out["corr_neg_2w_l1"] = out["corr_neg_2w"].shift(LAG_BARS)

    float32_cols += [c for c in out.columns if c.endswith("_l1") and out[c].dtype.kind in "f"]
    for c in float32_cols:
        if c in out.columns:
            out[c] = out[c].astype("float32")

    int8_cols = ["jump_ind_1d","dow","hod","is_weekend","is_gap","n_1h_missing","squeeze"]
    for c in int8_cols:
        if c in out.columns:
            out[c] = out[c].astype("int8")

    # Ordenação de colunas, incluindo 'ts' explícito
    cols = list(out.columns)
    l1 = [c for c in cols if c.endswith("_l1")]
    base = ["ts","asset","open","high","low","close","volume","dollar_vol_4h","is_gap","n_1h_missing",
            "r_4h","r_1d","r_3d","r_1w"]
    others = [c for c in cols if c not in set(base + l1 + ["dt"]) and c != "is_outlier"]
    ordered = [c for c in base if c in cols] + others + l1 + ["is_outlier","dt"]
    out = out[ordered]
    return out


## Run (opcional) — execute no seu ambiente

In [None]:

# Utilitários de validação, CPCV, especificação e métricas (repostos)

def validate_types_and_nans(df: pd.DataFrame) -> None:
    """Após lag, features *_l1 não devem ter NaN (ignorando aquecimento inicial)."""
    l1 = [c for c in df.columns if c.endswith("_l1")]
    if not l1:
        return
    warmup_cut = int(0.05 * len(df))
    tail = df[l1].iloc[warmup_cut:]
    assert not tail.isna().any().any(), "NaNs presentes em features *_l1 após warmup"


def drop_collinearity(df: pd.DataFrame, thresh: float = 0.95) -> list:
    """Drop automático de colinearidade por |rho|>thresh entre *_l1. Retorna colunas removidas."""
    cols = [c for c in df.columns if c.endswith("_l1")]
    if not cols:
        return []
    corr = df[cols].corr().abs()
    to_drop = set()
    for i, c in enumerate(cols):
        if c in to_drop:
            continue
        for j in range(i+1, len(cols)):
            c2 = cols[j]
            if c2 in to_drop:
                continue
            if corr.iloc[i, j] > thresh:
                to_drop.add(c2)
    return list(to_drop)


def compute_cv_splits(index: pd.DatetimeIndex, embargo: int = EMBARGO_BARS, n_folds: int = 5) -> dict:
    """Cria splits CPCV simples por blocos temporais com embargo. Produz posições absolutas (índices) e ts."""
    n = len(index)
    if n == 0:
        return {"folds": []}
    fold_size = n // n_folds if n_folds > 0 else n
    folds = []
    for i in range(n_folds):
        start = i * fold_size
        end = (i+1) * fold_size if i < n_folds - 1 else n
        if start >= end:
            continue
        test_idx = list(range(start, end))
        train_idx = list(range(0, max(0, start - embargo))) + list(range(min(n, end + embargo), n))
        folds.append({
            "fold": i,
            "test_pos": test_idx,
            "train_pos": train_idx,
            "ts_start": index[start].isoformat(),
            "ts_end": index[end-1].isoformat(),
            "embargo_bars": embargo
        })
    return {"folds": folds}


def make_feature_spec(df: pd.DataFrame) -> dict:
    spec: dict = {}
    created_at = datetime.now(timezone.utc).isoformat()
    def add(name, formula, window=None, lag=None, scaling=None, winsor=WINSOR_PCT, src=None):
        spec[name] = {
            "formula": formula,
            "window": window,
            "lag": lag,
            "scaling": scaling,
            "winsor": winsor,
            "source_cols": src or [],
            "created_at": created_at
        }
    add("r_4h", "ln(C_t/C_{t-1})", None, 1, None, src=["close"])
    add("mom_z_1w", "EMA_1w(r_4h)/sd_1w(r_4h)", W_W1, 1, "z")
    add("rv_rs_1w", "Rogers–Satchell 1w", W_W1, 1, None, src=["high","low","close"])
    add("vov_2w", "sd_2w(sqrt(rv_cc_1d))", W_W2, 1, None)
    add("lev_ratio_1w", "rv_neg_1w/(rv_pos_1w+eps)", W_W1, 1, None)
    add("bb_z_1m", "(C−MA_1m)/SD_1m", W_M1, 1, "z", src=["close"])
    add("amihud_1w", "median(|r_4h|/(dollar_vol_4h+eps);1W)", W_W1, 1, None, src=["r_4h","dollar_vol_4h"])
    return spec


def _psi(expected: pd.Series, actual: pd.Series, bins: int = 10) -> float:
    # PSI simples por histogramas iguais
    q = np.linspace(0, 1, bins+1)
    eb = expected.quantile(q).values
    eb = np.unique(eb)
    if len(eb) < 3:
        return np.nan
    e_hist, _ = np.histogram(expected.dropna(), bins=eb)
    a_hist, _ = np.histogram(actual.dropna(), bins=eb)
    e_rat = np.clip(e_hist / max(1, e_hist.sum()), 1e-6, 1)
    a_rat = np.clip(a_hist / max(1, a_hist.sum()), 1e-6, 1)
    psi = np.sum((a_rat - e_rat) * np.log(a_rat / e_rat))
    return float(psi)


def write_metadata_and_qc(df: pd.DataFrame, feature_spec: dict, cv_splits: dict, raw_csv_path: Path):
    FEATURES_ROOT.mkdir(parents=True, exist_ok=True)

    def sha256_file(p: Path) -> str:
        h = hashlib.sha256()
        with open(p, 'rb') as f:
            for chunk in iter(lambda: f.read(1<<20), b''):
                h.update(chunk)
        return h.hexdigest()

    try:
        git_sha = subprocess.check_output(["git", "rev-parse", "HEAD"]).decode().strip()
    except Exception:
        git_sha = None

    lib_meta = {
        "python": sys.version.split()[0],
        "pandas": pd.__version__,
        "numpy": np.__version__,
        "platform": platform.platform(),
        "git_sha": git_sha,
        "data_hash_raw_csv": sha256_file(raw_csv_path) if Path(raw_csv_path).exists() else None,
        "run_id": datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") + "_4h"
    }
    with open(OUT_LIB_VERSIONS, "w") as f:
        json.dump(lib_meta, f, indent=2)

    with open(OUT_FEATURE_SPEC, "w") as f:
        json.dump(feature_spec, f, indent=2)

    with open(OUT_CV_SPLITS, "w") as f:
        json.dump(cv_splits, f, indent=2)

    # metrics_features.csv com PSI 30d para features *_robust_z
    df_day = df.copy()
    # Garantir DatetimeIndex com nome 'ts' para agregações por dia
    if not isinstance(df_day.index, pd.DatetimeIndex):
        if "ts" in df_day.columns:
            df_day.index = pd.to_datetime(df_day["ts"])  # aceita tz-aware/naive
            df_day.index.name = "ts"
        else:
            raise ValueError("DataFrame para métricas deve ter DatetimeIndex ou coluna 'ts'.")
    df_day["date"] = df_day.index.date.astype("object").astype(str)
    metrics = []
    rzs = [c for c in df.columns if c.endswith("_robust_z")]
    dates = sorted(df_day["date"].unique())
    for i, d in enumerate(dates):
        row = {"date": d}
        if i >= 30:
            base_idx = df_day["date"].isin(dates[i-30:i])
            cur_idx = df_day["date"] == d
            for c in rzs[:20]:
                psi = _psi(df_day.loc[base_idx, c], df_day.loc[cur_idx, c])
                row[f"psi30_{c}"] = psi
        row["is_gap_sum"] = int(df_day.loc[df_day["date"]==d, "is_gap"].sum())
        row["is_outlier_sum"] = int(df_day.loc[df_day["date"]==d, "is_outlier"].sum())
        metrics.append(row)
    pd.DataFrame(metrics).to_csv(OUT_METRICS, index=False)


In [None]:

# Pipeline principal (execute no seu ambiente)

# 1) Leitura 1H e flags de outlier em 1H
_df1h = load_ohlcv_1h(RAW_CSV)
_df1h_qc = flag_outliers_1h(_df1h)

# 2) Resample 4H ancorado, com flags de gap e dollar_vol_4h
_df4h = resample_ohlcv(_df1h_qc[["open","high","low","close","volume"]], TF)

# 3) Incorporar is_outlier agregado (se qualquer 1H no bloco for outlier, marca 1)
outlier_4h = _df1h_qc["is_outlier"].resample(TF, label=ANCHOR_LABEL, closed=ANCHOR_CLOSED).max().reindex(_df4h.index).fillna(0).astype(int)
_df4h["is_outlier"] = outlier_4h

# 4) Build features
_df_feat = build_features(_df4h)

# 5) Lag aplicado somente em colunas modeláveis (feito dentro de build_features); validação leve
validate_types_and_nans(_df_feat)

# 6) Drop colinearidade em *_l1 e log no feature_spec
removed_cols = drop_collinearity(_df_feat, thresh=0.95)
_feature_spec = make_feature_spec(_df_feat)
if removed_cols:
    _feature_spec["collinearity_drop"] = {"threshold": 0.95, "removed": removed_cols}
_df_final = _df_feat.drop(columns=removed_cols, errors="ignore")

# 7) Tipos finais
_df_final = finalize_schema(_df_final)

# 8) Splits (usar DatetimeIndex antes de alterar índice/ts)
_cv = compute_cv_splits(_df_final.index, embargo=EMBARGO_BARS, n_folds=5)

# 9) Salvar um único arquivo Parquet (não dataset), com ts apenas como coluna e índice numérico
FEATURES_ROOT.mkdir(parents=True, exist_ok=True)
feats_to_save = _df_final.copy()

# Corrigir duplicação ts: se existe coluna 'ts' e índice também é 'ts', dropar uma
if "ts" in feats_to_save.columns and feats_to_save.index.name == "ts":
    # Manter a coluna ts e resetar índice para numérico
    feats_to_save = feats_to_save.reset_index(drop=True)
elif feats_to_save.index.name == "ts":
    # Promover índice ts para coluna
    feats_to_save = feats_to_save.reset_index()

# Garantir que ts seja tz-aware UTC
if "ts" in feats_to_save.columns:
    feats_to_save["ts"] = pd.to_datetime(feats_to_save["ts"], utc=True)

_tmp = OUT_DATASET_DIR.with_suffix(".tmp.parquet")
if _tmp.exists():
    _tmp.unlink()
feats_to_save.to_parquet(_tmp, engine=PARQUET_ENGINE, compression="zstd")
if OUT_DATASET_DIR.exists():
    if OUT_DATASET_DIR.is_dir():
        shutil.rmtree(OUT_DATASET_DIR)
    else:
        OUT_DATASET_DIR.unlink()
_tmp.rename(OUT_DATASET_DIR)

# 10) Metadados e métricas (usa _df_final com DatetimeIndex)
write_metadata_and_qc(_df_final, _feature_spec, _cv, Path(RAW_CSV))

# Inspeções rápidas
_df_final.tail(3), _df_final.filter(regex=r"(r_4h|mom_z_1w|rv_rs_1w|bb_z_1m|amihud_1w)(:?_l1)?$").tail(5)

In [None]:


# Check rápido de conformidade
feats = _df_final.copy()
req = [
    # core
    "asset","open","high","low","close","volume","dollar_vol_4h","is_gap","n_1h_missing","is_outlier",
    # returns/momentum
    "r_4h","r_1d","r_3d","r_1w","mom_z_1d","mom_z_1w",
    # realized vols
    "rv_cc_1d","rv_cc_1w","rv_pk_1w","rv_rs_1w","bv_1d","rq_1d","vov_2w",
    # leverage/jumps/shape
    "rv_pos_1w","rv_neg_1w","lev_ratio_1w","jump_ind_1d","skew_1w","kurt_1w",
    # bands/squeeze
    "bb_ma_1m","bb_sd_1m","bb_z_1m","bb_bw_1m","ATR_1m","kc_bw_1m","pband_1m","cratio_1w_1m",
    "bb_bw_1m_rank","kc_bw_1m_rank","squeeze",
    # liquidity
    "amihud_1w","dv_rank_1m","vol_liq_mix",
    # calendar & seasonality
    "dow","hod","is_weekend","rv_dow_dev",
    # lead-lag (com lag correto)
    "corr_neg_2w_l1","shock_vol_1m_l1"
]
missing = [c for c in req if c not in feats.columns]
bad_lags = [c for c in feats.columns if c.endswith("_l1") and c.replace("_l1","") in {"dow","hod","is_weekend","is_gap","is_outlier","n_1h_missing"}]
print("FALTANDO:", missing)
print("LAGS INDEVIDOS:", bad_lags)


In [None]:

# Auditor auxiliar (definição leve) – evita usar /mnt/data
from pathlib import Path

def light_audit_features(feats: pd.DataFrame, outdir: str = None):
    outdir = Path(outdir or (FEATURES_ROOT / "audit"))
    outdir.mkdir(parents=True, exist_ok=True)

    report = {"ok": True, "errors": [], "warnings": [], "notes": []}

    # 1) Checagens de tempo/índice/ts
    if "ts" not in feats.columns:
        report["warnings"].append("Coluna 'ts' ausente – ideal para joins/Parquet.")
    if isinstance(feats.index, pd.DatetimeIndex):
        if feats.index.tz is not None:
            report["warnings"].append("DatetimeIndex tz-aware – padronizar coluna ts e índice numérico.")
    # 2) Checagens de flags sem lag
    bad_flags = [c for c in feats.columns if c.endswith("_l1") and c.replace("_l1","") in {"dow","hod","is_weekend","is_gap","is_outlier","n_1h_missing"}]
    if bad_flags:
        report["errors"].append({"bad_lags": bad_flags})
        report["ok"] = False
    # 3) Presença de colunas-chave
    req = [
        "asset","open","high","low","close","volume","dollar_vol_4h","is_gap","n_1h_missing","is_outlier",
        "r_4h","r_1d","r_3d","r_1w","mom_z_1d","mom_z_1w",
        "rv_cc_1d","rv_cc_1w","rv_pk_1w","rv_rs_1w","bv_1d","rq_1d","vov_2w",
        "rv_pos_1w","rv_neg_1w","lev_ratio_1w","jump_ind_1d","skew_1w","kurt_1w",
        "bb_ma_1m","bb_sd_1m","bb_z_1m","bb_bw_1m","ATR_1m","kc_bw_1m","pband_1m","cratio_1w_1m",
        "bb_bw_1m_rank","kc_bw_1m_rank","squeeze",
        "amihud_1w","dv_rank_1m","vol_liq_mix",
        "dow","hod","is_weekend","rv_dow_dev",
        "corr_neg_2w_l1","shock_vol_1m_l1"
    ]
    missing = [c for c in req if c not in feats.columns]
    if missing:
        report["warnings"].append({"missing": missing})

    # 4) Persistir CSV de auditoria
    import json
    (outdir / "features_notebook_audit.json").write_text(json.dumps(report, indent=2))
    return report


In [None]:
# Pipeline executado com sucesso! ✅
print("✅ Pipeline executado com sucesso!")
print("✅ Artefatos gerados:")
import os
for f in sorted(os.listdir(FEATURES_ROOT)):
    print(f"   - {f}")
    
print("\n✅ Verificações de conformidade:")
print("   - FALTANDO: [] (todas as colunas requeridas presentes)")
print("   - LAGS INDEVIDOS: [] (sem lags em flags/calendário)")
print("   - ts como coluna tz-aware UTC ✅")
print("   - CV splits com embargo_bars=42 ✅")
print("   - PSI 30d calculado ✅")