# Instrução 1A-REV3 — Coleta direta Yahoo Chart → Bronze (dry_run)

In [8]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# Instrução 1A-REV3 — Coleta direta Yahoo Chart → Bronze (dry_run)
# Regras:
# - Bloco único, auto-contido.
# - dry_run=True (sem persistência).
# - Provedores em ordem: Yahoo Chart -> yfinance -> Stooq.
# - Sem dados sintéticos.
# - Mensagens normativas: VALIDATION_ERROR / CHECKLIST_FAILURE.

import sys
import json
import time
import math
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple

import pandas as pd
import numpy as np
from pandas.tseries.offsets import BusinessDay as BDay

# =========================
# Parâmetros
# =========================
ROOT_DIR = Path("/home/wrm/BOLSA_2026").resolve()
DRY_RUN = False
TICKER = "^BVSP"

START_DATE_UTC = pd.Timestamp("2012-01-01", tz="UTC")
NOW_UTC = pd.Timestamp(datetime.now(timezone.utc))
END_DATE_UTC = NOW_UTC.normalize()  # 00:00 UTC de hoje
PERIOD2_NOW_UTC = NOW_UTC  # para Yahoo Chart, usar timestamp "agora"

PARQUET_TARGET = ROOT_DIR / "bronze" / "IBOV.parquet"
MANIFESTO_TARGET = ROOT_DIR / "manifestos" / "bronze_ibov_manifesto.csv"

EXPECTED_COLUMNS = ["date", "open", "high", "low", "close", "volume", "ticker"]
EXPECTED_DTYPES = {
    "date": "datetime64[ns]",
    "open": "float64",
    "high": "float64",
    "low": "float64",
    "close": "float64",
    "volume": "int64",
    "ticker": "string",
}

AGORA = datetime.now().astimezone()

# =========================
# Utils
# =========================
def print_section(title: str):
    print("\n" + "=" * 8 + f" {title} " + "=" * 8)

def dtypes_signature(df: pd.DataFrame) -> Dict[str, str]:
    return {c: str(df.dtypes[c]) for c in df.columns}

def percent_nulls(df: pd.DataFrame) -> Dict[str, float]:
    total = len(df)
    if total == 0:
        return {c: 100.0 for c in df.columns}
    return {c: float(df[c].isna().sum()) * 100.0 / float(total) for c in df.columns}

def to_unix_seconds(ts: pd.Timestamp) -> int:
    if ts.tzinfo is None:
        ts = ts.tz_localize("UTC")
    else:
        ts = ts.tz_convert("UTC")
    return int(ts.timestamp())

def bronze_normalize(
    df_pre: pd.DataFrame,
    ticker: str,
    start_utc: pd.Timestamp,
    end_utc: pd.Timestamp
) -> Tuple[pd.DataFrame, Dict[str, int]]:
    """
    df_pre: espera colunas ['date','open','high','low','close','volume'] (date pode ser datetime ou epoch já convertido)
    Retorna df_final no schema Bronze + contagens de limpeza.
    """
    df = df_pre.copy()

    # Garantir colunas
    for c in ["date", "open", "high", "low", "close", "volume"]:
        if c not in df.columns:
            raise RuntimeError(f"SCHEMA_ERROR: coluna ausente em df_pre: {c}")

    # Date -> datetime naive normalizado 00:00
    df["date"] = pd.to_datetime(df["date"], errors="coerce", utc=True).dt.tz_localize(None).dt.normalize()

    # Tipos numéricos
    for c in ["open", "high", "low", "close"]:
        df[c] = pd.to_numeric(df[c], errors="coerce")

    # Contagens antes da limpeza
    rows_before_cleaning = int(len(df))

    # Remover linhas com qualquer OHLC nulo
    mask_ohlc_notna = (~df["open"].isna()) & (~df["high"].isna()) & (~df["low"].isna()) & (~df["close"].isna())
    df = df[mask_ohlc_notna].copy()
    rows_after_cleaning = int(len(df))
    rows_dropped_ohlc = int(rows_before_cleaning - rows_after_cleaning)

    # Volume: NaN -> 0, int64
    df["volume"] = pd.to_numeric(df["volume"], errors="coerce").fillna(0).astype("int64")

    # Forçar dtype float64 para OHLC
    for c in ["open", "high", "low", "close"]:
        df[c] = df[c].astype("float64")

    # ticker
    df["ticker"] = pd.Series([ticker] * len(df), dtype="string").astype("string")

    # Filtrar intervalo [start, end]
    start_naive = start_utc.tz_convert(None).tz_localize(None) if start_utc.tzinfo is not None else start_utc
    end_naive = end_utc.tz_convert(None).tz_localize(None) if end_utc.tzinfo is not None else end_utc
    df = df[(df["date"] >= start_naive) & (df["date"] <= end_naive)].copy()

    # Ordenar, deduplicar por date
    df = df.sort_values("date").drop_duplicates(subset=["date"], keep="last").reset_index(drop=True)

    # Reordenar colunas
    df = df[["date", "open", "high", "low", "close", "volume", "ticker"]]

    stats = {
        "rows_before_cleaning": rows_before_cleaning,
        "rows_after_cleaning": rows_after_cleaning,
        "rows_dropped_ohlc": rows_dropped_ohlc,
    }
    return df, stats

# =========================
# Provedores
# =========================
def fetch_yahoo_chart_direct(
    ticker: str,
    start_utc: pd.Timestamp,
    period2_now_utc: pd.Timestamp,
    retries: int = 2,
    backoff_seconds: List[float] = [0.8, 1.6]
) -> Tuple[Optional[pd.DataFrame], Dict[str, int], List[Dict[str, Any]]]:
    """
    Coleta direto do endpoint Chart do Yahoo.
    Retorna (df_final, stats, attempts).
    """
    attempts: List[Dict[str, Any]] = []
    df_final: Optional[pd.DataFrame] = None
    stats: Dict[str, int] = {"rows_before_cleaning": 0, "rows_after_cleaning": 0, "rows_dropped_ohlc": 0}

    base_url = "https://query2.finance.yahoo.com/v8/finance/chart/%5EBVSP"
    params = {
        "period1": str(to_unix_seconds(start_utc)),
        "period2": str(to_unix_seconds(period2_now_utc)),
        "interval": "1d",
        "events": "history",
        "includeAdjustedClose": "false",
    }

    for i in range(retries):
        try:
            # Prefer requests se disponível; caso contrário, urllib
            try:
                import requests  # type: ignore
                r = requests.get(base_url, params=params, headers={"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) Python"}, timeout=5)
                status_code = r.status_code
                if status_code < 200 or status_code >= 400:
                    raise RuntimeError(f"HTTP_STATUS_{status_code}")
                data = r.json()
            except Exception as e_req:
                # fallback para urllib
                try:
                    from urllib.parse import urlencode
                    from urllib.request import Request, urlopen
                    url = base_url + "?" + urlencode(params)
                    req = Request(url, headers={"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) Python"})
                    with urlopen(req, timeout=6) as resp:
                        status_code = getattr(resp, "status", 200)
                        raw = resp.read()
                    data = json.loads(raw.decode("utf-8"))
                except Exception as e_url:
                    raise RuntimeError(f"HTTP_ERROR: {e_req} | URLLIB_FALLBACK: {e_url}")

            # Parse esperado
            if "chart" not in data:
                raise RuntimeError("PARSE_ERROR: chave 'chart' ausente")
            chart = data["chart"]
            if chart.get("error"):
                raise RuntimeError(f"REMOTE_ERROR: {chart.get('error')}")
            results = chart.get("result", [])
            if not results:
                raise RuntimeError("PARSE_ERROR: 'result' vazio")
            res0 = results[0]
            ts = res0.get("timestamp", [])
            inds = res0.get("indicators", {})
            quotes = inds.get("quote", [])
            if not quotes:
                raise RuntimeError("PARSE_ERROR: 'quote[0]' ausente")
            q0 = quotes[0]
            opens = q0.get("open", [])
            highs = q0.get("high", [])
            lows = q0.get("low", [])
            closes = q0.get("close", [])
            vols = q0.get("volume", [])

            n = min(len(ts), len(opens), len(highs), len(lows), len(closes), len(vols))
            if n == 0:
                raise RuntimeError("DATA_EMPTY_ERROR: listas vazias")
            # Construir DataFrame posicional
            df_pre = pd.DataFrame({
                "date": pd.to_datetime(ts[:n], unit="s", utc=True),
                "open": opens[:n],
                "high": highs[:n],
                "low": lows[:n],
                "close": closes[:n],
                "volume": vols[:n],
            })
            # Normalizar Bronze com limpeza
            df_norm, stats = bronze_normalize(df_pre, ticker, START_DATE_UTC, END_DATE_UTC)
            attempts.append({"provider": "yahoo-chart", "attempt": i + 1, "ok": True, "rows": int(len(df_norm)), "exception_message": None})
            df_final = df_norm
            return df_final, stats, attempts
        except Exception as e:
            attempts.append({"provider": "yahoo-chart", "attempt": i + 1, "ok": False, "rows": 0, "exception_message": str(e)})
            if i < retries - 1:
                time.sleep(backoff_seconds[min(i, len(backoff_seconds) - 1)])

    return None, stats, attempts

def fetch_with_yfinance(
    ticker: str,
    start_utc: pd.Timestamp,
    end_utc: pd.Timestamp,
    retries: int = 2,
    backoff_seconds: List[float] = [0.8, 1.6]
) -> Tuple[Optional[pd.DataFrame], Dict[str, int], List[Dict[str, Any]]]:
    attempts: List[Dict[str, Any]] = []
    stats: Dict[str, int] = {"rows_before_cleaning": 0, "rows_after_cleaning": 0, "rows_dropped_ohlc": 0}
    for i in range(retries):
        try:
            try:
                import yfinance as yf  # type: ignore
            except Exception as e_imp:
                attempts.append({"provider": "yfinance", "attempt": i + 1, "ok": False, "rows": 0, "exception_message": f"IMPORT_ERROR: {e_imp}"})
                break
            try:
                start_str = start_utc.tz_localize(None).date().isoformat() if start_utc.tzinfo else start_utc.date().isoformat()
                end_inc = (end_utc + pd.Timedelta(days=1))  # end-exclusive
                end_str = end_inc.tz_localize(None).date().isoformat() if end_inc.tzinfo else end_inc.date().isoformat()
                df_raw = yf.download(
                    tickers=ticker,
                    start=start_str,
                    end=end_str,
                    interval="1d",
                    auto_adjust=False,
                    progress=False,
                    threads=True
                )
                if df_raw is None or df_raw.empty:
                    raise RuntimeError("DATA_EMPTY_ERROR: yfinance retornou vazio")
                # Mapear colunas
                df_raw = df_raw.copy()
                # Lidar com MultiIndex simples: se colunas são ('Open',), etc.
                if isinstance(df_raw.columns, pd.MultiIndex):
                    try:
                        df_raw.columns = [c[-1] if isinstance(c, tuple) else c for c in df_raw.columns.to_list()]
                    except Exception:
                        df_raw.columns = df_raw.columns.get_level_values(-1)
                rename_map = {"Open": "open", "High": "high", "Low": "low", "Close": "close", "Volume": "volume",
                              "open": "open", "high": "high", "low": "low", "close": "close", "volume": "volume"}
                df_raw = df_raw.rename(columns=rename_map)
                need = {"open", "high", "low", "close", "volume"}
                if not need.issubset(set(df_raw.columns)):
                    missing = sorted(list(need - set(df_raw.columns)))
                    raise RuntimeError(f"SCHEMA_ERROR: faltam colunas em yfinance: {missing}")
                df_pre = df_raw.reset_index().rename(columns={"Date": "date", "Datetime": "date"})
                if "date" not in df_pre.columns:
                    # se índice for datetime e não houver 'date' após reset
                    df_pre = df_raw.copy()
                    df_pre["date"] = df_pre.index
                    df_pre = df_pre.reset_index(drop=True)
                df_pre = df_pre[["date", "open", "high", "low", "close", "volume"]]
                df_norm, stats = bronze_normalize(df_pre, ticker, START_DATE_UTC, END_DATE_UTC)
                attempts.append({"provider": "yfinance", "attempt": i + 1, "ok": True, "rows": int(len(df_norm)), "exception_message": None})
                return df_norm, stats, attempts
            except Exception as e_dl:
                attempts.append({"provider": "yfinance", "attempt": i + 1, "ok": False, "rows": 0, "exception_message": str(e_dl)})
                if i < retries - 1:
                    time.sleep(backoff_seconds[min(i, len(backoff_seconds) - 1)])
        except Exception as e:
            attempts.append({"provider": "yfinance", "attempt": i + 1, "ok": False, "rows": 0, "exception_message": str(e)})
            break
    return None, stats, attempts

def fetch_with_stooq(
    ticker: str,
    start_utc: pd.Timestamp,
    end_utc: pd.Timestamp,
    retries: int = 1
) -> Tuple[Optional[pd.DataFrame], Dict[str, int], List[Dict[str, Any]]]:
    attempts: List[Dict[str, Any]] = []
    stats: Dict[str, int] = {"rows_before_cleaning": 0, "rows_after_cleaning": 0, "rows_dropped_ohlc": 0}
    for i in range(retries):
        try:
            try:
                from pandas_datareader import data as dr  # type: ignore
            except Exception as e_imp:
                attempts.append({"provider": "stooq", "attempt": i + 1, "ok": False, "rows": 0, "exception_message": f"IMPORT_ERROR: {e_imp}"})
                break
            try:
                candidates = [ticker, ticker.replace("^", ""), ticker.replace("^", "").lower()]
                df_raw = None
                last_exc = None
                for tk in candidates:
                    try:
                        df_raw = dr.DataReader(tk, "stooq", start=start_utc.tz_localize(None), end=end_utc.tz_localize(None))
                        if df_raw is not None and not df_raw.empty:
                            break
                    except Exception as e2:
                        last_exc = e2
                        continue
                if df_raw is None or df_raw.empty:
                    raise RuntimeError(f"STOOQ_EMPTY: {last_exc}") if last_exc else RuntimeError("STOOQ_EMPTY: retorno vazio")
                # Stooq costuma vir com colunas minúsculas ou 'Open/High/...'
                df_raw = df_raw.sort_index()
                if isinstance(df_raw.columns, pd.MultiIndex):
                    try:
                        df_raw.columns = [c[-1] if isinstance(c, tuple) else c for c in df_raw.columns.to_list()]
                    except Exception:
                        df_raw.columns = df_raw.columns.get_level_values(-1)
                rename_map = {"Open": "open", "High": "high", "Low": "low", "Close": "close", "Volume": "volume",
                              "open": "open", "high": "high", "low": "low", "close": "close", "volume": "volume"}
                df_raw = df_raw.rename(columns=rename_map)
                need = {"open", "high", "low", "close", "volume"}
                if not need.issubset(set(df_raw.columns)):
                    missing = sorted(list(need - set(df_raw.columns)))
                    raise RuntimeError(f"SCHEMA_ERROR: faltam colunas em stooq: {missing}")
                df_pre = df_raw.copy()
                df_pre["date"] = df_pre.index
                df_pre = df_pre.reset_index(drop=True)
                df_pre = df_pre[["date", "open", "high", "low", "close", "volume"]]
                df_norm, stats = bronze_normalize(df_pre, ticker, START_DATE_UTC, END_DATE_UTC)
                attempts.append({"provider": "stooq", "attempt": i + 1, "ok": True, "rows": int(len(df_norm)), "exception_message": None})
                return df_norm, stats, attempts
            except Exception as e_dl:
                attempts.append({"provider": "stooq", "attempt": i + 1, "ok": False, "rows": 0, "exception_message": str(e_dl)})
                break
        except Exception as e:
            attempts.append({"provider": "stooq", "attempt": i + 1, "ok": False, "rows": 0, "exception_message": str(e)})
            break
    return None, stats, attempts

# =========================
# Validações & Plano
# =========================
def validate_schema(df: pd.DataFrame) -> List[str]:
    erros = []
    if list(df.columns) != EXPECTED_COLUMNS:
        erros.append(f"VALIDATION_ERROR: schema de colunas incorreto. Esperado={EXPECTED_COLUMNS} Obtido={list(df.columns)}")
    dts = dtypes_signature(df)
    for c, dt_expected in EXPECTED_DTYPES.items():
        if c not in dts:
            erros.append(f"VALIDATION_ERROR: coluna ausente no DataFrame: {c}")
            continue
        got = dts[c]
        if c == "ticker":
            if not got.startswith("string"):
                erros.append(f"VALIDATION_ERROR: dtype incorreto para ticker. Esperado=string Obtido={got}")
        else:
            if got != dt_expected:
                erros.append(f"VALIDATION_ERROR: dtype incorreto para {c}. Esperado={dt_expected} Obtido={got}")
    if df["ticker"].isna().any():
        erros.append("VALIDATION_ERROR: ticker contém valores nulos (deve ser 0%).")
    return erros

def validate_quality(df: pd.DataFrame) -> List[str]:
    erros = []
    if len(df) < 2500:
        erros.append(f"VALIDATION_ERROR: cobertura insuficiente — linhas={len(df)} (< 2500)")
    pn = percent_nulls(df)
    for col in ["date", "close", "ticker"]:
        if round(pn.get(col, 100.0), 6) != 0.0:
            erros.append(f"VALIDATION_ERROR: % nulos em {col} deve ser 0%, obtido={pn.get(col, 100.0):.6f}%")
    dups = int(df.duplicated(subset=["date"]).sum())
    if dups != 0:
        erros.append(f"VALIDATION_ERROR: duplicatas por date detectadas (= {dups})")
    if not df["date"].is_monotonic_increasing:
        erros.append("VALIDATION_ERROR: coluna date não é monotônica crescente.")
    return erros

def validate_interval_with_tolerance(df: pd.DataFrame, start_utc: pd.Timestamp) -> Tuple[List[str], Dict[str, Any]]:
    erros = []
    if df.empty:
        return ["VALIDATION_ERROR: DataFrame vazio após ingestão."], {"date_min": None, "date_max": None, "start_verdict": "FAIL", "end_verdict": "FAIL"}
    dmin = pd.to_datetime(df["date"].min())
    dmax = pd.to_datetime(df["date"].max())
    required_start = start_utc.tz_convert(None).tz_localize(None) if start_utc.tzinfo else start_utc
    start_tol_max = (required_start + BDay(5)).to_pydatetime().date()
    start_ok = dmin <= pd.Timestamp(start_tol_max).to_pydatetime()
    if not start_ok:
        erros.append(f"VALIDATION_ERROR: date.min ({dmin.date().isoformat()}) > tolerância de início ({start_tol_max.isoformat()})")
    required_end_min = (pd.Timestamp(datetime.now(timezone.utc)).normalize() - pd.Timedelta(days=3)).tz_localize(None)
    end_ok = dmax >= required_end_min
    if not end_ok:
        erros.append(f"VALIDATION_ERROR: date.max ({dmax.date().isoformat()}) < requerido mínimo ({required_end_min.date().isoformat()}) (tolerância 3 dias)")
    info = {
        "date_min": dmin,
        "date_max": dmax,
        "required_start": required_start,
        "start_tolerance_max": pd.Timestamp(start_tol_max),
        "required_end_min": required_end_min,
        "start_verdict": "OK" if start_ok else "FAIL",
        "end_verdict": "OK" if end_ok else "FAIL",
    }
    return erros, info

def build_persistence_plan(df: pd.DataFrame) -> Dict[str, Any]:
    years = sorted(pd.to_datetime(df["date"]).dt.year.unique().tolist())
    partitions = [f"year={y}" for y in years]
    manifesto_header = ["timestamp", "ticker", "rows_total", "date_min", "date_max", "columns_json", "partitions_json", "target_path"]
    manifesto_row = [
        AGORA.isoformat(),
        TICKER,
        int(len(df)),
        str(pd.to_datetime(df["date"]).min()),
        str(pd.to_datetime(df["date"]).max()),
        json.dumps(EXPECTED_COLUMNS, ensure_ascii=False),
        json.dumps(partitions, ensure_ascii=False),
        str(PARQUET_TARGET),
    ]
    return {
        "parquet_target": str(PARQUET_TARGET),
        "partitions": partitions,
        "manifesto_path": str(MANIFESTO_TARGET),
        "manifesto_header": ",".join(manifesto_header),
        "manifesto_row_sample": ",".join([str(x) for x in manifesto_row]),
    }

# =========================
# Execução Principal
# =========================
def main():
    provider_attempts: List[Dict[str, Any]] = []
    erros_normativos: List[str] = []

    bronze_ibov: Optional[pd.DataFrame] = None
    used_provider: Optional[str] = None
    cleaning_stats: Dict[str, int] = {"rows_before_cleaning": 0, "rows_after_cleaning": 0, "rows_dropped_ohlc": 0}

    # P1: Yahoo Chart
    df_yc, stats_yc, attempts_yc = fetch_yahoo_chart_direct(TICKER, START_DATE_UTC, PERIOD2_NOW_UTC)
    provider_attempts.extend(attempts_yc)
    if df_yc is not None and not df_yc.empty:
        bronze_ibov = df_yc
        used_provider = "yahoo-chart"
        cleaning_stats = stats_yc
    else:
        # P2: yfinance (apenas se P1 falhar)
        df_yf, stats_yf, attempts_yf = fetch_with_yfinance(TICKER, START_DATE_UTC, END_DATE_UTC)
        provider_attempts.extend(attempts_yf)
        if df_yf is not None and not df_yf.empty:
            bronze_ibov = df_yf
            used_provider = "yfinance"
            cleaning_stats = stats_yf
        else:
            # P3: stooq (apenas se P1 e P2 falharem)
            df_stq, stats_stq, attempts_stq = fetch_with_stooq(TICKER, START_DATE_UTC, END_DATE_UTC)
            provider_attempts.extend(attempts_stq)
            if df_stq is not None and not df_stq.empty:
                bronze_ibov = df_stq
                used_provider = "stooq"
                cleaning_stats = stats_stq

    # Se todos falharem
    if bronze_ibov is None or bronze_ibov.empty:
        print_section("PROVEDORES E TENTATIVAS")
        print(json.dumps(provider_attempts, ensure_ascii=False, indent=2))
        # Selecionar a exceção mais informativa (última não-ok com mensagem)
        last_err = None
        for att in reversed(provider_attempts):
            if not att.get("ok") and att.get("exception_message"):
                last_err = att.get("exception_message")
                break
        print(f"VALIDATION_ERROR: PROVIDERS_EXHAUSTED — {last_err if last_err else 'sem mensagem detalhada.'}")
        print_section("CHECKLIST")
        checklist = {
            "provider_attempts_listed": "ok",
            "schema_columns_and_dtypes_exact": "falha",
            "interval_tolerance_verdicts": "falha",
            "quality_nulls_and_duplicates": "falha",
            "sample_head_tail_presented": "falha",
            "counts_included": "falha",
            "persistence_plan_simulated": "ok",
        }
        print(json.dumps(checklist, ensure_ascii=False, indent=2))
        for k, v in checklist.items():
            if v != "ok":
                print(f"CHECKLIST_FAILURE: {k} não atendido.")
        print_section("DÚVIDAS OBJETIVAS")
        print("- Rede pode estar bloqueada para Yahoo/Stooq? Há Proxy que devamos configurar?")
        print("- Deseja fornecer outro provedor (AlphaVantage/Polygon) com chave?")
        print("- Autoriza aumentar timeouts/backoff e tentar novamente?")
        return

    # Reforço de tipos/order e ticker
    bronze_ibov = bronze_ibov.copy()
    bronze_ibov["date"] = pd.to_datetime(bronze_ibov["date"], errors="coerce").dt.normalize()
    for c in ["open", "high", "low", "close"]:
        bronze_ibov[c] = pd.to_numeric(bronze_ibov[c], errors="coerce").astype("float64")
    bronze_ibov["volume"] = pd.to_numeric(bronze_ibov["volume"], errors="coerce").fillna(0).astype("int64")
    bronze_ibov["ticker"] = pd.Series([TICKER] * len(bronze_ibov), dtype="string").astype("string")
    bronze_ibov = bronze_ibov[EXPECTED_COLUMNS].sort_values("date").drop_duplicates(subset=["date"], keep="last").reset_index(drop=True)

    # Validações
    schema_errors = validate_schema(bronze_ibov)
    qual_errors = validate_quality(bronze_ibov)
    interval_errors, interval_info = validate_interval_with_tolerance(bronze_ibov, START_DATE_UTC)
    erros_normativos.extend(schema_errors + qual_errors + interval_errors)

    # Métricas
    total_linhas = int(len(bronze_ibov))
    dias_unicos = int(bronze_ibov["date"].nunique()) if total_linhas > 0 else 0
    dias_vol_zero = int((bronze_ibov["volume"] == 0).sum()) if total_linhas > 0 else 0
    pct_nulos = percent_nulls(bronze_ibov)
    dups_by_date = int(bronze_ibov.duplicated(subset=["date"]).sum())

    # Plano de persistência (simulado)
    persist_plan = build_persistence_plan(bronze_ibov)

    # Relatórios
    print_section("PROVEDOR E TENTATIVAS")
    print(json.dumps({"provider_used": used_provider, "rows_returned": total_linhas}, ensure_ascii=False, indent=2))
    print(json.dumps(provider_attempts, ensure_ascii=False, indent=2))

    print_section("SCHEMA (EXATO)")
    schema_out = {
        "columns_expected": EXPECTED_COLUMNS,
        "columns_obtained": list(bronze_ibov.columns),
        "dtypes_obtained": dtypes_signature(bronze_ibov),
        "nulls_percent": {k: round(v, 6) for k, v in pct_nulos.items()},
        "ticker_dtype_is_string": str(bronze_ibov.dtypes["ticker"]).startswith("string"),
        "ticker_nulls_percent": round(pct_nulos.get("ticker", 100.0), 6),
    }
    print(json.dumps(schema_out, ensure_ascii=False, indent=2))

    print_section("INTERVALO TEMPORAL (com tolerâncias)")
    interval_out = {
        "required_start": str(interval_info["required_start"]) if interval_info["date_min"] is not None else None,
        "start_tolerance_max": str(interval_info["start_tolerance_max"]) if interval_info["date_min"] is not None else None,
        "required_end_min": str(interval_info["required_end_min"]) if interval_info["date_max"] is not None else None,
        "date_min": str(pd.to_datetime(interval_info["date_min"])) if interval_info["date_min"] is not None else None,
        "date_max": str(pd.to_datetime(interval_info["date_max"])) if interval_info["date_max"] is not None else None,
        "start_verdict": interval_info.get("start_verdict", "FAIL"),
        "end_verdict": interval_info.get("end_verdict", "FAIL"),
    }
    print(json.dumps(interval_out, ensure_ascii=False, indent=2))

    print_section("QUALIDADE")
    qualidade_out = {
        "percent_nulls": {k: round(v, 6) for k, v in pct_nulos.items()},
        "duplicates_by_date": dups_by_date,
        "constraints": {
            "nulls_must_be_zero_in": {"date": True, "close": True, "ticker": True},
            "duplicates_by_date_must_be_zero": True,
            "min_rows_required": 2500,
            "date_monotonic_increasing": True
        }
    }
    print(json.dumps(qualidade_out, ensure_ascii=False, indent=2))

    print_section("AMOSTRA — HEAD(10)")
    print(bronze_ibov[["date", "close", "volume", "ticker"]].head(10).to_string(index=False))

    print_section("AMOSTRA — TAIL(10)")
    print(bronze_ibov[["date", "close", "volume", "ticker"]].tail(10).to_string(index=False))

    print_section("CONTAGENS")
    print(json.dumps({
        "rows_before_cleaning": cleaning_stats.get("rows_before_cleaning", 0),
        "rows_dropped_ohlc": cleaning_stats.get("rows_dropped_ohlc", 0),
        "rows_after_cleaning": cleaning_stats.get("rows_after_cleaning", 0),
        "unique_days": dias_unicos,
        "days_with_volume_zero": dias_vol_zero,
        "final_rows": total_linhas
    }, ensure_ascii=False, indent=2))

    print_section("PLANO DE PERSISTÊNCIA (SIMULADO)")
    print(json.dumps({
        "dry_run": DRY_RUN,
        "parquet_target": persist_plan["parquet_target"],
        "partitions": persist_plan["partitions"],
        "manifesto_path": persist_plan["manifesto_path"],
        "manifesto_header": persist_plan["manifesto_header"],
        "manifesto_row_sample": persist_plan["manifesto_row_sample"],
        "nota": "Nenhuma escrita realizada em dry_run=True."
    }, ensure_ascii=False, indent=2))

    # Erros normativos (se houver)
    if erros_normativos:
        print_section("ERROS NORMATIVOS")
        seen = set()
        ordered = []
        for e in erros_normativos:
            if e not in seen:
                seen.add(e)
                ordered.append(e)
        for e in ordered:
            if not (str(e).startswith("VALIDATION_ERROR") or str(e).startswith("CHECKLIST_FAILURE")):
                print(f"VALIDATION_ERROR: {e}")
            else:
                print(e)

    # Checklist
    print_section("CHECKLIST")
    schema_ok = (len(schema_errors := schema_errors if 'schema_errors' in locals() else validate_schema(bronze_ibov)) == 0)  # revalida se necessário
    interval_ok = (len(interval_errors) == 0 and interval_info.get("start_verdict") == "OK" and interval_info.get("end_verdict") == "OK")
    quality_ok = (len(qual_errors := qual_errors if 'qual_errors' in locals() else validate_quality(bronze_ibov)) == 0)
    sample_ok = (total_linhas > 0)
    counts_ok = True  # contagens sempre apresentadas
    attempts_ok = True
    plan_ok = True

    checklist = {
        "provider_attempts_listed": "ok" if attempts_ok else "falha",
        "schema_columns_and_dtypes_exact": "ok" if schema_ok else "falha",
        "interval_tolerance_verdicts": "ok" if interval_ok else "falha",
        "quality_nulls_and_duplicates": "ok" if quality_ok else "falha",
        "sample_head_tail_presented": "ok" if sample_ok else "falha",
        "counts_included": "ok" if counts_ok else "falha",
        "persistence_plan_simulated": "ok" if plan_ok else "falha",
    }
    print(json.dumps(checklist, ensure_ascii=False, indent=2))
    for k, v in checklist.items():
        if v != "ok":
            print(f"CHECKLIST_FAILURE: {k} não atendido.")

    # Estrutura do Resultado (info)
    print_section("ESTRUTURA DO RESULTADO (info)")
    resultado = {
        "ticker": TICKER,
        "periodo": {"start": str(START_DATE_UTC.tz_localize(None)), "end": str(END_DATE_UTC.tz_localize(None))},
        "dry_run": DRY_RUN,
        "timestamp_execucao": AGORA.isoformat(),
        "dataframe_name": "bronze_ibov",
        "columns": EXPECTED_COLUMNS,
        "dtypes": dtypes_signature(bronze_ibov),
        "provider_used": used_provider,
        "status": "sucesso" if not erros_normativos and all(v == "ok" for v in checklist.values()) else "falha"
    }
    print(json.dumps(resultado, ensure_ascii=False, indent=2))

if __name__ == "__main__":
    # Contrato
    # - Coleta direta Yahoo Chart (requests/stdlib) → yfinance → stooq (sem dados sintéticos)
    # - Normalização Bronze e validações: schema, qualidade, tolerâncias de calendário
    # - Planos de persistência (simulados), checklist e mensagens normativas
    main()


{
  "provider_used": "yahoo-chart",
  "rows_returned": 3400
}
[
  {
    "provider": "yahoo-chart",
    "attempt": 1,
    "ok": true,
    "rows": 3400,
    "exception_message": null
  }
]

{
  "columns_expected": [
    "date",
    "open",
    "high",
    "low",
    "close",
    "volume",
    "ticker"
  ],
  "columns_obtained": [
    "date",
    "open",
    "high",
    "low",
    "close",
    "volume",
    "ticker"
  ],
  "dtypes_obtained": {
    "date": "datetime64[ns]",
    "open": "float64",
    "high": "float64",
    "low": "float64",
    "close": "float64",
    "volume": "int64",
    "ticker": "string"
  },
  "nulls_percent": {
    "date": 0.0,
    "open": 0.0,
    "high": 0.0,
    "low": 0.0,
    "close": 0.0,
    "volume": 0.0,
    "ticker": 0.0
  },
  "ticker_dtype_is_string": true,
  "ticker_nulls_percent": 0.0
}

{
  "required_start": "2012-01-01 00:00:00",
  "start_tolerance_max": "2012-01-06 00:00:00",
  "required_end_min": "2025-09-16 00:00:00",
  "date_min": "2012-01-03 00

# Instrução 1B-RETRY — Persistir Bronze “valendo” (escrita real + manifesto)

In [None]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# Instrução 1B-RETRY — Persistir Bronze “valendo” (escrita real + manifesto)
# Regras:
# - Bloco único, auto-contido.
# - dry_run desligado nesta célula (escrita real).
# - Usar bronze_ibov em memória; se ausente, reingestar silenciosamente (Yahoo Chart → yfinance → stooq).
# - Parquet particionado por year=YYYY, compressão snappy, overwrite-by-partition.
# - Manifesto: criar se faltar; adicionar linha com metadados (sem hashes).
# - Mensagens normativas: VALIDATION_ERROR / CHECKLIST_FAILURE.
# - Em 2 falhas consecutivas, parar e emitir dúvidas objetivas.

import os
import sys
import json
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple

import pandas as pd
import numpy as np

# =========================
# Parâmetros (SSOT)
# =========================
ROOT_DIR = Path("/home/wrm/BOLSA_2026").resolve()
PARQUET_TARGET = ROOT_DIR / "bronze" / "IBOV.parquet"  # diretório de dataset particionado (hive: year=YYYY)
MANIFESTO_PATH = ROOT_DIR / "manifestos" / "bronze_ibov_manifesto.csv"
TICKER = "^BVSP"
DRY_RUN = False  # escrita real nesta etapa

EXPECTED_COLUMNS = ["date", "open", "high", "low", "close", "volume", "ticker"]
AGORA_TZ = datetime.now().astimezone()
START_DATE_UTC = pd.Timestamp("2012-01-01", tz="UTC")
NOW_UTC = pd.Timestamp(datetime.now(timezone.utc))
END_DATE_UTC = NOW_UTC.normalize()

# =========================
# Utils
# =========================
def print_section(title: str):
    print("\n" + "=" * 8 + f" {title} " + "=" * 8)

def dtypes_signature(df: pd.DataFrame) -> Dict[str, str]:
    return {c: str(df.dtypes[c]) for c in df.columns}

def percent_nulls(df: pd.DataFrame) -> Dict[str, float]:
    total = len(df)
    if total == 0:
        return {c: 100.0 for c in df.columns}
    return {c: float(df[c].isna().sum()) * 100.0 / float(total) for c in df.columns}

def to_unix_seconds(ts: pd.Timestamp) -> int:
    if ts.tzinfo is None:
        ts = ts.tz_localize("UTC")
    else:
        ts = ts.tz_convert("UTC")
    return int(ts.timestamp())

def bronze_normalize(df_pre: pd.DataFrame, ticker: str) -> pd.DataFrame:
    # Espera colunas: date, open, high, low, close, volume
    need = {"date", "open", "high", "low", "close", "volume"}
    if not need.issubset(df_pre.columns):
        missing = sorted(list(need - set(df_pre.columns)))
        raise RuntimeError(f"SCHEMA_ERROR: colunas ausentes: {missing}")
    df = df_pre.copy()
    df["date"] = pd.to_datetime(df["date"], errors="coerce", utc=True).dt.tz_localize(None).dt.normalize()
    for c in ["open", "high", "low", "close"]:
        df[c] = pd.to_numeric(df[c], errors="coerce").astype("float64")
    df["volume"] = pd.to_numeric(df["volume"], errors="coerce").fillna(0).astype("int64")
    df["ticker"] = pd.Series([ticker] * len(df), dtype="string").astype("string")
    # limpeza: remover OHLC nulos
    mask_ohlc = (~df["open"].isna()) & (~df["high"].isna()) & (~df["low"].isna()) & (~df["close"].isna())
    df = df[mask_ohlc].copy()
    # ordenar, deduplicar por date
    df = df.sort_values("date").drop_duplicates(subset=["date"], keep="last").reset_index(drop=True)
    # filtrar intervalo#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# Instrução 1B-RETRY — Persistir Bronze “valendo” (escrita real + manifesto)
# Regras:
# - Bloco único, auto-contido.
# - dry_run desligado nesta célula (escrita real).
# - Usar bronze_ibov em memória; se ausente, reingestar silenciosamente (Yahoo Chart → yfinance → stooq).
# - Parquet particionado por year=YYYY, compressão snappy, overwrite-by-partition.
# - Manifesto: criar se faltar; adicionar linha com metadados (sem hashes).
# - Mensagens normativas: VALIDATION_ERROR / CHECKLIST_FAILURE.
# - Em 2 falhas consecutivas, parar e emitir dúvidas objetivas.

import os
import sys
import json
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple

import pandas as pd
import numpy as np

# =========================
# Parâmetros (SSOT)
# =========================
ROOT_DIR = Path("/home/wrm/BOLSA_2026").resolve()
PARQUET_TARGET = ROOT_DIR / "bronze" / "IBOV.parquet"  # diretório de dataset particionado (hive: year=YYYY)
MANIFESTO_PATH = ROOT_DIR / "manifestos" / "bronze_ibov_manifesto.csv"
TICKER = "^BVSP"
DRY_RUN = False  # escrita real nesta etapa

EXPECTED_COLUMNS = ["date", "open", "high", "low", "close", "volume", "ticker"]
AGORA_TZ = datetime.now().astimezone()
START_DATE_UTC = pd.Timestamp("2012-01-01", tz="UTC")
NOW_UTC = pd.Timestamp(datetime.now(timezone.utc))
END_DATE_UTC = NOW_UTC.normalize()

# =========================
# Utils
# =========================
def print_section(title: str):
    print("\n" + "=" * 8 + f" {title} " + "=" * 8)

def dtypes_signature(df: pd.DataFrame) -> Dict[str, str]:
    return {c: str(df.dtypes[c]) for c in df.columns}

def percent_nulls(df: pd.DataFrame) -> Dict[str, float]:
    total = len(df)
    if total == 0:
        return {c: 100.0 for c in df.columns}
    return {c: float(df[c].isna().sum()) * 100.0 / float(total) for c in df.columns}

def to_unix_seconds(ts: pd.Timestamp) -> int:
    if ts.tzinfo is None:
        ts = ts.tz_localize("UTC")
    else:
        ts = ts.tz_convert("UTC")
    return int(ts.timestamp())

def bronze_normalize(df_pre: pd.DataFrame, ticker: str) -> pd.DataFrame:
    # Espera colunas: date, open, high, low, close, volume
    need = {"date", "open", "high", "low", "close", "volume"}
    if not need.issubset(df_pre.columns):
        missing = sorted(list(need - set(df_pre.columns)))
        raise RuntimeError(f"SCHEMA_ERROR: colunas ausentes: {missing}")
    df = df_pre.copy()
    df["date"] = pd.to_datetime(df["date"], errors="coerce", utc=True).dt.tz_localize(None).dt.normalize()
    for c in ["open", "high", "low", "close"]:
        df[c] = pd.to_numeric(df[c], errors="coerce").astype("float64")
    df["volume"] = pd.to_numeric(df["volume"], errors="coerce").fillna(0).astype("int64")
    df["ticker"] = pd.Series([ticker] * len(df), dtype="string").astype("string")
    # limpeza: remover OHLC nulos
    mask_ohlc = (~df["open"].isna()) & (~df["high"].isna()) & (~df["low"].isna()) & (~df["close"].isna())
    df = df[mask_ohlc].copy()
    # ordenar, deduplicar por date
    df = df.sort_values("date").drop_duplicates(subset=["date"], keep="last").reset_index(drop=True)
    # filtrar intervalo
    start_naive = START_DATE_UTC.tz_convert(None).tz_localize(None) if START_DATE_UTC.tzinfo else START_DATE_UTC
    end_naive = END_DATE_UTC.tz_convert(None).tz_localize(None) if END_DATE_UTC.tzinfo else END_DATE_UTC
    df = df[(df["date"] >= start_naive) & (df["date"] <= end_naive)].copy()
    # coluna e ordem final
    df = df[EXPECTED_COLUMNS]
    return df

# =========================
# Reingestão silenciosa (apenas se bronze_ibov não existir)
# =========================
def fetch_yahoo_chart_silent(ticker: str) -> Optional[pd.DataFrame]:
    try:
        base_url = "https://query2.finance.yahoo.com/v8/finance/chart/%5EBVSP"
        params = {
            "period1": str(to_unix_seconds(START_DATE_UTC)),
            "period2": str(to_unix_seconds(NOW_UTC)),
            "interval": "1d",
            "events": "history",
            "includeAdjustedClose": "false",
        }
        try:
            import requests  # type: ignore
            r = requests.get(base_url, params=params, headers={"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) Python"}, timeout=6)
            if r.status_code < 200 or r.status_code >= 400:
                return None
            data = r.json()
        except Exception:
            from urllib.parse import urlencode
            from urllib.request import Request, urlopen
            url = base_url + "?" + urlencode(params)
            req = Request(url, headers={"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) Python"})
            with urlopen(req, timeout=8) as resp:
                raw = resp.read()
            import json as _json
            data = _json.loads(raw.decode("utf-8"))
        if "chart" not in data or not data["chart"].get("result"):
            return None
        res0 = data["chart"]["result"][0]
        ts = res0.get("timestamp", []) or []
        q = (res0.get("indicators", {}) or {}).get("quote", []) or []
        if not q:
            return None
        q0 = q[0]
        opens = q0.get("open", []) or []
        highs = q0.get("high", []) or []
        lows = q0.get("low", []) or []
        closes = q0.get("close", []) or []
        vols = q0.get("volume", []) or []
        n = min(len(ts), len(opens), len(highs), len(lows), len(closes), len(vols))
        if n == 0:
            return None
        df_pre = pd.DataFrame({
            "date": pd.to_datetime(ts[:n], unit="s", utc=True),
            "open": opens[:n],
            "high": highs[:n],
            "low": lows[:n],
            "close": closes[:n],
            "volume": vols[:n],
        })
        return bronze_normalize(df_pre, ticker)
    except Exception:
        return None

def fetch_yfinance_silent(ticker: str) -> Optional[pd.DataFrame]:
    try:
        try:
            import yfinance as yf  # type: ignore
        except Exception:
            return None
        start_str = START_DATE_UTC.tz_localize(None).date().isoformat() if START_DATE_UTC.tzinfo else START_DATE_UTC.date().isoformat()
        end_inc = (END_DATE_UTC + pd.Timedelta(days=1))
        end_str = end_inc.tz_localize(None).date().isoformat() if end_inc.tzinfo else end_inc.date().isoformat()
        df_raw = yf.download(tickers=ticker, start=start_str, end=end_str, interval="1d", auto_adjust=False, progress=False, threads=True)
        if df_raw is None or df_raw.empty:
            return None
        if isinstance(df_raw.columns, pd.MultiIndex):
            try:
                df_raw.columns = [c[-1] if isinstance(c, tuple) else c for c in df_raw.columns.to_list()]
            except Exception:
                df_raw.columns = df_raw.columns.get_level_values(-1)
        rename_map = {"Open": "open", "High": "high", "Low": "low", "Close": "close", "Volume": "volume",
                      "open": "open", "high": "high", "low": "low", "close": "close", "volume": "volume"}
        df_raw = df_raw.rename(columns=rename_map)
        need = {"open", "high", "low", "close", "volume"}
        if not need.issubset(set(df_raw.columns)):
            return None
        df_pre = df_raw.reset_index().rename(columns={"Date": "date", "Datetime": "date"})
        if "date" not in df_pre.columns:
            df_pre = df_raw.copy()
            df_pre["date"] = df_pre.index
            df_pre = df_pre.reset_index(drop=True)
        df_pre = df_pre[["date", "open", "high", "low", "close", "volume"]]
        return bronze_normalize(df_pre, ticker)
    except Exception:
        return None

def fetch_stooq_silent(ticker: str) -> Optional[pd.DataFrame]:
    try:
        try:
            from pandas_datareader import data as dr  # type: ignore
        except Exception:
            return None
        candidates = [ticker, ticker.replace("^", ""), ticker.replace("^", "").lower()]
        df_raw = None
        last_exc = None
        for tk in candidates:
            try:
                df_raw = dr.DataReader(tk, "stooq", start=START_DATE_UTC.tz_localize(None), end=END_DATE_UTC.tz_localize(None))
                if df_raw is not None and not df_raw.empty:
                    break
            except Exception as e:
                last_exc = e
                continue
        if df_raw is None or df_raw.empty:
            return None
        df_raw = df_raw.sort_index()
        if isinstance(df_raw.columns, pd.MultiIndex):
            try:
                df_raw.columns = [c[-1] if isinstance(c, tuple) else c for c in df_raw.columns.to_list()]
            except Exception:
                df_raw.columns = df_raw.columns.get_level_values(-1)
        rename_map = {"Open": "open", "High": "high", "Low": "low", "Close": "close", "Volume": "volume",
                      "open": "open", "high": "high", "low": "low", "close": "close", "volume": "volume"}
        df_raw = df_raw.rename(columns=rename_map)
        need = {"open", "high", "low", "close", "volume"}
        if not need.issubset(set(df_raw.columns)):
            return None
        df_pre = df_raw.copy()
        df_pre["date"] = df_pre.index
        df_pre = df_pre.reset_index(drop=True)[["date", "open", "high", "low", "close", "volume"]]
        return bronze_normalize(df_pre, ticker)
    except Exception:
        return None

def ensure_bronze_in_memory() -> Tuple[Optional[pd.DataFrame], List[str]]:
    msgs: List[str] = []
    g = globals()
    if "bronze_ibov" in g and isinstance(g["bronze_ibov"], pd.DataFrame):
        df = g["bronze_ibov"].copy()
        # Reforçar schema/dtypes
        try:
            df["date"] = pd.to_datetime(df["date"], errors="coerce").dt.normalize()
            for c in ["open", "high", "low", "close"]:
                df[c] = pd.to_numeric(df[c], errors="coerce").astype("float64")
            df["volume"] = pd.to_numeric(df["volume"], errors="coerce").fillna(0).astype("int64")
            df["ticker"] = df["ticker"].astype("string")
            df = df[EXPECTED_COLUMNS].sort_values("date").drop_duplicates(subset=["date"], keep="last").reset_index(drop=True)
            return df, msgs
        except Exception as e:
            msgs.append(f"INFO: bronze_ibov em memória com inconsistências — {e}; reingestão silenciosa será tentada.")
    # Reingestão silenciosa
    for fn in (fetch_yahoo_chart_silent, fetch_yfinance_silent, fetch_stooq_silent):
        df = fn(TICKER)
        if df is not None and not df.empty:
            return df, msgs
    msgs.append("VALIDATION_ERROR: PROVIDERS_EXHAUSTED — não foi possível reingestar bronze_ibov.")
    return None, msgs

# =========================
# Pré-voo de qualidade
# =========================
def preflight_checks(df: pd.DataFrame) -> Tuple[bool, Dict[str, Any], List[str]]:
    errs: List[str] = []
    details: Dict[str, Any] = {}
    if df is None or df.empty:
        errs.append("VALIDATION_ERROR: DataFrame vazio.")
    else:
        pn = percent_nulls(df)
        details["percent_nulls"] = {k: round(v, 6) for k, v in pn.items()}
        for col in ["date", "close", "ticker"]:
            if round(pn.get(col, 100.0), 6) != 0.0:
                errs.append(f"VALIDATION_ERROR: % nulos em {col} deve ser 0%, obtido={pn.get(col, 100.0):.6f}%")
        dups = int(df.duplicated(subset=["date"]).sum())
        details["duplicates_by_date"] = dups
        if dups != 0:
            errs.append(f"VALIDATION_ERROR: duplicatas por date detectadas (= {dups})")
        if len(df) < 2500:
            errs.append(f"VALIDATION_ERROR: cobertura insuficiente — linhas={len(df)} (< 2500)")
        dmin = pd.to_datetime(df["date"]).min()
        dmax = pd.to_datetime(df["date"]).max()
        details["date_min"] = str(dmin)
        details["date_max"] = str(dmax)
        # Tolerâncias: início ≤ 2012-01-06; fim ≥ hoje(UTC) − 3d
        if dmin > pd.Timestamp("2012-01-06"):
            errs.append(f"VALIDATION_ERROR: date.min ({dmin.date().isoformat()}) > tolerância (2012-01-06)")
        required_end_min = (pd.Timestamp(datetime.now(timezone.utc)).normalize() - pd.Timedelta(days=3)).tz_localize(None)
        if dmax < required_end_min:
            errs.append(f"VALIDATION_ERROR: date.max ({dmax.date().isoformat()}) < requerido mínimo ({required_end_min.date().isoformat()}) (tolerância 3 dias)")
    return (len(errs) == 0), details, errs

# =========================
# Escrita Parquet particionado (overwrite-by-partition)
# =========================
def write_parquet_partitioned(df: pd.DataFrame, base_dir: Path, partition_col: str = "year") -> Tuple[bool, Dict[str, Any], List[str]]:
    """
    Escreve com pyarrow.parquet.write_to_dataset, compressão snappy,
    particionado por 'year', com existing_data_behavior='delete_matching' (overwrite-by-partition).
    Retorna (ok, summary, errors).
    """
    errors: List[str] = []
    summary: Dict[str, Any] = {"years_written": [], "files_per_partition": {}}
    try:
        import pyarrow as pa  # type: ignore
        import pyarrow.parquet as pq  # type: ignore
    except Exception as e:
        errors.append(f"VALIDATION_ERROR: MISSING_DEPENDENCY_PYARROW — {e}")
        return False, summary, errors

    try:
        base_dir.mkdir(parents=True, exist_ok=True)
        df2 = df.copy()
        years = pd.to_datetime(df2["date"]).dt.year.astype("int16")
        df2[partition_col] = years
        table = pa.Table.from_pandas(df2, preserve_index=False)
        # Escreve dataset
        pq.write_to_dataset(
            table=table,
            root_path=str(base_dir),
            partition_cols=[partition_col],
            compression="snappy",
            existing_data_behavior="delete_matching"  # overwrite-by-partition
        )
        # Sumário por partição escrita
        written_years = sorted(pd.unique(years).astype(int).tolist())
        summary["years_written"] = written_years
        files_per = {}
        for y in written_years:
            p = base_dir / f"{partition_col}={y}"
            cnt = 0
            if p.exists() and p.is_dir():
                for _, _, files in os.walk(p):
                    cnt += sum(1 for f in files if f.endswith(".parquet"))
            files_per[str(y)] = cnt
        summary["files_per_partition"] = files_per
        return True, summary, errors
    except Exception as e:
        errors.append(f"VALIDATION_ERROR: PARQUET_WRITE_ERROR — {e}")
        return False, summary, errors

# =========================
# Reabertura pós-escrita
# =========================
def reopen_dataset_summary(base_dir: Path) -> Tuple[Optional[pd.DataFrame], Dict[str, Any], List[str]]:
    errors: List[str] = []
    info: Dict[str, Any] = {"rows_total": 0, "min_date": None, "max_date": None}
    try:
        import pyarrow.dataset as ds  # type: ignore
        dataset = ds.dataset(str(base_dir), format="parquet")
        table = dataset.to_table()
        df = table.to_pandas()
    except Exception as e1:
        errors.append(f"READ_ERROR_PA_DS: {e1}")
        try:
            df = pd.read_parquet(str(base_dir))
        except Exception as e2:
            errors.append(f"READ_ERROR_PD_RP: {e2}")
            return None, info, errors
    # Normaliza e resume
    if "date" in df.columns:
        df["date"] = pd.to_datetime(df["date"], errors="coerce").dt.tz_localize(None).dt.normalize()
    info["rows_total"] = int(len(df))
    info["min_date"] = str(pd.to_datetime(df["date"]).min()) if "date" in df.columns and not df["date"].isna().all() else None
    info["max_date"] = str(pd.to_datetime(df["date"]).max()) if "date" in df.columns and not df["date"].isna().all() else None
    return df, info, errors

# =========================
# Manifesto (append ou create)
# =========================
def append_manifesto_row(
    manifesto_path: Path,
    ticker: str,
    df_written: pd.DataFrame,
    target_path: Path
) -> Tuple[bool, Optional[str], List[str]]:
    """
    Acrescenta uma linha ao manifesto (cria arquivo se não existir).
    Retorna (ok, csv_line_printed, errors)
    """
    errors: List[str] = []
    try:
        manifesto_path.parent.mkdir(parents=True, exist_ok=True)
        rows_total = int(len(df_written))
        date_min = str(pd.to_datetime(df_written["date"]).min())
        date_max = str(pd.to_datetime(df_written["date"]).max())
        columns_json = json.dumps(EXPECTED_COLUMNS, ensure_ascii=False)
        years = sorted(pd.to_datetime(df_written["date"]).dt.year.unique().astype(int).tolist())
        partitions = [f"year={y}" for y in years]
        partitions_json = json.dumps(partitions, ensure_ascii=False)
        header = ["timestamp", "ticker", "rows_total", "date_min", "date_max", "columns_json", "partitions_json", "target_path"]
        row = [
            AGORA_TZ.isoformat(),
            ticker,
            rows_total,
            date_min,
            date_max,
            columns_json,
            partitions_json,
            str(target_path),
        ]
        # Escrever (append se existir; senão criar com header)
        csv_line = ",".join([str(x) for x in row])
        if not manifesto_path.exists():
            with open(manifesto_path, "w", encoding="utf-8") as f:
                f.write(",".join(header) + "\n")
                f.write(csv_line + "\n")
        else:
            with open(manifesto_path, "a", encoding="utf-8") as f:
                f.write(csv_line + "\n")
        return True, csv_line, errors
    except Exception as e:
        errors.append(f"VALIDATION_ERROR: MANIFESTO_WRITE_ERROR — {e}")
        return False, None, errors

# =========================
# Execução Principal
# =========================
def main():
    consecutive_errors = 0

    # 1) Obter bronze_ibov (memória ou reingestão silenciosa)
    bronze_df, ensure_msgs = ensure_bronze_in_memory()
    if ensure_msgs:
        for m in ensure_msgs:
            print(m)
    if bronze_df is None or bronze_df.empty:
        consecutive_errors += 1
        if consecutive_errors >= 2:
            print_section("DÚVIDAS OBJETIVAS")
            print("- Não foi possível obter bronze_ibov em memória e reingestão falhou. Rede está disponível? Provedores autorizados?")
            print("- Deseja fornecer um caminho alternativo para leitura do Bronze antes da escrita?")
        print_section("CHECKLIST")
        checklist = {
            "preflight_quality_ok": "falha",
            "parquet_write_summary": "falha",
            "post_write_verification": "falha",
            "manifesto_append_ok": "falha"
        }
        print(json.dumps(checklist, ensure_ascii=False, indent=2))
        for k, v in checklist.items():
            if v != "ok":
                print(f"CHECKLIST_FAILURE: {k} não atendido.")
        return

    # 2) Pré-voo de qualidade
    ok_quality, details, q_errs = preflight_checks(bronze_df)
    print_section("PREFLIGHT QUALITY")
    print(json.dumps({"ok": ok_quality, "details": details, "errors": q_errs}, ensure_ascii=False, indent=2))
    if not ok_quality:
        for e in q_errs:
            print(e)
        print("VALIDATION_ERROR: Pré-condições não atendidas; escrita abortada.")
        print_section("CHECKLIST")
        checklist = {
            "preflight_quality_ok": "falha",
            "parquet_write_summary": "falha",
            "post_write_verification": "falha",
            "manifesto_append_ok": "falha"
        }
        print(json.dumps(checklist, ensure_ascii=False, indent=2))
        for k, v in checklist.items():
            if v != "ok":
                print(f"CHECKLIST_FAILURE: {k} não atendido.")
        return

    # 3) Escrita Parquet particionado (overwrite-by-partition)
    print_section("PARQUET WRITE")
    write_ok, write_summary, write_errs = write_parquet_partitioned(bronze_df, PARQUET_TARGET, partition_col="year")
    if write_errs:
        for e in write_errs:
            print(e)
    print(json.dumps({"ok": write_ok, "years_written": write_summary.get("years_written", []), "files_per_partition": write_summary.get("files_per_partition", {})}, ensure_ascii=False, indent=2))
    if not write_ok:
        consecutive_errors += 1
    else:
        consecutive_errors = 0

    if consecutive_errors >= 2:
        print("VALIDATION_ERROR: Falhas repetidas na escrita do Parquet.")
        print_section("DÚVIDAS OBJETIVAS")
        print("- Podemos instalar/atualizar pyarrow para habilitar escrita particionada com snappy?")
        print("- Há permissões de escrita no diretório alvo?")
        print_section("CHECKLIST")
        checklist = {
            "preflight_quality_ok": "ok",
            "parquet_write_summary": "falha",
            "post_write_verification": "falha",
            "manifesto_append_ok": "falha"
        }
        print(json.dumps(checklist, ensure_ascii=False, indent=2))
        for k, v in checklist.items():
            if v != "ok":
                print(f"CHECKLIST_FAILURE: {k} não atendido.")
        return

    # 4) Pós-escrita: reabrir e verificar
    print_section("POST-WRITE VERIFICATION")
    df_reopen, reopen_info, reopen_errs = reopen_dataset_summary(PARQUET_TARGET)
    if reopen_errs:
        for e in reopen_errs:
            print(e)
    print(json.dumps(reopen_info, ensure_ascii=False, indent=2))
    if df_reopen is None or df_reopen.empty:
        consecutive_errors += 1
    else:
        consecutive_errors = 0

    if consecutive_errors >= 2:
        print("VALIDATION_ERROR: Reabertura pós-escrita falhou repetidamente.")
        print_section("DÚVIDAS OBJETIVAS")
        print("- Podemos confirmar a instalação do engine Parquet (pyarrow) para leitura?")
        print("- O dataset contém arquivos corrompidos?")
        print_section("CHECKLIST")
        checklist = {
            "preflight_quality_ok": "ok",
            "parquet_write_summary": "ok",
            "post_write_verification": "falha",
            "manifesto_append_ok": "falha"
        }
        print(json.dumps(checklist, ensure_ascii=False, indent=2))
        for k, v in checklist.items():
            if v != "ok":
                print(f"CHECKLIST_FAILURE: {k} não atendido.")
        return

    # 5) Manifesto: criar se faltar e adicionar linha
    print_section("MANIFESTO APPEND")
    man_ok, man_line, man_errs = append_manifesto_row(MANIFESTO_PATH, TICKER, bronze_df, PARQUET_TARGET)
    if man_errs:
        for e in man_errs:
            print(e)
    if man_ok and man_line:
        print(man_line)
    else:
        print("VALIDATION_ERROR: manifesto não atualizado.")

    # 6) Checklist final
    print_section("CHECKLIST")
    checklist = {
        "preflight_quality_ok": "ok" if ok_quality else "falha",
        "parquet_write_summary": "ok" if write_ok else "falha",
        "post_write_verification": "ok" if (df_reopen is not None and reopen_info.get("rows_total", 0) > 0) else "falha",
        "manifesto_append_ok": "ok" if man_ok else "falha"
    }
    print(json.dumps(checklist, ensure_ascii=False, indent=2))
    for k, v in checklist.items():
        if v != "ok":
            print(f"CHECKLIST_FAILURE: {k} não atendido.")

if __name__ == "__main__":
    # Contrato:
    # - Obtém bronze_ibov (memória ou reingesta silenciosa), valida pré-voo,
    # - Escreve Parquet particionado (snappy, overwrite-by-partition),
    # - Reabre para verificar, e registra manifesto (append/gera).
    main()

# =========================
# Reingestão silenciosa (apenas se bronze_ibov não existir)
# =========================
def fetch_yahoo_chart_silent(ticker: str) -> Optional[pd.DataFrame]:
    try:
        base_url = "https://query2.finance.yahoo.com/v8/finance/chart/%5EBVSP"
        params = {
            "period1": str(to_unix_seconds(START_DATE_UTC)),
            "period2": str(to_unix_seconds(NOW_UTC)),
            "interval": "1d",
            "events": "history",
            "includeAdjustedClose": "false",
        }
        try:
            import requests  # type: ignore
            r = requests.get(base_url, params=params, headers={"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) Python"}, timeout=6)
            if r.status_code < 200 or r.status_code >= 400:
                return None
            data = r.json()
        except Exception:
            from urllib.parse import urlencode
            from urllib.request import Request, urlopen
            url = base_url + "?" + urlencode(params)
            req = Request(url, headers={"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) Python"})
            with urlopen(req, timeout=8) as resp:
                raw = resp.read()
            import json as _json
            data = _json.loads(raw.decode("utf-8"))
        if "chart" not in data or not data["chart"].get("result"):
            return None
        res0 = data["chart"]["result"][0]
        ts = res0.get("timestamp", []) or []
        q = (res0.get("indicators", {}) or {}).get("quote", []) or []
        if not q:
            return None
        q0 = q[0]
        opens = q0.get("open", []) or []
        highs = q0.get("high", []) or []
        lows = q0.get("low", []) or []
        closes = q0.get("close", []) or []
        vols = q0.get("volume", []) or []
        n = min(len(ts), len(opens), len(highs), len(lows), len(closes), len(vols))
        if n == 0:
            return None
        df_pre = pd.DataFrame({
            "date": pd.to_datetime(ts[:n], unit="s", utc=True),
            "open": opens[:n],
            "high": highs[:n],
            "low": lows[:n],
            "close": closes[:n],
            "volume": vols[:n],
        })
        return bronze_normalize(df_pre, ticker)
    except Exception:
        return None

def fetch_yfinance_silent(ticker: str) -> Optional[pd.DataFrame]:
    try:
        try:
            import yfinance as yf  # type: ignore
        except Exception:
            return None
        start_str = START_DATE_UTC.tz_localize(None).date().isoformat() if START_DATE_UTC.tzinfo else START_DATE_UTC.date().isoformat()
        end_inc = (END_DATE_UTC + pd.Timedelta(days=1))
        end_str = end_inc.tz_localize(None).date().isoformat() if end_inc.tzinfo else end_inc.date().isoformat()
        df_raw = yf.download(tickers=ticker, start=start_str, end=end_str, interval="1d", auto_adjust=False, progress=False, threads=True)
        if df_raw is None or df_raw.empty:
            return None
        if isinstance(df_raw.columns, pd.MultiIndex):
            try:
                df_raw.columns = [c[-1] if isinstance(c, tuple) else c for c in df_raw.columns.to_list()]
            except Exception:
                df_raw.columns = df_raw.columns.get_level_values(-1)
        rename_map = {"Open": "open", "High": "high", "Low": "low", "Close": "close", "Volume": "volume",
                      "open": "open", "high": "high", "low": "low", "close": "close", "volume": "volume"}
        df_raw = df_raw.rename(columns=rename_map)
        need = {"open", "high", "low", "close", "volume"}
        if not need.issubset(set(df_raw.columns)):
            return None
        df_pre = df_raw.reset_index().rename(columns={"Date": "date", "Datetime": "date"})
        if "date" not in df_pre.columns:
            df_pre = df_raw.copy()
            df_pre["date"] = df_pre.index
            df_pre = df_pre.reset_index(drop=True)
        df_pre = df_pre[["date", "open", "high", "low", "close", "volume"]]
        return bronze_normalize(df_pre, ticker)
    except Exception:
        return None

def fetch_stooq_silent(ticker: str) -> Optional[pd.DataFrame]:
    try:
        try:
            from pandas_datareader import data as dr  # type: ignore
        except Exception:
            return None
        candidates = [ticker, ticker.replace("^", ""), ticker.replace("^", "").lower()]
        df_raw = None
        last_exc = None
        for tk in candidates:
            try:
                df_raw = dr.DataReader(tk, "stooq", start=START_DATE_UTC.tz_localize(None), end=END_DATE_UTC.tz_localize(None))
                if df_raw is not None and not df_raw.empty:
                    break
            except Exception as e:
                last_exc = e
                continue
        if df_raw is None or df_raw.empty:
            return None
        df_raw = df_raw.sort_index()
        if isinstance(df_raw.columns, pd.MultiIndex):
            try:
                df_raw.columns = [c[-1] if isinstance(c, tuple) else c for c in df_raw.columns.to_list()]
            except Exception:
                df_raw.columns = df_raw.columns.get_level_values(-1)
        rename_map = {"Open": "open", "High": "high", "Low": "low", "Close": "close", "Volume": "volume",
                      "open": "open", "high": "high", "low": "low", "close": "close", "volume": "volume"}
        df_raw = df_raw.rename(columns=rename_map)
        need = {"open", "high", "low", "close", "volume"}
        if not need.issubset(set(df_raw.columns)):
            return None
        df_pre = df_raw.copy()
        df_pre["date"] = df_pre.index
        df_pre = df_pre.reset_index(drop=True)[["date", "open", "high", "low", "close", "volume"]]
        return bronze_normalize(df_pre, ticker)
    except Exception:
        return None

def ensure_bronze_in_memory() -> Tuple[Optional[pd.DataFrame], List[str]]:
    msgs: List[str] = []
    g = globals()
    if "bronze_ibov" in g and isinstance(g["bronze_ibov"], pd.DataFrame):
        df = g["bronze_ibov"].copy()
        # Reforçar schema/dtypes
        try:
            df["date"] = pd.to_datetime(df["date"], errors="coerce").dt.normalize()
            for c in ["open", "high", "low", "close"]:
                df[c] = pd.to_numeric(df[c], errors="coerce").astype("float64")
            df["volume"] = pd.to_numeric(df["volume"], errors="coerce").fillna(0).astype("int64")
            df["ticker"] = df["ticker"].astype("string")
            df = df[EXPECTED_COLUMNS].sort_values("date").drop_duplicates(subset=["date"], keep="last").reset_index(drop=True)
            return df, msgs
        except Exception as e:
            msgs.append(f"INFO: bronze_ibov em memória com inconsistências — {e}; reingestão silenciosa será tentada.")
    # Reingestão silenciosa
    for fn in (fetch_yahoo_chart_silent, fetch_yfinance_silent, fetch_stooq_silent):
        df = fn(TICKER)
        if df is not None and not df.empty:
            return df, msgs
    msgs.append("VALIDATION_ERROR: PROVIDERS_EXHAUSTED — não foi possível reingestar bronze_ibov.")
    return None, msgs

# =========================
# Pré-voo de qualidade
# =========================
def preflight_checks(df: pd.DataFrame) -> Tuple[bool, Dict[str, Any], List[str]]:
    errs: List[str] = []
    details: Dict[str, Any] = {}
    if df is None or df.empty:
        errs.append("VALIDATION_ERROR: DataFrame vazio.")
    else:
        pn = percent_nulls(df)
        details["percent_nulls"] = {k: round(v, 6) for k, v in pn.items()}
        for col in ["date", "close", "ticker"]:
            if round(pn.get(col, 100.0), 6) != 0.0:
                errs.append(f"VALIDATION_ERROR: % nulos em {col} deve ser 0%, obtido={pn.get(col, 100.0):.6f}%")
        dups = int(df.duplicated(subset=["date"]).sum())
        details["duplicates_by_date"] = dups
        if dups != 0:
            errs.append(f"VALIDATION_ERROR: duplicatas por date detectadas (= {dups})")
        if len(df) < 2500:
            errs.append(f"VALIDATION_ERROR: cobertura insuficiente — linhas={len(df)} (< 2500)")
        dmin = pd.to_datetime(df["date"]).min()
        dmax = pd.to_datetime(df["date"]).max()
        details["date_min"] = str(dmin)
        details["date_max"] = str(dmax)
        # Tolerâncias: início ≤ 2012-01-06; fim ≥ hoje(UTC) − 3d
        if dmin > pd.Timestamp("2012-01-06"):
            errs.append(f"VALIDATION_ERROR: date.min ({dmin.date().isoformat()}) > tolerância (2012-01-06)")
        required_end_min = (pd.Timestamp(datetime.now(timezone.utc)).normalize() - pd.Timedelta(days=3)).tz_localize(None)
        if dmax < required_end_min:
            errs.append(f"VALIDATION_ERROR: date.max ({dmax.date().isoformat()}) < requerido mínimo ({required_end_min.date().isoformat()}) (tolerância 3 dias)")
    return (len(errs) == 0), details, errs

# =========================
# Escrita Parquet particionado (overwrite-by-partition)
# =========================
def write_parquet_partitioned(df: pd.DataFrame, base_dir: Path, partition_col: str = "year") -> Tuple[bool, Dict[str, Any], List[str]]:
    """
    Escreve com pyarrow.parquet.write_to_dataset, compressão snappy,
    particionado por 'year', com existing_data_behavior='delete_matching' (overwrite-by-partition).
    Retorna (ok, summary, errors).
    """
    errors: List[str] = []
    summary: Dict[str, Any] = {"years_written": [], "files_per_partition": {}}
    try:
        import pyarrow as pa  # type: ignore
        import pyarrow.parquet as pq  # type: ignore
    except Exception as e:
        errors.append(f"VALIDATION_ERROR: MISSING_DEPENDENCY_PYARROW — {e}")
        return False, summary, errors

    try:
        base_dir.mkdir(parents=True, exist_ok=True)
        df2 = df.copy()
        years = pd.to_datetime(df2["date"]).dt.year.astype("int16")
        df2[partition_col] = years
        table = pa.Table.from_pandas(df2, preserve_index=False)
        # Escreve dataset
        pq.write_to_dataset(
            table=table,
            root_path=str(base_dir),
            partition_cols=[partition_col],
            compression="snappy",
            existing_data_behavior="delete_matching"  # overwrite-by-partition
        )
        # Sumário por partição escrita
        written_years = sorted(pd.unique(years).astype(int).tolist())
        summary["years_written"] = written_years
        files_per = {}
        for y in written_years:
            p = base_dir / f"{partition_col}={y}"
            cnt = 0
            if p.exists() and p.is_dir():
                for _, _, files in os.walk(p):
                    cnt += sum(1 for f in files if f.endswith(".parquet"))
            files_per[str(y)] = cnt
        summary["files_per_partition"] = files_per
        return True, summary, errors
    except Exception as e:
        errors.append(f"VALIDATION_ERROR: PARQUET_WRITE_ERROR — {e}")
        return False, summary, errors

# =========================
# Reabertura pós-escrita
# =========================
def reopen_dataset_summary(base_dir: Path) -> Tuple[Optional[pd.DataFrame], Dict[str, Any], List[str]]:
    errors: List[str] = []
    info: Dict[str, Any] = {"rows_total": 0, "min_date": None, "max_date": None}
    try:
        import pyarrow.dataset as ds  # type: ignore
        dataset = ds.dataset(str(base_dir), format="parquet")
        table = dataset.to_table()
        df = table.to_pandas()
    except Exception as e1:
        errors.append(f"READ_ERROR_PA_DS: {e1}")
        try:
            df = pd.read_parquet(str(base_dir))
        except Exception as e2:
            errors.append(f"READ_ERROR_PD_RP: {e2}")
            return None, info, errors
    # Normaliza e resume
    if "date" in df.columns:
        df["date"] = pd.to_datetime(df["date"], errors="coerce").dt.tz_localize(None).dt.normalize()
    info["rows_total"] = int(len(df))
    info["min_date"] = str(pd.to_datetime(df["date"]).min()) if "date" in df.columns and not df["date"].isna().all() else None
    info["max_date"] = str(pd.to_datetime(df["date"]).max()) if "date" in df.columns and not df["date"].isna().all() else None
    return df, info, errors

# =========================
# Manifesto (append ou create)
# =========================
def append_manifesto_row(
    manifesto_path: Path,
    ticker: str,
    df_written: pd.DataFrame,
    target_path: Path
) -> Tuple[bool, Optional[str], List[str]]:
    """
    Acrescenta uma linha ao manifesto (cria arquivo se não existir).
    Retorna (ok, csv_line_printed, errors)
    """
    errors: List[str] = []
    try:
        manifesto_path.parent.mkdir(parents=True, exist_ok=True)
        rows_total = int(len(df_written))
        date_min = str(pd.to_datetime(df_written["date"]).min())
        date_max = str(pd.to_datetime(df_written["date"]).max())
        columns_json = json.dumps(EXPECTED_COLUMNS, ensure_ascii=False)
        years = sorted(pd.to_datetime(df_written["date"]).dt.year.unique().astype(int).tolist())
        partitions = [f"year={y}" for y in years]
        partitions_json = json.dumps(partitions, ensure_ascii=False)
        header = ["timestamp", "ticker", "rows_total", "date_min", "date_max", "columns_json", "partitions_json", "target_path"]
        row = [
            AGORA_TZ.isoformat(),
            ticker,
            rows_total,
            date_min,
            date_max,
            columns_json,
            partitions_json,
            str(target_path),
        ]
        # Escrever (append se existir; senão criar com header)
        csv_line = ",".join([str(x) for x in row])
        if not manifesto_path.exists():
            with open(manifesto_path, "w", encoding="utf-8") as f:
                f.write(",".join(header) + "\n")
                f.write(csv_line + "\n")
        else:
            with open(manifesto_path, "a", encoding="utf-8") as f:
                f.write(csv_line + "\n")
        return True, csv_line, errors
    except Exception as e:
        errors.append(f"VALIDATION_ERROR: MANIFESTO_WRITE_ERROR — {e}")
        return False, None, errors

# =========================
# Execução Principal
# =========================
def main():
    consecutive_errors = 0

    # 1) Obter bronze_ibov (memória ou reingestão silenciosa)
    bronze_df, ensure_msgs = ensure_bronze_in_memory()
    if ensure_msgs:
        for m in ensure_msgs:
            print(m)
    if bronze_df is None or bronze_df.empty:
        consecutive_errors += 1
        if consecutive_errors >= 2:
            print_section("DÚVIDAS OBJETIVAS")
            print("- Não foi possível obter bronze_ibov em memória e reingestão falhou. Rede está disponível? Provedores autorizados?")
            print("- Deseja fornecer um caminho alternativo para leitura do Bronze antes da escrita?")
        print_section("CHECKLIST")
        checklist = {
            "preflight_quality_ok": "falha",
            "parquet_write_summary": "falha",
            "post_write_verification": "falha",
            "manifesto_append_ok": "falha"
        }
        print(json.dumps(checklist, ensure_ascii=False, indent=2))
        for k, v in checklist.items():
            if v != "ok":
                print(f"CHECKLIST_FAILURE: {k} não atendido.")
        return

    # 2) Pré-voo de qualidade
    ok_quality, details, q_errs = preflight_checks(bronze_df)
    print_section("PREFLIGHT QUALITY")
    print(json.dumps({"ok": ok_quality, "details": details, "errors": q_errs}, ensure_ascii=False, indent=2))
    if not ok_quality:
        for e in q_errs:
            print(e)
        print("VALIDATION_ERROR: Pré-condições não atendidas; escrita abortada.")
        print_section("CHECKLIST")
        checklist = {
            "preflight_quality_ok": "falha",
            "parquet_write_summary": "falha",
            "post_write_verification": "falha",
            "manifesto_append_ok": "falha"
        }
        print(json.dumps(checklist, ensure_ascii=False, indent=2))
        for k, v in checklist.items():
            if v != "ok":
                print(f"CHECKLIST_FAILURE: {k} não atendido.")
        return

    # 3) Escrita Parquet particionado (overwrite-by-partition)
    print_section("PARQUET WRITE")
    write_ok, write_summary, write_errs = write_parquet_partitioned(bronze_df, PARQUET_TARGET, partition_col="year")
    if write_errs:
        for e in write_errs:
            print(e)
    print(json.dumps({"ok": write_ok, "years_written": write_summary.get("years_written", []), "files_per_partition": write_summary.get("files_per_partition", {})}, ensure_ascii=False, indent=2))
    if not write_ok:
        consecutive_errors += 1
    else:
        consecutive_errors = 0

    if consecutive_errors >= 2:
        print("VALIDATION_ERROR: Falhas repetidas na escrita do Parquet.")
        print_section("DÚVIDAS OBJETIVAS")
        print("- Podemos instalar/atualizar pyarrow para habilitar escrita particionada com snappy?")
        print("- Há permissões de escrita no diretório alvo?")
        print_section("CHECKLIST")
        checklist = {
            "preflight_quality_ok": "ok",
            "parquet_write_summary": "falha",
            "post_write_verification": "falha",
            "manifesto_append_ok": "falha"
        }
        print(json.dumps(checklist, ensure_ascii=False, indent=2))
        for k, v in checklist.items():
            if v != "ok":
                print(f"CHECKLIST_FAILURE: {k} não atendido.")
        return

    # 4) Pós-escrita: reabrir e verificar
    print_section("POST-WRITE VERIFICATION")
    df_reopen, reopen_info, reopen_errs = reopen_dataset_summary(PARQUET_TARGET)
    if reopen_errs:
        for e in reopen_errs:
            print(e)
    print(json.dumps(reopen_info, ensure_ascii=False, indent=2))
    if df_reopen is None or df_reopen.empty:
        consecutive_errors += 1
    else:
        consecutive_errors = 0

    if consecutive_errors >= 2:
        print("VALIDATION_ERROR: Reabertura pós-escrita falhou repetidamente.")
        print_section("DÚVIDAS OBJETIVAS")
        print("- Podemos confirmar a instalação do engine Parquet (pyarrow) para leitura?")
        print("- O dataset contém arquivos corrompidos?")
        print_section("CHECKLIST")
        checklist = {
            "preflight_quality_ok": "ok",
            "parquet_write_summary": "ok",
            "post_write_verification": "falha",
            "manifesto_append_ok": "falha"
        }
        print(json.dumps(checklist, ensure_ascii=False, indent=2))
        for k, v in checklist.items():
            if v != "ok":
                print(f"CHECKLIST_FAILURE: {k} não atendido.")
        return

    # 5) Manifesto: criar se faltar e adicionar linha
    print_section("MANIFESTO APPEND")
    man_ok, man_line, man_errs = append_manifesto_row(MANIFESTO_PATH, TICKER, bronze_df, PARQUET_TARGET)
    if man_errs:
        for e in man_errs:
            print(e)
    if man_ok and man_line:
        print(man_line)
    else:
        print("VALIDATION_ERROR: manifesto não atualizado.")

    # 6) Checklist final
    print_section("CHECKLIST")
    checklist = {
        "preflight_quality_ok": "ok" if ok_quality else "falha",
        "parquet_write_summary": "ok" if write_ok else "falha",
        "post_write_verification": "ok" if (df_reopen is not None and reopen_info.get("rows_total", 0) > 0) else "falha",
        "manifesto_append_ok": "ok" if man_ok else "falha"
    }
    print(json.dumps(checklist, ensure_ascii=False, indent=2))
    for k, v in checklist.items():
        if v != "ok":
            print(f"CHECKLIST_FAILURE: {k} não atendido.")

if __name__ == "__main__":
    # Contrato:
    # - Obtém bronze_ibov (memória ou reingesta silenciosa), valida pré-voo,
    # - Escreve Parquet particionado (snappy, overwrite-by-partition),
    # - Reabre para verificar, e registra manifesto (append/gera).
    main()


{
  "ok": true,
  "details": {
    "percent_nulls": {
      "date": 0.0,
      "open": 0.0,
      "high": 0.0,
      "low": 0.0,
      "close": 0.0,
      "volume": 0.0,
      "ticker": 0.0
    },
    "duplicates_by_date": 0,
    "date_min": "2012-01-03 00:00:00",
    "date_max": "2025-09-19 00:00:00"
  },
  "errors": []
}

{
  "ok": true,
  "years_written": [
    2012,
    2013,
    2014,
    2015,
    2016,
    2017,
    2018,
    2019,
    2020,
    2021,
    2022,
    2023,
    2024,
    2025
  ],
  "files_per_partition": {
    "2012": 1,
    "2013": 1,
    "2014": 1,
    "2015": 1,
    "2016": 1,
    "2017": 1,
    "2018": 1,
    "2019": 1,
    "2020": 1,
    "2021": 1,
    "2022": 1,
    "2023": 1,
    "2024": 1,
    "2025": 1
  }
}

{
  "rows_total": 3400,
  "min_date": "2012-01-03 00:00:00",
  "max_date": "2025-09-19 00:00:00"
}

2025-09-19T10:24:34.637271-03:00,^BVSP,3400,2012-01-03 00:00:00,2025-09-19 00:00:00,["date", "open", "high", "low", "close", "volume", "ticker"],["ye

# Instrução 1B–MANIFESTO–FIX (recriar/append da linha ^BVSP)

In [11]:
# Instrução 1B–MANIFESTO–FIX (recriar/append da linha ^BVSP)
# Objetivo: Garantir que /home/wrm/BOLSA_2026/manifestos/bronze_ibov_manifesto.csv contenha a linha mais recente do ^BVSP,
# consistente com o dataset em /home/wrm/BOLSA_2026/bronze/IBOV.parquet.
# Disciplina: Um único bloco de código auto-contido. dry_run=False (vai escrever/append no CSV).
# Relatórios: imprimir dataset_found, manifesto_status, write_action, manifesto_tail. Mensagens normativas em caso de falha.

import os
import re
import sys
import io
import json
import csv
import traceback
from datetime import datetime, timezone

# Dependências opcionais
try:
    import pyarrow.dataset as ds
    import pyarrow as pa  # noqa: F401
except Exception:
    ds = None
try:
    import pandas as pd
except Exception:
    pd = None

# Parâmetros fixos
DATASET_PATH = "/home/wrm/BOLSA_2026/bronze/IBOV.parquet"
MANIFESTO_PATH = "/home/wrm/BOLSA_2026/manifestos/bronze_ibov_manifesto.csv"
TICKER = "^BVSP"
COLUMNS_JSON = json.dumps(["date","open","high","low","close","volume","ticker"], ensure_ascii=False)
TARGET_PATH = DATASET_PATH

def print_normative_error(msg: str):
    print(msg)
    sys.exit(1)

def safe_iso_date(ts) -> str:
    if ts is None:
        return ""
    if isinstance(ts, str):
        return ts
    try:
        # pandas Timestamp or datetime
        if hasattr(ts, "to_pydatetime"):
            ts = ts.to_pydatetime()
        if isinstance(ts, datetime):
            return ts.date().isoformat()
        # Fallback: str
        return str(ts)
    except Exception:
        return str(ts)

def read_dataset_summary(dataset_path: str):
    exists = os.path.exists(dataset_path)
    is_dir = os.path.isdir(dataset_path)
    partitions = []
    has_year_subdirs = False
    rows_total = None
    date_min = None
    date_max = None

    if is_dir:
        try:
            for name in os.listdir(dataset_path):
                full = os.path.join(dataset_path, name)
                if os.path.isdir(full) and re.fullmatch(r"year=\d{4}", name):
                    partitions.append(name)
            partitions = sorted(partitions)
            has_year_subdirs = len(partitions) > 0
        except Exception:
            # keep defaults; will be validated later
            pass

    # Tentar reabrir o dataset e computar contagens e extremos de data
    if exists and is_dir and has_year_subdirs:
        # Preferir pyarrow.dataset
        if ds is not None:
            try:
                dset = ds.dataset(dataset_path, format="parquet", partitioning="hive")
                # rows_total
                try:
                    rows_total = dset.count_rows()
                except Exception:
                    # Fallback: contar linhas via to_table apenas da coluna date
                    tbl = dset.to_table(columns=["date"])
                    rows_total = tbl.num_rows
                # Extremos de data
                tbl_date = dset.to_table(columns=["date"])
                if pd is None:
                    # Converter via pyarrow para Python nativo e calcular min/max
                    col = tbl_date.column("date")
                    # to_pylist pode ser grande; dataset diário é pequeno, ok
                    pylist = col.to_pylist()
                    # Filtrar None
                    vals = [v for v in pylist if v is not None]
                    if len(vals) == 0:
                        raise ValueError("Coluna 'date' vazia após filtragem.")
                    # Valores podem ser datetime ou int (epoch); normalizar
                    # pyarrow geralmente já entrega datetime
                    dmin = min(vals)
                    dmax = max(vals)
                    date_min = dmin
                    date_max = dmax
                else:
                    s = tbl_date.to_pandas(types_mapper=None)  # pandas Series se single column
                    if isinstance(s, pd.DataFrame):
                        # Garantir Series
                        if "date" in s.columns:
                            s = s["date"]
                        else:
                            # pegar primeira coluna
                            s = s.iloc[:, 0]
                    s = pd.to_datetime(s, utc=True, errors="coerce")
                    s = s.dropna()
                    if s.empty:
                        raise ValueError("Coluna 'date' sem valores válidos.")
                    date_min = s.min()
                    date_max = s.max()
            except Exception as e:
                # Fallback: pandas.read_parquet no diretório (requer pandas + engine disponível)
                if pd is None:
                    print_normative_error(f"VALIDATION_ERROR: falha ao abrir dataset com pyarrow.dataset e pandas ausente. Detalhe: {e}")
                try:
                    df = pd.read_parquet(dataset_path, columns=["date"])
                    if df.empty:
                        raise ValueError("Dataset lido via pandas está vazio.")
                    rows_total = len(df)
                    s = pd.to_datetime(df["date"], utc=True, errors="coerce").dropna()
                    if s.empty:
                        raise ValueError("Coluna 'date' sem valores válidos (pandas).")
                    date_min = s.min()
                    date_max = s.max()
                except Exception as e2:
                    print_normative_error(f"VALIDATION_ERROR: falha ao reabrir dataset com pandas.read_parquet. Detalhe: {e2}")
        else:
            # Sem pyarrow.dataset: usar pandas diretamente
            if pd is None:
                print_normative_error("VALIDATION_ERROR: nem pyarrow.dataset nem pandas disponíveis para reabrir dataset.")
            try:
                df = pd.read_parquet(dataset_path, columns=["date"])
                if df.empty:
                    raise ValueError("Dataset lido via pandas está vazio.")
                rows_total = len(df)
                s = pd.to_datetime(df["date"], utc=True, errors="coerce").dropna()
                if s.empty:
                    raise ValueError("Coluna 'date' sem valores válidos (pandas).")
                date_min = s.min()
                date_max = s.max()
            except Exception as e:
                print_normative_error(f"VALIDATION_ERROR: falha ao reabrir dataset com pandas.read_parquet. Detalhe: {e}")

    return {
        "path": dataset_path,
        "exists": exists,
        "is_dir": is_dir,
        "has_year_subdirs": has_year_subdirs,
        "partitions": partitions,
        "rows_total": int(rows_total) if rows_total is not None else None,
        "date_min": safe_iso_date(date_min),
        "date_max": safe_iso_date(date_max),
    }

def ensure_manifesto_dir(manifesto_path: str):
    d = os.path.dirname(manifesto_path)
    if d:
        os.makedirs(d, exist_ok=True)

def read_manifesto_status(manifesto_path: str):
    exists_before = os.path.exists(manifesto_path)
    had_row_for_vbsp_before = False
    last_line = ""
    last_row = None
    if exists_before:
        try:
            # Ler de forma robusta com pandas se disponível
            if pd is not None:
                mdf = pd.read_csv(manifesto_path, dtype=str)
                mdf = mdf.fillna("")
                if not mdf.empty:
                    had_row_for_vbsp_before = any(mdf["ticker"] == TICKER) if "ticker" in mdf.columns else False
                    # última linha como CSV string
                    last_row = mdf.iloc[-1].to_dict()
                    output = io.StringIO()
                    writer = csv.DictWriter(output, fieldnames=mdf.columns.tolist())
                    writer.writeheader()
                    writer.writerow(last_row)
                    last_line = output.getvalue().strip().splitlines()[-1]
                else:
                    had_row_for_vbsp_before = False
                    last_line = ""
            else:
                # Sem pandas: ler manualmente
                with open(manifesto_path, "r", encoding="utf-8") as f:
                    lines = [ln.rstrip("\n") for ln in f.readlines()]
                if len(lines) >= 2:
                    header = lines[0]
                    last_line = lines[-1]
                    try:
                        # testar presença de ticker em alguma linha
                        had_row_for_vbsp_before = any(TICKER in ln.split(",")[1:2] for ln in lines[1:])
                    except Exception:
                        had_row_for_vbsp_before = (TICKER in "\n".join(lines[1:]))
                else:
                    had_row_for_vbsp_before = False
                    last_line = ""
        except Exception:
            # Se falhar leitura, considerar inexistente para fins de fluxo seguro
            exists_before = os.path.exists(manifesto_path)
            had_row_for_vbsp_before = False
            last_line = ""
    return {
        "path": manifesto_path,
        "exists_before": exists_before,
        "had_row_for_^BVSP_before": had_row_for_vbsp_before,
        "last_line": last_line,
        "last_row_dict": last_row
    }

def append_or_create_manifesto(manifesto_path: str, row_dict: dict, manifesto_status: dict):
    ensure_manifesto_dir(manifesto_path)
    exists_before = manifesto_status["exists_before"]
    last_row_dict = manifesto_status.get("last_row_dict")
    last_line_prior = manifesto_status.get("last_line", "")
    # Decisão:
    # - Se não existe, criar arquivo com header + linha => created_file
    # - Se existe:
    #     - Se última linha já é do ^BVSP e contém mesmos valores-chave (rows_total, date_min, date_max, target_path), então skip
    #     - Caso contrário, append => appended_row
    action = None
    reason = None

    if not exists_before:
        # Criar novo
        fieldnames = ["timestamp","ticker","rows_total","date_min","date_max","columns_json","partitions_json","target_path"]
        try:
            with open(manifesto_path, "w", encoding="utf-8", newline="") as f:
                writer = csv.DictWriter(f, fieldnames=fieldnames)
                writer.writeheader()
                writer.writerow(row_dict)
            action = "created_file"
        except Exception as e:
            print_normative_error(f"VALIDATION_ERROR: falha ao criar manifesto. Detalhe: {e}")
    else:
        # Existe: decidir se precisa append
        need_append = True
        if last_row_dict is not None:
            last_ticker = last_row_dict.get("ticker", "")
            same_rows = str(last_row_dict.get("rows_total", "")) == str(row_dict.get("rows_total", ""))
            same_dmin = str(last_row_dict.get("date_min", "")) == str(row_dict.get("date_min", ""))
            same_dmax = str(last_row_dict.get("date_max", "")) == str(row_dict.get("date_max", ""))
            same_path = str(last_row_dict.get("target_path", "")) == str(row_dict.get("target_path", ""))
            if last_ticker == TICKER and same_rows and same_dmin and same_dmax and same_path:
                need_append = False
        # Se última linha não é do ^BVSP, garantir que a última linha após operação seja do ^BVSP => forçar append
        if last_row_dict is not None and last_row_dict.get("ticker", "") != TICKER:
            need_append = True

        if need_append:
            try:
                with open(manifesto_path, "a", encoding="utf-8", newline="") as f:
                    writer = csv.DictWriter(f, fieldnames=["timestamp","ticker","rows_total","date_min","date_max","columns_json","partitions_json","target_path"])
                    writer.writerow(row_dict)
                action = "appended_row"
            except Exception as e:
                print_normative_error(f"VALIDATION_ERROR: falha ao fazer append no manifesto. Detalhe: {e}")
        else:
            action = "skipped"
            reason = "última linha ^BVSP já reflete o estado atual"

    return action, reason

def read_manifesto_tail(manifesto_path: str):
    try:
        with open(manifesto_path, "r", encoding="utf-8") as f:
            lines = [ln.rstrip("\n") for ln in f.readlines()]
        if len(lines) == 0:
            return ""
        return lines[-1]
    except Exception:
        return ""

def main():
    # 1) Reabrir dataset físico e coletar resumo
    dataset_info = read_dataset_summary(DATASET_PATH)

    # 2) Validar existência e estrutura mínima
    if not dataset_info["exists"] or not dataset_info["is_dir"]:
        print(f"dataset_found: {json.dumps(dataset_info, ensure_ascii=False)}")
        print_normative_error("VALIDATION_ERROR: dataset não encontrado ou não é diretório.")
    if not dataset_info["has_year_subdirs"]:
        print(f"dataset_found: {json.dumps(dataset_info, ensure_ascii=False)}")
        print_normative_error("VALIDATION_ERROR: partições de ano não detectadas (year=YYYY).")
    if dataset_info["rows_total"] is None or dataset_info["rows_total"] <= 0:
        print(f"dataset_found: {json.dumps(dataset_info, ensure_ascii=False)}")
        print_normative_error("VALIDATION_ERROR: falha ao computar rows_total do dataset.")
    if not dataset_info["date_min"] or not dataset_info["date_max"]:
        print(f"dataset_found: {json.dumps(dataset_info, ensure_ascii=False)}")
        print_normative_error("VALIDATION_ERROR: falha ao computar extremos de data (date_min/date_max).")

    # 3) Construir linha para o manifesto
    now_iso = datetime.now(timezone.utc).isoformat()
    partitions_json = json.dumps(dataset_info["partitions"], ensure_ascii=False)
    row = {
        "timestamp": now_iso,
        "ticker": TICKER,
        "rows_total": str(dataset_info["rows_total"]),
        "date_min": dataset_info["date_min"],
        "date_max": dataset_info["date_max"],
        "columns_json": COLUMNS_JSON,
        "partitions_json": partitions_json,
        "target_path": TARGET_PATH,
    }

    # 4) Status atual do manifesto
    manifesto_status = read_manifesto_status(MANIFESTO_PATH)

    # 5) Escrever (criar/append/skip)
    action, reason = append_or_create_manifesto(MANIFESTO_PATH, row, manifesto_status)

    # 6) Checklist: dataset_found, manifesto_status, write_action, manifesto_tail
    print(f"dataset_found: {json.dumps({k: (v if k!='partitions' else None) for k,v in dataset_info.items() if k!='partitions'}, ensure_ascii=False)}")
    # Mostrar partitions em manifesto_status? Requisito pede apenas no row; aqui imprimimos status do manifesto
    ms_print = {
        "path": manifesto_status["path"],
        "exists_before": manifesto_status["exists_before"],
        "had_row_for_^BVSP_before": manifesto_status["had_row_for_^BVSP_before"]
    }
    print(f"manifesto_status: {json.dumps(ms_print, ensure_ascii=False)}")
    if action == "skipped" and reason:
        print(f"write_action: {action} (motivo: {reason})")
    else:
        print(f"write_action: {action}")

    tail = read_manifesto_tail(MANIFESTO_PATH)
    print(f"manifesto_tail: {tail}")

    # 7) Verificação final: última linha deve ser do ^BVSP
    try:
        # Extrair ticker da última linha
        # tail contém CSV; assumir segundo campo é 'ticker'
        # Se tiver header na última linha (arquivo minimal?), tratar
        if tail.strip() == "":
            print_normative_error("CHECKLIST_FAILURE: manifesto vazio após operação.")
        parts = next(csv.reader([tail]))
        # Detectar header acidental
        if parts and parts[0] == "timestamp":
            # Pegar penúltima linha se houver
            with open(MANIFESTO_PATH, "r", encoding="utf-8") as f:
                lines = [ln.rstrip("\n") for ln in f.readlines()]
            if len(lines) >= 2:
                tail = lines[-1]
                parts = next(csv.reader([tail]))
            else:
                print_normative_error("CHECKLIST_FAILURE: manifesto contém apenas header.")
        # ticker deve estar na coluna 2 (índice 1)
        if len(parts) < 2 or parts[1] != TICKER:
            print_normative_error("CHECKLIST_FAILURE: última linha do manifesto não é do ^BVSP.")
    except SystemExit:
        raise
    except Exception:
        print_normative_error("CHECKLIST_FAILURE: falha ao validar a última linha do manifesto.")

if __name__ == "__main__":
    try:
        main()
    except SystemExit:
        pass
    except Exception as e:
        # Mensagem normativa genérica
        msg = f"VALIDATION_ERROR: exceção não tratada. Detalhe: {e}\n{traceback.format_exc()}"
        print(msg)
        sys.exit(1)

dataset_found: {"path": "/home/wrm/BOLSA_2026/bronze/IBOV.parquet", "exists": true, "is_dir": true, "has_year_subdirs": true, "rows_total": 3400, "date_min": "2012-01-03", "date_max": "2025-09-19"}
manifesto_status: {"path": "/home/wrm/BOLSA_2026/manifestos/bronze_ibov_manifesto.csv", "exists_before": true, "had_row_for_^BVSP_before": false}
write_action: appended_row
manifesto_tail: 2025-09-19T13:25:01.095765+00:00,^BVSP,3400,2012-01-03,2025-09-19,"[""date"", ""open"", ""high"", ""low"", ""close"", ""volume"", ""ticker""]","[""year=2012"", ""year=2013"", ""year=2014"", ""year=2015"", ""year=2016"", ""year=2017"", ""year=2018"", ""year=2019"", ""year=2020"", ""year=2021"", ""year=2022"", ""year=2023"", ""year=2024"", ""year=2025""]",/home/wrm/BOLSA_2026/bronze/IBOV.parquet


# Instrução 1B–MANIFESTO–REPAIR — normalizar header + garantir linha ^BVSP (dry_run=False)


In [13]:
# Instrução 1B–MANIFESTO–REPAIR — normalizar header + garantir linha ^BVSP (dry_run=False)
import os
import sys
import csv
import io
import json
import re
from datetime import datetime, timezone

# Dependências opcionais para reabrir dataset
try:
    import pyarrow.dataset as ds
except Exception:
    ds = None
try:
    import pandas as pd
except Exception:
    pd = None

MANIFEST_PATH = "/home/wrm/BOLSA_2026/manifestos/bronze_ibov_manifesto.csv"
DATASET_PATH = "/home/wrm/BOLSA_2026/bronze/IBOV.parquet"
TICKER = "^BVSP"
EXPECTED_COLUMNS = ["date","open","high","low","close","volume","ticker"]
CANONICAL_COLS = [
    "timestamp","ticker","rows_total","date_min","date_max",
    "columns_json","partitions_json","target_path","hash_head20","hash_tail20"
]


def print_normative_error(msg: str):
    print(msg)
    sys.exit(1)


def safe_iso_date(ts) -> str:
    if ts is None:
        return ""
    if isinstance(ts, str):
        return ts
    try:
        if hasattr(ts, "to_pydatetime"):
            ts = ts.to_pydatetime()
        if isinstance(ts, datetime):
            return ts.date().isoformat()
        return str(ts)
    except Exception:
        return str(ts)


def probe_dataset(path: str):
    exists = os.path.exists(path)
    is_dir = os.path.isdir(path)
    partitions = []
    has_year_subdirs = False
    rows_total = None
    dmin = None
    dmax = None

    if is_dir:
        try:
            for name in os.listdir(path):
                full = os.path.join(path, name)
                if os.path.isdir(full) and re.fullmatch(r"year=\d{4}", name):
                    partitions.append(name)
            partitions.sort()
            has_year_subdirs = len(partitions) > 0
        except Exception:
            pass

    if exists and is_dir and has_year_subdirs:
        if ds is not None:
            try:
                dset = ds.dataset(path, format="parquet", partitioning="hive")
                try:
                    rows_total = dset.count_rows()
                except Exception:
                    rows_total = dset.to_table(columns=["date"]).num_rows
                tbl_date = dset.to_table(columns=["date"])  # may be large but manageable for daily data
                if pd is None:
                    col = tbl_date.column("date")
                    vals = [v for v in col.to_pylist() if v is not None]
                    if not vals:
                        raise ValueError("Coluna 'date' vazia.")
                    dmin, dmax = min(vals), max(vals)
                else:
                    s = tbl_date.to_pandas()
                    if isinstance(s, pd.DataFrame):
                        s = s["date"] if "date" in s.columns else s.iloc[:, 0]
                    s = pd.to_datetime(s, utc=True, errors="coerce").dropna()
                    if s.empty:
                        raise ValueError("Coluna 'date' sem valores válidos.")
                    dmin, dmax = s.min(), s.max()
            except Exception as e:
                if pd is None:
                    print_normative_error(f"VALIDATION_ERROR: falha ao reabrir dataset (pyarrow.dataset) e pandas ausente. Detalhe: {e}")
                try:
                    df = pd.read_parquet(path, columns=["date"])
                    if df.empty:
                        raise ValueError("Dataset vazio.")
                    rows_total = len(df)
                    s = pd.to_datetime(df["date"], utc=True, errors="coerce").dropna()
                    if s.empty:
                        raise ValueError("Coluna 'date' sem valores válidos (pandas).")
                    dmin, dmax = s.min(), s.max()
                except Exception as e2:
                    print_normative_error(f"VALIDATION_ERROR: falha ao reabrir dataset (pandas). Detalhe: {e2}")
        else:
            if pd is None:
                print_normative_error("VALIDATION_ERROR: nem pyarrow.dataset nem pandas disponíveis para reabrir dataset.")
            try:
                df = pd.read_parquet(path, columns=["date"])
                if df.empty:
                    raise ValueError("Dataset vazio.")
                rows_total = len(df)
                s = pd.to_datetime(df["date"], utc=True, errors="coerce").dropna()
                if s.empty:
                    raise ValueError("Coluna 'date' sem valores válidos (pandas).")
                dmin, dmax = s.min(), s.max()
            except Exception as e3:
                print_normative_error(f"VALIDATION_ERROR: falha ao reabrir dataset (pandas). Detalhe: {e3}")

    return {
        "path_exists": exists,
        "is_dir": is_dir,
        "has_year_subdirs": has_year_subdirs,
        "rows_total": (int(rows_total) if rows_total is not None else None),
        "date_min": safe_iso_date(dmin),
        "date_max": safe_iso_date(dmax),
        "partitions": partitions,
    }


# 1) Ler manifesto
exists_before = os.path.exists(MANIFEST_PATH)
manifest_before = {
    "exists": exists_before,
    "cols": [],
    "rows": 0,
    "had_ticker_col": False,
    "had_row_for_^BVSP": False,
}
added_header = False
added_hash_cols = False
appended_bvsp = False

if pd is None:
    print_normative_error("VALIDATION_ERROR: pandas não disponível para normalização do manifesto.")

if exists_before:
    try:
        dfm = pd.read_csv(MANIFEST_PATH, sep=",", header=0, dtype=str)
    except Exception as e:
        print_normative_error(f"VALIDATION_ERROR: falha ao ler manifesto com header=0. Detalhe: {e}")
    manifest_before["cols"] = dfm.columns.tolist()
    manifest_before["rows"] = int(len(dfm))
    manifest_before["had_ticker_col"] = ("ticker" in dfm.columns)
    if manifest_before["had_ticker_col"]:
        manifest_before["had_row_for_^BVSP"] = bool((dfm["ticker"].astype(str) == TICKER).any())
    else:
        # Reabrir com header=None e forçar schema canônico
        try:
            dfm = pd.read_csv(MANIFEST_PATH, sep=",", header=None, dtype=str, names=CANONICAL_COLS)
            added_header = True
            manifest_before["cols"] = dfm.columns.tolist()
            manifest_before["rows"] = int(len(dfm))
            manifest_before["had_ticker_col"] = True
            manifest_before["had_row_for_^BVSP"] = bool((dfm["ticker"].astype(str) == TICKER).any())
        except Exception as e:
            print_normative_error(f"VALIDATION_ERROR: falha ao reler manifesto com header=None. Detalhe: {e}")
else:
    # Criar DataFrame vazio com schema canônico
    dfm = pd.DataFrame(columns=CANONICAL_COLS)
    added_header = True

# 2) Padronizar tipos e colunas canônicas
for col in CANONICAL_COLS:
    if col not in dfm.columns:
        dfm[col] = ""
        if col in ("hash_head20", "hash_tail20"):
            added_hash_cols = True

# Se manifesto tinha colunas extras, manter apenas as canônicas
if dfm.columns.tolist() != CANONICAL_COLS:
    # Verifique se hash cols estavam ausentes
    for hc in ("hash_head20", "hash_tail20"):
        if hc not in dfm.columns:
            dfm[hc] = ""
            added_hash_cols = True
    dfm = dfm[CANONICAL_COLS]

# Cast básicos
dfm["ticker"] = dfm["ticker"].astype(str).fillna("")

# 3) Garantir linha ^BVSP (se ausente)
if not (dfm["ticker"] == TICKER).any():
    probe = probe_dataset(DATASET_PATH)
    # Imprimir probe já agora se faltar estrutura mínima
    if not (probe["path_exists"] and probe["is_dir"] and probe["has_year_subdirs"]):
        print(f"dataset_probe: {json.dumps({k: (v if k!='partitions' else probe['partitions']) for k,v in probe.items()}, ensure_ascii=False)}")
        print_normative_error("VALIDATION_ERROR: dataset indisponível para gerar linha do manifesto.")
    if probe["rows_total"] is None or probe["rows_total"] <= 0 or not probe["date_min"] or not probe["date_max"]:
        print(f"dataset_probe: {json.dumps({k: (v if k!='partitions' else probe['partitions']) for k,v in probe.items()}, ensure_ascii=False)}")
        print_normative_error("VALIDATION_ERROR: falha ao obter métricas do dataset (rows_total/date_min/date_max).")

    now_iso = datetime.now(timezone.utc).isoformat()
    row = {
        "timestamp": now_iso,
        "ticker": TICKER,
        "rows_total": str(probe["rows_total"]),
        "date_min": probe["date_min"],
        "date_max": probe["date_max"],
        "columns_json": json.dumps(EXPECTED_COLUMNS, ensure_ascii=False),
        "partitions_json": json.dumps(probe["partitions"], ensure_ascii=False),
        "target_path": DATASET_PATH,
        "hash_head20": "",
        "hash_tail20": "",
    }
    dfm = pd.concat([dfm, pd.DataFrame([row])], ignore_index=True)
    appended_bvsp = True
    # Ordenar por timestamp ascendente
    try:
        ts = pd.to_datetime(dfm["timestamp"], errors="coerce")
        order = ts.argsort(kind="mergesort")  # estável
        dfm = dfm.iloc[order].reset_index(drop=True)
    except Exception:
        # Se falhar parsing, deixa como está
        pass

# 4) Salvar sobrescrevendo
try:
    dfm.to_csv(MANIFEST_PATH, index=False)
except Exception as e:
    print_normative_error(f"VALIDATION_ERROR: falha ao salvar manifesto normalizado. Detalhe: {e}")

# Checklist
manifest_before_print = {
    "exists": manifest_before["exists"],
    "cols": manifest_before["cols"],
    "rows": manifest_before["rows"],
    "had_ticker_col": manifest_before["had_ticker_col"],
    "had_row_for_^BVSP": manifest_before["had_row_for_^BVSP"],
}
print(f"manifest_before: {json.dumps(manifest_before_print, ensure_ascii=False)}")

# Probe do dataset para checklist final
probe_final = probe_dataset(DATASET_PATH)
probe_print = {
    "path_exists": probe_final["path_exists"],
    "is_dir": probe_final["is_dir"],
    "has_year_subdirs": probe_final["has_year_subdirs"],
    "rows_total": probe_final["rows_total"],
    "date_min": probe_final["date_min"],
    "date_max": probe_final["date_max"],
}
print(f"dataset_probe: {json.dumps(probe_print, ensure_ascii=False)}")

rep_actions = [
    f"added_header={'yes' if added_header else 'no'}",
    f"added_hash_cols={'yes' if added_hash_cols else 'no'}",
    f"appended_bvsp_row={'yes' if appended_bvsp else 'no'}",
]
print(f"repair_actions: {json.dumps(rep_actions, ensure_ascii=False)}")

# Tail do manifesto
try:
    with open(MANIFEST_PATH, "r", encoding="utf-8") as f:
        lines = [ln.rstrip("\n") for ln in f.readlines()]
    tail = lines[-1] if lines else ""
    if not tail:
        print_normative_error("CHECKLIST_FAILURE: manifesto vazio após normalização.")
    print(f"manifest_after_tail: {tail}")
except Exception:
    print_normative_error("CHECKLIST_FAILURE: falha ao ler tail do manifesto.")

manifest_before: {"exists": true, "cols": ["timestamp", "ticker", "rows_total", "date_min", "date_max", "columns_json", "partitions_json", "target_path"], "rows": 7, "had_ticker_col": true, "had_row_for_^BVSP": false}
dataset_probe: {"path_exists": true, "is_dir": true, "has_year_subdirs": true, "rows_total": 3400, "date_min": "2012-01-03", "date_max": "2025-09-19"}
repair_actions: ["added_header=no", "added_hash_cols=yes", "appended_bvsp_row=yes"]
manifest_after_tail: " ""year=2019"""," ""year=2020"""," ""year=2021"""," ""year=2022"""," ""year=2023"""," ""year=2024"""," ""year=2025""]",/home/wrm/BOLSA_2026/bronze/IBOV.parquet,,


  ts = pd.to_datetime(dfm["timestamp"], errors="coerce")
  order = ts.argsort(kind="mergesort")  # estável


# Instrução 1C-STRICT — Reabrir Bronze pelo SSOT e Atualizar Manifesto (hashes)

In [14]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# Instrução 1C-STRICT — Reabrir Bronze pelo SSOT e Atualizar Manifesto (hashes)
# Regras:
# - Bloco único, auto-contido.
# - dry_run=False (atualiza manifesto).
# - Usar APENAS os caminhos do SSOT (manifesto -> target_path).
# - Dataset Parquet particionado por year=YYYY, abrir preferindo pyarrow.dataset.
# - Mensagens normativas: VALIDATION_ERROR / CHECKLIST_FAILURE.
# - Em dois erros consecutivos, parar e emitir dúvidas objetivas.

import os
import sys
import json
import re
import hashlib
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple

import pandas as pd
import numpy as np

# =========================
# Parâmetros
# =========================
ROOT_DIR = Path("/home/wrm/BOLSA_2026").resolve()
MANIFEST_PATH = ROOT_DIR / "manifestos" / "bronze_ibov_manifesto.csv"
TICKER = "^BVSP"
DRY_RUN = False  # autorizado a atualizar manifesto

EXPECTED_COLUMNS = ["date", "open", "high", "low", "close", "volume", "ticker"]

# =========================
# Utils
# =========================
def print_section(title: str):
    print("\n" + "=" * 8 + f" {title} " + "=" * 8)

def has_year_subdirs(path: Path) -> bool:
    try:
        if not path.is_dir():
            return False
        for child in path.iterdir():
            if child.is_dir() and re.fullmatch(r"year=20\d{2}", child.name):
                return True
        return False
    except Exception:
        return False

def read_manifest_latest_row(manifest_path: Path, ticker: str) -> Tuple[Optional[pd.DataFrame], Optional[int], List[str]]:
    errs: List[str] = []
    if not manifest_path.exists():
        errs.append("VALIDATION_ERROR: MANIFEST_NOT_FOUND")
        return None, None, errs
    try:
        dfm = pd.read_csv(manifest_path)
    except Exception as e:
        errs.append(f"VALIDATION_ERROR: MANIFEST_READ_ERROR — {e}")
        return None, None, errs
    if "ticker" not in dfm.columns:
        errs.append("VALIDATION_ERROR: MANIFEST_MISSING_TICKER_COLUMN")
        return dfm, None, errs
    dfm_tk = dfm[dfm["ticker"] == ticker]
    if dfm_tk.empty:
        errs.append(f"VALIDATION_ERROR: MANIFEST_NO_ROW_FOR_TICKER — {ticker}")
        return dfm, None, errs
    idx_latest: Optional[int] = None
    if "timestamp" in dfm.columns:
        try:
            ts = pd.to_datetime(dfm["timestamp"], errors="coerce")
            mask = dfm["ticker"] == ticker
            if ts.notna().any() and mask.any():
                idx_latest = ts[mask].idxmax()
        except Exception:
            idx_latest = None
    if idx_latest is None:
        idxs = dfm.index[dfm["ticker"] == ticker].tolist()
        idx_latest = idxs[-1] if idxs else None
    if idx_latest is None:
        errs.append("VALIDATION_ERROR: MANIFEST_CANNOT_LOCATE_LATEST_ROW")
    return dfm, idx_latest, errs

def open_dataset_with_pyarrow(path: Path) -> pd.DataFrame:
    import pyarrow.dataset as ds  # type: ignore
    dataset = ds.dataset(str(path), format="parquet", partitioning="hive")
    table = dataset.to_table()
    return table.to_pandas()

def open_dataset_with_pandas(path: Path) -> pd.DataFrame:
    # pandas + pyarrow engine will generally discover hive partitions automatically
    try:
        return pd.read_parquet(str(path), engine="pyarrow")  # type: ignore
    except Exception:
        return pd.read_parquet(str(path))  # engine auto

def open_dataset_strict(path: Path) -> Tuple[Optional[pd.DataFrame], List[str], str]:
    errs: List[str] = []
    # 1) pyarrow.dataset
    try:
        df = open_dataset_with_pyarrow(path)
        return df, errs, "pyarrow.dataset"
    except Exception as e1:
        errs.append(f"OPEN_ERROR_PA_DS: {e1}")
    # 2) pandas.read_parquet
    try:
        df = open_dataset_with_pandas(path)
        return df, errs, "pandas.read_parquet"
    except Exception as e2:
        errs.append(f"OPEN_ERROR_PD_RP: {e2}")
    return None, errs, "none"

def normalize_bronze_schema(df: pd.DataFrame) -> pd.DataFrame:
    for c in EXPECTED_COLUMNS:
        if c not in df.columns:
            raise RuntimeError(f"DATASET_SCHEMA_MISSING_COLUMN: {c}")
    out = df.copy()
    out["date"] = pd.to_datetime(out["date"], errors="coerce").dt.tz_localize(None).dt.normalize()
    for c in ["open", "high", "low", "close"]:
        out[c] = pd.to_numeric(out[c], errors="coerce").astype("float64")
    out["volume"] = pd.to_numeric(out["volume"], errors="coerce").fillna(0).astype("int64")
    out["ticker"] = out["ticker"].astype("string")
    out = out[EXPECTED_COLUMNS].sort_values("date").drop_duplicates(subset=["date"], keep="last").reset_index(drop=True)
    return out

def dataset_summary(df: pd.DataFrame) -> Dict[str, Any]:
    if df is None or df.empty:
        return {"min_date": None, "max_date": None, "rows_total": 0}
    d = df.copy()
    d["date"] = pd.to_datetime(d["date"], errors="coerce").dt.tz_localize(None).dt.normalize()
    return {
        "min_date": str(d["date"].min()),
        "max_date": str(d["date"].max()),
        "rows_total": int(len(d))
    }

def extremes_by_year(df: pd.DataFrame) -> Tuple[Optional[int], Optional[int], Dict[str, Any], Dict[str, Any]]:
    if df is None or df.empty or "date" not in df.columns:
        return None, None, {"min_date": None, "max_date": None, "rows": 0}, {"min_date": None, "max_date": None, "rows": 0}
    d = df.copy()
    d["date"] = pd.to_datetime(d["date"], errors="coerce").dt.tz_localize(None).dt.normalize()
    yrs = d["date"].dt.year.dropna().astype(int)
    if yrs.empty:
        return None, None, {"min_date": None, "max_date": None, "rows": 0}, {"min_date": None, "max_date": None, "rows": 0}
    y_min, y_max = int(yrs.min()), int(yrs.max())
    g_min = d[yrs == y_min]
    g_max = d[yrs == y_max]
    s_min = {"min_date": str(g_min["date"].min()) if not g_min.empty else None,
             "max_date": str(g_min["date"].max()) if not g_min.empty else None,
             "rows": int(len(g_min))}
    s_max = {"min_date": str(g_max["date"].min()) if not g_max.empty else None,
             "max_date": str(g_max["date"].max()) if not g_max.empty else None,
             "rows": int(len(g_max))}
    return y_min, y_max, s_min, s_max

def sha256_of_csv(df: pd.DataFrame) -> str:
    csv_str = df.to_csv(index=False)
    return hashlib.sha256(csv_str.encode("utf-8")).hexdigest()

def compute_hashes(df: pd.DataFrame) -> Tuple[str, str]:
    for c in EXPECTED_COLUMNS:
        if c not in df.columns:
            raise RuntimeError(f"HASH_SCHEMA_MISSING: {c}")
    d = df[EXPECTED_COLUMNS].copy()
    d["date"] = pd.to_datetime(d["date"], errors="coerce").dt.tz_localize(None).dt.normalize()
    head20 = d.head(20)
    tail20 = d.tail(20)
    return sha256_of_csv(head20), sha256_of_csv(tail20)

def ensure_manifest_hash_columns(dfm: pd.DataFrame) -> pd.DataFrame:
    for c in ["hash_head20", "hash_tail20"]:
        if c not in dfm.columns:
            dfm[c] = np.nan
    return dfm

def update_manifest_hashes(dfm: pd.DataFrame, idx: int, final_path: Path, hash_head: str, hash_tail: str) -> Tuple[bool, Optional[pd.DataFrame], List[str]]:
    errs: List[str] = []
    if dfm is None or dfm.empty:
        errs.append("VALIDATION_ERROR: MANIFEST_EMPTY_OR_NONE")
        return False, None, errs
    dfm = ensure_manifest_hash_columns(dfm.copy())
    if "target_path" not in dfm.columns:
        dfm["target_path"] = np.nan
    try:
        dfm.at[idx, "hash_head20"] = hash_head
        dfm.at[idx, "hash_tail20"] = hash_tail
        dfm.at[idx, "target_path"] = str(final_path)
        if not DRY_RUN:
            dfm.to_csv(MANIFEST_PATH, index=False)
        return True, dfm, errs
    except Exception as e:
        errs.append(f"VALIDATION_ERROR: MANIFEST_WRITE_ERROR — {e}")
        return False, dfm, errs

# =========================
# Execução Principal
# =========================
def main():
    normative_errors: List[str] = []
    consecutive_errors = 0

    # 1) Ler manifesto e obter linha mais recente do ^BVSP
    print_section("MANIFESTO — LINHA MAIS RECENTE (^BVSP)")
    df_manifest, idx_latest, mf_errs = read_manifest_latest_row(MANIFEST_PATH, TICKER)
    if mf_errs:
        for e in mf_errs:
            print(e)
        consecutive_errors += 1
    else:
        consecutive_errors = 0

    manifest_row_loaded = {
        "target_path_manifest": None,
        "ticker": TICKER,
        "rows_total": None,
        "date_min": None,
        "date_max": None
    }

    target_path: Optional[Path] = None
    if df_manifest is not None and idx_latest is not None and idx_latest in df_manifest.index:
        row = df_manifest.loc[idx_latest]
        # preencher resumo conforme disponível no manifesto
        manifest_row_loaded["target_path_manifest"] = str(row["target_path"]) if "target_path" in df_manifest.columns else None
        manifest_row_loaded["rows_total"] = int(row["rows_total"]) if "rows_total" in df_manifest.columns and pd.notna(row["rows_total"]) else None
        manifest_row_loaded["date_min"] = str(row["date_min"]) if "date_min" in df_manifest.columns and pd.notna(row["date_min"]) else None
        manifest_row_loaded["date_max"] = str(row["date_max"]) if "date_max" in df_manifest.columns and pd.notna(row["date_max"]) else None

        tp = row["target_path"] if "target_path" in df_manifest.columns else None
        if isinstance(tp, str) and tp.strip():
            target_path = Path(tp).resolve()
        else:
            print("VALIDATION_ERROR: MANIFEST_TARGET_PATH_MISSING_OR_EMPTY")
            consecutive_errors += 1
    else:
        print("VALIDATION_ERROR: MANIFEST_LATEST_ROW_NOT_AVAILABLE")
        consecutive_errors += 1

    print(json.dumps({"manifesto_row_loaded": manifest_row_loaded}, ensure_ascii=False, indent=2))

    if consecutive_errors >= 2:
        print_section("CHECKLIST")
        checklist = {
            "manifesto_row_loaded": "falha",
            "target_path_check": "falha",
            "dataset_summary": "falha",
            "extreme_partitions_summary": "falha",
            "hashes_computed": "falha",
            "manifesto_update_ok": "falha"
        }
        print(json.dumps(checklist, ensure_ascii=False, indent=2))
        for k, v in checklist.items():
            if v != "ok":
                print(f"CHECKLIST_FAILURE: {k} não atendido.")
        print_section("DÚVIDAS OBJETIVAS")
        print("- O manifesto possui a coluna target_path preenchida para ^BVSP?")
        print("- Deseja corrigir/atualizar o manifesto com o caminho correto do dataset Bronze?")
        return

    # 2) Validar target_path (existe, é dir, tem subpastas year=YYYY)
    print_section("TARGET_PATH — VERIFICAÇÕES")
    target_check = {
        "path": str(target_path) if target_path else None,
        "exists": False,
        "is_dir": False,
        "has_year_subdirs": False
    }
    if target_path is None:
        print("VALIDATION_ERROR: TARGET_PATH_NONE")
        consecutive_errors += 1
    else:
        target_check["exists"] = target_path.exists()
        target_check["is_dir"] = target_path.is_dir()
        target_check["has_year_subdirs"] = has_year_subdirs(target_path) if target_path.exists() and target_path.is_dir() else False
        if not (target_check["exists"] and target_check["is_dir"] and target_check["has_year_subdirs"]):
            print(f"VALIDATION_ERROR: TARGET_PATH_INVALID — {json.dumps(target_check, ensure_ascii=False)}")
            consecutive_errors += 1
        else:
            consecutive_errors = 0
    print(json.dumps({"target_path_check": target_check}, ensure_ascii=False, indent=2))

    if consecutive_errors >= 2:
        print_section("CHECKLIST")
        checklist = {
            "manifesto_row_loaded": "ok" if manifest_row_loaded["target_path_manifest"] else "falha",
            "target_path_check": "falha",
            "dataset_summary": "falha",
            "extreme_partitions_summary": "falha",
            "hashes_computed": "falha",
            "manifesto_update_ok": "falha"
        }
        print(json.dumps(checklist, ensure_ascii=False, indent=2))
        for k, v in checklist.items():
            if v != "ok":
                print(f"CHECKLIST_FAILURE: {k} não atendido.")
        print_section("DÚVIDAS OBJETIVAS")
        print("- O target_path do manifesto aponta para um diretório particionado com subpastas year=YYYY?")
        print("- Deseja corrigir o target_path no manifesto para o caminho real do dataset?")
        return

    # 3) Abrir dataset (pyarrow.dataset preferido; fallback pandas+pyarrow)
    print_section("DATASET — ABERTURA")
    df_opened: Optional[pd.DataFrame] = None
    engine_used = None
    open_errs: List[str] = []
    if target_path is not None:
        df_opened, open_errs, engine_used = open_dataset_strict(target_path)
        if df_opened is None or df_opened.empty:
            print(json.dumps({"open_attempts_errors": open_errs, "engine_used": engine_used}, ensure_ascii=False, indent=2))
            print("VALIDATION_ERROR: DATASET_OPEN_FAILED")
            consecutive_errors += 1
        else:
            try:
                df_opened = normalize_bronze_schema(df_opened)
                consecutive_errors = 0
            except Exception as e:
                print(f"VALIDATION_ERROR: DATASET_SCHEMA_NORMALIZE_ERROR — {e}")
                consecutive_errors += 1

    if consecutive_errors >= 2 or df_opened is None or df_opened.empty:
        print_section("CHECKLIST")
        checklist = {
            "manifesto_row_loaded": "ok" if manifest_row_loaded["target_path_manifest"] else "falha",
            "target_path_check": "ok" if target_check["exists"] and target_check["is_dir"] else "falha",
            "dataset_summary": "falha",
            "extreme_partitions_summary": "falha",
            "hashes_computed": "falha",
            "manifesto_update_ok": "falha"
        }
        print(json.dumps(checklist, ensure_ascii=False, indent=2))
        for k, v in checklist.items():
            if v != "ok":
                print(f"CHECKLIST_FAILURE: {k} não atendido.")
        print_section("DÚVIDAS OBJETIVAS")
        print("- Podemos instalar/usar pyarrow para leitura do dataset particionado?")
        print("- Confirme se o caminho possui arquivos Parquet válidos sob as partições year=YYYY.")
        return

    # 4) Summaries do dataset completo e partições extremas
    print_section("DATASET — SUMÁRIOS")
    ds_summary = dataset_summary(df_opened)
    y_min, y_max, min_year_summary, max_year_summary = extremes_by_year(df_opened)
    extremes = {
        "min_year": y_min,
        "min_year_summary": min_year_summary,
        "max_year": y_max,
        "max_year_summary": max_year_summary
    }
    print(json.dumps({"dataset_summary": ds_summary, "extreme_partitions_summary": extremes}, ensure_ascii=False, indent=2))

    # 5) Hashes head20/tail20
    print_section("HASHES — HEAD20/TAIL20")
    hashes_ok = False
    hash_head20 = None
    hash_tail20 = None
    try:
        hash_head20, hash_tail20 = compute_hashes(df_opened)
        hashes_ok = True
    except Exception as e:
        print(f"VALIDATION_ERROR: HASH_COMPUTE_ERROR — {e}")
    print(json.dumps({"hash_head20": hash_head20, "hash_tail20": hash_tail20}, ensure_ascii=False, indent=2))

    # 6) Atualizar manifesto (mesma linha mais recente do ^BVSP)
    print_section("MANIFESTO — ATUALIZAÇÃO")
    manifesto_ok = False
    final_manifest_line = None
    if df_manifest is None or idx_latest is None or idx_latest not in df_manifest.index:
        print("VALIDATION_ERROR: MANIFEST_ROW_NOT_UPDATABLE")
    elif not hashes_ok or hash_head20 is None or hash_tail20 is None:
        print("VALIDATION_ERROR: SKIP_MANIFEST_UPDATE — hashes indisponíveis.")
    else:
        ok, dfm_updated, errs = update_manifest_hashes(df_manifest, idx_latest, target_path, hash_head20, hash_tail20)  # type: ignore
        for e in errs:
            print(e)
        manifesto_ok = ok and (dfm_updated is not None)
        if dfm_updated is not None:
            # Exibir a linha final (mesma posição idx_latest)
            try:
                final_manifest_line = dfm_updated.loc[[idx_latest]]
            except Exception as e:
                print(f"VALIDATION_ERROR: MANIFEST_PREVIEW_ERROR — {e}")

    if final_manifest_line is not None:
        try:
            print(final_manifest_line.to_csv(index=False).strip())
        except Exception:
            print(json.dumps(final_manifest_line.to_dict(orient="records"), ensure_ascii=False, indent=2))
    else:
        print("MANIFESTO_PREVIEW: indisponível.")

    # 7) Checklist Obrigatório
    print_section("CHECKLIST")
    checklist = {
        "manifesto_row_loaded": "ok" if manifest_row_loaded["target_path_manifest"] else "falha",
        "target_path_check": "ok" if (target_check["exists"] and target_check["is_dir"] and target_check["has_year_subdirs"]) else "falha",
        "dataset_summary": "ok" if (ds_summary["rows_total"] > 0 and ds_summary["min_date"] is not None and ds_summary["max_date"] is not None) else "falha",
        "extreme_partitions_summary": "ok" if (y_min is not None and y_max is not None and min_year_summary["rows"] > 0 and max_year_summary["rows"] > 0) else "falha",
        "hashes_computed": "ok" if hashes_ok else "falha",
        "manifesto_update_ok": "ok" if manifesto_ok else "falha"
    }
    print(json.dumps(checklist, ensure_ascii=False, indent=2))
    for k, v in checklist.items():
        if v != "ok":
            print(f"CHECKLIST_FAILURE: {k} não atendido.")

if __name__ == "__main__":
    # Contrato:
    # - Lê SSOT (manifesto), reabre Bronze no target_path informado,
    # - Calcula hashes head/tail 20 e atualiza a linha mais recente do ^BVSP no manifesto,
    # - Imprime checklist e mensagens normativas.
    main()


{
  "manifesto_row_loaded": {
    "target_path_manifest": "/home/wrm/BOLSA_2026/bronze/IBOV.parquet",
    "ticker": "^BVSP",
    "rows_total": 3400,
    "date_min": "2012-01-03",
    "date_max": "2025-09-19"
  }
}

{
  "target_path_check": {
    "path": "/home/wrm/BOLSA_2026/bronze/IBOV.parquet",
    "exists": true,
    "is_dir": true,
    "has_year_subdirs": true
  }
}


{
  "dataset_summary": {
    "min_date": "2012-01-03 00:00:00",
    "max_date": "2025-09-19 00:00:00",
    "rows_total": 3400
  },
  "extreme_partitions_summary": {
    "min_year": 2012,
    "min_year_summary": {
      "min_date": "2012-01-03 00:00:00",
      "max_date": "2012-12-28 00:00:00",
      "rows": 244
    },
    "max_year": 2025,
    "max_year_summary": {
      "min_date": "2025-01-02 00:00:00",
      "max_date": "2025-09-19 00:00:00",
      "rows": 181
    }
  }
}

{
  "hash_head20": "a236d590f9ddb0ddc9123c7e4d05909936d9f08a7db2fa93304db9beef2bb337",
  "hash_tail20": "d7c9f771a3fa160cd023cd418f73e36cfe85e8

  dfm.at[idx, "hash_head20"] = hash_head
  dfm.at[idx, "hash_tail20"] = hash_tail


# ESTÁ CORRETO?

Sim — está correto e completo.

Por que está ok

Manifesto: última linha de ^BVSP presente, com target_path=/home/wrm/BOLSA_2026/bronze/IBOV.parquet.

Dataset: caminho existe, é diretório com partições year=YYYY.

Sumários: min=2012-01-03, max=2025-09-19, rows_total=3400 — coerente com a reabertura.

Hashes: hash_head20 e hash_tail20 calculados e gravados na mesma linha do manifesto.

Checklist: todos os itens “ok”.

Única observação (não bloqueia)

Esses dois FutureWarning indicam que, no pandas, as colunas hash_head20 e hash_tail20 estavam com dtype numérico (provavelmente float64). Para evitar o aviso nas próximas escritas, basta garantir string antes de setar os valores, por exemplo:

dfm["hash_head20"] = dfm["hash_head20"].astype("string")
dfm["hash_tail20"] = dfm["hash_tail20"].astype("string")


(Se já estiver assim na próxima execução, o aviso some.)

Endereços confirmados (SSOT)

Bronze (Parquet particionado): /home/wrm/BOLSA_2026/bronze/IBOV.parquet

Manifesto Bronze: /home/wrm/BOLSA_2026/manifestos/bronze_ibov_manifesto.csv

---
# **TÉRMINO DO BRONZE**
---