In [76]:
# -*- coding: utf-8 -*-
"""
ADR↔BDR ETL + baseline com regra EOD especial:
 - BDR: shift base = -30min; última barra do dia (EOD) = -60min (p/ mapear 17:30→16:30 US)
 - ADR/FX: sem shift
 - merge_asof (nearest) com tolerância
 - ratio (OLS/median), teórico, spread, zscore
 - salva Parquet/CSV e plota séries
"""

from pathlib import Path
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# ---------------- CONFIG ----------------
PATH_BDR_XLSX = Path("Hist_BDRs.xlsx")
PATH_US_XLSX  = Path("Hist_Origem_BDRs.xlsx")
PATH_FX_XLSX  = Path("dolar.xlsx")

OUT_DIR = Path("output")
(OUT_DIR / "pairs").mkdir(parents=True, exist_ok=True)
(OUT_DIR / "plots").mkdir(parents=True, exist_ok=True)

PAIRS = {
    "AAPL34": "AAPL",
    "MSFT34": "MSFT",
    "NVDC34": "NVDA",
    "AMZO34": "AMZN",
    "GOGL34": "GOOGL",
    "M1TA34": "META",
    "TSLA34": "TSLA",
    "TSMC34": "TSM",
    "AVGO34": "AVGO",
    "BABA34": "BABA",
    "JDCO34": "JD",
}

# Alinhamento
SHIFT_BDR_BASE_MIN = -30      # regra geral (casar :00 da B3 com :30 dos EUA)
SHIFT_BDR_EOD_MIN  = -60      # somente última barra do dia (mapear 17:30→16:30)
ASOF_TOL           = "31min"  # tolerância do merge_asof
EWMA_SPAN          = 60
RATIO_METHOD       = "ols"    # "ols" ou "median"
MIN_OVERLAP        = 500
PLOT_TOP_N         = None     # None = plotar todos os pares processados; ou ex.: 5

# ---------------- HELPERS ----------------

def _ptbr_to_float(series: pd.Series) -> pd.Series:
    return (series.astype(float))

def load_bdr_sheets(path: Path) -> dict[str, pd.DataFrame]:
    raw = pd.read_excel(path, sheet_name=None)
    out = {}
    for sheet, df in raw.items():
        if 'Data' not in df.columns or 'Fechamento' not in df.columns:
            continue
        df = df.rename(columns={
            'Data':'datetime', 'Fechamento':'close_bdr', 'Volume Financeiro':'volume_bdr'
        })
        df['datetime']  = pd.to_datetime(df['datetime'], dayfirst=True)
        df['close_bdr'] = _ptbr_to_float(df['close_bdr'])
        if 'volume_bdr' in df:
            try:
                df['volume_bdr'] = _ptbr_to_float(df['volume_bdr'])
            except Exception:
                df['volume_bdr'] = pd.to_numeric(df['volume_bdr'], errors='coerce')
        else:
            df['volume_bdr'] = np.nan
        df = (df[['datetime','close_bdr','volume_bdr']]
                .dropna(subset=['datetime','close_bdr'])
                .sort_values('datetime')
                .set_index('datetime'))
        out[sheet] = df
    return out

def load_us_sheets(path: Path) -> dict[str, pd.DataFrame]:
    raw = pd.read_excel(path, sheet_name=None)
    out = {}
    for sheet, df in raw.items():
        if 'Date' not in df.columns or 'Last Price' not in df.columns:
            continue
        df = df.rename(columns={'Date':'datetime','Last Price':'close_us','Volume':'volume_us'})
        df['datetime'] = pd.to_datetime(df['datetime'], dayfirst=True)
        try:
            df['close_us'] = _ptbr_to_float(df['close_us'])
        except Exception:
            df['close_us'] = pd.to_numeric(df['close_us'], errors='coerce')
        if 'volume_us' in df:
            try:
                df['volume_us'] = _ptbr_to_float(df['volume_us'])
            except Exception:
                df['volume_us'] = pd.to_numeric(df['volume_us'], errors='coerce')
        else:
            df['volume_us'] = np.nan
        df = (df[['datetime','close_us','volume_us']]
                .dropna(subset=['datetime','close_us'])
                .sort_values('datetime')
                .set_index('datetime'))
        out[sheet] = df
    return out

def load_fx(path: Path) -> pd.DataFrame:
    df = pd.read_excel(path)
    if 'Date' not in df.columns or 'Mid Price' not in df.columns:
        raise ValueError("dolar.xlsx precisa conter colunas 'Date' e 'Mid Price'.")
    df = df.rename(columns={'Date':'datetime','Mid Price':'usdxbrl','Volume':'volume_fx'})
    df['datetime'] = pd.to_datetime(df['datetime'], dayfirst=True)
    try:
        df['usdxbrl'] = _ptbr_to_float(df['usdxbrl'])
    except Exception:
        df['usdxbrl'] = pd.to_numeric(df['usdxbrl'], errors='coerce')
    if 'volume_fx' in df:
        try:
            df['volume_fx'] = _ptbr_to_float(df['volume_fx'])
        except Exception:
            df['volume_fx'] = pd.to_numeric(df['volume_fx'], errors='coerce')
    else:
        df['volume_fx'] = np.nan
    df = (df[['datetime','usdxbrl','volume_fx']]
            .dropna(subset=['datetime','usdxbrl'])
            .sort_values('datetime')
            .set_index('datetime'))
    return df

def shift_bdr_index_with_eod_rule(df_bdr: pd.DataFrame,
                                  base_min: int = -30,
                                  eod_min: int = -60) -> pd.DataFrame:
    """
    Aplica -30min em TODAS as barras e -60min apenas na ÚLTIMA barra de cada dia.
    Isso mapeia 17:30(BR) → 16:30(US) e o restante :00 → :30.
    """
    out = df_bdr.copy()
    idx = out.index

    # Série com o próprio índice (para poder agrupar e transformar)
    idx_s = pd.Series(idx, index=idx)  # valores = timestamps

    # Último timestamp de cada dia
    last_per_day = idx_s.groupby(idx_s.index.normalize()).transform('max')

    is_last = (idx_s == last_per_day)

    base_delta = pd.to_timedelta(base_min, unit='m')
    eod_delta  = pd.to_timedelta(eod_min,  unit='m')

    new_idx = idx + base_delta
    new_idx = pd.DatetimeIndex(new_idx)

    # aplica offset EOD somente nos last-of-day
    new_idx = pd.DatetimeIndex(np.where(is_last.values,
                                        (idx + eod_delta).values,
                                        new_idx.values))
    out.index = new_idx
    return out.sort_index()

def nearest_join(left: pd.DataFrame, right: pd.DataFrame, tolerance: str, direction: str = "backward") -> pd.DataFrame:
    """
    Junta pela observação passada mais recente (<=) para evitar olhar para o futuro.
    """
    l = left.sort_index().reset_index().rename(columns={'index':'datetime'})
    r = right.sort_index().reset_index().rename(columns={'index':'datetime'})
    m = pd.merge_asof(
        l, r, on='datetime',
        direction=direction,
        tolerance=pd.to_timedelta(tolerance)
    )
    m = m.set_index('datetime')
    return m

def calibrate_ratio(bdr_close: pd.Series, adr_usd: pd.Series, usdxbrl: pd.Series, method="ols"):
    denom = adr_usd * usdxbrl
    df = pd.DataFrame({'bdr': bdr_close, 'den': denom}).dropna()
    if df.empty:
        return np.nan, np.nan, 0
    ratio_series = df['bdr'] / df['den'].replace(0, np.nan)
    ratio_series = ratio_series.replace([np.inf, -np.inf], np.nan).dropna()
    if ratio_series.empty:
        return np.nan, np.nan, int(len(df))
    if method == "median":
        ratio = float(np.median(ratio_series.values))
    else:
        num = (df['den'] * df['bdr']).sum()
        den = (df['den'] ** 2).sum()
        ratio = float(num / den) if den != 0 else np.nan
    return ratio, float(ratio_series.std(ddof=1)), int(len(ratio_series))

def add_theoretical_and_spread(df_pair: pd.DataFrame, ewma_span=60) -> pd.DataFrame:
    df = df_pair.copy()
    df['bdr_teo'] = df['close_us'] * df['usdxbrl'] * df.attrs['ratio_used']
    df['spread']  = np.log(df['close_bdr']) - np.log(df['bdr_teo'])
    mean = df['spread'].ewm(span=ewma_span).mean()
    var  = df['spread'].ewm(span=ewma_span).var()
    std  = np.sqrt(var)
    df['zscore'] = (df['spread'] - mean) / std.replace(0, np.nan)
    return df

def plot_prices(df: pd.DataFrame, name: str, outdir: Path):
    fig = plt.figure()
    df[['close_bdr','bdr_teo']].dropna().plot(ax=plt.gca())
    plt.title(f"{name} — BDR vs Teórico")
    plt.xlabel("Tempo"); plt.ylabel("Preço (BRL)")
    fig.tight_layout(); fig.savefig(outdir / f"{name}_prices.png"); plt.close(fig)

def plot_spread(df: pd.DataFrame, name: str, outdir: Path):
    fig = plt.figure()
    df['spread'].dropna().plot(ax=plt.gca())
    plt.title(f"{name} — Spread (log BDR − log Teórico)")
    plt.xlabel("Tempo"); plt.ylabel("Spread (log)")
    fig.tight_layout(); fig.savefig(outdir / f"{name}_spread.png"); plt.close(fig)

def plot_zscore(df: pd.DataFrame, name: str, outdir: Path):
    fig = plt.figure()
    df['zscore'].dropna().plot(ax=plt.gca())
    plt.title(f"{name} — Z-Score do Spread (EWMA)")
    plt.xlabel("Tempo"); plt.ylabel("Z-Score")
    ax = plt.gca(); ax.axhline(0); ax.axhline(2); ax.axhline(-2)
    fig.tight_layout(); fig.savefig(outdir / f"{name}_zscore.png"); plt.close(fig)

# ---------------- MAIN ----------------




In [77]:
def estimate_bars_per_day_from_index(index: pd.DatetimeIndex) -> int:
    if len(index) == 0:
        return 1
    counts = pd.Series(index.normalize()).value_counts()
    return int(np.median(counts.values))

In [78]:
def diagnostics_quick(df_pair: pd.DataFrame):
    """Retorna dict com checagens rápidas do par casado."""
    x = np.log(df_pair['close_bdr'])
    y = np.log(df_pair['close_us'] * df_pair['usdxbrl'])
    valid = x.notna() & y.notna()
    corr = x[valid].corr(y[valid]) if valid.any() else np.nan
    ratio_inst = (df_pair['close_bdr'] / (df_pair['close_us'] * df_pair['usdxbrl'])).replace([np.inf,-np.inf], np.nan)
    med_ratio = float(ratio_inst.median())
    return {
        "corr_log": float(corr),
        "median_ratio": med_ratio,
        "n": int(valid.sum()),
    }

def choose_ratio_with_fallback(df_pair: pd.DataFrame, ratio_ols: float):
    """Compara OLS com mediana e aplica fallback se OLS for inconsistente."""
    denom = df_pair['close_us'] * df_pair['usdxbrl']
    ratio_series = (df_pair['close_bdr'] / denom).replace([np.inf,-np.inf], np.nan).dropna()
    if ratio_series.empty:
        return ratio_ols, np.nan, 0, "ols"
    ratio_med = float(np.median(ratio_series))
    ratio_sd = float(ratio_series.std(ddof=1)) if len(ratio_series) > 1 else 0.0
    # regra: se OLS for < 1/100 de ratio_med (ordens de grandeza), usa mediana
    if (ratio_med > 0) and (ratio_ols <= 0 or ratio_ols < ratio_med / 100.0 or ratio_ols > ratio_med * 100.0):
        return ratio_med, ratio_sd, len(ratio_series), "median_fallback"
    return ratio_ols, ratio_sd, len(ratio_series), "ols"

def rank_score_stable(overlap, mae, ratio_sd, k1=1000.0, k2=100000.0):
    # ln(1+overlap) - ln(1+k1*mae) - ln(1+k2*ratio_sd)
    return (np.log1p(overlap)
            - np.log1p(k1 * max(mae, 0.0))
            - np.log1p(k2 * max(ratio_sd, 0.0)))

In [79]:
def backtest_spread(df: pd.DataFrame,
                    z_entry=2.0, z_exit=0.0, z_stop=3.0,
                    cost_bdr_bps=10.0, cost_synth_bps=2.0,
                    borrow_bdr_bps_day=40.0,
                    bars_per_day=None,
                    flat_at_eod=True,
                    min_hold=2,
                    no_entry_last_k_bars=2,
                    latency_bars=0):
    """
    Opera S = BDR - bdr_teo (BRL).
    Entra: z > +z_entry -> short S ; z < -z_entry -> long S
    Sai: z cruza z_exit (0) OU |z| > z_stop OU fim do dia (se flat_at_eod).
    Restrições:
      - não entra nas últimas K barras do dia (no_entry_last_k_bars)
      - tempo mínimo em posição (min_hold)
      - pode simular latência de execução (latency_bars >= 0)
    Custos: mudança de posição em bps + borrow diário quando short BDR.
    """
    df = df.copy().sort_index()
    S = df['close_bdr'] - df['bdr_teo']
    z = df['zscore']; bdr = df['close_bdr']; syn = df['bdr_teo']

    idx = df.index
    day = idx.normalize()
    # marca última barra do dia e também as K últimas
    last_of_day = (day != np.roll(day, -1))
    if len(df): last_of_day[-1] = True
    # índice da barra do dia (0..n-1)
    day_counts = pd.Series(day).map(pd.Series(day).value_counts())
    bar_num_in_day = pd.Series(day).groupby(day).cumcount().to_numpy()
    bars_in_day    = day_counts.to_numpy()
    in_last_k = (bars_in_day - bar_num_in_day) <= no_entry_last_k_bars

    if bars_per_day is None:
        bars_per_day = estimate_bars_per_day_from_index(idx)
    borrow_per_bar = (borrow_bdr_bps_day / max(bars_per_day, 1)) / 1e4

    pos = np.zeros(len(df), dtype=int)
    entry_idx = -np.ones(len(df), dtype=int)
    costs = np.zeros(len(df))
    pnl   = np.zeros(len(df))

    def should_exit(i):
        if pos[i-1] > 0 and z.iloc[i] >= z_exit: return True
        if pos[i-1] < 0 and z.iloc[i] <= -z_exit: return True
        if abs(z.iloc[i]) > z_stop: return True
        if flat_at_eod and last_of_day[i]: return True
        if min_hold > 0 and entry_idx[i-1] >= 0 and (i - entry_idx[i-1]) < min_hold:
            return False
        return False

    for i in range(1, len(df)):
        p_prev = pos[i-1]

        # decisão de entrada/saída com latência opcional
        j = i - latency_bars if latency_bars > 0 else i

        if p_prev == 0:
            enter_long = (z.iloc[j] < -z_entry) if j >= 0 else False
            enter_short= (z.iloc[j] >  z_entry) if j >= 0 else False
            if not in_last_k[i]:  # bloqueio de entrada nas últimas K barras
                if enter_short:
                    pos[i] = -1; entry_idx[i] = i
                elif enter_long:
                    pos[i] = +1; entry_idx[i] = i
                else:
                    pos[i] = 0
            else:
                pos[i] = 0
        else:
            pos[i] = p_prev
            entry_idx[i] = entry_idx[i-1] if entry_idx[i-1] >= 0 else i-1
            if should_exit(i):
                pos[i] = 0; entry_idx[i] = -1

        # custos de mudança de posição
        delta = abs(pos[i] - p_prev)
        if delta > 0:
            trade_notional = (cost_bdr_bps * bdr.iloc[i] + cost_synth_bps * syn.iloc[i]) / 1e4
            costs[i] += delta * trade_notional

        # aluguel enquanto carrego short de BDR
        if p_prev < 0:
            costs[i] += borrow_per_bar * bdr.iloc[i]

        # PnL do período
        pnl[i] = p_prev * (S.iloc[i] - S.iloc[i-1]) - costs[i]

    # séries e métricas
    pnl_cum = pnl.cumsum()
    ret = pd.Series(pnl, index=df.index, name='pnl')
    curve = pd.Series(pnl_cum, index=df.index, name='pnl_cum')

    ann_factor = np.sqrt(1638.0)  # ~6.5h*252
    std_ret = float(ret.std(ddof=1))
    sharpe = (float(ret.mean()) / (std_ret + 1e-12)) * ann_factor if std_ret > 0 else np.nan

    roll_max = curve.cummax()
    mdd = float((curve - roll_max).min()) if len(curve) else np.nan

    # métricas por trade
    entries = (np.roll(pos, 1) == 0) & (pos != 0)
    exits   = (np.roll(pos, 1) != 0) & (pos == 0)
    starts = np.where(entries)[0].tolist()
    ends   = np.where(exits)[0].tolist()
    if ends and (not starts or ends[0] < starts[0]): ends.pop(0)
    if len(ends) > len(starts): ends = ends[:len(starts)]
    if len(starts) > len(ends): starts = starts[:len(ends)]

    trade_pnls = [float(ret.iloc[s+1:e+1].sum()) for s, e in zip(starts, ends)]
    hold_bars  = [int(e - s) for s, e in zip(starts, ends)]
    trades     = len(trade_pnls)
    winrate    = float(np.mean([p > 0 for p in trade_pnls])) if trades else np.nan
    expectancy = float(np.mean(trade_pnls)) if trades else np.nan
    med_hold   = int(np.median(hold_bars)) if hold_bars else 0

    summary = {
        'bars': len(df), 'trades': trades,
        'winrate': winrate, 'expectancy_per_trade': expectancy,
        'median_hold_bars': med_hold,
        'sharpe_hourly_annualized': sharpe,
        'mdd': mdd,
        'pnl_total': float(curve.iloc[-1]) if len(curve) else 0.0,
        'costs_total': float(costs.sum()),
    }
    out = pd.DataFrame({'S': S, 'z': z, 'pnl': ret, 'pnl_cum': curve, 'pos': pos}, index=df.index)
    return out, summary

In [80]:
def grid_search_params(pairs_to_test,
                       z_entries=(1.5, 2.0, 2.5),
                       z_stops=(3.0, 3.5),
                       cost_sets=((10.0, 2.0), (20.0, 5.0), (30.0, 10.0)),
                       borrow_daily_bps=(0.0, 20.0, 50.0),
                       flat_at_eod=True):
    """
    pairs_to_test: lista de tuplas (bdr, adr, parquet_path) — use 'processed' do main.
    Retorna um DataFrame com os resultados do grid (ou vazio, se nada rodar).
    """
    rows = []
    for bdr, adr, pq in pairs_to_test:
        # carrega o par
        df = pd.read_parquet(pq)
        df = df.set_index(pd.to_datetime(df['datetime'])).drop(columns=['datetime'])
        name = f"{bdr}_{adr}"

        # estima barras/dia (pra custo de borrow)
        bars_day = estimate_bars_per_day_from_index(df.index)

        # grid
        for ze in z_entries:
            for zs in z_stops:
                for cbdr, csyn in cost_sets:
                    for brw in borrow_daily_bps:
                        bt_df, bt_sum = backtest_spread(
                            df,
                            z_entry=ze, z_exit=0.5, z_stop=zs,   # use z_exit>0 p/ evitar saída imediata
                            cost_bdr_bps=cbdr, cost_synth_bps=csyn,
                            borrow_bdr_bps_day=brw,
                            bars_per_day=bars_day,
                            flat_at_eod=flat_at_eod,
                            min_hold=2,
                            no_entry_last_k_bars=2,
                            latency_bars=0
                        )
                        rows.append({
                            'pair': name,
                            'z_entry': ze,
                            'z_stop': zs,
                            'cost_bdr_bps': cbdr,
                            'cost_syn_bps': csyn,
                            'borrow_bps_day': brw,
                            **bt_sum
                        })

    df_out = pd.DataFrame(rows)

    # Se deu vazio (ex.: pairs_to_test vazio), retorna DF vazio (sem tentar sort)
    if df_out.empty:
        return df_out

    # Ordena com segurança
    col = 'sharpe_hourly_annualized'
    if col in df_out.columns:
        df_out = df_out.sort_values(col, ascending=False)
    return df_out


In [81]:
def diagnostics_quick(df_pair: pd.DataFrame):
    x = np.log(df_pair['close_bdr'])
    y = np.log(df_pair['close_us'] * df_pair['usdxbrl'])
    valid = x.notna() & y.notna()
    corr = x[valid].corr(y[valid]) if valid.any() else np.nan
    ratio_inst = (df_pair['close_bdr'] / (df_pair['close_us'] * df_pair['usdxbrl'])).replace([np.inf,-np.inf], np.nan)
    return {
        "corr_log": float(corr),
        "median_ratio": float(ratio_inst.median()),
        "n": int(valid.sum()),
    }

def choose_ratio_with_fallback(df_pair: pd.DataFrame, ratio_ols: float):
    denom = df_pair['close_us'] * df_pair['usdxbrl']
    ratio_series = (df_pair['close_bdr'] / denom).replace([np.inf,-np.inf], np.nan).dropna()
    if ratio_series.empty:
        return ratio_ols, np.nan, 0, "ols"
    ratio_med = float(np.median(ratio_series))
    ratio_sd = float(ratio_series.std(ddof=1)) if len(ratio_series) > 1 else 0.0
    if (ratio_med > 0) and (ratio_ols <= 0 or ratio_ols < ratio_med/100.0 or ratio_ols > ratio_med*100.0):
        return ratio_med, ratio_sd, len(ratio_series), "median_fallback"
    return ratio_ols, ratio_sd, len(ratio_series), "ols"

def rank_score_stable(overlap, mae, ratio_sd, k1=1000.0, k2=100000.0):
    # ln(1+overlap) - ln(1+k1*mae) - ln(1+k2*ratio_sd)
    return (np.log1p(overlap) - np.log1p(k1*max(mae,0.0)) - np.log1p(k2*max(ratio_sd,0.0)))


In [82]:
def entries_near_eod_share(df_bt: pd.DataFrame, df_raw: pd.DataFrame, last_k=2) -> float:
    """
    Retorna a fração de entradas que ocorrem nas últimas 'last_k' barras do dia.
    """
    idx = df_raw.index
    day = idx.normalize()

    # barras no dia e índice da barra (0..n-1) para cada linha
    day_series = pd.Series(day)
    bars_in_day = day_series.map(day_series.value_counts()).to_numpy()
    bar_num_in_day = day_series.groupby(day_series).cumcount().to_numpy()

    near_eod = (bars_in_day - bar_num_in_day) <= last_k  # bool de mesmo tamanho

    pos = df_bt['pos'].to_numpy()
    entries = (np.roll(pos, 1) == 0) & (pos != 0)        # pontos de entrada

    num_entries = int(entries.sum())
    if num_entries == 0:
        return np.nan

    num_near = int((entries & near_eod).sum())
    return num_near / num_entries


In [83]:
def main():
    print("Lendo BDRs…")
    bdr_map = load_bdr_sheets(PATH_BDR_XLSX)
    print(f"BDRs lidas: {len(bdr_map)}")

    print("Lendo ADRs…")
    us_map  = load_us_sheets(PATH_US_XLSX)
    print(f"ADRs lidas: {len(us_map)}")

    print("Lendo FX (USDBRL)…")
    fx_df   = load_fx(PATH_FX_XLSX)

    rows_summary = []
    processed = []

    for bdr, adr in PAIRS.items():
        if bdr not in bdr_map:
            print(f"[WARN] BDR '{bdr}' não encontrado; pulando.")
            continue
        if adr not in us_map:
            print(f"[WARN] ADR '{adr}' não encontrado; pulando.")
            continue

        # Shift especial no BDR (−30min geral; −60min na última barra do dia)
        df_bdr_shifted = shift_bdr_index_with_eod_rule(
            bdr_map[bdr], SHIFT_BDR_BASE_MIN, SHIFT_BDR_EOD_MIN
        )

        # Join BDR↔ADR↔FX (nearest)
        tmp = nearest_join(
            df_bdr_shifted[['close_bdr','volume_bdr']],
            us_map[adr][['close_us','volume_us']],
            ASOF_TOL
        )
        tmp = nearest_join(tmp, fx_df[['usdxbrl','volume_fx']], ASOF_TOL)
        tmp = tmp.dropna(subset=['close_bdr','close_us','usdxbrl']).sort_index()

        n_overlap = len(tmp)
        if n_overlap == 0:
            print(f"[INFO] Sem overlap para {bdr}↔{adr}.")
            continue

        # Ratio OLS + fallback para mediana (se necessário)
        ratio_ols, ratio_sd_raw, n_ratio_raw = calibrate_ratio(
            tmp['close_bdr'], tmp['close_us'], tmp['usdxbrl'], method=RATIO_METHOD
        )
        ratio_used, ratio_sd, n_ratio, ratio_method = choose_ratio_with_fallback(tmp, ratio_ols)

        # Teórico, spread, zscore
        tmp.attrs['ratio_used'] = ratio_used
        tmp = add_theoretical_and_spread(tmp, ewma_span=EWMA_SPAN)

        # Métricas de aderência do teórico
        mae  = (tmp['close_bdr'] - tmp['bdr_teo']).abs().mean()
        mape = ((tmp['close_bdr'] - tmp['bdr_teo']).abs()
                / tmp['close_bdr'].replace(0, np.nan)).mean()

        rows_summary.append({
            'bdr': bdr,
            'adr': adr,
            'overlap_obs': n_overlap,
            'ratio_used': ratio_used,
            'ratio_sd': ratio_sd,
            'ratio_obs': n_ratio,
            'ratio_method': ratio_method,
            'mae_teorico': mae,
            'mape_teorico': mape,
        })

        # Salvar parquet do par casado
        out_path = OUT_DIR / "pairs" / f"{bdr}_{adr}.parquet"
        tmp.reset_index().rename(columns={'index':'datetime'}).to_parquet(out_path, index=False)
        processed.append((bdr, adr, out_path))
        print(f"[OK] {bdr}↔{adr}: {n_overlap} pts | ratio={ratio_used:.6f} ({ratio_method}) | salvo: {out_path.name}")

    # Resumo + plots + backtest
    if rows_summary:
        df_sum = pd.DataFrame(rows_summary)

        # Score estável (log-scale)
        df_sum['score'] = df_sum.apply(
            lambda r: rank_score_stable(r['overlap_obs'], r['mae_teorico'], r['ratio_sd']),
            axis=1
        )
        df_sum = df_sum.sort_values('score', ascending=False)
        df_sum.to_csv(OUT_DIR / "summary_pairs.csv", index=False, float_format="%.8f")
        print("\n== Resumo dos pares (top 10 por score) ==")
        print(df_sum[['bdr','adr','overlap_obs','ratio_used','ratio_sd','mae_teorico','mape_teorico','ratio_method']].head(10))

        # Quais pares plotar/backtestar
        plot_list = processed
        if PLOT_TOP_N is not None and PLOT_TOP_N > 0:
            top = set(tuple(x) for x in df_sum[['bdr','adr']].head(PLOT_TOP_N).to_records(index=False))
            plot_list = [p for p in processed if (p[0], p[1]) in top]

        # Plots + Backtest por par
        bt_rows = []
        for bdr, adr, pq in plot_list:
            df = pd.read_parquet(pq)
            df = df.set_index(pd.to_datetime(df['datetime'])).drop(columns=['datetime'])
            name = f"{bdr}_{adr}"

            # Diagnósticos rápidos
            diag = diagnostics_quick(df)
            if not np.isfinite(diag['median_ratio']) or diag['median_ratio'] <= 0:
                print(f"[WARN][{name}] median_ratio inválido:", diag['median_ratio'])

            # Plots
            plot_prices(df, name, OUT_DIR / "plots")
            plot_spread(df, name, OUT_DIR / "plots")
            plot_zscore(df, name, OUT_DIR / "plots")

            # Backtest (ajuste custos/borrow conforme sua realidade)
            bars_day = estimate_bars_per_day_from_index(df.index)
            bt_df, bt_sum = backtest_spread(
                df,
                z_entry=2.0, z_exit=0.5, z_stop=3.0,        # z_exit > 0 evita saída instantânea
                cost_bdr_bps=10.0, cost_synth_bps=2.0,
                borrow_bdr_bps_day=40.0,
                bars_per_day=bars_day,
                flat_at_eod=True,
                min_hold=2,                      # segurar pelo menos 2 barras
                no_entry_last_k_bars=2,          # não entra nas 2 últimas barras
                latency_bars=0                   # se quiser, teste =1
            )
            share = entries_near_eod_share(bt_df, df, last_k=2)
            if np.isnan(share):
                print("% de entradas nas últimas 2 barras do dia: N/A (sem entradas)")
            else:
                print(f"% de entradas nas últimas 2 barras do dia: {share:.1%}")


            # Salvar curva de PnL
            bt_path = OUT_DIR / "pairs" / f"{name}_bt.parquet"
            bt_df.reset_index().rename(columns={'index':'datetime'}).to_parquet(bt_path, index=False)


            bt_rows.append({
                'bdr': bdr, 'adr': adr,
                **bt_sum,
                'corr_log': diag['corr_log'],
                'median_ratio_diag': diag['median_ratio']
            })
            print(f"[BT] {name}: sharpe={bt_sum['sharpe_hourly_annualized']:.2f} | pnl={bt_sum['pnl_total']:.2f} | trades={bt_sum['trades']}")

        if bt_rows:
            df_bt = pd.DataFrame(bt_rows).sort_values('sharpe_hourly_annualized', ascending=False)
            df_bt.to_csv(OUT_DIR / "summary_backtest.csv", index=False, float_format="%.6f")
            print("\n== Resultado do backtest (top 10 por Sharpe) ==")
            print(df_bt[['bdr','adr','sharpe_hourly_annualized','pnl_total','mdd','trades','winrate','corr_log']].head(10))
        pairs_to_test = processed

        df_grid = grid_search_params(
            pairs_to_test=pairs_to_test,
            z_entries=(1.5, 2.0, 2.5),
            z_stops=(3.0, 3.5, 4.0),
            cost_sets=((10.0, 2.0), (20.0, 5.0)),
            borrow_daily_bps=(0.0, 20.0, 40.0),
            flat_at_eod=True
        )

        if not df_grid.empty:
            df_grid.to_csv(OUT_DIR / "summary_gridsearch.csv", index=False, float_format="%.6f")
            print("\n== Grid search (top 10 por Sharpe) ==")
            print(df_grid[['pair','z_entry','z_stop','cost_bdr_bps','cost_syn_bps','borrow_bps_day',
                        'sharpe_hourly_annualized','pnl_total','trades','winrate']].head(10))
        else:
            print("\n[GRID] Nenhum cenário foi avaliado (lista 'pairs_to_test' vazia?).")
    else:
        print("Nenhum par válido processado.")

if __name__ == "__main__":
    main()

Lendo BDRs…
BDRs lidas: 11
Lendo ADRs…
ADRs lidas: 11
Lendo FX (USDBRL)…
[OK] AAPL34↔AAPL: 12961 pts | ratio=0.049607 (ols) | salvo: AAPL34_AAPL.parquet
[OK] MSFT34↔MSFT: 11007 pts | ratio=0.041243 (ols) | salvo: MSFT34_MSFT.parquet
[OK] NVDC34↔NVDA: 8254 pts | ratio=0.020829 (ols) | salvo: NVDC34_NVDA.parquet
[OK] AMZO34↔AMZN: 9708 pts | ratio=0.050004 (ols) | salvo: AMZO34_AMZN.parquet
[OK] GOGL34↔GOOGL: 10794 pts | ratio=0.082713 (ols) | salvo: GOGL34_GOOGL.parquet
[OK] M1TA34↔META: 9707 pts | ratio=0.035643 (ols) | salvo: M1TA34_META.parquet
[OK] TSLA34↔TSLA: 8676 pts | ratio=0.031242 (ols) | salvo: TSLA34_TSLA.parquet
[OK] TSMC34↔TSM: 7540 pts | ratio=0.122508 (ols) | salvo: TSMC34_TSM.parquet
[OK] AVGO34↔AVGO: 5914 pts | ratio=0.014187 (ols) | salvo: AVGO34_AVGO.parquet
[OK] BABA34↔BABA: 8194 pts | ratio=0.035744 (ols) | salvo: BABA34_BABA.parquet
[OK] JDCO34↔JD: 5526 pts | ratio=0.166777 (ols) | salvo: JDCO34_JD.parquet

== Resumo dos pares (top 10 por score) ==
       bdr    ad