
---

### **üì¶ cvm_downloader** ‚Äî Download & Unzip autom√°tico (CVM: DFP / ITR)

**1. Descri√ß√£o curta**  
Script robusto para baixar (streaming com progress bar) e descompactar os arquivos p√∫blicos da CVM (`DFP` e `ITR`) por ano. Inclui retries, backoff, grava√ß√£o segura (.part ‚Üí rename) e verifica√ß√£o para pular arquivos j√° existentes. Tem suporte a execu√ß√£o sequencial ou com --workers (thread pool).

**2. Principais responsabilidades**
- Construir URLs padr√£o: **https://dados.cvm.gov.br/dados/CIA_ABERTA/DOC/{doc_type}/DADOS/{doc_type}_cia_aberta_{year}.zip**
- Baixar zips com **requests.Session** + retries
- Exibir barra de progresso (**tqdm**)
- Descompactar zips (**zipfile.ZipFile**)
- Pular arquivos j√° baixados / extra√≠dos
- Opcional: paralelizar downloads via **ThreadPoolExecutor**

**3. Entradas / Par√¢metros**
- --doc-types (ex.: DFP,ITR)
- --start-year / --end-year (ex.: 2011 / 2025)
- --workers (n¬∫ threads; default 1)

**4. Sa√≠das**
- data/cvm/zip/{DOC}_cia_aberta_{YEAR}.zip
- data/cvm/unzipped/{DOC}_{YEAR}/... (conte√∫do extra√≠do)
- Retorna resumo (ok / skipped / failed) e c√≥digo de sa√≠da CLI

---


In [None]:
import datetime
import logging
import os
import time
from zipfile import ZipFile

import pandas as pd

from typing import Dict, Iterable, List, Optional, Set

import requests
import tqdm


# --- Configura√ß√£o ---
# Onde os dados ser√£o salvos (pasta data no n√≠vel do aurum, n√£o data_sources)
BASE_DIR = os.path.join("..", "data", "cvm")
ZIP_DIR = os.path.join(BASE_DIR, "zip")
UNZIPPED_DIR = os.path.join(BASE_DIR, "unzipped")

# Cria as pastas se n√£o existirem
os.makedirs(ZIP_DIR, exist_ok=True)
os.makedirs(UNZIPPED_DIR, exist_ok=True)

# URL base para os arquivos da CVM
URL_BASE = "https://dados.cvm.gov.br/dados/CIA_ABERTA/DOC/{doc_type}/DADOS/"

# Tipos de documentos que queremos
# DFP -> Anual, ITR -> Trimestral
DOC_TYPES = ['DFP', 'ITR']

YEARS = range(2011, 2026) 

def download_and_unzip(url, zip_path, unzipped_path):
    """Baixa e descompacta um arquivo ZIP se ele n√£o existir localmente."""
    if os.path.exists(zip_path):
        print(f"Arquivo j√° existe, pulando download: {os.path.basename(zip_path)}")
    else:
        print(f"Baixando: {os.path.basename(zip_path)}")
        try:
            response = requests.get(url, stream=True)
            response.raise_for_status() # Lan√ßa erro se a requisi√ß√£o falhar
            
            total_size = int(response.headers.get('content-length', 0))
            
            with open(zip_path, 'wb') as f, tqdm.tqdm(
                desc=os.path.basename(zip_path),
                total=total_size,
                unit='iB',
                unit_scale=True,
                unit_divisor=1024,
            ) as bar:
                for data in response.iter_content(chunk_size=1024):
                    size = f.write(data)
                    bar.update(size)
            print("Download completo.")
        except requests.exceptions.RequestException as e:
            print(f"Erro no download de {url}: {e}")
            return # Sai da fun√ß√£o se o download falhar

    print(f"Descompactando: {os.path.basename(zip_path)}")
    try:
        with ZipFile(zip_path, 'r') as zip_ref:
            zip_ref.extractall(unzipped_path)
        print("Descompactado com sucesso.")
    except Exception as e:
        print(f"Erro ao descompactar {zip_path}: {e}")

if __name__ == "__main__":
    for doc_type in DOC_TYPES:
        for year in YEARS:
            filename = f"{doc_type}_cia_aberta_{year}.zip"
            url = URL_BASE.format(doc_type=doc_type) + filename
            
            zip_path = os.path.join(ZIP_DIR, filename)
            unzipped_path = os.path.join(UNZIPPED_DIR, f"{doc_type}_{year}")
            
            download_and_unzip(url, zip_path, unzipped_path)
            print("-" * 50)
            
    print("\nProcesso de download e extra√ß√£o conclu√≠do!")



---
### üß© cvm_parser ‚Äî Parser / Busca consolidada (_con_ files) e prepara√ß√£o

**1. Descri√ß√£o curta**  
Pipeline para localizar arquivos CSV consolidados _con_*.csv extra√≠dos da CVM, aplicar filtros (√öLTIMO/PEN√öLTIMO, escala MIL/UNIDADE), normalizar VL_CONTA, diagnosticar CNPJ(s) de interesse (ex.: Sanepar), concatenar, deduplicar e salvar resultado processado.

**2. Principais responsabilidades**
- Encontrar arquivos consolidados recursivamente (`UNZIPPED_DIR/**/*_con_*.csv`)
- Ler CSVs com pd.read_csv(..., encoding='latin-1', sep=';')
- Filtrar por ORDEM_EXERC (`√öLTIMO` / `PEN√öLTIMO`) e ESCALA_MOEDA (`MIL` / `UNIDADE`)
- Converter VL_CONTA para num√©rico e aplicar ajuste (multiplica por 1000 quando `ESCALA_MOEDA == 'MIL'`)
- Diagn√≥stico: detectar CNPJ_SAPR e imprimir valores de `ORDEM_EXERC` / `ESCALA_MOEDA`
- Concatenar chunks, ordenar por CNPJ_CIA, DT_FIM_EXERC, VERSAO
- Drop duplicates por ['CNPJ_CIA','DT_FIM_EXERC','CD_CONTA']
- Salvar processed/raw_{doc}.parquet e processed/raw_{doc}.csv

**3. Entradas**
- data/cvm/unzipped/... (arquivos _con_*.csv)

**4. Sa√≠das**
- data/cvm/processed/raw_{dre|bpa|bpp}.parquet
- data/cvm/processed/raw_{dre|bpa|bpp}.csv

---

In [None]:
import pandas as pd
import numpy as np
from pathlib import Path
import logging
import gc

# -----------------------
# Configura√ß√£o (edite aqui)
# -----------------------
BASE_DIR = Path("..") / "data" / "cvm"
UNZIPPED_DIR = BASE_DIR / "unzipped"
PROCESSED_DIR = BASE_DIR / "processed"
PROCESSED_DIR.mkdir(parents=True, exist_ok=True)

DOC_PATTERNS_BROAD = {
    "dre": "*_DRE_con_*.csv",
    "bpa": "*_BPA_con_*.csv",
    "bpp": "*_BPP_con_*.csv",
}

CNPJ_SAPR = "76.484.013/0001-45"
CNPJs_DEBUG = [CNPJ_SAPR, "33.839.910/0001-11"]  # VIVA3

CATEGORY_COLS = [
    "CNPJ_CIA", "DENOM_CIA", "GRUPO_DFP", "MOEDA",
    "ESCALA_MOEDA", "ORDEM_EXERC", "CD_CONTA",
    "DS_CONTA", "ST_CONTA_FIXA"
]

# -----------------------
# Logging
# -----------------------
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger("cvm_parser")

# -----------------------
# Helpers
# -----------------------
def find_consolidated_files(unzipped_dir: Path, pattern: str) -> List[Path]:
    """Retorna lista de arquivos que batem com o padr√£o (recursivo)."""
    return list(unzipped_dir.rglob(pattern))


def _ensure_category(df: pd.DataFrame, col: str) -> None:
    """Converte coluna para category quando apropriado (silencioso)."""
    try:
        if col in df.columns and not pd.api.types.is_categorical_dtype(df[col]):
            df[col] = df[col].astype("category")
    except Exception:
        # n√£o quebrar a execu√ß√£o por problemas de convers√£o
        pass


def _read_csv_safe(path: Path) -> Optional[pd.DataFrame]:
    """L√™ um CSV com par√¢metros padr√£o usados no pipeline; captura exce√ß√µes e retorna None se falhar."""
    try:
        df = pd.read_csv(path, encoding="latin-1", sep=";", low_memory=False, dtype={"CNPJ_CIA": str})
        return df
    except Exception as exc:
        logger.warning("Falha lendo %s: %s", path, exc)
        return None


def _process_df_chunk(df: pd.DataFrame) -> Optional[pd.DataFrame]:
    """
    Aplicar filtros e tratamentos ao chunk (j√° lido).
    Retorna DataFrame filtrado pronto para concatenar ou None se vazio.
    """
    # garantir colunas chave
    if "DT_FIM_EXERC" not in df.columns:
        return None

    # normalizar e otimizar
    for col in CATEGORY_COLS:
        if col in df.columns:
            _ensure_category(df, col)

    # converter data e remover linhas sem data v√°lida
    df["DT_FIM_EXERC"] = pd.to_datetime(df["DT_FIM_EXERC"], errors="coerce")
    df = df.dropna(subset=["DT_FIM_EXERC"])
    if df.empty:
        return None

    # aplicar filtro ORDEM_EXERC & ESCALA_MOEDA quando existirem
    if "ORDEM_EXERC" in df.columns and "ESCALA_MOEDA" in df.columns:
        mask = (
            df["ORDEM_EXERC"].isin(["√öLTIMO", "PEN√öLTIMO"]) &
            df["ESCALA_MOEDA"].isin(["MIL", "UNIDADE"])
        )
        df = df.loc[mask].copy()

    if df.empty:
        return None

    # garantir VL_CONTA num√©rico; remover nulos
    if "VL_CONTA" in df.columns:
        df["VL_CONTA"] = pd.to_numeric(df["VL_CONTA"], errors="coerce")
        df = df.dropna(subset=["VL_CONTA"])
    else:
        return None

    # ajustar escala MIL ‚Üí multiplicar por 1000
    if "ESCALA_MOEDA" in df.columns:
        if pd.api.types.is_categorical_dtype(df["ESCALA_MOEDA"]):
            cats = df["ESCALA_MOEDA"].cat.categories
            is_mil = df["ESCALA_MOEDA"].cat.codes == int(np.where(cats == "MIL")[0][0]) if "MIL" in cats else False
        else:
            is_mil = df["ESCALA_MOEDA"].astype(str) == "MIL"
        # aplicar ajuste com np.where (vetorizado)
        df["VL_CONTA"] = np.where(is_mil, df["VL_CONTA"] * 1000.0, df["VL_CONTA"])

    return df


# -----------------------
# Pipeline principal
# -----------------------
def parse_and_consolidate_final(
    doc_name: str,
    broad_pattern: str,
    unzipped_dir: Path = UNZIPPED_DIR,
    processed_dir: Path = PROCESSED_DIR,
) -> Dict[str, object]:
    """
    Encontra arquivos consolidados (_con_), processa, concatena, deduplica e salva parquet/csv.
    Retorna dicion√°rio com estat√≠sticas do processamento.
    """
    logger.info("Iniciando processamento CONSOLIDADO para: %s (padr√£o: %s)", doc_name.upper(), broad_pattern)
    files = find_consolidated_files(unzipped_dir, broad_pattern)
    if not files:
        logger.info("Nenhum arquivo encontrado para o padr√£o: %s", broad_pattern)
        return {"status": "no_files", "files_count": 0}

    df_chunks = []
    total_rows_read = 0
    total_rows_after_filter = 0
    sapr_found = False

    for path in tqdm.tqdm(files, desc=f"Processando {doc_name.upper()}"):
        df = _read_csv_safe(path)
        if df is None:
            continue

        total_rows_read += len(df)

        # diagnostico SAPR (apenas relat√≥rio, sem interromper)
        if "CNPJ_CIA" in df.columns:
            df["CNPJ_CIA"] = df["CNPJ_CIA"].astype(str).str.strip()
            df_sapr = df[df["CNPJ_CIA"] == CNPJ_SAPR]
            if not df_sapr.empty and not sapr_found:
                sapr_found = True
                logger.info("[DIAGN√ìSTICO SAPR11] Encontrado em: %s", path.name)
                if "ORDEM_EXERC" in df_sapr.columns:
                    logger.info("  ORDEM_EXERC values: %s", df_sapr["ORDEM_EXERC"].unique())
                if "ESCALA_MOEDA" in df_sapr.columns:
                    logger.info("  ESCALA_MOEDA values: %s", df_sapr["ESCALA_MOEDA"].unique())

        # processa e filtra o chunk
        try:
            processed = _process_df_chunk(df)
            if processed is not None and not processed.empty:
                total_rows_after_filter += len(processed)
                df_chunks.append(processed)
        except Exception as exc:
            logger.warning("Erro processando arquivo %s: %s", path, exc)
        finally:
            # liberar mem√≥ria
            del df
            gc.collect()

    logger.info("Totais: linhas lidas=%d, linhas ap√≥s filtros=%d", total_rows_read, total_rows_after_filter)
    if not sapr_found:
        logger.warning("[DIAGN√ìSTICO SAPR11] CNPJ Sanepar (%s) N√ÉO encontrado na busca consolidada.", CNPJ_SAPR)

    if not df_chunks:
        logger.info("Nenhum chunk com dados v√°lidos ap√≥s filtros. Abortando concatena√ß√£o.")
        return {"status": "no_data_after_filter", "files_count": len(files)}

    # concat + sort + dedupe
    logger.info("Concatenando %d chunks...", len(df_chunks))
    consolidated_df = pd.concat(df_chunks, ignore_index=True)
    consolidated_df.sort_values(by=["CNPJ_CIA", "DT_FIM_EXERC", "VERSAO"], ascending=[True, True, False], inplace=True)

    dedup_subset = ["CNPJ_CIA", "DT_FIM_EXERC", "CD_CONTA"]
    final_df = consolidated_df.drop_duplicates(subset=dedup_subset, keep="first").copy()

    # diagn√≥stico r√°pido para CNPJs de debug
    if "CNPJ_CIA" in final_df.columns:
        debug_mask = final_df["CNPJ_CIA"].astype(str).isin([str(x) for x in CNPJs_DEBUG])
        debug_data = final_df.loc[debug_mask]
        if not debug_data.empty:
            grouped = debug_data.groupby(["CNPJ_CIA", "DT_FIM_EXERC"]).size().reset_index(name="contagem_contas")
            logger.info("Dados SAPR11/VIVA3 no DF final:\n%s", grouped.head(20).to_string(index=False))
        else:
            logger.info("Nenhum dado SAPR11/VIVA3 no DF final.")

    # salvar resultados
    out_parquet = processed_dir / f"raw_{doc_name}.parquet"
    out_csv = processed_dir / f"raw_{doc_name}.csv"

    try:
        logger.info("Salvando Parquet: %s", out_parquet)
        final_df.to_parquet(out_parquet, index=False)
        logger.info("Salvando CSV: %s", out_csv)
        final_df.to_csv(out_csv, index=False, sep=";", encoding="utf-8-sig")
        logger.info("Shape final salvo: %s", final_df.shape)
    except Exception as exc:
        logger.exception("Falha ao salvar arquivos: %s", exc)
        return {"status": "save_error", "error": str(exc)}

    # limpeza final
    del df_chunks, consolidated_df, final_df
    gc.collect()

    return {
        "status": "ok",
        "files_count": len(files),
        "rows_read": total_rows_read,
        "rows_after_filter": total_rows_after_filter,
        "saved_parquet": str(out_parquet),
        "saved_csv": str(out_csv),
    }


if __name__ == "__main__":
    results = {}
    for name, pattern in DOC_PATTERNS_BROAD.items():
        results[name] = parse_and_consolidate_final(name, pattern)
        gc.collect()

    logger.info("Processo de parsing (v9.0 - Busca Consolidada) conclu√≠do!")
    logger.info("Consulte logs acima para mensagens '---> [DIAGN√ìSTICO SAPR11 v9.0]'.")
    logger.info("Resumo por documento: %s", results)



---
### **üîß cv_processor ‚Äî** Transforma√ß√£o para formato WIDE (fundamentals_wide)

**1. Descri√ß√£o curta**  
Transforma os arquivos processados (`processed/raw_dre.parquet`, `raw_bpa.parquet`, `raw_bpp.parquet`) em tabelas *wide* prontos para an√°lise fundamentalista. Mapeia c√≥digos de conta para nomes leg√≠veis (plano de contas), filtra pelo √öLTIMO exerc√≠cio, pivota (long ‚Üí wide) e faz o merge final entre DRE, BPA e BPP.

**2. Principais responsabilidades**
- Ler `processed/raw_{doc}.parquet` para cada doc type (dre, bpa, bpp)
- Filtrar linhas por CD_CONTA usando`MAPA_CONTAS_*
- Manter apenas ORDEM_EXERC == '√öLTIMO' quando dispon√≠vel
- Converter VL_CONTA para num√©rico e remover nulos
- Pivot (index: CNPJ_CIA, DENOM_CIA, DT_FIM_EXERC; columns: nome leg√≠vel da CONTA)
- Merge outer entre DRE / BPA / BPP em ['CNPJ_CIA','DENOM_CIA','DT_FIM_EXERC']
- Salvar data/cvm/final/fundamentals_wide.parquet (+ CSV ;)

**3. Entradas**
- data/cvm/processed/raw_dre.parquet
- data/cvm/processed/raw_bpa.parquet
- data/cvm/processed/raw_bpp.parquet

**4. Sa√≠das**
- data/cvm/final/fundamentals_wide.parquet
- data/cvm/final/fundamentals_wide.csv


In [None]:
import pandas as pd
import numpy as np
from pathlib import Path
import logging
from typing import Dict, List, Optional
import gc # Para garbage collection

# -----------------------
# Configura√ß√£o (ATUALIZADA)
# -----------------------
BASE_DIR = Path("..") / "data" / "cvm"
PROCESSED_DIR = BASE_DIR / "processed"
FINAL_DIR = BASE_DIR / "final"
FINAL_DIR.mkdir(parents=True, exist_ok=True)

# Mapa de contas PRINCIPAIS (agregadas)
MAPA_CONTAS_DRE_MAIN = {
    "3.01": "Receita L√≠quida",
    "3.02": "Custo dos Bens e/ou Servi√ßos Vendidos",
    "3.03": "Lucro Bruto",
    "3.05": "EBIT",
    "3.07": "EBT",
    "3.11": "Lucro L√≠quido Consolidado",
}

MAPA_CONTAS_BPA_MAIN = {
    "1": "Ativo Total",
    "1.01": "Ativo Circulante",
    "1.02": "Ativo N√£o Circulante",
}

MAPA_CONTAS_BPP_MAIN = {
    "2": "Passivo Total",
    "2.01": "Passivo Circulante",
    "2.02": "Passivo N√£o Circulante",
    "2.03": "Patrim√¥nio L√≠quido Consolidado",
}

# --- NOVO: Mapa de contas DETALHADAS (para colunas espec√≠ficas) ---
# Estes c√≥digos s√£o comuns, mas podem precisar de ajustes finos
# Verifique o plano de contas da CVM para mais detalhes se necess√°rio
MAPA_CONTAS_DETALHADAS_BPA = {
    "1.01.01": "Caixa e Equivalentes", # Caixa e Equivalentes de Caixa
}

MAPA_CONTAS_DETALHADAS_BPP = {
    "2.01.04": "D√≠vida Curto Prazo", # Empr√©stimos e Financiamentos CP
    "2.02.01": "D√≠vida Longo Prazo", # Empr√©stimos e Financiamentos LP
}
# --- FIM NOVO ---

# Estrutura unificada dos mapas
MAPA_CONTAS_GERAL = {
    "dre": {"main": MAPA_CONTAS_DRE_MAIN, "detailed": None}, # DRE n√£o tem detalhado por enquanto
    "bpa": {"main": MAPA_CONTAS_BPA_MAIN, "detailed": MAPA_CONTAS_DETALHADAS_BPA},
    "bpp": {"main": MAPA_CONTAS_BPP_MAIN, "detailed": MAPA_CONTAS_DETALHADAS_BPP},
}

# Colunas chave esperadas
INDEX_COLS = ["CNPJ_CIA", "DENOM_CIA", "DT_FIM_EXERC"]

# -----------------------
# Logging
# -----------------------
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger("cvm_transform")

# -----------------------
# Fun√ß√µes utilit√°rias
# -----------------------
def _read_processed_parquet(path: Path) -> Optional[pd.DataFrame]:
    """L√™ um parquet com tratamento de erro."""
    try:
        df = pd.read_parquet(path)
        logger.info("Lido parquet: %s (shape=%s)", path.name, df.shape)
        return df
    except Exception as exc:
        logger.warning("Falha ao ler parquet %s: %s", path.name, exc)
        return None

# --- ATUALIZADO: Fun√ß√£o process_and_pivot_file ---
def process_and_pivot_file(
    doc_name: str,
    main_account_map: Dict[str, str],
    detailed_account_map: Optional[Dict[str, str]] = None, # Aceita mapa detalhado opcional
    processed_dir: Path = PROCESSED_DIR,
) -> Optional[pd.DataFrame]:
    """
    Carrega raw parquet, filtra contas (principais E detalhadas),
    mant√©m '√öLTIMO' exerc√≠cio e pivota para wide.
    """
    input_file = processed_dir / f"raw_{doc_name}.parquet"
    logger.info("Processando %s -> %s", doc_name.upper(), input_file.name)

    if not input_file.exists():
        logger.warning("Arquivo n√£o encontrado: %s. Pulando %s.", input_file.name, doc_name)
        return None

    df = _read_processed_parquet(input_file)
    if df is None or df.empty: return None

    # Validar colunas essenciais
    essential_cols = ["CD_CONTA", "VL_CONTA", "DT_FIM_EXERC", "ORDEM_EXERC"] + INDEX_COLS
    missing_essentials = [c for c in essential_cols if c not in df.columns]
    if missing_essentials:
        logger.error("Colunas essenciais ausentes em %s: %s. Pulando.", input_file.name, missing_essentials)
        return None

    # --- L√ìGICA ATUALIZADA: Combinar contas principais e detalhadas ---
    all_accounts_to_keep = list(main_account_map.keys())
    if detailed_account_map:
        all_accounts_to_keep.extend(list(detailed_account_map.keys()))
        logger.info(f"Contas detalhadas a serem extra√≠das: {list(detailed_account_map.keys())}")

    logger.info(f"Total de c√≥digos de conta a serem filtrados: {len(all_accounts_to_keep)}")
    df = df[df["CD_CONTA"].isin(all_accounts_to_keep)].copy()
    logger.info("Ap√≥s filtrar contas (principais + detalhadas): shape=%s", df.shape)
    if df.empty:
        logger.info("Nenhuma conta de interesse encontrada. Pulando.")
        return None
    # --- FIM L√ìGICA ATUALIZADA ---

    # Converter data e filtrar por ORDEM_EXERC == '√öLTIMO'
    df["DT_FIM_EXERC"] = pd.to_datetime(df["DT_FIM_EXERC"], errors="coerce")
    df = df.dropna(subset=["DT_FIM_EXERC"])
    df = df[df["ORDEM_EXERC"] == "√öLTIMO"].copy()
    logger.info("Ap√≥s filtrar ORDEM_EXERC == '√öLTIMO': shape=%s", df.shape)
    if df.empty:
        logger.info("Nenhuma linha '√öLTIMO' encontrada. Pulando.")
        return None

    # --- L√ìGICA ATUALIZADA: Mapear nome da conta (prioriza detalhado) ---
    # Primeiro tenta mapear com o mapa detalhado, depois com o principal
    map_detalhado = detailed_account_map if detailed_account_map else {}
    df["CONTA"] = df["CD_CONTA"].map(map_detalhado).fillna(df["CD_CONTA"].map(main_account_map))
    # Remove contas que n√£o foram mapeadas por nenhum dos mapas (se houver)
    df = df.dropna(subset=["CONTA"])
    logger.info("Contas mapeadas. Exemplo de nomes: %s", df["CONTA"].unique()[:5])
    if df.empty:
        logger.info("Nenhuma conta mapeada resultou em nome v√°lido. Pulando.")
        return None
    # --- FIM L√ìGICA ATUALIZADA ---

    # Garantir VL_CONTA num√©rico e remover ausentes
    df["VL_CONTA"] = pd.to_numeric(df["VL_CONTA"], errors="coerce")
    df = df.dropna(subset=["VL_CONTA"])
    if df.empty:
        logger.info("Sem valores num√©ricos para VL_CONTA. Pulando.")
        return None

    # Pivot (long -> wide)
    try:
        logger.info("Pivotando (long -> wide)...")
        # Usar fill_value=0 pode ser √∫til se algumas contas n√£o aparecem em todos os trimestres
        df_wide = df.pivot_table(
            index=INDEX_COLS,
            columns="CONTA", # Usar√° os nomes mapeados (incluindo os detalhados)
            values="VL_CONTA",
            aggfunc="sum",
            fill_value=0 # Preenche com 0 contas ausentes naquele per√≠odo/empresa
        )
        df_wide = df_wide.reset_index()
        df_wide.columns.name = None # Limpa o nome do √≠ndice das colunas
        logger.info("Pivot conclu√≠do: shape=%s", df_wide.shape)
        logger.info("Colunas geradas pelo pivot: %s", df_wide.columns.tolist())
    except Exception as exc:
        logger.exception("Erro ao pivotar %s: %s", input_file.name, exc)
        return None

    del df
    gc.collect()
    return df_wide

# -----------------------
# Fun√ß√£o para juntar todos os DF wide (dre,bpa,bpp) - SEM ALTERA√á√ÉO
# -----------------------
def merge_fundamentals(dfs_wide: Dict[str, pd.DataFrame]) -> Optional[pd.DataFrame]:
    """ Junta os DataFrames wide (DRE, BPA, BPP) """
    if not dfs_wide:
        logger.warning("Nenhum DataFrame wide fornecido para merge.")
        return None
    valid_dfs = {k: v for k, v in dfs_wide.items() if v is not None and not v.empty}
    if not valid_dfs:
        logger.warning("Nenhum DataFrame wide V√ÅLIDO fornecido para merge.")
        return None

    keys = list(valid_dfs.keys())
    base = valid_dfs[keys[0]].copy()
    logger.info("Usando %s como base para merge (shape=%s)", keys[0], base.shape)

    for k in keys[1:]:
        logger.info("Mesclando com %s (shape=%s)", k, valid_dfs[k].shape)
        # Verifica colunas duplicadas (exceto as de √≠ndice) antes do merge
        cols_to_merge = valid_dfs[k].columns.difference(base.columns).tolist() + INDEX_COLS
        base = pd.merge(base, valid_dfs[k][cols_to_merge], on=INDEX_COLS, how="outer")
        logger.info("Shape ap√≥s merge com %s: %s", k, base.shape)

    base = base.sort_values(by=["CNPJ_CIA", "DT_FIM_EXERC"]).reset_index(drop=True)
    logger.info("Merge finalizado: shape=%s", base.shape)
    logger.info("Colunas finais: %s", base.columns.tolist())
    return base


def save_final(df: pd.DataFrame, final_dir: Path = FINAL_DIR, fname: str = "fundamentals_wide.parquet") -> Dict[str, str]:
    """ Salva o DataFrame final em parquet e CSV """
    final_dir.mkdir(parents=True, exist_ok=True)
    out_parquet = final_dir / fname
    out_csv = final_dir / str(fname).replace(".parquet", ".csv")

    try:
        logger.info("Salvando parquet final em: %s", out_parquet)
        df.to_parquet(out_parquet, index=False)
        logger.info("Salvando CSV final em: %s", out_csv)
        df.to_csv(out_csv, index=False, sep=";", encoding="utf-8-sig")
        return {"parquet": str(out_parquet), "csv": str(out_csv)}
    except Exception as exc:
        logger.exception("Erro ao salvar arquivo final: %s", exc)
        raise

# -----------------------
# Execu√ß√£o principal (ATUALIZADA)
# -----------------------
if __name__ == "__main__":
    logger.info("Iniciando transforma√ß√£o para formato WIDE (com contas detalhadas)...")

    dfs_wide = {}
    # --- L√ìGICA ATUALIZADA: Iterar sobre a nova estrutura de mapas ---
    for doc_type, maps_dict in MAPA_CONTAS_GERAL.items():
        main_map = maps_dict.get("main")
        detailed_map = maps_dict.get("detailed") # Pode ser None

        if main_map: # S√≥ processa se houver um mapa principal
            logger.info("-" * 20)
            df_wide = process_and_pivot_file(doc_type, main_map, detailed_map) # Passa ambos os mapas
            if df_wide is not None and not df_wide.empty:
                dfs_wide[doc_type] = df_wide
            else:
                logger.warning("Processamento de %s n√£o gerou DataFrame v√°lido.", doc_type)
        else:
             logger.warning("Mapa principal n√£o definido para %s. Pulando.", doc_type)
    # --- FIM L√ìGICA ATUALIZADA ---

    if not dfs_wide:
        logger.error("Nenhum dataframe produzido. Encerrando sem salvar.")
    else:
        logger.info("-" * 20)
        logger.info("Iniciando merge dos DataFrames DRE, BPA, BPP...")
        final_df = merge_fundamentals(dfs_wide)
        if final_df is None or final_df.empty:
            logger.error("DataFrame final vazio ap√≥s merge. Encerrando.")
        else:
            save_paths = save_final(final_df)
            logger.info("-" * 20)
            logger.info("Processamento conclu√≠do com sucesso!")
            logger.info("Arquivos salvos: %s", save_paths)
            logger.info("Shape final mestre: %s", final_df.shape)
            logger.info("Colunas finais geradas: %s", final_df.columns.tolist())
            logger.info("Amostra do resultado final:\n%s",
                        final_df.head()[INDEX_COLS + list(final_df.columns.difference(INDEX_COLS))].to_string(index=False)) # Reordena colunas para amostra
