# Experimentos (OR/Otimização) — Mean-CVaR + Turnover

Notebook central do artigo acadêmico (Pesquisa Operacional / Otimização).

O objetivo é gerar, **dentro do próprio notebook**, os principais artefatos do paper:

- **Tabela principal**: retorno/risco/CVaR/drawdown/turnover/custos.
- **Figura de trade-off**: retorno vs risco de cauda (CVaR).
- **Sensibilidade**: varredura de τ (limite de CVaR) e penalidade de turnover.
- **Estresse**: métricas por subperíodo (COVID/2022 etc.).
- **Diagnóstico do solver**: status, tempo e resíduos (importante em OR).

## Convenções (CVaR)

- No LP de mean-CVaR (`solve_cvar_lp`), o CVaR é modelado sobre **perdas** (valor positivo).
- Na avaliação OOS (`arara_quant.evaluation.oos`), `cvar_95` é a média dos **piores retornos** (valor negativo) e `cvar_95_annual = cvar_95 × √252`.
- Para o paper, use normalmente **perda**: `cvar_loss_95_annual = -cvar_95_annual`.

Referência: `docs/CVAR_CONVENTION.md`.


In [None]:
from __future__ import annotations

import json
import platform
import subprocess
import sys
from dataclasses import dataclass
from datetime import datetime
from itertools import product
from pathlib import Path
from typing import Any

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

REPO_ROOT = Path.cwd().resolve()
if (REPO_ROOT / "src").exists():
    sys.path.insert(0, str(REPO_ROOT / "src"))

pd.set_option("display.max_columns", 200)
pd.set_option("display.width", 140)
np.set_printoptions(precision=6, suppress=True)
plt.rcParams["figure.figsize"] = (10, 4)


def _git_head() -> str:
    try:
        return subprocess.check_output(["git", "rev-parse", "--short", "HEAD"], text=True).strip()
    except Exception:
        return "unknown"


def _env_snapshot() -> dict[str, Any]:
    return {
        "timestamp": datetime.now().isoformat(timespec="seconds"),
        "git": _git_head(),
        "python": sys.version.split()[0],
        "platform": platform.platform(),
        "numpy": np.__version__,
        "pandas": pd.__version__,
    }


env = _env_snapshot()
env


## 1) Dados

Este notebook usa a matriz de retornos `data/processed/returns_arara.parquet`.

Se ainda não existir:
```bash
poetry run python scripts/run_01_data_pipeline.py --force-download --start 2010-01-01
```


In [None]:
returns_path = REPO_ROOT / "data/processed/returns_arara.parquet"
if not returns_path.exists():
    raise FileNotFoundError(
        f"Arquivo não encontrado: {returns_path}. Rode scripts/run_01_data_pipeline.py para gerar."
    )

returns_raw = pd.read_parquet(returns_path).sort_index().astype(float)

# Filtro mínimo de observações por ativo (alinha com scripts/research/run_cvar_tail_experiment.py)
TRAIN_WINDOW = 252
MIN_OBS = TRAIN_WINDOW + 60
valid_cols = [c for c in returns_raw.columns if returns_raw[c].count() >= MIN_OBS]

returns = returns_raw[valid_cols].fillna(0.0)

summary = {
    "rows": int(len(returns)),
    "assets": int(returns.shape[1]),
    "start": str(returns.index.min().date()),
    "end": str(returns.index.max().date()),
    "min_obs_filter": int(MIN_OBS),
    "missing_rate_raw": float(returns_raw.isna().mean().mean()),
}

summary


### 1.1) Sanity checks do dataset (para seção de dados)

- Número de ativos elegíveis
- Distribuição de observações por ativo
- Checagem de magnitude de retornos


In [None]:
obs_per_asset = returns_raw[valid_cols].count().sort_values(ascending=False)
describe = obs_per_asset.describe().to_frame(name="obs")
describe


In [None]:
fig, ax = plt.subplots()
ax.hist(obs_per_asset.values, bins=20, alpha=0.85)
ax.set_title("Observações por ativo (após filtro min_obs)")
ax.set_xlabel("# observações")
ax.set_ylabel("# ativos")
ax.grid(True, alpha=0.3)
plt.show()


In [None]:
q = returns.stack().quantile([0.001, 0.01, 0.5, 0.99, 0.999]).to_frame(name="return")
q


## 2) Desenho experimental (walk-forward)

Configuração base (padrões do repo):
- `train=252`, `test=21`, `purge=5`, `embargo=5`
- custos lineares: `30 bps` (aplicado no rebalance)
- box constraints: `0 ≤ w_i ≤ max_position`


In [None]:
TEST_WINDOW = 21
PURGE_WINDOW = 5
EMBARGO_WINDOW = 5

COSTS_BPS = 30.0
MAX_POSITION = 0.12

RANDOM_STATE = 42
BOOTSTRAP_ITERATIONS = 200  # use 0 para rodar mais rápido
CONFIDENCE = 0.95
BLOCK_SIZE = 21

pd.DataFrame(
    {
        "train_window": [TRAIN_WINDOW],
        "test_window": [TEST_WINDOW],
        "purge_window": [PURGE_WINDOW],
        "embargo_window": [EMBARGO_WINDOW],
        "costs_bps": [COSTS_BPS],
        "max_position": [MAX_POSITION],
        "bootstrap_iterations": [BOOTSTRAP_ITERATIONS],
        "block_size": [BLOCK_SIZE],
        "random_state": [RANDOM_STATE],
    }
)


## 3) Estratégias

Baselines (para comparação em OR):
- 1/N (`equal_weight`)
- Risk Parity (`risk_parity`)
- MV com shrink conservador (`shrunk_mv`)

Proposto:
- Mean-CVaR (LP) com **turnover cap/penalty**, em dois modos:
  1) `mean_cvar_target`: retorno-alvo + penalidade de CVaR
  2) `mean_cvar_limit`: limite duro de CVaR (perda) + objetivo retorno - λ·CVaR


In [None]:
from arara_quant.evaluation.oos import StrategySpec, compare_baselines, default_strategies, stress_test
from arara_quant.optimization.core.cvar_lp import CvarConfig, solve_cvar_lp


@dataclass(frozen=True)
class CvarSpec:
    name: str
    alpha: float
    risk_aversion: float
    max_position: float
    turnover_penalty: float
    turnover_cap: float | None
    target_return: float | None
    max_cvar_loss: float | None
    solver: str


def _equal_weight(train_returns: pd.DataFrame) -> pd.Series:
    cols = train_returns.columns
    if len(cols) == 0:
        return pd.Series(dtype=float)
    return pd.Series(1.0 / len(cols), index=cols, dtype=float)


def _prepare_window(data: pd.DataFrame) -> pd.DataFrame:
    # Mantém o padrão do script de pesquisa: se houver NaNs, remove linhas; se esvaziar, zera.
    cleaned = data.dropna(axis=0, how="any")
    if cleaned.empty:
        return data.fillna(0.0)
    return cleaned


def build_mean_cvar_strategy(spec: CvarSpec, *, diagnostics: list[dict[str, Any]]) -> StrategySpec:
    def builder(train_returns: pd.DataFrame, prev_weights: pd.Series | None) -> pd.Series:
        window = _prepare_window(train_returns)
        if window.empty:
            diagnostics.append({"status": "empty_window", "train_end": None})
            return _equal_weight(train_returns)

        expected = window.mean()
        assets = expected.index

        prev = pd.Series(0.0, index=assets, dtype=float)
        if prev_weights is not None:
            prev = prev_weights.reindex(assets).fillna(0.0).astype(float)

        config = CvarConfig(
            alpha=spec.alpha,
            risk_aversion=spec.risk_aversion,
            long_only=True,
            lower_bounds=pd.Series(0.0, index=assets, dtype=float),
            upper_bounds=pd.Series(spec.max_position, index=assets, dtype=float),
            turnover_penalty=spec.turnover_penalty,
            turnover_cap=spec.turnover_cap,
            previous_weights=prev,
            target_return=spec.target_return,
            max_cvar=spec.max_cvar_loss,
            solver=spec.solver,
            solver_kwargs={"max_iters": 10_000},
        )

        record: dict[str, Any] = {
            "train_end": str(window.index.max().date()) if len(window.index) else None,
            "n_assets": int(len(assets)),
            "n_scenarios": int(len(window)),
            "alpha": float(spec.alpha),
            "risk_aversion": float(spec.risk_aversion),
            "turnover_penalty": float(spec.turnover_penalty),
            "turnover_cap": None if spec.turnover_cap is None else float(spec.turnover_cap),
            "target_return": None if spec.target_return is None else float(spec.target_return),
            "max_cvar_loss": None if spec.max_cvar_loss is None else float(spec.max_cvar_loss),
        }

        try:
            result = solve_cvar_lp(window, expected, config)
            record.update(
                {
                    "status": result.summary.status,
                    "solver": result.summary.solver,
                    "runtime": result.summary.runtime,
                    "objective": result.summary.value,
                    "primal_residual": result.summary.primal_residual,
                    "dual_residual": result.summary.dual_residual,
                    "expected_return": result.expected_return,
                    "cvar_loss": result.cvar,
                    "var_loss": result.var,
                    "turnover": result.turnover,
                }
            )
            weights = result.weights.reindex(assets).fillna(0.0)
        except Exception as exc:
            record.update({"status": "exception", "error": str(exc)})
            diagnostics.append(record)
            return _equal_weight(train_returns)

        diagnostics.append(record)

        weights = weights.clip(lower=0.0, upper=spec.max_position)
        total = float(weights.sum())
        if total <= 0:
            return _equal_weight(train_returns)
        return weights / total

    return StrategySpec(spec.name, builder)


base_strategies = default_strategies(max_position=MAX_POSITION)
base_strategies = [s for s in base_strategies if s.name in {"equal_weight", "risk_parity", "shrunk_mv"}]

diag_target: list[dict[str, Any]] = []
diag_limit: list[dict[str, Any]] = []

spec_target = CvarSpec(
    name="mean_cvar_target",
    alpha=0.95,
    risk_aversion=2.5,
    max_position=MAX_POSITION,
    turnover_penalty=0.02,
    turnover_cap=0.20,
    target_return=0.0004,  # ~10% a.a. (aprox.)
    max_cvar_loss=None,
    solver="CLARABEL",
)

spec_limit = CvarSpec(
    name="mean_cvar_limit",
    alpha=0.95,
    risk_aversion=1.5,
    max_position=MAX_POSITION,
    turnover_penalty=0.02,
    turnover_cap=0.20,
    target_return=None,
    max_cvar_loss=0.06,  # CVaR(loss) diário <= 6%
    solver="CLARABEL",
)

strategies = [
    *base_strategies,
    build_mean_cvar_strategy(spec_target, diagnostics=diag_target),
    build_mean_cvar_strategy(spec_limit, diagnostics=diag_limit),
]

[s.name for s in strategies]


## 3.1) Variantes do mean-CVaR (paper) e regime-switching

O PDF propõe comparar duas formulações principais:

- **CVaR como restrição:** maximizar retorno sujeito a CVaR(loss) ≤ τ.
- **CVaR no objetivo:** maximizar retorno − λ·CVaR(loss).

E também um **regime-switching simples** (proxy interno por volatilidade; opcionalmente VIX):
- regime “normal” → MV (proxy) para capturar bull markets;
- regime “estresse” → mean-CVaR para proteção de cauda.


In [None]:
# Variantes do paper (além das duas já definidas acima)
diag_constraint: list[dict[str, Any]] = []
diag_objective: list[dict[str, Any]] = []

# (A) CVaR como restrição: maximize retorno sob CVaR(loss) <= tau
spec_constraint = CvarSpec(
    name="mean_cvar_constraint",
    alpha=0.95,
    risk_aversion=0.0,
    max_position=MAX_POSITION,
    turnover_penalty=0.02,
    turnover_cap=0.20,
    target_return=None,
    max_cvar_loss=0.06,
    solver="CLARABEL",
)

# (B) CVaR no objetivo: maximize retorno - lambda * CVaR(loss)
spec_objective = CvarSpec(
    name="mean_cvar_objective",
    alpha=0.95,
    risk_aversion=2.5,
    max_position=MAX_POSITION,
    turnover_penalty=0.02,
    turnover_cap=0.20,
    target_return=None,
    max_cvar_loss=None,
    solver="CLARABEL",
)

cvar_constraint = build_mean_cvar_strategy(spec_constraint, diagnostics=diag_constraint)
cvar_objective = build_mean_cvar_strategy(spec_objective, diagnostics=diag_objective)

# Regime-switching (proxy interno; opcional: VIX)
USE_VIX_SIGNAL = False
VIX_THRESHOLD = 30.0
VOL_THRESHOLD = 0.20  # 20% a.a. (proxy interno)

vix_close: pd.Series | None = None
if USE_VIX_SIGNAL:
    import yfinance as yf

    vix_df = yf.download(
        "^VIX",
        start=str(returns.index.min().date()),
        end=str(returns.index.max().date()),
        progress=False,
        auto_adjust=True,
    )
    if isinstance(vix_df.columns, pd.MultiIndex):
        if "Close" in vix_df.columns.get_level_values(0):
            vix_close = vix_df["Close"].iloc[:, 0]
    else:
        if "Close" in vix_df.columns:
            vix_close = vix_df["Close"]
    if vix_close is not None:
        vix_close = vix_close.dropna().sort_index()

regime_log: list[dict[str, Any]] = []

from arara_quant.estimators.cov import ledoit_wolf_shrinkage
from arara_quant.estimators.mu import shrunk_mean
from arara_quant.optimization.core.mv_qp import MeanVarianceConfig, solve_mean_variance


def _signal_from_train(train_returns: pd.DataFrame) -> tuple[str, float, bool]:
    train_end = train_returns.index.max()

    if vix_close is not None and not vix_close.empty:
        value = float(vix_close.asof(train_end))
        return "vix", value, value >= VIX_THRESHOLD

    proxy = (
        train_returns["SPY"]
        if "SPY" in train_returns.columns
        else train_returns.mean(axis=1)
    )
    vol = float(proxy.tail(21).std(ddof=1) * np.sqrt(252.0))
    return "realized_vol", vol, vol >= VOL_THRESHOLD


def _mv_shrunk_builder(train_returns: pd.DataFrame, prev_weights: pd.Series | None) -> pd.Series:
    assets = train_returns.columns
    mu_daily = shrunk_mean(train_returns, strength=0.5, prior=0.0)
    mu = mu_daily * 252.0

    cov_daily, _ = ledoit_wolf_shrinkage(train_returns)
    cov = cov_daily * 252.0

    prev = pd.Series(0.0, index=assets, dtype=float)
    if prev_weights is not None:
        prev = prev_weights.reindex(assets).fillna(0.0).astype(float)

    bounds = pd.Series(MAX_POSITION, index=assets, dtype=float)
    config = MeanVarianceConfig(
        risk_aversion=4.0,
        turnover_penalty=0.0,
        turnover_cap=None,
        lower_bounds=pd.Series(0.0, index=assets, dtype=float),
        upper_bounds=bounds,
        previous_weights=prev,
        cost_vector=None,
        solver="CLARABEL",
    )
    result = solve_mean_variance(mu, cov, config)

    weights = (
        result.weights.reindex(assets)
        .fillna(0.0)
        .clip(lower=0.0, upper=MAX_POSITION)
    )
    total = float(weights.sum())
    if total <= 0:
        return _equal_weight(train_returns)
    return weights / total


def _regime_switch(train_returns: pd.DataFrame, prev_weights: pd.Series | None) -> pd.Series:
    signal, value, stressed = _signal_from_train(train_returns)

    if stressed:
        chosen = "mean_cvar_constraint"
        weights = cvar_constraint.builder(train_returns, prev_weights)
    else:
        chosen = "shrunk_mv_proxy"
        weights = _mv_shrunk_builder(train_returns, prev_weights)

    regime_log.append(
        {
            "train_end": str(train_returns.index.max().date()),
            "signal": signal,
            "signal_value": float(value),
            "stressed": bool(stressed),
            "chosen": chosen,
        }
    )
    return weights


regime_switch = StrategySpec("regime_switch", _regime_switch)

# Estratégias usadas no run principal (tabela/figuras)
strategies = [
    *base_strategies,
    cvar_constraint,
    cvar_objective,
    build_mean_cvar_strategy(spec_target, diagnostics=diag_target),
    build_mean_cvar_strategy(spec_limit, diagnostics=diag_limit),
    regime_switch,
]

[s.name for s in strategies]


## 4) Resultado principal (Tabela do artigo)


In [None]:
oos = compare_baselines(
    returns,
    strategies=strategies,
    train_window=TRAIN_WINDOW,
    test_window=TEST_WINDOW,
    purge_window=PURGE_WINDOW,
    embargo_window=EMBARGO_WINDOW,
    costs_bps=COSTS_BPS,
    max_position=MAX_POSITION,
    bootstrap_iterations=BOOTSTRAP_ITERATIONS,
    confidence=CONFIDENCE,
    block_size=BLOCK_SIZE,
    random_state=RANDOM_STATE,
)

metrics = oos.metrics.copy()

# Transformações úteis para paper
metrics["cvar_loss_95_annual"] = -metrics["cvar_95_annual"].astype(float)
metrics["max_drawdown_abs"] = metrics["max_drawdown"].abs().replace(0.0, np.nan)
metrics["cvar_95_annual_abs"] = metrics["cvar_95_annual"].abs().replace(0.0, np.nan)
metrics["calmar"] = metrics["annualized_return"] / metrics["max_drawdown_abs"]
metrics["return_over_cvar"] = metrics["annualized_return"] / metrics["cvar_95_annual_abs"]

years = len(oos.returns) / 252.0
metrics["cost_bps_per_year"] = np.where(
    years > 0,
    metrics["total_cost"].astype(float) / years * 10_000.0,
    np.nan,
)
rebalances_per_year = 252.0 / TEST_WINDOW
metrics["turnover_per_year"] = metrics["avg_turnover"].astype(float) * rebalances_per_year

table_main = metrics[[
    "annualized_return",
    "volatility",
    "sharpe",
    "cvar_loss_95_annual",
    "max_drawdown",
    "turnover_per_year",
    "cost_bps_per_year",
    "calmar",
    "return_over_cvar",
    "sharpe_ci_low",
    "sharpe_ci_high",
]].copy()

table_main = table_main.sort_values(["return_over_cvar", "calmar"], ascending=False)
table_main


In [None]:
# Versão "paper-ready" (formatação)
table_paper = table_main.copy()

fmt_pct = lambda x: f"{x*100:,.2f}%" if pd.notna(x) else "-"
fmt_num = lambda x: f"{x:,.2f}" if pd.notna(x) else "-"
fmt_bps = lambda x: f"{x:,.1f}" if pd.notna(x) else "-"

table_paper["annualized_return"] = table_paper["annualized_return"].map(fmt_pct)
table_paper["volatility"] = table_paper["volatility"].map(fmt_pct)
table_paper["cvar_loss_95_annual"] = table_paper["cvar_loss_95_annual"].map(fmt_pct)
table_paper["max_drawdown"] = table_paper["max_drawdown"].map(fmt_pct)
table_paper["turnover_per_year"] = table_paper["turnover_per_year"].map(fmt_num)
table_paper["cost_bps_per_year"] = table_paper["cost_bps_per_year"].map(fmt_bps)
table_paper["sharpe"] = table_paper["sharpe"].map(fmt_num)
table_paper["calmar"] = table_paper["calmar"].map(fmt_num)
table_paper["return_over_cvar"] = table_paper["return_over_cvar"].map(fmt_num)
table_paper["sharpe_ci_low"] = table_paper["sharpe_ci_low"].map(fmt_num)
table_paper["sharpe_ci_high"] = table_paper["sharpe_ci_high"].map(fmt_num)

table_paper


In [None]:
# Tabela em LaTeX (cole direto no paper)
latex_cols = {
    "annualized_return": "Ret (a.a.)",
    "volatility": "Vol (a.a.)",
    "sharpe": "Sharpe",
    "cvar_loss_95_annual": "CVaR 95\\% (a.a., perda)",
    "max_drawdown": "MaxDD",
    "turnover_per_year": "Turnover/ano",
    "cost_bps_per_year": "Custo (bps/ano)",
    "calmar": "Calmar",
    "return_over_cvar": "Ret/CVaR",
}

table_paper[list(latex_cols.keys())].rename(columns=latex_cols).to_latex(
    escape=False,
    index=True,
    caption="OOS (walk-forward): métricas principais.",
    label="tab:oos_main",
)


## 4.1) Critério de “vitória econômica” (do PDF)

Regra prática proposta:
- se a variante mean-CVaR tiver retorno absoluto menor, mas **Calmar ≥ 1.5×** ou **Ret/CVaR ≥ 1.5×** vs baseline, considerar “vitória econômica”.


In [None]:
candidates = [name for name in metrics.index if name.startswith("mean_cvar") or name == "regime_switch"]
baselines = [name for name in metrics.index if name in {"shrunk_mv", "equal_weight", "risk_parity"}]

rows: list[dict[str, Any]] = []
for cand in candidates:
    for base in baselines:
        calmar_ratio = float(metrics.loc[cand, "calmar"] / metrics.loc[base, "calmar"]) if metrics.loc[base, "calmar"] else np.nan
        roc_ratio = float(metrics.loc[cand, "return_over_cvar"] / metrics.loc[base, "return_over_cvar"]) if metrics.loc[base, "return_over_cvar"] else np.nan
        ret_diff = float(metrics.loc[cand, "annualized_return"] - metrics.loc[base, "annualized_return"])
        rows.append(
            {
                "candidate": cand,
                "baseline": base,
                "ret_diff": ret_diff,
                "calmar_ratio": calmar_ratio,
                "ret_over_cvar_ratio": roc_ratio,
                "passes_1.5x": (calmar_ratio >= 1.5) or (roc_ratio >= 1.5),
            }
        )

victory = pd.DataFrame(rows).sort_values(["passes_1.5x", "ret_over_cvar_ratio"], ascending=False)
victory


## 4.2) Regime-switching: frequência e sinal

O paper precisa mostrar **quanto tempo** o sistema fica em regime de estresse e qual o sinal usado (VIX ou proxy interno).


In [None]:
regime_df = pd.DataFrame(regime_log)
if not regime_df.empty and "regime_switch" in oos.weights:
    dates = [ts for ts, _ in oos.weights["regime_switch"]]
    regime_df["rebalance_date"] = pd.to_datetime(dates[: len(regime_df)])
    regime_df["rebalance_date"] = regime_df["rebalance_date"].dt.tz_localize(None)
    regime_df = regime_df.sort_values("rebalance_date")

regime_df.head()


In [None]:
if regime_df.empty:
    print("Regime log vazio: rode o bloco principal antes.")
else:
    summary_regime = {
        "n_windows": int(len(regime_df)),
        "stressed_rate": float(regime_df["stressed"].mean()),
        "signal": str(regime_df["signal"].iloc[0]) if "signal" in regime_df.columns else "unknown",
        "signal_value_min": float(regime_df["signal_value"].min()),
        "signal_value_p95": float(regime_df["signal_value"].quantile(0.95)),
        "signal_value_max": float(regime_df["signal_value"].max()),
    }
    summary_regime


In [None]:
if not regime_df.empty:
    fig, ax = plt.subplots()
    ax.plot(regime_df["rebalance_date"], regime_df["signal_value"], lw=1.8)

    threshold = VIX_THRESHOLD if regime_df["signal"].iloc[0] == "vix" else VOL_THRESHOLD
    ax.axhline(threshold, color="red", linestyle="--", lw=1.2, label="threshold")

    stressed = regime_df[regime_df["stressed"]]
    ax.scatter(stressed["rebalance_date"], stressed["signal_value"], color="red", s=30, label="stressed")

    ax.set_title("Sinal de regime no rebalance (proxy interno ou VIX)")
    ax.set_ylabel("signal_value")
    ax.grid(True, alpha=0.3)
    ax.legend()
    plt.show()


### 4.3) Equity curve do regime-switch (com marcação de estresse)

Sombreia as janelas onde o sinal classificou o regime como estresse (proxy interno ou VIX).


In [None]:
if regime_df.empty or "regime_switch" not in oos.returns.columns:
    print("Sem dados para plotar regime_switch (rode o run principal antes).")
else:
    nav = (1.0 + oos.returns).cumprod()
    reg = regime_df.sort_values("rebalance_date").reset_index(drop=True)

    cols = [c for c in ["regime_switch", "mean_cvar_constraint", "shrunk_mv"] if c in nav.columns]
    fig, ax = plt.subplots()
    for c in cols:
        highlight = c == "regime_switch"
        ax.plot(
            nav.index,
            nav[c],
            label=c,
            lw=2.6 if highlight else 1.4,
            alpha=1.0 if highlight else 0.75,
        )

    dates = reg["rebalance_date"].tolist()
    for i, row in reg.iterrows():
        start = row["rebalance_date"]
        end = dates[i + 1] if i + 1 < len(dates) else nav.index.max()
        if bool(row.get("stressed", False)):
            ax.axvspan(start, end, color="red", alpha=0.08)

    ax.set_title("Equity curve OOS: regime-switch (stress shading)")
    ax.set_ylabel("NAV")
    ax.grid(True, alpha=0.3)
    ax.legend()
    plt.show()


## 5) Figura principal: trade-off retorno vs risco de cauda

Para OR, a leitura usual é que o controle de cauda implica trade-off. A figura abaixo posiciona cada estratégia.


In [None]:
plot_points = metrics.copy()
plot_points["cvar_loss_95_annual"] = -plot_points["cvar_95_annual"].astype(float)

fig, ax = plt.subplots()
for name, row in plot_points.iterrows():
    ax.scatter(row["cvar_loss_95_annual"], row["annualized_return"], s=90)
    ax.annotate(name, (row["cvar_loss_95_annual"], row["annualized_return"]), xytext=(5, 5), textcoords="offset points")

ax.set_xlabel("CVaR 95% anual (perda, +)")
ax.set_ylabel("Retorno anualizado")
ax.set_title("Trade-off: retorno vs risco de cauda (OOS)")
ax.grid(True, alpha=0.3)
plt.show()


## 5.1) Curvas de equity e drawdown (Figura do artigo)

Curvas OOS (NAV) e drawdown underwater para leitura visual do trade-off.


In [None]:
nav = (1.0 + oos.returns).cumprod()

fig, ax = plt.subplots()
for name in nav.columns:
    highlight = name.startswith("mean_cvar")
    ax.plot(
        nav.index,
        nav[name],
        label=name,
        lw=2.6 if highlight else 1.2,
        alpha=1.0 if highlight else 0.65,
    )

ax.set_title("Equity curve OOS (NAV, base=1)")
ax.set_ylabel("NAV")
ax.grid(True, alpha=0.3)
ax.legend(ncol=2, fontsize=8)
plt.show()


In [None]:
drawdown = nav.div(nav.cummax()) - 1.0

fig, ax = plt.subplots()
for name in drawdown.columns:
    highlight = name.startswith("mean_cvar")
    ax.plot(
        drawdown.index,
        drawdown[name],
        label=name,
        lw=2.6 if highlight else 1.2,
        alpha=1.0 if highlight else 0.65,
    )

ax.set_title("Drawdown OOS (underwater)")
ax.set_ylabel("drawdown")
ax.grid(True, alpha=0.3)
ax.legend(ncol=2, fontsize=8)
plt.show()


## 5.2) Turnover e custos (Figura do artigo)

Distribuição de turnover por rebalance e custo implícito (em bps) usando `costs_bps`.


In [None]:
turnover_df = pd.DataFrame({name: pd.Series(values) for name, values in oos.turnovers.items()})
cost_bps_df = turnover_df * COSTS_BPS

fig, axes = plt.subplots(1, 2, figsize=(12, 4))

turnover_df.boxplot(ax=axes[0], rot=25)
axes[0].set_title("Turnover one-way por rebalance")
axes[0].set_ylabel("turnover")
axes[0].grid(True, axis="y", alpha=0.3)

cost_bps_df.boxplot(ax=axes[1], rot=25)
axes[1].set_title("Custo (bps) por rebalance")
axes[1].set_ylabel("bps")
axes[1].grid(True, axis="y", alpha=0.3)

plt.tight_layout()
plt.show()


## 5.3) Estabilidade de pesos (evidência operacional)

Além de turnover, é útil reportar estabilidade/complexidade da solução:
- **N_eff** (número efetivo de ativos): 1/∑w²
- **Max weight** (concentração)


In [None]:
def _weight_stats(w: pd.Series) -> dict[str, float]:
    weights = w.astype(float).fillna(0.0).clip(lower=0.0)
    total = float(weights.sum())
    if total > 0:
        weights = weights / total
    hhi = float((weights**2).sum())
    n_eff = (1.0 / hhi) if hhi > 0 else np.nan
    max_w = float(weights.max()) if not weights.empty else np.nan
    n_active = float((weights > 1e-6).sum())
    return {
        "n_active": n_active,
        "n_eff": float(n_eff),
        "max_weight": max_w,
    }


records: list[dict[str, Any]] = []
for strategy, snapshots in oos.weights.items():
    for rebalance_date, weights in snapshots:
        row = _weight_stats(weights)
        row["strategy"] = strategy
        row["rebalance_date"] = pd.to_datetime(rebalance_date).tz_localize(None)
        records.append(row)

weights_stats = pd.DataFrame(records)
weights_stats.head()


In [None]:
fig, axes = plt.subplots(1, 2, figsize=(12, 4))

weights_stats.boxplot(column="n_eff", by="strategy", ax=axes[0], rot=25)
axes[0].set_title("N_eff por rebalance")
axes[0].set_ylabel("N_eff")
axes[0].grid(True, axis="y", alpha=0.3)

weights_stats.boxplot(column="max_weight", by="strategy", ax=axes[1], rot=25)
axes[1].set_title("Max weight por rebalance")
axes[1].set_ylabel("max weight")
axes[1].grid(True, axis="y", alpha=0.3)

plt.suptitle("")
plt.tight_layout()
plt.show()


## 6) Sensibilidade (τ do CVaR vs penalidade de turnover)

Esta seção gera uma grade pequena *paper-friendly*.

Se quiser rodar uma grade maior, aumente os grids abaixo (pode demorar).

In [None]:
max_cvar_grid = [0.04, 0.05, 0.06, 0.07]
turnover_penalty_grid = [0.0, 0.01, 0.02]

records: list[dict[str, Any]] = []

for max_cvar_loss, turnover_penalty in product(max_cvar_grid, turnover_penalty_grid):
    tmp_diag: list[dict[str, Any]] = []
    tmp_spec = CvarSpec(
        name="mean_cvar_limit",
        alpha=0.95,
        risk_aversion=1.5,
        max_position=MAX_POSITION,
        turnover_penalty=float(turnover_penalty),
        turnover_cap=0.20,
        target_return=None,
        max_cvar_loss=float(max_cvar_loss),
        solver="CLARABEL",
    )
    spec = build_mean_cvar_strategy(tmp_spec, diagnostics=tmp_diag)

    run = compare_baselines(
        returns,
        strategies=[*base_strategies, spec],
        train_window=TRAIN_WINDOW,
        test_window=TEST_WINDOW,
        purge_window=PURGE_WINDOW,
        embargo_window=EMBARGO_WINDOW,
        costs_bps=COSTS_BPS,
        max_position=MAX_POSITION,
        bootstrap_iterations=0,
        random_state=RANDOM_STATE,
    )

    m = run.metrics.loc["mean_cvar_limit"].to_dict()
    m["max_cvar_loss"] = float(max_cvar_loss)
    m["turnover_penalty"] = float(turnover_penalty)
    m["cvar_loss_95_annual"] = -float(m.get("cvar_95_annual", 0.0))
    m["max_drawdown_abs"] = abs(float(m.get("max_drawdown", 0.0)))
    denom = abs(float(m.get("cvar_95_annual", 0.0)))
    m["return_over_cvar"] = (float(m.get("annualized_return", 0.0)) / denom) if denom else np.nan

    diag_df = pd.DataFrame(tmp_diag)
    m["solver_opt_rate"] = float((diag_df.get("status") == "optimal").mean()) if not diag_df.empty else np.nan
    m["solver_runtime_mean"] = float(diag_df.get("runtime", pd.Series(dtype=float)).mean()) if not diag_df.empty else np.nan

    records.append(m)

grid = pd.DataFrame(records)
grid = grid.sort_values(["return_over_cvar", "annualized_return"], ascending=False)
grid[[
    "max_cvar_loss",
    "turnover_penalty",
    "annualized_return",
    "sharpe",
    "cvar_95_annual",
    "max_drawdown",
    "avg_turnover",
    "return_over_cvar",
    "solver_opt_rate",
    "solver_runtime_mean",
]].head(12)


In [None]:
fig, ax = plt.subplots()
grid_plot = grid.copy()
grid_plot["cvar_loss_95_annual"] = -grid_plot["cvar_95_annual"].astype(float)

for penalty in sorted(grid_plot["turnover_penalty"].unique()):
    subset = grid_plot[grid_plot["turnover_penalty"] == penalty]
    ax.plot(
        subset["cvar_loss_95_annual"],
        subset["annualized_return"],
        marker="o",
        linestyle="-",
        label=f"turnover_penalty={penalty}",
        alpha=0.9,
    )

ax.set_xlabel("CVaR 95% anual (perda, +)")
ax.set_ylabel("Retorno anualizado")
ax.set_title("Sensibilidade: τ (CVaR) × penalidade de turnover")
ax.grid(True, alpha=0.3)
ax.legend()
plt.show()


### 6.1) Tabelas pivô e heatmap (τ × penalidade)

Centraliza, em formato de tabela/figura, como τ e η alteram retorno, CVaR e turnover.


In [None]:
pivot_ret = grid.pivot_table(
    index="max_cvar_loss", columns="turnover_penalty", values="annualized_return"
).sort_index()
pivot_turn = grid.pivot_table(
    index="max_cvar_loss", columns="turnover_penalty", values="avg_turnover"
).sort_index()
pivot_cvar_ann = (-grid.pivot_table(
    index="max_cvar_loss", columns="turnover_penalty", values="cvar_95_annual"
)).sort_index()
pivot_roc = grid.pivot_table(
    index="max_cvar_loss", columns="turnover_penalty", values="return_over_cvar"
).sort_index()

print("Retorno anualizado")
display(pivot_ret)
print("CVaR 95% anual (perda, +)")
display(pivot_cvar_ann)
print("Turnover médio por rebalance")
display(pivot_turn)
print("Ret/CVaR")
display(pivot_roc)


In [None]:
fig, ax = plt.subplots(figsize=(8, 4))
im = ax.imshow(pivot_roc.to_numpy(dtype=float), aspect="auto", origin="lower")

ax.set_xticks(range(len(pivot_roc.columns)))
ax.set_xticklabels([str(c) for c in pivot_roc.columns])
ax.set_yticks(range(len(pivot_roc.index)))
ax.set_yticklabels([str(i) for i in pivot_roc.index])

ax.set_xlabel("turnover_penalty (η)")
ax.set_ylabel("max_cvar_loss (τ, diário)")
ax.set_title("Heatmap: Ret/CVaR (maior é melhor)")
fig.colorbar(im, ax=ax)
plt.tight_layout()
plt.show()


In [None]:
fig, ax = plt.subplots()
for penalty in sorted(grid["turnover_penalty"].unique()):
    subset = grid[grid["turnover_penalty"] == penalty].sort_values("max_cvar_loss")
    ax.plot(
        subset["max_cvar_loss"],
        subset["avg_turnover"],
        marker="o",
        label=f"η={penalty}",
    )

ax.set_title("Sensibilidade de turnover ao limite de CVaR (τ)")
ax.set_xlabel("max_cvar_loss (τ, diário)")
ax.set_ylabel("avg_turnover (one-way)")
ax.grid(True, alpha=0.3)
ax.legend()
plt.show()


## 7) Estresse (Tabela por subperíodo)

Para OR, é importante mostrar o comportamento em regimes de crise.
Ajuste os períodos conforme o paper (e.g., COVID/2020, inflação/2022).


In [None]:
periods = {
    "COVID-19 (2020Q1)": ("2020-02-19", "2020-04-30"),
    "Inflation/Rate shock (2022)": ("2022-01-03", "2022-10-14"),
}

stress = stress_test(
    oos.returns,
    periods,
    bootstrap_iterations=0,
    random_state=RANDOM_STATE,
)

if not stress.empty:
    stress["cvar_loss_95_annual"] = (-stress["cvar_95"].astype(float)) * np.sqrt(252.0)
    stress["max_drawdown"] = stress["max_drawdown"].astype(float)
    stress = stress[[
        "period",
        "strategy",
        "annualized_return",
        "volatility",
        "sharpe",
        "cvar_loss_95_annual",
        "max_drawdown",
    ]].sort_values(["period", "strategy"])
stress


### 7.1) Turnover e custos por período (operabilidade)

O PDF sugere reportar turnover/custos por regime (crise vs normal). Aqui usamos os mesmos subperíodos para resumir a operabilidade.


In [None]:
turnover_by_date: dict[str, pd.Series] = {}
for strategy, snapshots in oos.weights.items():
    dates = [ts for ts, _ in snapshots]
    values = oos.turnovers.get(strategy, [])
    if not dates or len(values) != len(dates):
        continue
    turnover_by_date[strategy] = pd.Series(values, index=pd.to_datetime(dates)).sort_index()

turnover_by_date_df = pd.DataFrame(turnover_by_date).sort_index()
cost_bps_by_date_df = turnover_by_date_df * COSTS_BPS

records: list[dict[str, Any]] = []
for label, (start, end) in periods.items():
    tw = turnover_by_date_df.loc[str(start) : str(end)]
    cw = cost_bps_by_date_df.loc[str(start) : str(end)]
    for strategy in tw.columns:
        series = tw[strategy].dropna()
        if series.empty:
            continue
        cost_series = cw[strategy].dropna()
        records.append(
            {
                "period": label,
                "strategy": strategy,
                "n_rebalances": int(series.size),
                "turnover_mean": float(series.mean()),
                "turnover_p95": float(series.quantile(0.95)),
                "cost_bps_mean": float(cost_series.mean()) if not cost_series.empty else np.nan,
                "cost_bps_p95": float(cost_series.quantile(0.95)) if not cost_series.empty else np.nan,
            }
        )

ops_by_period = pd.DataFrame(records).sort_values(["period", "strategy"]).reset_index(drop=True)
ops_by_period


## 8) Diagnóstico do solver (tabela + figura)

Esta seção é um diferencial em OR: reporta estabilidade, status e tempo.


In [None]:
diag_df_target = pd.DataFrame(diag_target)
diag_df_limit = pd.DataFrame(diag_limit)
diag_df_constraint = pd.DataFrame(diag_constraint)
diag_df_objective = pd.DataFrame(diag_objective)

def summarize_diag(df: pd.DataFrame, *, label: str) -> pd.DataFrame:
    if df.empty:
        return pd.DataFrame({"label": [label], "n": [0]})
    status_counts = df["status"].value_counts(dropna=False).to_dict() if "status" in df.columns else {}
    optimal_rate = float((df.get("status") == "optimal").mean()) if "status" in df.columns else float("nan")
    runtime = df.get("runtime", pd.Series(dtype=float))
    return pd.DataFrame(
        {
            "label": [label],
            "n_windows": [int(len(df))],
            "optimal_rate": [optimal_rate],
            "runtime_mean_s": [float(runtime.mean()) if not runtime.empty else np.nan],
            "runtime_p95_s": [float(runtime.quantile(0.95)) if not runtime.empty else np.nan],
            "status_counts": [status_counts],
        }
    )


pd.concat(
    [
        summarize_diag(diag_df_constraint, label="mean_cvar_constraint"),
        summarize_diag(diag_df_objective, label="mean_cvar_objective"),
        summarize_diag(diag_df_target, label="mean_cvar_target"),
        summarize_diag(diag_df_limit, label="mean_cvar_limit"),
    ],
    ignore_index=True,
)


### 8.1) Aderência às restrições e taxa de binding (OR)

Para o artigo em OR, é útil reportar quantas janelas respeitam as restrições (CVaR/turnover) e quando elas ficam ativas (binding).


In [None]:
def _adherence(df: pd.DataFrame, *, label: str, tol: float = 1e-6) -> dict[str, Any]:
    if df.empty:
        return {"label": label, "n_windows": 0}

    status = df.get("status", pd.Series(dtype=str)).astype(str)
    optimal = status.str.startswith("optimal")

    out: dict[str, Any] = {
        "label": label,
        "n_windows": int(len(df)),
        "optimal_rate": float(optimal.mean()) if len(optimal) else np.nan,
    }

    if "turnover_cap" in df.columns and df["turnover_cap"].notna().any() and "turnover" in df.columns:
        cap = df["turnover_cap"].astype(float)
        val = df["turnover"].astype(float)
        mask = cap.notna() & val.notna()
        out["turnover_within_cap_rate"] = float((val[mask] <= cap[mask] + tol).mean()) if mask.any() else np.nan
        out["turnover_binding_rate"] = float((np.abs(val[mask] - cap[mask]) <= 1e-3).mean()) if mask.any() else np.nan
    else:
        out["turnover_within_cap_rate"] = np.nan
        out["turnover_binding_rate"] = np.nan

    if "max_cvar_loss" in df.columns and df["max_cvar_loss"].notna().any() and "cvar_loss" in df.columns:
        cap = df["max_cvar_loss"].astype(float)
        val = df["cvar_loss"].astype(float)
        mask = cap.notna() & val.notna()
        out["cvar_within_cap_rate"] = float((val[mask] <= cap[mask] + tol).mean()) if mask.any() else np.nan
        out["cvar_binding_rate"] = float((np.abs(val[mask] - cap[mask]) <= 1e-3).mean()) if mask.any() else np.nan
    else:
        out["cvar_within_cap_rate"] = np.nan
        out["cvar_binding_rate"] = np.nan

    runtime = df.get("runtime", pd.Series(dtype=float))
    out["runtime_mean_s"] = float(runtime.mean()) if not runtime.empty else np.nan
    out["runtime_p95_s"] = float(runtime.quantile(0.95)) if not runtime.empty else np.nan
    return out


adherence = pd.DataFrame(
    [
        _adherence(diag_df_constraint, label="mean_cvar_constraint"),
        _adherence(diag_df_objective, label="mean_cvar_objective"),
        _adherence(diag_df_target, label="mean_cvar_target"),
        _adherence(diag_df_limit, label="mean_cvar_limit"),
    ]
).set_index("label")

adherence


In [None]:
fig, ax = plt.subplots()

for label, df in [
    ("mean_cvar_constraint", diag_df_constraint),
    ("mean_cvar_objective", diag_df_objective),
    ("mean_cvar_target", diag_df_target),
    ("mean_cvar_limit", diag_df_limit),
]:
    if not df.empty and "runtime" in df.columns:
        ax.hist(df["runtime"].dropna().values, bins=20, alpha=0.5, label=label)

ax.set_title("Distribuição do tempo de solver (por janela)")
ax.set_xlabel("tempo (s)")
ax.set_ylabel("# janelas")
ax.grid(True, alpha=0.3)
ax.legend(fontsize=8)
plt.show()


## 9) Reprodutibilidade (snapshot)

Este bloco gera um JSON pequeno para copiar/colar no apêndice do paper.
Ele é exibido na saída (não escreve em disco).


In [None]:
bundle = {
    "env": env,
    "data": summary,
    "experiment": {
        "train_window": TRAIN_WINDOW,
        "test_window": TEST_WINDOW,
        "purge_window": PURGE_WINDOW,
        "embargo_window": EMBARGO_WINDOW,
        "costs_bps": COSTS_BPS,
        "max_position": MAX_POSITION,
        "strategies": [s.name for s in strategies],
        "bootstrap_iterations": BOOTSTRAP_ITERATIONS,
        "block_size": BLOCK_SIZE,
        "random_state": RANDOM_STATE,
        "cvar_convention": "optimizer models loss (+); evaluation reports tail returns (-), annualized via sqrt(252)",
    },
}

print(json.dumps(bundle, indent=2, sort_keys=True))
