<a href="https://colab.research.google.com/github/Maumau-3005/Hydra-fullstack/blob/main/webscrapping_hydra.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# -*- coding: utf-8 -*-
"""
empresa_header.py — monta JSON com nome da empresa e setor GICS a partir de um ticker e 2 datas.

Exemplo de retorno:
{
    "codigo_da_ação": "PETR3",
    "Nome_da_empresa": "Petrobras - Petróleo Brasileiro S.A.",
    "Setor_da_empresa": "Energy",
    "Final_do_teste": "2025-06-30",
    "hojé_em_dia": "2025-09-06"
}
"""

from __future__ import annotations
import warnings
from typing import Dict, Optional, List
from datetime import datetime

warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", category=UserWarning)

try:
    import yfinance as yf  # pip install yfinance
except Exception:  # pragma: no cover
    yf = None


# ---------------------- Normalização GICS ----------------------
_GICS_SET = {
    "Communication Services",
    "Consumer Discretionary",
    "Consumer Staples",
    "Energy",
    "Financials",
    "Health Care",
    "Industrials",
    "Information Technology",
    "Materials",
    "Real Estate",
    "Utilities",
}

# mapeia variações/comuns -> GICS oficial
_GICS_NORMALIZE = {
    "communication services": "Communication Services",
    "communications": "Communication Services",
    "consumer discretionary": "Consumer Discretionary",
    "consumer staples": "Consumer Staples",
    "energy": "Energy",
    "financials": "Financials",
    "financial services": "Financials",
    "health care": "Health Care",
    "healthcare": "Health Care",
    "industrials": "Industrials",
    "industrial": "Industrials",
    "information technology": "Information Technology",
    "technology": "Information Technology",
    "tech": "Information Technology",
    "materials": "Materials",
    "real estate": "Real Estate",
    "real-estate": "Real Estate",
    "utilities": "Utilities",
    # pt/pt-br ocasionais
    "tecnologia": "Information Technology",
    "saúde": "Health Care",
    "energia": "Energy",
    "financeiro": "Financials",
    "materiais": "Materials",
    "utilidades": "Utilities",
    "bens de consumo": "Consumer Staples",
    "discricionário": "Consumer Discretionary",
    "comunicação": "Communication Services",
    "imobiliário": "Real Estate",
}

def _normalize_gics(sector: Optional[str]) -> Optional[str]:
    if not sector or not isinstance(sector, str):
        return None
    s = sector.strip()
    if s in _GICS_SET:
        return s
    key = s.lower()
    return _GICS_NORMALIZE.get(key, None)


# ---------------------- Heurísticas de símbolo ----------------------
def _guess_yahoo_variants(code: str) -> List[str]:
    """
    Gera variantes prováveis para o Yahoo Finance.
    - B3: código alfanumérico terminado em dígito (ex.: PETR3, VALE3, ITUB4) -> '.SA'
    - Caso já contenha um sufixo (.SA, .NY, .US etc.), usa como está e também tenta sem.
    - Em geral: tenta [code, code+'.SA'] nessa ordem, sem duplicar.
    """
    code = (code or "").strip().upper()
    variants: List[str] = []
    has_dot = "." in code
    # como está
    variants.append(code)
    # heurística B3
    if not has_dot:
        # padrão B3 clássico: 4 letras + 1-2 dígitos (3,4,11 etc.)
        import re
        if re.fullmatch(r"[A-Z]{4}\d{1,2}", code):
            variants.append(f"{code}.SA")
    # se usuário já passou .SA, tentamos também sem
    if has_dot and code.endswith(".SA"):
        base = code.split(".")[0]
        variants.append(base)
    # remove duplicados mantendo a ordem
    out: List[str] = []
    for v in variants:
        if v not in out:
            out.append(v)
    return out


# ---------------------- Fallback local (nomes/sectores) ----------------------
# Pequeno dicionário para casos comuns se yfinance falhar.
_FALLBACK_PROFILE = {
    # B3
    "PETR3": ("Petrobras - Petróleo Brasileiro S.A.", "Energy"),
    "PETR4": ("Petrobras - Petróleo Brasileiro S.A.", "Energy"),
    "VALE3": ("Vale S.A.", "Materials"),
    "ITUB4": ("Itaú Unibanco Holding S.A.", "Financials"),
    "BBDC4": ("Banco Bradesco S.A.", "Financials"),
    "ABEV3": ("Ambev S.A.", "Consumer Staples"),
    "BBAS3": ("Banco do Brasil S.A.", "Financials"),
    "WEGE3": ("WEG S.A.", "Industrials"),
    "ELET3": ("Eletrobras", "Utilities"),
    "SUZB3": ("Suzano S.A.", "Materials"),
    "MGLU3": ("Magazine Luiza S.A.", "Consumer Discretionary"),
    # EUA/ADR exemplos
    "PBR": ("Petróleo Brasileiro S.A. - Petrobras", "Energy"),
    "AAPL": ("Apple Inc.", "Information Technology"),
    "MSFT": ("Microsoft Corporation", "Information Technology"),
    "GOOGL": ("Alphabet Inc.", "Communication Services"),
}


# ---------------------- yfinance: coleta de nome e setor ----------------------
def _get_profile_yfinance(symbol: str) -> Optional[Dict[str, str]]:
    if yf is None:
        return None
    try:
        t = yf.Ticker(symbol)
        # .get_info() evita alguns avisos das versões recentes
        try:
            info = t.get_info()
        except Exception:
            info = getattr(t, "info", {}) or {}
        if not info:
            return None

        # nome preferencial
        name = info.get("longName") or info.get("shortName") or info.get("displayName")
        sector_raw = info.get("sector") or info.get("industry")  # às vezes só vem 'industry'
        sector = _normalize_gics(sector_raw) or _normalize_gics(info.get("industry"))  # segunda chance

        if not name and not sector:
            return None
        return {"name": name, "sector": sector or sector_raw}
    except Exception:
        return None


# ---------------------- Normalização de datas ----------------------
def _to_iso_date_str(d: str) -> str:
    """
    Aceita 'YYYY-MM-DD', 'YYYY/MM/DD', 'DD/MM/YYYY', 'YYYY.MM.DD' etc. Retorna 'YYYY-MM-DD'.
    """
    ds = (d or "").strip()
    # tenta parsing direto
    for fmt in ("%Y-%m-%d", "%Y/%m/%d", "%d/%m/%Y", "%Y.%m.%d", "%d-%m-%Y", "%m/%d/%Y"):
        try:
            return datetime.strptime(ds, fmt).date().isoformat()
        except Exception:
            pass
    # fallback: tenta parser genérico do datetime.fromisoformat
    try:
        return datetime.fromisoformat(ds).date().isoformat()
    except Exception:
        # se tudo falhar, devolve a string original (melhor que quebrar)
        return ds


# ---------------------- Função pública ----------------------
def montar_json_empresa(codigo_acao: str, final_do_teste: str, hoje_em_dia: str, *, debug: bool=False) -> Dict[str, str]:
    """
    Retorna um dicionário no formato desejado:
    {
        "codigo_da_ação": "<código original>",
        "Nome_da_empresa": "<nome>",
        "Setor_da_empresa": "<GICS>",
        "Final_do_teste": "<YYYY-MM-DD>",
        "hojé_em_dia": "<YYYY-MM-DD>"
    }
    """
    code_in = (codigo_acao or "").strip().upper()
    # 1) tenta yfinance com variantes
    name: Optional[str] = None
    sector: Optional[str] = None

    for sym in _guess_yahoo_variants(code_in):
        prof = _get_profile_yfinance(sym)
        if prof:
            if debug:
                print(f"[DEBUG] yfinance hit: {sym} -> {prof}")
            name = name or prof.get("name")
            sector = prof.get("sector") or sector
            # se já fechou nome e setor GICS normalizado, para
            if name and _normalize_gics(sector):
                sector = _normalize_gics(sector)
                break

    # 2) fallback local (se ainda faltou algo)
    if (not name or not sector) and code_in in _FALLBACK_PROFILE:
        fb_name, fb_sector = _FALLBACK_PROFILE[code_in]
        if debug:
            print(f"[DEBUG] fallback local usado para {code_in}")
        name = name or fb_name
        sector = sector or fb_sector

    # 3) normaliza setor para GICS (se possível)
    sector_norm = _normalize_gics(sector) or sector or "Unknown"

    # 4) normaliza datas
    d_final = _to_iso_date_str(final_do_teste)
    d_hoje = _to_iso_date_str(hoje_em_dia)

    # formatação simples do nome (opcional)
    def _format_name(n: Optional[str]) -> str:
        if not n:
            return code_in  # último recurso
        # exemplos: "Petróleo Brasileiro S.A. - Petrobras" -> "Petrobras - Petróleo Brasileiro S.A."
        lower = n.lower()
        if "petrobras" in lower and "petróleo brasileiro" in lower and " - " in n:
            parts = [p.strip() for p in n.split(" - ")]
            if len(parts) == 2:
                # inverte para priorizar "Petrobras - ..."
                if "petrobras" in parts[1].lower():
                    return f"{parts[1]} - {parts[0]}"
        return n

    nome_fmt = _format_name(name)

    out = {
        "codigo_da_ação": code_in,
        "Nome_da_empresa": nome_fmt,
        "Setor_da_empresa": sector_norm,
        "Final_do_teste": d_final,
        "hojé_em_dia": d_hoje,
    }
    if debug:
        print(f"[DEBUG] JSON final: {out}")
    return out


# ---------------------- Exemplo rápido ----------------------
if __name__ == "__main__":
    exemplo = montar_json_empresa("PETR3", "2025-06-30", "2025-09-06", debug=True)
    print(exemplo)


ERROR:yfinance:HTTP Error 404: 


[DEBUG] yfinance hit: MSFT34.SA -> {'name': 'Microsoft Corporation', 'sector': 'Information Technology'}
[DEBUG] JSON final: {'codigo_da_ação': 'MSFT34', 'Nome_da_empresa': 'Microsoft Corporation', 'Setor_da_empresa': 'Information Technology', 'Final_do_teste': '2025-06-30', 'hojé_em_dia': '2025-09-06'}
{'codigo_da_ação': 'MSFT34', 'Nome_da_empresa': 'Microsoft Corporation', 'Setor_da_empresa': 'Information Technology', 'Final_do_teste': '2025-06-30', 'hojé_em_dia': '2025-09-06'}


In [None]:
!pip install praw feedparser trafilatura instructor openai httpx
from google.colab import userdata

# Retrieve secrets from Colab's Secrets Manager
REDDIT_CLIENT_ID     = userdata.get("REDDIT_CLIENT_ID")
REDDIT_CLIENT_SECRET = userdata.get("REDDIT_CLIENT_SECRET")
REDDIT_USER_AGENT    = "script:pesquisa-acao:v1.0 (by u/HugoHickman)"
OPENROUTER_API_KEY   = userdata.get("OPENROUTER_API_KEY")

# You can optionally print the values to confirm they are loaded (for debugging)
# print(f"REDDIT_CLIENT_ID: {REDDIT_CLIENT_ID}")
# print(f"REDDIT_CLIENT_SECRET: {REDDIT_CLIENT_SECRET}")
# print(f"REDDIT_USER_AGENT: {REDDIT_USER_AGENT}")
# print(f"OPENROUTER_API_KEY: {OPENROUTER_API_KEY}")



In [None]:
# pesquisa_acao.py
# -*- coding: utf-8 -*-
"""
Pipeline robusto (degraus/fallbacks) para:
- Relatórios (CVM + SEC/EDGAR + fallback ADR com XBRL KPIs)
- Notícias (Google News RSS multilíngue) com aliases e expansões via LLM
- Reddit (PRAW -> Reddit RSS) **qualitativo e universal**, com expansão de aliases/subreddits via LLM
- Sumarização via Google AI Studio (Gemini 2.5 Flash) com heurística de fallback
- Multilíngue: pt-BR, en-US, zh-CN (ou o que vier no config)
- Dedup, retries/backoff, cache opcional
- Logs detalhados e _debug para depuração

GRÁTIS — sem dependência de APIs pagas.

Instalação mínima:
  pip install google-genai praw feedparser trafilatura httpx pydantic

Opcionais (recomendado p/ robustez e PDFs):
  pip install tenacity requests-cache url-normalize pymupdf pdfplumber

Credenciais (Colab ou variáveis de ambiente):
  GOOGLE_API_KEY
  REDDIT_CLIENT_ID / REDDIT_CLIENT_SECRET / REDDIT_USER_AGENT   # PRAW (grátis)

Importante p/ SEC:
  export USER_AGENT="seu-app/1.0 (contato: voce@dominio.com; site: https://seusite.com)"

Config externo (opcional):
- Variável PESQ_CONFIG aponta para um JSON. Se não existir, defaults internos serão usados.
Exemplo de config.json:
{
  "langs_news": [
    {"hl":"pt-BR","gl":"BR","ceid":"BR:pt-BR"},
    {"hl":"en-US","gl":"US","ceid":"US:en"},
    {"hl":"zh-CN","gl":"CN","ceid":"CN:zh-Hans"}
  ],
  "gemini_models": ["gemini-2.5-flash","gemini-2.5-flash-lite","gemini-2.5-pro"],
  "default_subreddits": ["stocks","investing","StockMarket","valueinvesting","wallstreetbets"],
  "sector_hints": {
    "Energy":["energy","oil","naturalgas","renewableenergy","utilities"]
  },
  "blocked_domains": ["facebook.com","instagram.com","x.com"],
  "max_workers": 32,
  "http_timeout": 25,
  "max_txt_chars": 2000,
  "news": {"max_per_lang": 18, "max_total_llm": 30},
  "reddit": {"max_items": 220, "min_per_sr": 2}
}
"""

import os, io, re, json, zipfile, urllib.parse, logging, csv, unicodedata, time, random, pathlib
import datetime as dt
from typing import List, Optional, Tuple, Dict, Any, Set
from concurrent.futures import ThreadPoolExecutor, as_completed

# --------- Base libs ---------
import httpx
import feedparser
from trafilatura import extract
from pydantic import BaseModel, Field, validator

# --------- LLM (Gemini) ---------
from google import genai

# --------- Colab secrets (opcional) ---------
try:
    from google.colab import userdata  # type: ignore
except Exception:
    userdata = None

# --------- Reddit PRAW (opcional) ---------
try:
    import praw  # type: ignore
except Exception:
    praw = None

# --------- Opcionais (robustez) ---------
try:
    from tenacity import retry, stop_after_attempt, wait_exponential_jitter, retry_if_exception_type
except Exception:
    retry = None

try:
    from url_normalize import url_normalize
except Exception:
    def url_normalize(u: str) -> str:
        return u

# PDFs
try:
    import fitz  # PyMuPDF
except Exception:
    fitz = None

try:
    import pdfplumber
except Exception:
    pdfplumber = None

# Cache (requests-cache ajuda se você também usa requests em outros trechos)
try:
    import requests_cache
except Exception:
    requests_cache = None

# ============= CONFIG / DEFAULTS =============
def _load_config() -> Dict[str, Any]:
    cfg_path = os.getenv("PESQ_CONFIG")
    if cfg_path and os.path.exists(cfg_path):
        try:
            with open(cfg_path, "r", encoding="utf-8") as f:
                return json.load(f)
        except Exception:
            pass
    # fallback: procurar "config.json" no CWD
    if os.path.exists("config.json"):
        try:
            with open("config.json", "r", encoding="utf-8") as f:
                return json.load(f)
        except Exception:
            pass
    # default minimal
    return {
        "langs_news": [
            {"hl": "pt-BR", "gl": "BR", "ceid": "BR:pt-BR"},
            {"hl": "en-US", "gl": "US", "ceid": "US:en"},
            {"hl": "zh-CN", "gl": "CN", "ceid": "CN:zh-Hans"},
        ],
        "gemini_models": ["gemini-2.5-flash","gemini-2.5-flash-lite","gemini-2.5-pro"],
        "default_subreddits": [
            "stocks","StockMarket","wallstreetbets","valueinvesting","securityanalysis",
            "investing","financialindependence","options","economy","technology",
            "business","Entrepreneur","news","Futurology"
        ],
        "sector_hints": {
            "Technology": ["tech","hardware","software","programming","cybersecurity"],
            "Energy": ["energy","oil","naturalgas","renewableenergy","utilities"],
            "Financials": ["banking","finance","wallstreetbetsELITE"],
            "Healthcare": ["biotech","medicine","healthcare","pharmacy"],
            "Consumer": ["retail","ecommerce","marketing","advertising"],
            "Industrial": ["manufacturing","supplychain","aviation"],
            "Telecom": ["telecom","5Gtechnology"],
            "Materials": ["mining","chemistry"],
            "Utilities": ["utilities"]
        },
        "blocked_domains": ["facebook.com","instagram.com","x.com"],
        "max_workers": min(32, (os.cpu_count() or 4) * 5),
        "http_timeout": 25,
        "max_txt_chars": 2000,
        "news": {"max_per_lang": 18, "max_total_llm": 30},
        "reddit": {"max_items": 220, "min_per_sr": 2}
    }

CONFIG = _load_config()

# ============= LOGGING =============
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
logging.basicConfig(
    level=LOG_LEVEL,
    format="%(levelname)s:%(name)s:%(message)s"
)
logger = logging.getLogger("pesquisa_acao")
for _name in ["trafilatura", "trafilatura.core", "trafilatura.downloads"]:
    logging.getLogger(_name).setLevel(logging.ERROR)

# ============= PERFORMANCE / LIMITES =============
MAX_WORKERS = int(os.getenv("THREADS", str(CONFIG.get("max_workers", 32))))
HTTP_TIMEOUT = int(CONFIG.get("http_timeout", 25))
MAX_TXT_CHARS = int(CONFIG.get("max_txt_chars", 2000))

MAX_NEWS_PER_LANG = int(CONFIG.get("news", {}).get("max_per_lang", 18))
MAX_NEWS_TOTAL_FOR_LLM = int(CONFIG.get("news", {}).get("max_total_llm", 30))

MAX_REDDIT_ITEMS = int(CONFIG.get("reddit", {}).get("max_items", 220))
REDDIT_FETCH_PER_SR_MIN = int(CONFIG.get("reddit", {}).get("min_per_sr", 2))

# ============= CONFIG GERAL =============
USER_AGENT = os.getenv("USER_AGENT", "pesquisa-acao/1.4 (contato: voce@example.com)")
LANGS_NEWS = CONFIG.get("langs_news", [])
GEMINI_MODEL_CANDIDATES = CONFIG.get("gemini_models", ["gemini-2.5-flash","gemini-2.5-flash-lite","gemini-2.5-pro"])
DEFAULT_SUBREDDITS = CONFIG.get("default_subreddits", [])
SECTOR_SUBREDDITS = CONFIG.get("sector_hints", {})
BLOCKED_DOMAINS = set(CONFIG.get("blocked_domains", []))

# ============= PEQUENOS SEEDS (mantidos mínimos) =============
# BR → ADR: usado apenas como "seed"; LLM também sugere tickers ADR e o código testa via SEC.
BR_TO_ADR_SEED = {
    "PETR": ["PBR", "PBR.A"],
    "VALE": ["VALE"],
    "BBDC": ["BBD", "BBDO"],
    "ITUB": ["ITUB"],
    "ABEV": ["ABEV"],
    "ELET": ["EBR", "EBR.B"],
    "SUZB": ["SUZ"],
    "BBAS": ["BDORY"],
    "WEGE": ["WEGEY"],
}

# Classes comuns de tickers US (seed mínima; LLM cobre o resto)
US_CLASS_TICKERS_SEED = {
    "GOOG": ["GOOG", "GOOGL"],
    "GOOGL": ["GOOG", "GOOGL"],
    "BRK.B": ["BRK.B", "BRK.A"],
    "BRK.A": ["BRK.A", "BRK.B"],
    "META": ["META", "FB"],
}

# ============= DISK CACHE p/ expansões LLM (evita custo/latência) =============
CACHE_DIR = pathlib.Path(os.getenv("PESQ_CACHE_DIR", pathlib.Path.home() / ".cache" / "pesquisa_acao"))
CACHE_DIR.mkdir(parents=True, exist_ok=True)
EXPAND_CACHE_FILE = CACHE_DIR / "expand_cache.json"

def _load_expand_cache() -> Dict[str, Any]:
    if EXPAND_CACHE_FILE.exists():
        try:
            return json.load(open(EXPAND_CACHE_FILE, "r", encoding="utf-8"))
        except Exception:
            return {}
    return {}

def _save_expand_cache(cache: Dict[str, Any]) -> None:
    try:
        json.dump(cache, open(EXPAND_CACHE_FILE, "w", encoding="utf-8"), ensure_ascii=False, indent=2)
    except Exception:
        pass

EXPAND_CACHE = _load_expand_cache()

# ============= SCHEMAS =============
class Item(BaseModel):
    title: str
    url: str
    published_at: Optional[str] = None
    summary: Optional[str] = None
    sentiment: Optional[float] = Field(default=None, ge=-1, le=1)

class BatchLLM(BaseModel):
    items: List[Item]
    summary: str
    note_score: int = Field(ge=0, le=100)
    note_label: str
    rationale: str

class LLMExpand(BaseModel):
    """Resposta do LLM para aliases e subreddits."""
    aliases: List[str] = Field(default_factory=list, description="nomes alternativos, tickers, marcas, produtos")
    translations: Dict[str, List[str]] = Field(default_factory=dict, description="ex.: {'pt': [...], 'en':[...], 'zh':[...]} ")
    subreddits: List[str] = Field(default_factory=list, description="subreddits relevantes (sem prefixo r/)")
    focus_terms: List[str] = Field(default_factory=list, description="termos qualitativos (earnings, lawsuit, strike, hack, ... até 25)")

    @validator("subreddits", pre=True)
    def _clean_sr(cls, v):
        out = []
        for s in v or []:
            s = str(s).strip().lstrip("r/").split("/")[0]
            if s:
                out.append(s)
        return out

# ============= HELPERS =============
def _now_utc_date() -> dt.date:
    return dt.datetime.now(dt.timezone.utc).date()

def _cap_ref_date(ref_date: dt.date) -> dt.date:
    today = _now_utc_date()
    if ref_date > today:
        logger.info(f"[Datas] ref_date {ref_date} > hoje {today}. Usando ref_cap = hoje.")
    return min(ref_date, today)

def _to_date(s: str) -> dt.date:
    s = s.strip()
    m = re.match(r"^\s*(\d{4})[-/](\d{1,2})[-/](\d{1,2})\s*$", s)
    if m:
        y, mm, dd = map(int, m.groups())
        return dt.date(y, mm, dd)
    if s.startswith("date(") and s.endswith(")"):
        nums = [int(x) for x in re.findall(r"\d+", s)]
        return dt.date(nums[0], nums[1], nums[2])
    return dt.date.fromisoformat(s)

def _parse_dt_iso(s: Optional[str]) -> Optional[dt.datetime]:
    if not s: return None
    ss = s.strip().replace("Z", "+00:00")
    try:
        d = dt.datetime.fromisoformat(ss)
        return d if d.tzinfo else d.replace(tzinfo=dt.timezone.utc)
    except Exception:
        try:
            d = dt.date.fromisoformat(ss[:10])
            return dt.datetime.combine(d, dt.time(0,0), tzinfo=dt.timezone.utc)
        except Exception:
            return None

def _within_window(published_at: Optional[str], ref_date: dt.date, lookback_days: int) -> bool:
    if not published_at: return False
    t = _parse_dt_iso(published_at)
    if not t:
        try:
            d = dt.date.fromisoformat(published_at[:10])
            t = dt.datetime.combine(d, dt.time(0,0), tzinfo=dt.timezone.utc)
        except Exception:
            return False
    d = t.date()
    return (d <= ref_date) and (d >= ref_date - dt.timedelta(days=lookback_days))

def _nota_0a10(score_0a100: int) -> int:
    return max(0, min(10, round(score_0a100 / 10)))

def _normalize_name(s: Optional[str]) -> str:
    if not s:
        return ""
    s = unicodedata.normalize("NFKD", s)
    s = "".join(ch for ch in s if not unicodedata.combining(ch))
    s = re.sub(r"[^A-Za-z0-9]+", " ", s).strip()
    return re.sub(r"\s+", " ", s).upper()

def _us_ticker_variants(ticker: str) -> List[str]:
    up = ticker.upper()
    if up in US_CLASS_TICKERS_SEED:
        return US_CLASS_TICKERS_SEED[up][:]
    m = re.match(r"^([A-Z]{1,4})[.\-]([A-Z])$", up)
    if m:
        base, cls = m.groups()
        other = "A" if cls == "B" else "B"
        return [up, f"{base}.{other}"]
    return [up]

def _name_matches(norm_row_name: str, aliases: List[str]) -> bool:
    for alias in aliases:
        if alias and (alias in norm_row_name or norm_row_name in alias):
            return True
    return False

def _get_secret(name: str) -> Optional[str]:
    val = None
    if userdata is not None:
        try:
            val = userdata.get(name)
        except Exception:
            val = None
    return val or os.getenv(name)

def _hostname(url: str) -> str:
    try:
        return urllib.parse.urlparse(url).hostname or ""
    except Exception:
        return ""

# ============= LEXICON / HEURÍSTICA =============
def _lexicon() -> Dict[str, set]:
    pos = {
        "alta","subida","otimista","positivo","crescimento","melhora","recorde","ganho","lucro",
        "aumenta","acima","supera","favorável","forte","fortes","expansão","upgrade","compra",
        "up","beat","growth","improves","record","profit","expansion","upgrade","买入","增长","改善"
    }
    neg = {
        "queda","cai","negativo","piora","prejuízo","risco","problema","baixa","reduz","abaixo",
        "perde","alerta","fraqueza","fraude","downgrade","venda","investigação",
        "down","miss","decline","loss","risk","问题","下跌","亏损","降级","卖出"
    }
    return {"pos": pos, "neg": neg}

def _heuristic_sentiment(items: List[Item]) -> Tuple[List[Item], str, int]:
    lex = _lexicon()
    pos, neg = lex["pos"], lex["neg"]
    scores = []
    for it in items:
        text = f"{it.title} {it.summary or ''}".lower()
        p = sum(1 for w in pos if w in text)
        n = sum(1 for w in neg if w in text)
        sc = 0.0
        if p or n:
            sc = (p - n) / max(1, (p + n))
        it.sentiment = sc
        if not it.summary:
            it.summary = it.title
        scores.append(sc)
    agg = sum(scores)/len(scores) if scores else 0.0
    note100 = int(round((agg + 1.0) * 50))
    note10  = _nota_0a10(note100)
    label = "neutral"
    if   agg >= 0.75: label = "strong_buy"
    elif agg >= 0.50: label = "buy"
    elif agg <= -0.75: label = "strong_sell"
    elif agg <= -0.50: label = "sell"
    resumo = f"Heuristic sentiment (pt/en/zh): mean={agg:.2f} → {note10}/10 ({label})."
    return items, resumo, note10

# ============= GEMINI (Google AI Studio) =============
def _genai_client(api_key: Optional[str] = None) -> genai.Client:
    api_key = api_key or _get_secret("GOOGLE_API_KEY") or _get_secret("GEMINI_API_KEY")
    if not api_key:
        raise RuntimeError("Defina GOOGLE_API_KEY (ou GEMINI_API_KEY). https://aistudio.google.com/app/apikey")
    return genai.Client(api_key=api_key)

def _normalize_items_for_llm(items: List[Item], max_items: int = 18) -> List[Item]:
    seen = set(); out = []
    for it in items:
        key = (url_normalize(it.url) if it.url else "").strip().lower() or it.title.strip().lower()
        if key in seen:
            continue
        seen.add(key)
        if it.summary and len(it.summary) > MAX_TXT_CHARS:
            it.summary = it.summary[:MAX_TXT_CHARS]
        out.append(it)
        if len(out) >= max_items:
            break
    return out

def _genai_resumir_e_notar(items: List[Item], empresa: str, ticker: str, setor: str, fonte: str,
                            debug: Dict[str, Any], kpis: Optional[Dict[str, Tuple[str,float,str]]] = None) -> Tuple[List[Item], str, int]:
    items = _normalize_items_for_llm(items, max_items=18)
    compact = "\n".join([f"- {it.title}\n{(it.summary or '')}" for it in items])

    kpi_txt = ""
    if kpis:
        parts = []
        for k,(date,val,unit) in kpis.items():
            parts.append(f"{k}={val} {unit} ({date})")
        kpi_txt = "KPIs XBRL até a data: " + "; ".join(parts) + ".\n"

    prompt = (
        "Você é analista de equity. Responda em **português do Brasil**.\n"
        "Fontes podem estar em PT/EN/ZH; normalize as conclusões.\n"
        f"Empresa: {empresa} ({ticker}) | Setor: {setor} | Fonte: {fonte}\n"
        + (kpi_txt if kpi_txt else "") +
        "Para cada item, gere UMA frase de síntese e um sentimento em [-1,1]. "
        "Depois gere: (1) resumo agregado (2–4 frases) (2) nota de 0 a 100 e "
        "label (strong_buy|buy|neutral|sell|strong_sell) apenas com base nestes itens e nos KPIs (se houver).\n"
        f"ITENS:\n{compact}"
    )

    last_error = None
    for model in GEMINI_MODEL_CANDIDATES:
        try:
            logger.info(f"[Gemini] Tentando modelo: {model}")
            client = _genai_client()
            response = client.models.generate_content(
                model=model,
                contents=prompt,
                config={
                    "temperature": 0,
                    "response_mime_type": "application/json",
                    "response_schema": BatchLLM,
                },
            )
            parsed = getattr(response, "parsed", None)
            if parsed is None:
                text = (response.text or "").strip()
                parsed = BatchLLM(**(json.loads(text) if text else {}))
            nota = _nota_0a10(parsed.note_score)
            out_items: List[Item] = []
            for i, it in enumerate(items):
                if i < len(parsed.items):
                    it.summary   = parsed.items[i].summary or it.summary
                    it.sentiment = parsed.items[i].sentiment
                out_items.append(it)
            debug["llm"] = {"provider": "gemini", "model": model, "fallback": False}
            return out_items, parsed.summary, nota
        except Exception as e:
            last_error = e
            logger.warning(f"[Gemini] Falhou com {model}: {type(e).__name__} — tentando próximo.")
    logger.warning(f"[Gemini] Todas as tentativas falharam: {type(last_error).__name__ if last_error else 'Erro desconhecido'}. Usando heurística.")
    debug["llm"] = {"provider": "gemini", "model": None, "fallback": True, "error": str(last_error)}
    return _heuristic_sentiment(items)

# ---------- LLM para expansão de aliases e subreddits ----------
def _expand_cache_key(empresa: str, ticker: str, setor: str) -> str:
    return f"{ticker.upper()}||{empresa.strip()}||{setor.strip()}"

def _llm_expand_aliases_and_subreddits(empresa: str, ticker: str, setor: str, debug: Dict[str,Any]) -> LLMExpand:
    """Usa Gemini para gerar aliases, traduções, subreddits e termos. Cacheia em disco (TTL simples 7 dias)."""
    key = _expand_cache_key(empresa, ticker, setor)
    rec = EXPAND_CACHE.get(key)
    if rec and isinstance(rec, dict):
        try:
            ts = float(rec.get("_ts", 0))
            if time.time() - ts < 7*24*3600:
                parsed = LLMExpand(**rec.get("payload", {}))
                debug["llm_expand"] = {"model": rec.get("model","cache"), "fallback": rec.get("fallback", False), "cache": True}
                return parsed
        except Exception:
            pass

    prompt = (
        "Atue como pesquisador de equity. Gere um JSON compacto com campos:\n"
        "aliases: [strings]\n"
        "translations: { 'pt': [..], 'en':[...], 'zh':[...]} (se aplicável)\n"
        "subreddits: [strings]  # sem prefixo r/\n"
        "focus_terms: [strings] # até 25 termos qualitativos úteis em buscas (earnings, guidance, outage, lawsuit, strike, hack, recall, boycott, layoff, churn, price hike, product issue, CEO, CFO, union, subsidy, ban, regulation, sanction, antitrust, class action, short seller, whistleblower, supply chain, shutdown, plant, outage, cybersecurity, data breach, downtime, margin, debt, downgrade, upgrade)\n\n"
        "Regras:\n- Limite aliases a 20, subreddits a 20.\n- Inclua tickers-irmãos (ex.: GOOG/GOOGL; BRK.A/BRK.B) se existir.\n- Para empresa estrangeira com ADR, inclua tickers ADR se conhecidos.\n- Só liste subreddits que existem publicamente e façam sentido.\n- Evite duplicatas; normalize subreddits sem 'r/'.\n"
        f"Empresa: {empresa}\nTicker: {ticker}\nSetor: {setor}\n"
    )
    last_error = None
    for model in GEMINI_MODEL_CANDIDATES:
        try:
            client = _genai_client()
            response = client.models.generate_content(
                model=model,
                contents=prompt,
                config={
                    "temperature": 0.2,
                    "response_mime_type": "application/json",
                    "response_schema": LLMExpand,
                },
            )
            parsed = getattr(response, "parsed", None)
            if parsed is None:
                text = (response.text or "").strip()
                parsed = LLMExpand(**(json.loads(text) if text else {}))
            debug["llm_expand"] = {"model": model, "fallback": False, "cache": False}
            # save cache
            EXPAND_CACHE[key] = {"_ts": time.time(), "model": model, "fallback": False, "payload": parsed.dict()}
            _save_expand_cache(EXPAND_CACHE)
            return parsed
        except Exception as e:
            last_error = e
            logger.debug(f"[LLM expand] falhou {model}: {e}")

    # Fallback heurístico
    aliases: Set[str] = set([empresa.strip(), ticker.strip()] + _us_ticker_variants(ticker))
    corp = [" Inc", " Inc.", " Corporation", " Corp", " Corp.", " Ltd", " Ltd.", " S.A.", " S A", " PLC", " N.V.", ", SA", ", Inc."]
    for c in corp:
        if empresa.endswith(c):
            aliases.add(empresa.replace(c, "").strip())
    base_sr = set(DEFAULT_SUBREDDITS)
    base_sr.update(SECTOR_SUBREDDITS.get(setor, []))
    focus_terms = [
        "earnings","guidance","outage","lawsuit","strike","hack","recall","boycott","layoff",
        "churn","price hike","product issue","CEO","CFO","union","subsidy","ban","regulation",
        "sanction","antitrust","class action","short seller","whistleblower","supply chain","shutdown",
        "plant","cybersecurity","data breach","downtime","margin","debt","downgrade","upgrade"
    ]
    parsed = LLMExpand(
        aliases=list({a for a in aliases if a})[:20],
        translations={},
        subreddits=[s for s in base_sr][:20],
        focus_terms=focus_terms[:25]
    )
    debug["llm_expand"] = {"model": None, "fallback": True, "error": str(last_error), "cache": False}
    return parsed

# ============= Cache/retry HTTP ============
if requests_cache and os.getenv("ENABLE_REQUESTS_CACHE", "0") == "1":
    try:
        requests_cache.install_cache("pesquisa_acao_cache", backend="sqlite", expire_after=6*3600)
        logger.info("[Cache] requests-cache habilitado (6h).")
    except Exception:
        pass

def _retryable():
    if retry:
        return retry(
            reraise=True,
            stop=stop_after_attempt(3),
            wait=wait_exponential_jitter(initial=1, max=8),
            retry=retry_if_exception_type(Exception),
        )
    def deco(fn): return fn
    return deco

@_retryable()
def _http_get(url: str, timeout: int = HTTP_TIMEOUT) -> Optional[str]:
    if any(d in (url.lower()) for d in BLOCKED_DOMAINS):
        return None
    headers = {"User-Agent": USER_AGENT, "Accept": "text/html,application/xhtml+xml"}
    with httpx.Client(headers=headers, timeout=timeout, follow_redirects=True) as cli:
        r = cli.get(url)
        if r.status_code != 200 or not r.text:
            raise RuntimeError(f"GET {url} -> {r.status_code}")
        time.sleep(0.07 + random.random()*0.08)  # rate-limit leve
        return r.text

def _fetch_clean_text(url: str) -> str:
    if not url:
        return ""
    try:
        html = _http_get(url)
    except Exception:
        html = None
    if not html:
        return ""
    try:
        txt = extract(html, include_links=False) or ""
    except Exception:
        txt = ""
    return txt.strip()

def _fetch_pdf_text(url: str, limit_chars: int = 10000) -> str:
    if not url:
        return ""
    text = ""
    try:
        if fitz:
            with httpx.Client(headers={"User-Agent": USER_AGENT}, timeout=HTTP_TIMEOUT) as cli:
                r = cli.get(url)
                if r.status_code == 200:
                    with fitz.open(stream=r.content, filetype="pdf") as doc:
                        for page in doc:
                            text += page.get_text()
                            if len(text) >= limit_chars:
                                break
                    return text[:limit_chars]
    except Exception:
        pass
    try:
        if pdfplumber:
            with httpx.Client(headers={"User-Agent": USER_AGENT}, timeout=HTTP_TIMEOUT) as cli:
                r = cli.get(url)
                if r.status_code == 200:
                    with io.BytesIO(r.content) as f:
                        with pdfplumber.open(f) as doc:
                            for page in doc.pages:
                                text += page.extract_text() or ""
                                if len(text) >= limit_chars:
                                    break
                    return text[:limit_chars]
    except Exception:
        pass
    return ""

def _extract_text_general(url: str) -> str:
    if not url:
        return ""
    ul = url.lower()
    if ul.endswith(".pdf") and (fitz or pdfplumber):
        return _fetch_pdf_text(url, limit_chars=MAX_TXT_CHARS)
    return _fetch_clean_text(url)[:MAX_TXT_CHARS]

# ============= NOTÍCIAS (Google News RSS) =============
def _google_news_rss(query: str, lang: Dict[str,str]) -> str:
    q = urllib.parse.quote_plus(query)
    return f"https://news.google.com/rss/search?q={q}&hl={lang['hl']}&gl={lang['gl']}&ceid={lang['ceid']}"

def _dedup(items: List[Item]) -> List[Item]:
    seen = set(); out = []
    for it in items:
        key = (url_normalize(it.url) if it.url else "").strip().lower() or it.title.strip().lower()
        if key in seen:
            continue
        seen.add(key)
        out.append(it)
    return out

def _enrich_bodies_parallel(items: List[Item]) -> None:
    def _worker(it: Item) -> str:
        return _extract_text_general(it.url)
    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex:
        futs = {ex.submit(_worker, it): it for it in items}
        for fut in as_completed(futs):
            it = futs[fut]
            try:
                body = fut.result() or ""
            except Exception:
                body = ""
            if body:
                it.summary = body

def _build_news_queries(empresa: str, ticker: str, setor: str, expand: LLMExpand) -> List[str]:
    aliases = set([empresa, ticker] + _us_ticker_variants(ticker))
    aliases.update(expand.aliases or [])
    for lang, arr in (expand.translations or {}).items():
        aliases.update(arr or [])
    aliases = {a.strip() for a in aliases if a and len(a.strip()) >= 2}
    focus = (expand.focus_terms or [])[:10]
    queries = []
    big_or = " OR ".join([f"\"{a}\"" if " " in a else a for a in sorted(aliases)])
    queries.append(big_or)
    for t in focus:
        queries.append(f"({big_or}) {t}")
    return queries[:8]

def _collect_news(company: str, ticker: str, setor: str, expand: LLMExpand,
                  ref_cap: dt.date, lookback_days: int) -> List[Item]:
    items: List[Item] = []
    queries = _build_news_queries(company, ticker, setor, expand)
    langs = LANGS_NEWS or [
        {"hl": "pt-BR", "gl": "BR", "ceid": "BR:pt-BR"},
        {"hl": "en-US", "gl": "US", "ceid": "US:en"},
        {"hl": "zh-CN", "gl": "CN", "ceid": "CN:zh-Hans"},
    ]

    for L in langs:
        lang_items: List[Item] = []
        for q in queries:
            feed_url = _google_news_rss(q, L)
            try:
                feed = feedparser.parse(feed_url)
            except Exception:
                continue
            for e in feed.entries[:MAX_NEWS_PER_LANG]:
                link = e.get("link") or e.get("id") or ""
                title = e.get("title") or ""
                pub = None
                if getattr(e, "published_parsed", None):
                    pub = dt.datetime(*e.published_parsed[:6], tzinfo=dt.timezone.utc).isoformat()
                it = Item(title=title, url=link, published_at=pub)
                if _within_window(it.published_at, ref_cap, lookback_days):
                    lang_items.append(it)
        logger.info(f"[Noticias] {L.get('hl','?')}: {len(lang_items)} itens na janela (pré-dedup).")
        items.extend(lang_items)

    items = _dedup(items)[:MAX_NEWS_TOTAL_FOR_LLM]
    return items

def agente_noticias(empresa: str, ticker: str, setor: str,
                    ref_cap: dt.date, lookback_days: int,
                    debug: Dict[str, Any]) -> Tuple[str, int]:
    t0 = time.time()
    expand = _llm_expand_aliases_and_subreddits(empresa, ticker, setor, debug)
    items = _collect_news(empresa, ticker, setor, expand, ref_cap, lookback_days)
    logger.info(f"[Noticias] Total agregados (dedup): {len(items)}")

    if not items:
        debug["noticias"] = {"n_items": 0, "elapsed_s": round(time.time()-t0, 2)}
        return ("Sem notícias relevantes no período.", 5)

    _enrich_bodies_parallel(items)
    out_items, resumo, nota = _genai_resumir_e_notar(items, empresa, ticker, setor, "Notícias", debug)
    debug["noticias"] = {
        "n_items": len(items),
        "elapsed_s": round(time.time()-t0, 2),
        "queries": _build_news_queries(empresa, ticker, setor, expand)
    }
    return resumo, nota

# ============= REDDIT (PRAW -> RSS) =============
def _reddit_client() -> Optional[Any]:
    if praw is None:
        logger.warning("[Reddit] PRAW não instalado. Use: pip install praw")
        return None
    cid = _get_secret("REDDIT_CLIENT_ID")
    csecret = _get_secret("REDDIT_CLIENT_SECRET")
    uagent = _get_secret("REDDIT_USER_AGENT") or USER_AGENT
    if not (cid and csecret and uagent):
        logger.warning("[Reddit] Credenciais ausentes. Set REDDIT_CLIENT_ID/SECRET/USER_AGENT.")
        return None
    try:
        reddit = praw.Reddit(
            client_id=cid, client_secret=csecret, user_agent=uagent, check_for_async=False
        )
        reddit.read_only = True
        return reddit
    except Exception as e:
        logger.warning(f"[Reddit] PRAW init falhou: {e}")
        return None

def _collect_from_reddit_praw(queries: List[str], subreddits_incl: List[str],
                              limit_total: int = MAX_REDDIT_ITEMS) -> List[dict]:
    reddit = _reddit_client()
    if reddit is None:
        return []
    rows = []
    per_sr = max(REDDIT_FETCH_PER_SR_MIN, limit_total // max(1, len(subreddits_incl)))
    for sr in subreddits_incl:
        try:
            sub = reddit.subreddit(sr)
        except Exception:
            continue
        for q in queries[:6]:
            try:
                subs = sub.search(
                    query=q, sort="new", time_filter="all", syntax="lucene", limit=per_sr
                )
                for s in subs:
                    rows.append({
                        "data": {
                            "title": s.title or "",
                            "permalink": getattr(s, "permalink", "") or "",
                            "url": getattr(s, "url", "") or "",
                            "created_utc": int(getattr(s, "created_utc", 0)) or None,
                            "selftext": (getattr(s, "selftext", "") or "")[:MAX_TXT_CHARS],
                            "subreddit": str(getattr(s, "subreddit", "")) or "",
                            "score": int(getattr(s, "score", 0)) or 0
                        }
                    })
            except Exception as e:
                logger.debug(f"[Reddit] Erro r/{sr} q='{q}': {e}")
                continue
    logger.info(f"[Reddit] PRAW coletou {len(rows)} itens.")
    return rows

def _collect_from_reddit_rss(queries: List[str], subreddits_incl: List[str], limit_per_q: int = 80) -> List[dict]:
    out = []
    # busca global
    for q in queries[:6]:
        rss_url = f"https://www.reddit.com/search.rss?q={urllib.parse.quote_plus(q)}&sort=new"
        try:
            feed = feedparser.parse(rss_url)
        except Exception:
            continue
        for e in feed.entries[:limit_per_q]:
            created_iso = None
            if getattr(e, "published_parsed", None):
                created_iso = dt.datetime(*e.published_parsed[:6], tzinfo=dt.timezone.utc).isoformat()
            out.append({
                "data": {
                    "title": e.get("title",""),
                    "permalink": "",
                    "url": e.get("link",""),
                    "created_utc": int(_parse_dt_iso(created_iso).timestamp()) if created_iso else None,
                    "selftext": (e.get("summary") or "")[:MAX_TXT_CHARS],
                    "subreddit": "",
                    "score": 0
                }
            })
    # busca por subreddit específico via RSS
    for sr in subreddits_incl[:12]:
        rss_url = f"https://www.reddit.com/r/{sr}/search.rss?q={urllib.parse.quote_plus(' OR '.join(queries[:3]))}&restrict_sr=1&sort=new"
        try:
            feed = feedparser.parse(rss_url)
        except Exception:
            continue
        for e in feed.entries[:limit_per_q//2]:
            created_iso = None
            if getattr(e, "published_parsed", None):
                created_iso = dt.datetime(*e.published_parsed[:6], tzinfo=dt.timezone.utc).isoformat()
            out.append({
                "data": {
                    "title": e.get("title",""),
                    "permalink": "",
                    "url": e.get("link",""),
                    "created_utc": int(_parse_dt_iso(created_iso).timestamp()) if created_iso else None,
                    "selftext": (e.get("summary") or "")[:MAX_TXT_CHARS],
                    "subreddit": sr,
                    "score": 0
                }
            })
    logger.info(f"[Reddit] RSS coletou {len(out)} itens.")
    return out

def _build_reddit_queries(empresa: str, ticker: str, setor: str, expand: LLMExpand) -> Tuple[List[str], List[str]]:
    aliases: Set[str] = set([empresa, ticker] + _us_ticker_variants(ticker))
    aliases.update([a for a in expand.aliases or [] if a])
    for arr in (expand.translations or {}).values():
        aliases.update(arr or [])
    aliases = {a.strip() for a in aliases if a and len(a.strip()) >= 2}

    terms = expand.focus_terms or []
    core = " OR ".join([f"\"{a}\"" if " " in a else a for a in sorted(aliases)])
    queries = [core]
    for t in terms[:10]:
        queries.append(f"({core}) {t}")

    base_sr = set(DEFAULT_SUBREDDITS)
    base_sr.update(SECTOR_SUBREDDITS.get(setor, []))
    for sr in (expand.subreddits or []):
        if sr: base_sr.add(sr.strip().lstrip("r/"))
    subreddits = [s for s in sorted(base_sr) if re.match(r"^[A-Za-z0-9_][A-Za-z0-9_]*$", s)]

    return queries[:8], subreddits[:30]

def _enrich_reddit_items(items: List[Item]) -> None:
    def _worker(it: Item) -> str:
        if (not it.summary) and it.url and ("reddit.com" not in it.url.lower()):
            return _extract_text_general(it.url)
        return it.summary or ""
    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex:
        futs = {ex.submit(_worker, it): it for it in items}
        for fut in as_completed(futs):
            it = futs[fut]
            try:
                body = fut.result() or ""
            except Exception:
                body = ""
            if body:
                it.summary = body[:MAX_TXT_CHARS]

def agente_reddit(empresa: str, ticker: str, setor: str,
                  ref_cap: dt.date, lookback_days: int,
                  debug: Dict[str, Any]) -> Tuple[str, int]:
    expand = _llm_expand_aliases_and_subreddits(empresa, ticker, setor, debug)
    queries, subreddits = _build_reddit_queries(empresa, ticker, setor, expand)
    debug["reddit_query_meta"] = {"queries": queries, "subreddits": subreddits}

    rows = _collect_from_reddit_praw(queries, subreddits, limit_total=MAX_REDDIT_ITEMS)
    if not rows:
        logger.warning("[Reddit] Sem PRAW (ou sem credenciais) ou sem resultados. Usando fallback RSS.")
        rows = _collect_from_reddit_rss(queries, subreddits, limit_per_q=90)

    items: List[Item] = []
    for ch in rows:
        d = ch.get("data", {})
        title = d.get("title") or ""
        permalink = d.get("permalink", "")
        link = ("https://www.reddit.com" + permalink) if permalink else (d.get("url", "") or "")
        ts = d.get("created_utc")
        published_at = dt.datetime.fromtimestamp(ts, tz=dt.timezone.utc).isoformat() if ts else None
        body = d.get("selftext") or ""
        it = Item(title=title, url=link, published_at=published_at, summary=body[:MAX_TXT_CHARS])
        if _within_window(it.published_at, ref_cap, lookback_days):
            items.append(it)

    before = len(items)
    items = _dedup(items)
    logger.info(f"[Reddit] Itens filtrados na janela: {len(items)} (antes dedup: {before})")

    if not items:
        debug["reddit"] = {"n_items": 0}
        return ("Sem menções relevantes no Reddit no período.", 5)

    _enrich_reddit_items(items)
    out_items, resumo, nota = _genai_resumir_e_notar(items, empresa, ticker, setor, "Reddit", debug)
    debug["reddit"] = {"n_items": len(items)}
    return (resumo, nota)

# ============= RELATÓRIOS (SEC/CVM + ADR + XBRL) =============
def _ticker_eh_br(ticker: str) -> bool:
    return bool(re.search(r"\d$", ticker.strip().upper()))

def _sec_get_cik(ticker: str) -> Optional[str]:
    """Obtém CIK via company_tickers.json. Se falhar, tenta variantes de classe."""
    try:
        with httpx.Client(headers={"User-Agent": USER_AGENT}, timeout=HTTP_TIMEOUT) as cli:
            r = cli.get("https://www.sec.gov/files/company_tickers.json")
            if r.status_code != 200:
                return None
            data = r.json()
    except Exception:
        return None
    up = ticker.upper()
    for _, rec in data.items():
        if rec.get("ticker", "").upper() == up:
            return f"{int(rec['cik_str']):010d}"
    # tenta variantes de classe (GOOG/GOOGL etc.)
    for var in _us_ticker_variants(up):
        if var == up:
            continue
        for _, rec in data.items():
            if rec.get("ticker", "").upper() == var:
                return f"{int(rec['cik_str']):010d}"
    return None

def _sec_search_filings_efts(ticker: str, form_types: List[str], size: int = 60) -> List[dict]:
    body = {
        "query": { "query_string": { "query": f'ticker:{ticker.upper()} AND ( ' + " OR ".join([f'formType:\"{f}\"' for f in form_types]) + " )" } },
        "from": 0, "size": size,
        "sort": [{ "filedAt": { "order": "desc" }}]
    }
    headers = {"User-Agent": USER_AGENT, "Content-Type": "application/json"}
    try:
        with httpx.Client(headers=headers, timeout=HTTP_TIMEOUT) as cli:
            r = cli.post("https://efts.sec.gov/LATEST/search-index", json=body)
            if r.status_code != 200:
                logger.info(f"[SEC search-index] Falha {r.status_code}")
                return []
            hits = r.json().get("hits", {}).get("hits", [])
            out = []
            for h in hits:
                src = h.get("_source", {})
                out.append({
                    "adsh": src.get("adsh") or src.get("accNo") or "",
                    "filedAt": src.get("filedAt"),
                    "cik": str(src.get("cik") or ""),
                    "formType": src.get("formType"),
                })
            return out
    except Exception:
        return []

def _sec_filing_index(cik: str, accession: str) -> dict:
    acc = accession.replace("-", "")
    url = f"https://www.sec.gov/Archives/edgar/data/{int(cik)}/{acc}/index.json"
    try:
        with httpx.Client(headers={"User-Agent": USER_AGENT}, timeout=HTTP_TIMEOUT) as cli:
            r = cli.get(url)
            if r.status_code != 200:
                return {}
            return r.json()
    except Exception:
        return {}

def _sec_companyfacts(cik: str) -> dict:
    url = f"https://data.sec.gov/api/xbrl/companyfacts/CIK{int(cik):010d}.json"
    try:
        with httpx.Client(headers={"User-Agent": USER_AGENT}, timeout=HTTP_TIMEOUT) as cli:
            r = cli.get(url)
            if r.status_code != 200:
                return {}
            return r.json()
    except Exception:
        return {}

def _xbrl_latest_value(series: List[dict], ref_cap: dt.date) -> Optional[Tuple[str,float,str]]:
    best = None; best_dt = None
    for row in series or []:
        d = row.get("end") or row.get("fy")
        if not d:
            continue
        try:
            dd = dt.date.fromisoformat(str(d)[:10])
        except Exception:
            continue
        if dd <= ref_cap and (best is None or dd > best_dt):
            val = row.get("val")
            best = (str(dd), float(val) if val is not None else 0.0); best_dt = dd
    if not best:
        return None
    return (best[0], best[1], "")

def _xbrl_pick_unit(units_dict: Dict[str, List[dict]]) -> Tuple[str,List[dict]]:
    if "USD" in units_dict:
        return "USD", units_dict["USD"]
    for k, v in units_dict.items():
        if isinstance(v, list) and v:
            return k, v
    return "", []

def _sec_kpis(cik: str, ref_cap: dt.date) -> Dict[str, Tuple[str,float,str]]:
    cf = _sec_companyfacts(cik)
    out: Dict[str, Tuple[str,float,str]] = {}
    facts = cf.get("facts", {}).get("us-gaap", {})
    concepts = {
        "Revenue": "Revenues",
        "NetIncome": "NetIncomeLoss",
        "EPS_Diluted": "EarningsPerShareDiluted",
        "OCF": "NetCashProvidedByUsedInOperatingActivities",
    }
    for label, concept in concepts.items():
        try:
            units = facts[concept]["units"]
        except Exception:
            continue
        unit_name, series = _xbrl_pick_unit(units)
        if not series:
            continue
        val = _xbrl_latest_value(series, ref_cap)
        if val:
            out[label] = (val[0], val[1], unit_name)
    return out

def _choose_best_attachment(index_json: dict) -> Optional[str]:
    items = (index_json.get("directory", {}) or {}).get("item", [])
    if not items:
        return None
    def _build_url(name: str) -> str:
        dirurl = index_json.get("directory", {}).get("url", "")
        if dirurl and not dirurl.endswith("/"):
            dirurl += "/"
        return dirurl + name if dirurl else name

    ex99 = [f for f in items if str(f.get("name","")).upper().startswith("EX-99")]
    if ex99:
        return _build_url(ex99[0]["name"])
    htmls = [f for f in items if str(f.get("name","")).lower().endswith((".htm",".html"))]
    if htmls:
        return _build_url(htmls[0]["name"])
    pdfs = [f for f in items if str(f.get("name","")).lower().endswith(".pdf")]
    if pdfs:
        return _build_url(pdfs[0]["name"])
    primaries = [f for f in items if str(f.get("name","")).lower().startswith("primary")]
    if primaries:
        return _build_url(primaries[0]["name"])
    return _build_url(items[0]["name"])

def _sec_collect_best_item(ticker: str, empresa: str, ref_cap: dt.date) -> Optional[Item]:
    form_priority = ["10-Q","10-K","6-K","20-F","8-K"]
    # 1) efts search-index
    hits = _sec_search_filings_efts(ticker, form_priority, size=60)
    if not hits:
        # 1b) tentar variantes de classe
        for var in _us_ticker_variants(ticker):
            if var == ticker:
                continue
            hits = _sec_search_filings_efts(var, form_priority, size=60)
            if hits:
                break
    if not hits:
        # 2) submissions fallback
        cik = _sec_get_cik(ticker)
        if not cik:
            return None
        sub_url = f"https://data.sec.gov/submissions/CIK{cik}.json"
        try:
            with httpx.Client(headers={"User-Agent": USER_AGENT}, timeout=HTTP_TIMEOUT) as cli:
                r = cli.get(sub_url)
                if r.status_code != 200:
                    return None
                sub = r.json()
        except Exception:
            return None
        forms = sub.get("filings", {}).get("recent", {})
        items: List[Item] = []
        for i, ftype in enumerate(forms.get("form", [])):
            if ftype not in set(form_priority):
                continue
            fdate = forms["filingDate"][i]
            acc = forms["accessionNumber"][i].replace("-", "")
            prim = forms["primaryDocument"][i]
            doc_url = f"https://www.sec.gov/Archives/edgar/data/{int(cik)}/{acc}/{prim}"
            items.append(Item(
                title=f"{ftype} {empresa} ({fdate})",
                url=doc_url,
                published_at=fdate,
                summary=""
            ))
        latest = _pick_latest_leq(items, ref_cap)
        return latest

    def _d(h):
        return _parse_dt_iso(h.get("filedAt") or "") or dt.datetime.min.replace(tzinfo=dt.timezone.utc)
    hits = [h for h in hits if _d(h).date() <= ref_cap]
    if not hits:
        return None
    pri = {f:i for i,f in enumerate(form_priority)}
    hits.sort(key=lambda h: (pri.get(h.get("formType","ZZZ"), 999), _d(h)), reverse=False)
    best_form = hits[0]["formType"]
    cand = [h for h in hits if h["formType"] == best_form]
    cand.sort(key=lambda h: _d(h), reverse=True)
    chosen = cand[0]
    cik = chosen.get("cik") or _sec_get_cik(ticker)
    adsh = chosen.get("adsh","")
    if not cik or not adsh:
        return None

    idx = _sec_filing_index(cik, adsh)
    best_url = _choose_best_attachment(idx)
    filed_date = (chosen.get("filedAt") or "")[:10] or None
    title = f"{best_form} {empresa} ({filed_date or 'N/D'})"
    summary = _extract_text_general(best_url) if best_url else ""
    return Item(title=title, url=best_url or "", published_at=filed_date, summary=summary)

def _cvm_rows_for_year(year: int, kind: str) -> Optional[List[List[str]]]:
    kind = kind.lower()
    url = f"https://dados.cvm.gov.br/dados/CIA_ABERTA/DOC/{kind.upper()}/DADOS/{kind}_cia_aberta_{year}.zip"
    try:
        with httpx.Client(headers={"User-Agent": USER_AGENT}, timeout=HTTP_TIMEOUT) as cli:
            r = cli.get(url)
            if r.status_code != 200:
                return None
            z = zipfile.ZipFile(io.BytesIO(r.content))
            name = next((n for n in z.namelist() if n.lower().endswith(".csv") and kind in n.lower() and "cia_aberta" in n.lower()), None)
            if not name:
                return None
            raw = z.read(name).decode("utf-8", errors="ignore")
            rows = list(csv.reader(io.StringIO(raw), delimiter=';', quotechar='"'))
        return rows
    except Exception:
        return None

def _find_cols(header: List[str], *alts) -> Optional[int]:
    header = [h.strip().lower() for h in header]
    for a in alts:
        if a in header:
            return header.index(a)
    return None

def _guess_aliases_br(ticker: str, empresa: str) -> List[str]:
    base = re.sub(r"\d+$", "", (ticker or "").upper())
    norm_emp = _normalize_name(empresa)
    aliases = {norm_emp, empresa, ticker.upper()}
    common = {
        "PETR": "PETROLEO BRASILEIRO S A PETROBRAS",
        "VALE": "VALE S A",
        "BBDC": "BANCO BRADESCO S A",
        "ITUB": "ITAU UNIBANCO HOLDING S A",
        "BBAS": "BANCO DO BRASIL S A",
        "ABEV": "AMBEV S A",
        "WEGE": "WEG S A",
        "ELET": "CENTRAIS ELETRICAS BRASILEIRAS S A ELETROBRAS",
        "SUZB": "SUZANO S A",
    }
    if base in common:
        aliases.add(common[base])
    if norm_emp.endswith(" S A"):
        aliases.add(norm_emp.replace(" S A", ""))
    return list(aliases)

def _cvm_collect_items(empresa: str, ticker: str, ref_cap: dt.date) -> List[Item]:
    items: List[Item] = []
    years = {ref_cap.year, ref_cap.year - 1}
    aliases = _guess_aliases_br(ticker, empresa)

    def _collect_one(kind: str, y: int) -> List[Item]:
        rows = _cvm_rows_for_year(y, kind)
        out: List[Item] = []
        if not rows or len(rows) < 2:
            return out
        header = [c.strip().lower() for c in rows[0]]
        i_nome = _find_cols(header, "denom_cia","nome_cia","nome_empresarial")
        i_data = _find_cols(header, "dt_refer","data_refer","dt_referencias")
        i_link = _find_cols(header, "link_doc","link_documento","end_doc")
        if i_nome is None:
            return out
        for cols in rows[1:]:
            row_name = (cols[i_nome] if i_nome < len(cols) else "") or ""
            norm_row = _normalize_name(row_name)
            if not _name_matches(norm_row, aliases):
                continue
            dt_ref = None; display = str(y)
            if i_data is not None and i_data < len(cols) and (cols[i_data] or "").strip():
                s_val = cols[i_data].strip()
                display = s_val
                try:
                    dt_ref = dt.date.fromisoformat(s_val)
                except Exception:
                    try:
                        d,m,a = s_val.split("/")
                        dt_ref = dt.date(int(a), int(m), int(d))
                    except Exception:
                        dt_ref = None
            url_doc = cols[i_link] if (i_link is not None and i_link < len(cols)) else ""
            out.append(Item(
                title=f"{kind.upper()} {row_name} ({display})",
                url=url_doc,
                published_at=dt_ref.isoformat() if dt_ref else None,
                summary=""
            ))
        return out

    with ThreadPoolExecutor(max_workers=min(4, len(years)*2)) as ex:
        jobs = [ex.submit(_collect_one, kind, y) for kind in ("itr", "dfp") for y in sorted(years, reverse=True)]
        for fut in as_completed(jobs):
            items.extend(fut.result() or [])

    logger.info(f"[Relatorios][CVM] '{empresa}': coletados {len(items)} candidatos (ITR+DFP).")
    return items

def _pick_latest_leq(items: List[Item], ref_cap: dt.date) -> Optional[Item]:
    best = None; best_date = None
    for it in items:
        t = _parse_dt_iso(it.published_at)
        if not t:
            continue
        d = t.date()
        if d <= ref_cap and (best is None or d > best_date):
            best = it; best_date = d
    return best

def _extract_tickers_from_llm_aliases(expand: LLMExpand) -> List[str]:
    """Extrai candidatos a tickers US/ADR a partir de aliases do LLM (heurística)."""
    cands = set()
    for a in (expand.aliases or []):
        a = a.strip().upper()
        if re.match(r"^[A-Z]{1,5}([.\-][A-Z])?$", a):
            cands.add(a)
    return list(cands)

def _try_sec_with_adr_fallback(empresa: str, ticker_br: str, ref_cap: dt.date, debug: Dict[str, Any]) -> Optional[Item]:
    # 1) seeds mínimos por mapa
    tried = set()
    base = re.sub(r"\d+$", "", (ticker_br or "").upper())
    for adr in BR_TO_ADR_SEED.get(base, []):
        tried.add(adr)
        logger.info(f"[Relatorios][SEC] Tentando ADR seed: {adr}")
        it = _sec_collect_best_item(adr, empresa, ref_cap)
        if it:
            debug.setdefault("relatorios", {})["adr_fallback"] = {"via":"seed", "ticker": adr}
            return it
    # 2) usar LLM para sugerir tickers (aliases → regex de ticker)
    try:
        expand = _llm_expand_aliases_and_subreddits(empresa, ticker_br, "BR_ADR", debug)
    except Exception:
        expand = LLMExpand()
    llm_cands = [t for t in _extract_tickers_from_llm_aliases(expand) if t not in tried]
    for adr in llm_cands[:6]:
        logger.info(f"[Relatorios][SEC] Tentando ADR via LLM: {adr}")
        it = _sec_collect_best_item(adr, empresa, ref_cap)
        if it:
            debug.setdefault("relatorios", {})["adr_fallback"] = {"via":"llm", "ticker": adr}
            return it
    return None

def agente_relatorios(empresa: str, ticker: str, setor: str,
                      ref_cap: dt.date, debug: Dict[str, Any]) -> Tuple[str, int]:
    t0 = time.time()

    if _ticker_eh_br(ticker):
        candidatos = _cvm_collect_items(empresa, ticker, ref_cap)
        latest = _pick_latest_leq(candidatos, ref_cap)
        if not latest:
            logger.info(f"[Relatorios] CVM vazia para {ticker}. Tentando SEC via ADR...")
            latest = _try_sec_with_adr_fallback(empresa, ticker, ref_cap, debug)

        if not latest:
            debug["relatorios"] = {
                "selected": None,
                "n_candidates": len(candidatos),
                "elapsed_s": round(time.time()-t0, 2)
            }
            return ("Sem relatórios elegíveis no período.", 5)

        if latest.url and not latest.summary:
            latest.summary = _extract_text_general(latest.url)

        debug["relatorios"] = {
            "selected": latest.title,
            "date": latest.published_at,
            "n_candidates": len(candidatos),
            "elapsed_s": round(time.time()-t0, 2)
        }
        out_items, resumo, nota = _genai_resumir_e_notar([latest], empresa, ticker, setor, "Relatórios", debug)
        return resumo, nota

    else:
        best = _sec_collect_best_item(ticker, empresa, ref_cap)
        if not best:
            for var in _us_ticker_variants(ticker):
                if var != ticker:
                    best = _sec_collect_best_item(var, empresa, ref_cap)
                    if best:
                        break
        if not best:
            debug["relatorios"] = {
                "selected": None,
                "n_candidates": 0,
                "elapsed_s": round(time.time()-t0, 2)
            }
            return ("Sem relatórios elegíveis no período.", 5)

        cik = _sec_get_cik(ticker) or _sec_get_cik(_us_ticker_variants(ticker)[0])
        kpis = _sec_kpis(cik, ref_cap) if cik else {}

        debug["relatorios"] = {
            "selected": best.title,
            "date": best.published_at,
            "n_candidates": 1,
            "elapsed_s": round(time.time()-t0, 2),
            "xbrl_kpis": kpis
        }
        out_items, resumo, nota = _genai_resumir_e_notar([best], empresa, ticker, setor, "Relatórios", debug, kpis=kpis)
        return resumo, nota

# ============= FORMATAÇÃO / ORQUESTRAÇÃO =============
def _bloco_fonte(resumo: str, nota: int) -> dict:
    return {"Empresa": {"resumo": resumo, "nota": nota}}

def _analise_para_data(empresa: str, ticker: str, setor: str, data_ref_str: str, lookback_days: int) -> dict:
    ref_date = _to_date(data_ref_str)
    ref_cap  = _cap_ref_date(ref_date)
    debug: Dict[str, Any] = {"ref_date": str(ref_date), "ref_cap": str(ref_cap)}

    resumo_rel, nota_rel = agente_relatorios(empresa, ticker, setor, ref_cap, debug)
    resumo_not, nota_not = agente_noticias (empresa, ticker, setor, ref_cap, lookback_days, debug)
    resumo_red, nota_red = agente_reddit   (empresa, ticker, setor, ref_cap, lookback_days, debug)

    out = {
        "data_de_referencia": data_ref_str,
        "Relatorios": _bloco_fonte(resumo_rel, nota_rel),
        "Noticias":   _bloco_fonte(resumo_not, nota_not),
        "Reddit":     _bloco_fonte(resumo_red, nota_red),
        "_debug":     debug
    }
    return out

def pesquisa_ação(ação: dict, lookback_days: int = 30) -> dict:
    """
    ação = {
        "codigo_da_ação": "PETR3"  # BR com dígito -> CVM; US sem dígito -> SEC
        "Nome_da_empresa": "Petrobras - Petróleo Brasileiro S.A.",
        "Setor_da_empresa": "Energy",
        "Final_do_teste": "2025-06-30",
        "hojé_em_dia": "2025-12-31"
    }
    """
    ticker = ação.get("codigo_da_ação") or ação.get("codigo_da_acao") or ""
    empresa = ação.get("Nome_da_empresa") or ação.get("nome_da_empresa") or ""
    setor   = ação.get("Setor_da_empresa") or ação.get("setor_da_empresa") or ""
    data_final_teste = ação.get("Final_do_teste")
    data_hoje_str    = ação.get("hojé_em_dia") or ação.get("hoje_em_dia")

    if not (ticker and empresa and setor and data_final_teste and data_hoje_str):
        raise ValueError("Entradas faltando: requer 'codigo_da_ação', 'Nome_da_empresa', 'Setor_da_empresa', 'Final_do_teste', 'hojé_em_dia'.")

    print(f">>> Rodando pesquisa_ação com DEBUG ... (threads={MAX_WORKERS})")
    t0 = time.time()
    analise_hoje  = _analise_para_data(empresa, ticker, setor, data_hoje_str, lookback_days)
    analise_final = _analise_para_data(empresa, ticker, setor, data_final_teste, lookback_days)
    elapsed = round(time.time()-t0, 2)
    print(f">>> Concluído em {elapsed}s.")

    saida = {
        "informacoes_da_empresa": {
            "codigo_da_acao": ticker,
            "nome_da_empresa": empresa
        },
        "analise_de_sentimentos": {
            "hoje_em_dia": analise_hoje,
            "Final_do_teste": analise_final
        },
        "_meta": {"elapsed_s": elapsed, "threads": MAX_WORKERS}
    }
    return saida

# ============= EXEMPLO RÁPIDO =============
if __name__ == "__main__":
    exemplo = {
        "codigo_da_ação": "PETR3",
        "Nome_da_empresa": "Petrobras - Petróleo Brasileiro S.A.",
        "Setor_da_empresa": "Energy",
        "Final_do_teste": "2025-06-30",
        "hojé_em_dia": "2025-09-06"
    }
    print(json.dumps(pesquisa_ação(exemplo, lookback_days=30), ensure_ascii=False, indent=2))


/tmp/ipython-input-691633801.py:251: PydanticDeprecatedSince20: Pydantic V1 style `@validator` validators are deprecated. You should migrate to Pydantic V2 style `@field_validator` validators, see the migration guide for more details. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.11/migration/
  @validator("subreddits", pre=True)


>>> Rodando pesquisa_ação com DEBUG ... (threads=10)


ERROR:trafilatura.utils:parsed tree length: 1, wrong data type or not valid HTML
ERROR:trafilatura.core:empty HTML tree: None
ERROR:trafilatura.utils:parsed tree length: 1, wrong data type or not valid HTML
ERROR:trafilatura.core:empty HTML tree: None


>>> Concluído em 276.88s.
{
  "informacoes_da_empresa": {
    "codigo_da_acao": "PETR3",
    "nome_da_empresa": "Petrobras - Petróleo Brasileiro S.A."
  },
  "analise_de_sentimentos": {
    "hoje_em_dia": {
      "data_de_referencia": "2025-09-06",
      "Relatorios": {
        "Empresa": {
          "resumo": "A análise do ITR da Petrobras para o segundo trimestre de 2025, embora sem dados específicos disponíveis no momento, representa um ponto de avaliação crucial para o desempenho da empresa. Este relatório é fundamental para compreender a saúde financeira e as perspectivas operacionais da Petrobras no período. A ausência de informações detalhadas impede uma avaliação aprofundada de seus resultados.",
          "nota": 5
        }
      },
      "Noticias": {
        "Empresa": {
          "resumo": "A Petrobras tem enfrentado um período de alta volatilidade, com suas ações frequentemente impactadas pela queda nos preços do petróleo e pela perspectiva de aumento da produção da Opep+

In [None]:
# -*- coding: utf-8 -*-
"""
pesquisa_acao_daily_kpis.py — diário, rápido, KPIs corretas (US-GAAP + IFRS), sem imputação,
sem AutoETS, com lock-to-publication e remoção de colunas vazias.

Principais pontos:
- PREÇO (yfinance; fallbacks: Stooq/Alpha Vantage se quiser).
- KPIs por SEC CompanyFacts (us-gaap + ifrs-full; forms: 10-Q/10-K/6-K/20-F).
- Alinhamento diário "lock-to-publication": a cada dia de pregão, vale o último 'filed' conhecido.
- NENHUMA imputação: não interpolamos nem preenchemos NaNs. Apenas propagação pós-filed.
- Colunas de KPIs 100% vazias são REMOVIDAS (e os respectivos flags também).
- Debug detalhado e sem “spam” de warnings. Cache leve em memória/disk opcional.

Variáveis de ambiente:
- SEC_USER_AGENT (recomendado; ex.: "SeuNome SeuEmail").
- ALPHAVANTAGE_API_KEY (opcional; só se ativar o uso).
- SEC_CACHE_DIR (opcional; caminho para cache JSON da SEC).
"""

from __future__ import annotations
import os, io, json, hashlib, math, warnings, pathlib
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple, Set
from datetime import datetime, timezone

import pandas as pd
import numpy as np

warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", category=UserWarning)

# --------- Dependências externas (todas opcionais com fallback controlado)
try:
    import requests
except Exception:  # pragma: no cover
    requests = None

try:
    import yfinance as yf
except Exception:  # pragma: no cover
    yf = None


# ===================== Utilidades de Debug =====================
@dataclass
class DebugLogger:
    enabled: bool = True
    events: List[Dict[str, Any]] = None

    def __post_init__(self):
        if self.events is None:
            self.events = []

    def _now(self) -> str:
        return datetime.now(timezone.utc).isoformat(timespec='seconds')

    def log(self, msg: str, **kv) -> None:
        if self.enabled:
            print(f"[DEBUG] {msg}")
        self.events.append({"t": self._now(), "msg": msg, **kv})

    def section(self, title: str) -> None:
        if self.enabled:
            print("\n" + "=" * 80)
            print(title)
            print("=" * 80)
        self.events.append({"t": self._now(), "section": title})

    def df(self, name: str, df: pd.DataFrame, head: int = 5) -> None:
        if self.enabled:
            print(f"[DEBUG][DF] {name}: shape={df.shape}, cols={list(df.columns)}")
            with pd.option_context('display.max_columns', None, 'display.width', 180):
                print(df.head(head))
        self.events.append({"t": self._now(),
                            "df": name, "shape": tuple(df.shape), "cols": list(df.columns)})


# ===================== Fontes de PREÇO =====================
def _ensure_utc(dtser: pd.Series) -> pd.Series:
    s = pd.to_datetime(dtser, utc=True)
    # já tz-aware
    return s

def fetch_prices_yfinance(ticker: str, logger: DebugLogger,
                          period: str = "max", interval: str = "1d") -> Optional[pd.DataFrame]:
    if yf is None:
        logger.log("yfinance indisponível.")
        return None
    try:
        t = yf.Ticker(ticker)
        hist = t.history(period=period, interval=interval, auto_adjust=False, actions=True)
        if hist is None or hist.empty:
            logger.log("yfinance vazio.")
            return None
        # preço ajustado (Adj Close) quando existir
        if "Adj Close" in hist.columns:
            ser = hist["Adj Close"].rename("close_adjusted")
        else:
            ser = hist["Close"].rename("close_adjusted")
        out = ser.reset_index().rename(columns={"Date": "date"})
        out["date"] = _ensure_utc(out["date"])
        # Dividends / Stock Splits
        div = hist.get("Dividends", pd.Series(dtype=float)).reset_index().rename(columns={"Date":"date"})
        spl = hist.get("Stock Splits", pd.Series(dtype=float)).reset_index().rename(columns={"Date":"date"})
        if not div.empty:
            div["date"] = _ensure_utc(div["date"])
        if not spl.empty:
            spl["date"] = _ensure_utc(spl["date"])
        out = out.merge(div, on="date", how="left")
        if "Dividends" not in out.columns:
            out["Dividends"] = np.nan
        out = out.merge(spl, on="date", how="left")
        if "Stock Splits" not in out.columns:
            out["Stock Splits"] = np.nan
        out["source"] = "yfinance"
        logger.log(f"yfinance OK: {ticker}, linhas={len(out)}")
        return out[["date","close_adjusted","Dividends","Stock Splits","source"]]
    except Exception as e:  # pragma: no cover
        logger.log(f"Falha yfinance: {e}")
        return None

def fetch_prices_stooq(ticker: str, logger: DebugLogger) -> Optional[pd.DataFrame]:
    if requests is None:
        logger.log("requests indisponível p/ Stooq.")
        return None
    try:
        sym = ticker.lower()
        if not sym.endswith(".us"):
            sym = f"{sym}.us"
        url = f"https://stooq.com/q/d/l/?s={sym}&i=d"
        r = requests.get(url, timeout=30)
        if r.status_code != 200 or not r.text:
            logger.log(f"Stooq sem dados (HTTP {r.status_code}).")
            return None
        df = pd.read_csv(io.StringIO(r.text))
        if df.empty:
            logger.log("Stooq vazio.")
            return None
        df.rename(columns=str.lower, inplace=True)
        out = pd.DataFrame({
            "date": pd.to_datetime(df["date"], utc=True),
            "close_adjusted": df["close"].astype(float),
            "Dividends": np.nan,
            "Stock Splits": np.nan,
            "source": "stooq",
        })
        logger.log(f"Stooq OK: {ticker}, linhas={len(out)}")
        return out
    except Exception as e:
        logger.log(f"Falha Stooq: {e}")
        return None

ALPHAVANTAGE_URL = "https://www.alphavantage.co/query"

def fetch_prices_alphavantage(ticker: str, logger: DebugLogger) -> Optional[pd.DataFrame]:
    if requests is None:
        logger.log("requests indisponível p/ Alpha Vantage.")
        return None
    api_key = os.getenv("ALPHAVANTAGE_API_KEY")
    if not api_key:
        logger.log("Alpha Vantage ignorado (sem API key).")
        return None
    try:
        params = {"function":"TIME_SERIES_DAILY_ADJUSTED","symbol":ticker,"outputsize":"full","apikey":api_key}
        r = requests.get(ALPHAVANTAGE_URL, params=params, timeout=45)
        r.raise_for_status()
        js = r.json()
        key_ts = "Time Series (Daily)"
        if key_ts not in js:
            logger.log(f"Alpha Vantage estrutura inesperada: {list(js.keys())[:5]}")
            return None
        recs = []
        for d, vals in js[key_ts].items():
            recs.append({
                "date": pd.to_datetime(d, utc=True),
                "close_adjusted": float(vals.get("5. adjusted close", np.nan)),
                "Dividends": float(vals.get("7. dividend amount", 0.0) or 0.0),
                "Stock Splits": float(vals.get("8. split coefficient", 0.0) or 0.0),
                "source": "alphavantage"
            })
        df = pd.DataFrame(recs).sort_values("date").reset_index(drop=True)
        logger.log(f"Alpha Vantage OK: {ticker}, linhas={len(df)}")
        return df
    except Exception as e:
        logger.log(f"Falha Alpha Vantage: {e}")
        return None


def stitch_price_sources(dfs: List[pd.DataFrame], logger: DebugLogger,
                         compute_disparity: bool = True) -> pd.DataFrame:
    dfs = [d for d in dfs if d is not None and not d.empty]
    if not dfs:
        raise RuntimeError("Nenhuma fonte de preço retornou dados.")
    cat = pd.concat(dfs, ignore_index=True)
    cat["date"] = pd.to_datetime(cat["date"], utc=True)
    cat = cat.dropna(subset=["date"]).sort_values("date")
    grp = cat.groupby("date", as_index=True)

    med = grp["close_adjusted"].median().rename("close_adjusted")
    nsrc = grp["source"].nunique().rename("n_sources")

    if compute_disparity:
        lo = grp["close_adjusted"].min()
        hi = grp["close_adjusted"].max()
        disparity_bps = ((hi - lo) / med.replace(0, np.nan) * 10000).rename("disparity_bps")
        flag_disparity = (disparity_bps.fillna(0) > 50)  # limiar pedagógico
    else:
        disparity_bps = pd.Series(index=med.index, data=0.0, name="disparity_bps")
        flag_disparity = pd.Series(index=med.index, data=False, name="flag_disparity")

    out = pd.concat([med, nsrc, disparity_bps, flag_disparity], axis=1).reset_index()

    # Dividends / Splits: preferências por fonte
    def pick_pref(col: str) -> pd.Series:
        w = cat.pivot_table(index="date", columns="source", values=col, aggfunc="first")
        for pref in ["yfinance", "alphavantage", "stooq"]:
            if pref in w.columns:
                return w[pref]
        return pd.Series(index=out["date"], dtype=float)

    out = out.merge(pick_pref("Dividends").rename("Dividends"), on="date", how="left")
    out = out.merge(pick_pref("Stock Splits").rename("Stock Splits"), on="date", how="left")

    out = out.sort_values("date").reset_index(drop=True)
    logger.log("Stitching concluído",
               dias=len(out),
               datas=f"{out['date'].min().date()}..{out['date'].max().date()}")
    return out


# ===================== SEC (companyfacts) =====================
SEC_FACTS_URL = "https://data.sec.gov/api/xbrl/companyfacts/CIK{CIK}.json"
SEC_TICKERS_JSON = "https://www.sec.gov/files/company_tickers.json"

FORM_OK = {"10-Q","10-K","6-K","20-F"}
UNIT_PREF = ["USD","USD/shares","USD/share","USD per share"]

# Mapa canônico -> lista de candidatos por taxonomia (us-gaap / ifrs-full)
CANON_MAP: Dict[str, Dict[str, List[str]]] = {
    "Revenues": {
        "us-gaap": ["Revenues","RevenueFromContractWithCustomerExcludingAssessedTax","SalesRevenueNet"],
        "ifrs-full": ["Revenue","RevenueFromContractsWithCustomers"]
    },
    "NetIncomeLoss": {
        "us-gaap": ["NetIncomeLoss","ProfitLoss"],
        "ifrs-full": ["ProfitLoss"]
    },
    "OperatingIncomeLoss": {
        "us-gaap": ["OperatingIncomeLoss","OperatingIncomeLossAlternative"],
        "ifrs-full": ["OperatingProfitLoss"]
    },
    "GrossProfit": {
        "us-gaap": ["GrossProfit"],
        "ifrs-full": ["GrossProfit"]
    },
    "EarningsPerShareBasic": {
        "us-gaap": ["EarningsPerShareBasic"],
        "ifrs-full": ["BasicEarningsLossPerShare"]
    },
    "EarningsPerShareDiluted": {
        "us-gaap": ["EarningsPerShareDiluted"],
        "ifrs-full": ["DilutedEarningsLossPerShare"]
    },
    "Assets": {
        "us-gaap": ["Assets"],
        "ifrs-full": ["Assets"]
    },
    "Liabilities": {
        "us-gaap": ["Liabilities"],
        "ifrs-full": ["Liabilities"]
    },
    "StockholdersEquity": {
        "us-gaap": ["StockholdersEquity","StockholdersEquityIncludingPortionAttributableToNoncontrollingInterest"],
        "ifrs-full": ["Equity","EquityAttributableToOwnersOfParent"]
    },
    "CashAndCashEquivalentsAtCarryingValue": {
        "us-gaap": ["CashAndCashEquivalentsAtCarryingValue"],
        "ifrs-full": ["CashAndCashEquivalents"]
    },
    "LongTermDebtNoncurrent": {
        "us-gaap": ["LongTermDebtNoncurrent","LongTermBorrowingsNoncurrent"],
        "ifrs-full": ["NoncurrentBorrowings","NoncurrentFinancialLiabilities"]
    },
    "CurrentAssets": {
        "us-gaap": ["AssetsCurrent"],
        "ifrs-full": ["CurrentAssets"]
    },
    "CurrentLiabilities": {
        "us-gaap": ["LiabilitiesCurrent"],
        "ifrs-full": ["CurrentLiabilities"]
    },
    "ResearchAndDevelopmentExpense": {
        "us-gaap": ["ResearchAndDevelopmentExpense"],
        "ifrs-full": ["ResearchAndDevelopmentExpense"]
    },
    "CapitalExpenditures": {
        "us-gaap": ["CapitalExpenditures","PaymentsToAcquirePropertyPlantAndEquipment"],
        "ifrs-full": ["PaymentsToAcquirePropertyPlantAndEquipment"]
    },
}

# simples cache em memória/arquivo para companyfacts
_SEC_CACHE: Dict[str, Dict[str, Any]] = {}

def _sec_headers() -> Dict[str,str]:
    ua = os.getenv("SEC_USER_AGENT", "YourName Contact@example.com")
    return {"User-Agent": ua}

def resolve_cik_from_ticker(ticker: str, logger: DebugLogger, session: Optional["requests.Session"]=None) -> Optional[str]:
    if requests is None:
        logger.log("requests indisponível p/ SEC.")
        return None
    try:
        s = session or requests.Session()
        r = s.get(SEC_TICKERS_JSON, headers=_sec_headers(), timeout=30)
        r.raise_for_status()
        data = r.json()
        t = ticker.upper()
        for _, entry in data.items():
            if entry.get("ticker","").upper() == t:
                cik = str(entry["cik_str"]).zfill(10)
                logger.log(f"CIK resolvido: {ticker}->{cik}")
                return cik
        logger.log(f"CIK não encontrado para {ticker}.")
        return None
    except Exception as e:
        logger.log(f"Falha resolve CIK: {e}")
        return None

def _sec_cache_path(cik: str) -> Optional[pathlib.Path]:
    cdir = os.getenv("SEC_CACHE_DIR")
    if not cdir:
        return None
    try:
        p = pathlib.Path(cdir).expanduser().resolve()
        p.mkdir(parents=True, exist_ok=True)
        return p / f"companyfacts_{cik}.json"
    except Exception:
        return None

def fetch_companyfacts_json(cik: str, logger: DebugLogger, session: Optional["requests.Session"]=None) -> Optional[Dict[str,Any]]:
    if cik in _SEC_CACHE:
        return _SEC_CACHE[cik]
    if requests is None:
        logger.log("requests indisponível p/ SEC.")
        return None
    try:
        # tentativa de cache em disco
        cache_fp = _sec_cache_path(cik)
        if cache_fp and cache_fp.exists():
            try:
                js = json.loads(cache_fp.read_text(encoding="utf-8"))
                _SEC_CACHE[cik] = js
                logger.log(f"SEC companyfacts (cache hit disco): CIK={cik}")
                return js
            except Exception:
                pass

        s = session or requests.Session()
        r = s.get(SEC_FACTS_URL.format(CIK=cik), headers=_sec_headers(), timeout=60)
        r.raise_for_status()
        js = r.json()
        _SEC_CACHE[cik] = js
        if cache_fp:
            try:
                cache_fp.write_text(json.dumps(js), encoding="utf-8")
            except Exception:
                pass
        logger.log(f"SEC companyfacts baixado: CIK={cik}")
        return js
    except Exception as e:
        logger.log(f"Falha SEC companyfacts: {e}")
        return None

def _pick_unit(units: Dict[str, List[Dict[str,Any]]]) -> Optional[str]:
    if not units:
        return None
    for k in UNIT_PREF:
        if k in units:
            return k
    # qualquer um
    return next(iter(units.keys()), None)

def extract_filings_from_facts(js: Dict[str,Any],
                               logger: DebugLogger,
                               canon_map: Dict[str, Dict[str, List[str]]] = CANON_MAP
                               ) -> pd.DataFrame:
    """
    Constrói um DF 'filings' com colunas:
      ['kpi','tax','tag','unit','end','filed','val','form']
    Escolhe, para cada KPI canônica, o MELHOR tag disponível (maior número de pontos).
    """
    if not js:
        return pd.DataFrame(columns=["kpi","tax","tag","unit","end","filed","val","form"])
    facts = js.get("facts", {})
    rows: List[Dict[str,Any]] = []

    def collect_records(tax_name: str, tag_name: str) -> List[Dict[str,Any]]:
        tax = facts.get(tax_name, {})
        node = tax.get(tag_name)
        if not node:
            return []
        units = node.get("units", {})
        unit_key = _pick_unit(units)
        if not unit_key:
            return []
        out = []
        for rec in units.get(unit_key, []):
            form = rec.get("form")
            if form not in FORM_OK:
                continue
            end = pd.to_datetime(rec.get("end"), errors="coerce", utc=True)
            filed = pd.to_datetime(rec.get("filed"), errors="coerce", utc=True)
            val = pd.to_numeric(rec.get("val"), errors="coerce")
            if pd.isna(end) or pd.isna(filed) or pd.isna(val):
                continue
            out.append({"tax": tax_name, "tag": tag_name, "unit": unit_key,
                        "end": end, "filed": filed, "val": float(val), "form": form})
        return out

    # buscar candidatos e escolher o melhor por KPI canônica
    for kpi, m in canon_map.items():
        candidates: List[Dict[str, Any]] = []

        for tax_name, tag_list in m.items():
            for tag in tag_list:
                recs = collect_records(tax_name, tag)
                if recs:
                    for r in recs:
                        r["kpi"] = kpi
                    candidates.extend(recs)

        if not candidates:
            # nenhum tag disponível para esta KPI -> não cria linha alguma
            continue

        # preferir aquele tag/tax com MAIS pontos
        df_cand = pd.DataFrame(candidates)
        # qual (tax,tag,unit) com mais registros?
        grp = df_cand.groupby(["tax","tag","unit"], as_index=False).size().sort_values("size", ascending=False)
        best_tax, best_tag, best_unit = grp.iloc[0][["tax","tag","unit"]]

        df_best = df_cand[(df_cand["tax"]==best_tax)&(df_cand["tag"]==best_tag)&(df_cand["unit"]==best_unit)] \
                    .sort_values(["filed","end"])
        rows.extend(df_best.to_dict("records"))

        logger.log(f"KPI '{kpi}': usando {best_tax}:{best_tag} [{best_unit}], pontos={len(df_best)}")

    df = pd.DataFrame(rows)
    if df.empty:
        return pd.DataFrame(columns=["kpi","tax","tag","unit","end","filed","val","form"])
    df = df.sort_values(["kpi","filed","end"]).reset_index(drop=True)
    return df


# ===================== Calendário de pregão & lock-to-publication =====================
def trading_days_from_price(price: pd.DataFrame) -> pd.DatetimeIndex:
    ix = pd.DatetimeIndex(pd.to_datetime(price["date"], utc=True).dropna().unique())
    return ix.sort_values()

def align_to_next_trading_day(dt: pd.Timestamp, trade_days: pd.DatetimeIndex) -> Optional[pd.Timestamp]:
    pos = trade_days.searchsorted(dt)
    if pos < len(trade_days):
        return trade_days[pos]
    return None

def kpis_step_to_trading_days(filings: pd.DataFrame,
                              trade_days: pd.DatetimeIndex,
                              logger: DebugLogger) -> Tuple[pd.DataFrame, Dict[str, Set[pd.Timestamp]]]:
    """
    Para cada KPI:
      - marca 'report days' = primeiro pregão >= filed
      - gera série diária com merge_asof (último filed <= date)
      - não preenche nada antes do primeiro filed (NaN), não interpola.
    Retorna:
      df_daily_step: ['date', <kpi...>]
      report_days_map: dict(kpi -> set(dates))
    """
    out = pd.DataFrame({"date": trade_days})
    report_days: Dict[str, Set[pd.Timestamp]] = {}
    if filings.empty:
        return out, report_days

    for kpi, grp in filings.groupby("kpi"):
        g = grp[["filed","val"]].dropna().sort_values("filed").copy()
        g["filed"] = pd.to_datetime(g["filed"], utc=True)

        # report days (alinhados ao próximo pregão)
        rset: Set[pd.Timestamp] = set()
        for d in g["filed"]:
            nd = align_to_next_trading_day(d, trade_days)
            if nd is not None:
                rset.add(nd)
        report_days[kpi] = rset

        # merge_asof (último filed <= date)
        merged = pd.merge_asof(
            out[["date"]].sort_values("date"),
            g.rename(columns={"filed":"key"}).sort_values("key"),
            left_on="date", right_on="key", direction="backward"
        )
        out[kpi] = merged["val"].values  # sem preenchimento antes do 1º filed -> NaN

    return out, report_days


# ===================== QA simples: saltos de preço =====================
def qc_jumps_flag(df: pd.DataFrame, logger: DebugLogger,
                  col_price: str = "close_adjusted",
                  min_thr_abs: float = 0.6, mad_mult: float = 6.0) -> pd.DataFrame:
    if df.empty or col_price not in df.columns:
        return df
    s = df[col_price].astype(float)
    logret = np.log(s) - np.log(s.shift(1))
    med = np.nanmedian(logret.values)
    mad = np.nanmedian(np.abs(logret.values - med)) * 1.4826
    thr = max(min_thr_abs, mad_mult * (mad if mad > 0 else 1e-6))
    flag = (np.abs(logret) > thr)
    df["flag_jump"] = flag.fillna(False).astype(bool)
    logger.log("QA preço (logret/MAD) aplicado", jumps=int(df["flag_jump"].sum()), thr=float(thr))
    return df


# ===================== JSON builder =====================
def _checksum_bytes(b: bytes) -> str:
    return hashlib.md5(b).hexdigest()

def build_json_daily(entrada: Dict[str,Any],
                     df_prices: pd.DataFrame,
                     df_final: pd.DataFrame,
                     kpis_kept: List[str],
                     logger: DebugLogger) -> Dict[str,Any]:
    ticker = (entrada.get("codigo_da_ação") or entrada.get("ticker") or entrada.get("codigo_da_acao") or "").upper()
    nome = entrada.get("Nome_da_empresa", "")
    dmin = pd.to_datetime(df_final["date"]).min()
    dmax = pd.to_datetime(df_final["date"]).max()

    # checksums
    try:
        b1 = df_prices.to_csv(index=False).encode()
        b2 = df_final.to_csv(index=False).encode()
        c1 = _checksum_bytes(b1); c2 = _checksum_bytes(b2)
    except Exception:
        c1 = c2 = None

    johnson = {
        "meta_input": {
            "timezone": "UTC",
            "trading_calendar": "NYSE",
            "index_type": "trading_days_NYSE",
            "ajuste_preco": "adjusted_close_dividends_splits",
        },
        "empresa": {
            "ticker": ticker,
            "nome": nome,
            "setor": entrada.get("Setor_da_empresa"),
        },
        "alvo": {
            "variavel": "preco_acao",
            "tipo_alvo": "preco",
            "unidade": "USD",
            "frequencia": "diaria",
            "coluna_valor": "close_adjusted",
        },
        "variaveis": {
            "kpis": [{"nome": t, "frequencia": "diaria (lock_to_publication)", "fonte": "SEC companyfacts"} for t in kpis_kept]
        },
        "operacao": {
            "coleta_em": datetime.now(timezone.utc).isoformat(timespec='seconds'),
            "datas": {"min": str(dmin.date()), "max": str(dmax.date())},
            "linhas": int(len(df_final)),
            "colunas": list(map(str, df_final.columns)),
        },
        "qa_flags": {
            "anomalias_jump": int(df_final.get("flag_jump", pd.Series(False)).sum())
        },
        "proveniencia_opcional": {
            "checksum_prices_csv_md5": c1,
            "checksum_dataset_csv_md5": c2,
        }
    }
    return johnson


# ===================== Função principal =====================
def pesquisa_acao_daily_kpis(entrada: Dict[str,Any], *,
                             debug: bool=True,
                             sources: str="yfinance_only",   # "yfinance_only" | "all"
                             use_alphavantage: bool=False,
                             compute_disparity: bool=False,
                             drop_empty_kpi_cols: bool=True,
                             years_back: Optional[int]=None   # None = 'max'
                             ) -> Tuple[Dict[str,Any], pd.DataFrame]:
    """
    Retorna (json, df_diario_completo) — diário, sem imputação, com KPIs corretas (US-GAAP/IFRS).

    Parâmetros chave:
      - sources="yfinance_only" (rápido) | "all" (tenta Stooq e AlphaVantage se ativado).
      - use_alphavantage=True só se tiver ALPHAVANTAGE_API_KEY.
      - compute_disparity=True calcula disparidade entre fontes (se 'all').
      - drop_empty_kpi_cols=True remove KPI 100% NaN (e flags correspondentes).
      - years_back: limite de anos para preço (ex.: 20). None = tudo (default yfinance 'max').
    """
    logger = DebugLogger(enabled=debug)
    logger.section("Início pesquisa_acao_daily_kpis")

    ticker = (entrada.get("codigo_da_ação") or entrada.get("ticker") or entrada.get("codigo_da_acao") or "").upper()
    if not ticker:
        raise ValueError("Entrada precisa de 'codigo_da_ação' ou 'ticker'.")
    logger.log(f"Ticker={ticker}, Empresa={entrada.get('Nome_da_empresa','')}")

    # 1) PREÇO — coleta multi-fonte (conforme configuração)
    logger.section("Coleta de preço — fontes grátis")
    dfs_price: List[pd.DataFrame] = []
    yf_period = "max" if years_back is None else f"{years_back}y"
    yfd = fetch_prices_yfinance(ticker, logger, period=yf_period)
    if yfd is not None:
        dfs_price.append(yfd)

    if sources == "all":
        stq = fetch_prices_stooq(ticker, logger)
        if stq is not None: dfs_price.append(stq)
        if use_alphavantage:
            av = fetch_prices_alphavantage(ticker, logger)
            if av is not None: dfs_price.append(av)

    if not dfs_price:
        raise RuntimeError("Falha ao obter preço de todas as fontes.")
    for i, d in enumerate(dfs_price):
        logger.df(f"preco_src_{i+1}", d, head=3)

    logger.section("Stitching de preço")
    price = stitch_price_sources(dfs_price, logger, compute_disparity=(compute_disparity and len(dfs_price)>1))
    price["ticker"] = ticker
    logger.df("df_prices_stitched", price, head=6)

    trade_days = trading_days_from_price(price)

    # 2) SEC — KPIs (US-GAAP + IFRS) com escolha de melhor tag por KPI canônica
    logger.section("Fundamentos (SEC) lock-to-publication (sem imputação)")
    session = requests.Session() if requests is not None else None
    cik = resolve_cik_from_ticker(ticker, logger, session=session)
    if cik:
        facts_js = fetch_companyfacts_json(cik, logger, session=session)
        filings = extract_filings_from_facts(facts_js, logger, CANON_MAP)
    else:
        filings = pd.DataFrame(columns=["kpi","tax","tag","unit","end","filed","val","form"])
    logger.df("sec_filings (selecionadas)", filings, head=8)

    # 3) KPI diário stepwise (lock-to-publication) + flags report
    df_kpis_daily, report_days = kpis_step_to_trading_days(filings, trade_days, logger)
    logger.df("kpis_daily_step (raw)", df_kpis_daily, head=8)

    # 4) Consolidar com PREÇO
    df = price.merge(df_kpis_daily, on="date", how="left")

    # 5) Flags 'is_report_*' (booleanas) sem NaN
    for kpi, rset in report_days.items():
        mask = df["date"].isin(list(rset))
        df[f"is_report_{kpi}"] = mask.fillna(False).astype(bool)

    # 6) QA simples de preço
    df = qc_jumps_flag(df, logger)

    # 7) Remover KPIs 100% vazias (e suas flags)
    if drop_empty_kpi_cols:
        kpi_cols = [c for c in df.columns if c not in {
            "date","ticker","close_adjusted","Dividends","Stock Splits","n_sources","disparity_bps","flag_disparity","flag_jump"
        } and not c.startswith("is_report_")]
        to_drop: List[str] = []
        kept: List[str] = []
        for c in kpi_cols:
            if df[c].notna().sum() == 0:
                to_drop.append(c)
                flag_col = f"is_report_{c}"
                if flag_col in df.columns:
                    to_drop.append(flag_col)
            else:
                kept.append(c)
        if to_drop:
            df = df.drop(columns=[c for c in to_drop if c in df.columns])
            logger.log(f"KPIs removidas (100% vazias): {sorted(set(to_drop))}")
        logger.log(f"KPIs mantidas: {kept}")
        kpis_kept = kept
    else:
        kpis_kept = [c for c in df.columns if c not in {
            "date","ticker","close_adjusted","Dividends","Stock Splits","n_sources","disparity_bps","flag_disparity","flag_jump"
        } and not c.startswith("is_report_")]

    # 8) Ordenar colunas
    base_cols = ["date","ticker","close_adjusted","Dividends","Stock Splits","n_sources","disparity_bps","flag_disparity","flag_jump"]
    ordered = [c for c in base_cols if c in df.columns] + sorted(kpis_kept) + [f"is_report_{k}" for k in sorted(kpis_kept)]
    df = df[ordered]
    logger.df("df_final (amostra)", df, head=12)

    # 9) JSON
    johnson = build_json_daily(entrada, price, df, kpis_kept, logger)

    # 10) Resumo
    logger.section("Resumo")
    logger.log(f"Período do dataset: {df['date'].min().date()} .. {df['date'].max().date()}")
    logger.log(f"Linhas: {len(df)}, Colunas: {len(df.columns)}")

    return johnson, df


# ---------------- Execução direta (exemplo) ----------------
if __name__ == "__main__":
    exemplo = {
        "codigo_da_ação": "PBR",  # Petrobras ADR
        "Nome_da_empresa": "Petroleo Brasileiro S.A. - Petrobras",
        "Setor_da_empresa": "Energy",
    }
    johnson, df = pesquisa_acao_daily_kpis(
        exemplo,
        debug=True,
        sources="yfinance_only",   # troque para "all" se quiser Stooq/AV
        use_alphavantage=False,
        compute_disparity=False,
        drop_empty_kpi_cols=True,
        years_back=None,           # None = todo histórico do yfinance
    )

    print("\n===== JSON (chaves principais) =====")
    print({k: johnson[k] for k in ["empresa","variaveis","operacao","qa_flags"]})

    print("\n===== PRIMEIRAS 20 LINHAS =====")
    with pd.option_context('display.max_columns', None, 'display.width', 240):
        print(df.head(20))


Início pesquisa_acao_daily_kpis
[DEBUG] Ticker=PBR, Empresa=Petroleo Brasileiro S.A. - Petrobras

Coleta de preço — fontes grátis
[DEBUG] yfinance OK: PBR, linhas=6309
[DEBUG][DF] preco_src_1: shape=(6309, 5), cols=['date', 'close_adjusted', 'Dividends', 'Stock Splits', 'source']
                       date  close_adjusted  Dividends  Stock Splits    source
0 2000-08-10 04:00:00+00:00        1.280023        0.0           0.0  yfinance
1 2000-08-11 04:00:00+00:00        1.277228        0.0           0.0  yfinance
2 2000-08-14 04:00:00+00:00        1.271638        0.0           0.0  yfinance

Stitching de preço
[DEBUG] Stitching concluído
[DEBUG][DF] df_prices_stitched: shape=(6309, 8), cols=['date', 'close_adjusted', 'n_sources', 'disparity_bps', 'flag_disparity', 'Dividends', 'Stock Splits', 'ticker']
                       date  close_adjusted  n_sources  disparity_bps  flag_disparity  Dividends  Stock Splits ticker
0 2000-08-10 04:00:00+00:00        1.280023          1            0.

In [None]:
df.describe()

Unnamed: 0,close_adjusted,Dividends,Stock Splits,n_sources,disparity_bps,Assets,CapitalExpenditures,CashAndCashEquivalentsAtCarryingValue,CurrentAssets,CurrentLiabilities,EarningsPerShareBasic,EarningsPerShareDiluted,GrossProfit,Liabilities,LongTermDebtNoncurrent,NetIncomeLoss,OperatingIncomeLoss,ResearchAndDevelopmentExpense,Revenues,StockholdersEquity
count,6307.0,6307.0,6307.0,6307.0,6307.0,1859.0,3850.0,3850.0,1859.0,1859.0,1374.0,353.0,1859.0,1859.0,3850.0,1859.0,3850.0,1859.0,1859.0,1859.0
mean,5.45953,0.003902,0.000634,1.0,0.0,205589000000.0,31179890000.0,17450760000.0,31716240000.0,27770340000.0,1.485058,1.49932,38706520000.0,134575300000.0,66410990000.0,13928330000.0,21226470000.0,438387300.0,84886650000.0,71995320000.0
std,3.914217,0.057248,0.035612,0.0,0.0,22232720000.0,4521345000.0,1096709000.0,5317609000.0,3721474000.0,0.784664,0.61532,15695300000.0,18213580000.0,4179657000.0,12262260000.0,2789686000.0,435476900.0,26851010000.0,7274943000.0
min,0.464958,0.0,0.0,1.0,0.0,174348000000.0,9783000000.0,12972000000.0,21836000000.0,21954000000.0,0.58,0.58,8771000000.0,104536000000.0,48149000000.0,169000000.0,6209000000.0,-563000000.0,23407000000.0,59350000000.0
25%,2.393611,0.0,0.0,1.0,0.0,187191000000.0,31785000000.0,17624000000.0,27812000000.0,24948000000.0,0.78,0.58,28680000000.0,117355000000.0,67528000000.0,2688000000.0,21874000000.0,355000000.0,83966000000.0,69812000000.0
50%,4.497064,0.0,0.0,1.0,0.0,217067000000.0,31785000000.0,17624000000.0,31250000000.0,26225000000.0,1.52,1.91,34067000000.0,138092000000.0,67528000000.0,7414000000.0,21874000000.0,641000000.0,84638000000.0,74215000000.0
75%,7.748088,0.0,0.0,1.0,0.0,222068000000.0,31785000000.0,17624000000.0,37062000000.0,31380000000.0,1.91,1.91,53974000000.0,148893000000.0,67528000000.0,24995000000.0,21874000000.0,726000000.0,102409000000.0,78975000000.0
max,18.641769,2.59,2.0,1.0,0.0,251366000000.0,45078000000.0,21689000000.0,47131000000.0,33860000000.0,2.81,1.91,64988000000.0,169864000000.0,70529000000.0,36755000000.0,24158000000.0,792000000.0,124474000000.0,81502000000.0


In [None]:
print(johnson)

{'meta_input': {'timezone': 'UTC', 'trading_calendar': 'NYSE', 'index_type': 'trading_days_NYSE', 'ajuste_preco': 'adjusted_close_dividends_splits'}, 'empresa': {'ticker': 'PBR', 'nome': 'Petroleo Brasileiro S.A. - Petrobras', 'setor': 'Energy'}, 'alvo': {'variavel': 'preco_acao', 'tipo_alvo': 'preco', 'unidade': 'USD', 'frequencia': 'diaria', 'coluna_valor': 'close_adjusted'}, 'variaveis': {'kpis': [{'nome': 'Assets', 'frequencia': 'diaria (lock_to_publication)', 'fonte': 'SEC companyfacts'}, {'nome': 'CapitalExpenditures', 'frequencia': 'diaria (lock_to_publication)', 'fonte': 'SEC companyfacts'}, {'nome': 'CashAndCashEquivalentsAtCarryingValue', 'frequencia': 'diaria (lock_to_publication)', 'fonte': 'SEC companyfacts'}, {'nome': 'CurrentAssets', 'frequencia': 'diaria (lock_to_publication)', 'fonte': 'SEC companyfacts'}, {'nome': 'CurrentLiabilities', 'frequencia': 'diaria (lock_to_publication)', 'fonte': 'SEC companyfacts'}, {'nome': 'EarningsPerShareBasic', 'frequencia': 'diaria (l

In [None]:

# Salve o DataFrame em um arquivo CSV
df.to_csv("pesquisa_daily_kpis_df.csv", index=False)

# Salve o JSON em um arquivo JSON
import json
with open("pesquisa_daily_kpis_johnson.json", "w", encoding="utf-8") as f:
    json.dump(johnson, f, ensure_ascii=False, indent=2)

print("Arquivos 'pesquisa_daily_kpis_df.csv' e 'pesquisa_daily_kpis_johnson.json' salvos com sucesso.")

Arquivos 'pesquisa_daily_kpis_df.csv' e 'pesquisa_daily_kpis_johnson.json' salvos com sucesso.
