In [None]:
# -*- coding: utf-8 -*-
r"""
Pipeline local dos Dados Abertos CNPJ (Receita Federal)
Windows + Python 3.11

Uso:
  python cnae.py           # processa somente o mês mais recente
  python cnae.py --all     # (opcional) carga histórica: todos os meses

Saída (padrão):
  .\data\parquet\<dataset>\{YYYY-MM}\part-*.parquet
  .\data\_metadata\ingestion_log.parquet
"""
from __future__ import annotations

import os
import re
import io
import sys
import json
import time
import hashlib
import zipfile
import shutil
import tempfile
import argparse
from pathlib import Path
from typing import List, Dict, Optional
from urllib.parse import urljoin, urlparse
from datetime import datetime, timezone

import requests
import pandas as pd
from requests.adapters import HTTPAdapter, Retry
import unicodedata, difflib

# schemas conhecidos (colunas e tipos)
# DOC: https://www.gov.br/receitafederal/dados/cnpj-metadados.pdf
schemas = {
    "empresas": {
        "columns": {
            "cnpj_basico": "string",
            "razao_social": "string",
            "natureza_juridica": "string",
            "qualificacao_responsavel": "string",
            "capital_social": "float64",
            "porte": "string",
            "ente_federativo_responsavel": "string",
        }
    },
    "estabelecimentos": {
        "columns": {
            "cnpj_basico": "string",
            "cnpj_ordem": "string",
            "cnpj_dv": "string",
            "identificador_matriz_filial": "int8",
            "nome_fantasia": "string",
            "situacao_cadastral": "string",
            "data_situacao_cadastral": "string",
            "motivo_situacao_cadastral": "string",
            "nome_cidade_exterior": "string",
            "pais": "string",
            "data_inicio_atividade": "string",
            "cnae_fiscal_principal": "string",
            "cnae_fiscal_secundaria": "string",
            "tipo_logradouro": "string",
            "logradouro": "string",
            "numero": "string",
            "complemento": "string",
            "bairro": "string",
            "cep": "string",
            "uf": "string",
            "municipio": "string",
            "ddd_telefone_1": "string",
            "telefone_1": "string",
            "ddd_telefone_2": "string",
            "telefone_2": "string",
            "ddd_fax": "string",
            "fax": "string",
            "email": "string",
            "situacao_especial": "string",
            "data_situacao_especial": "string",
        }
    },
    "dados_do_simples": {
        "columns": {
            "cnpj_basico": "string",
            "opcao_pelo_simples": "string",
            "data_opcao_pelo_simples": "string",
            "data_exclusao_do_simples": "string",
            "opcao_pelo_mei": "string",
            "data_opcao_pelo_mei": "string",
            "data_exclusao_do_mei": "string",
        }
    },
    "socios": {
        "columns": {
            "cnpj_basico": "string",
            "identificador_socio": "string",
            "nome_socio": "string",
            "cnpj_cpf_socio": "string",
            "qualificacao_socio": "string",
            "data_entrada_sociedade": "string",
            "pais": "string",
            "representante_legal": "string",
            "nome_representante_legal": "string",
            "qualificacao_representante_legal": "string",
            "faixa_etaria": "string",
        }
    },
    "paises": {
        "columns": {
            "codigo": "string",
            "descricao": "string",
        }
    },
    "municipios": {
        "columns": {
            "codigo": "string",
            "descricao": "string",
        }
    },
    "qualificacoes_socios": {
        "columns": {
            "codigo": "string",
            "descricao": "string",
        }
    },
    "naturezas_juridicas": {
        "columns": {
            "codigo": "string",
            "descricao": "string",
        }
    },
    "cnaes": {
        "columns": {
            "codigo": "string",
            "descricao": "string",
        }
    },
}

# mapeamentos de sinônimos/variações comuns
SCHEMA_SYNONYMS = {
    "simples": "dados_do_simples",
    "dados_simples": "dados_do_simples",
    "qualificacoes": "qualificacoes_socios",
    "qualificacao_socio": "qualificacoes_socios",
    "natureza_juridica": "naturezas_juridicas",
    "municipio": "municipios",
    "pais": "paises",
    "cnae": "cnaes",
    "empresa": "empresas",
    "estabelecimento": "estabelecimentos",
    "socio": "socios",
}

# pistas por padrões de nome de arquivo (útil para arquivos tipo F.K03200$Z.D50913.CNAECSV)
FILENAME_HINTS = [
    (r"cnae", "cnaes"),
    (r"simple|mei", "dados_do_simples"),
    (r"soci", "socios"),
    (r"estab", "estabelecimentos"),
    (r"empre", "empresas"),
    (r"qualif", "qualificacoes_socios"),
    (r"nat.?jur|natureza", "naturezas_juridicas"),
    (r"munic", "municipios"),
    (r"pais|pa[ií]s", "paises"),
]

def _slug(s: str) -> str:
    s = unicodedata.normalize("NFKD", s)
    s = "".join(ch for ch in s if not unicodedata.combining(ch))
    s = s.lower()
    s = re.sub(r"[^a-z0-9]+", "_", s)
    s = s.strip("_")
    return s

def _apply_synonyms(key: str) -> str:
    key = _slug(key)
    return SCHEMA_SYNONYMS.get(key, key)

def choose_schema_key(preferred_hint: str | None, filename_hint: str | None) -> str | None:
    """
    Decide qual schema usar:
      1) tenta sinônimos e match exato pelo 'preferred_hint' (ex.: dataset inferido)
      2) tenta pistas do filename (regex)
      3) fuzzy match contra as chaves de 'schemas'
    """
    keys = list(schemas.keys())
    norm_keys = {_slug(k): k for k in keys}

    # 1) pelo preferred_hint (dataset inferido)
    if preferred_hint:
        k1 = _apply_synonyms(preferred_hint)
        if k1 in norm_keys:
            return norm_keys[k1]

    # 2) por pistas do filename
    if filename_hint:
        fname = _slug(filename_hint)
        for pat, target in FILENAME_HINTS:
            if re.search(pat, fname):
                t = _apply_synonyms(target)
                if t in norm_keys:
                    return norm_keys[t]

    # 3) fuzzy contra as chaves
    candidates = list(norm_keys.keys())
    query = _slug(preferred_hint or filename_hint or "")
    matches = difflib.get_close_matches(query, candidates, n=1, cutoff=0.65)
    if matches:
        return norm_keys[matches[0]]

    return None

def schema_columns_and_types(schema_key: str) -> tuple[list[str], dict]:
    cols = schemas[schema_key]["columns"]
    colnames = list(cols.keys())  # ordem fornecida no dicionário
    types = cols  # mapping name -> dtype string
    return colnames, types

def cast_df_by_types(df: pd.DataFrame, types: dict) -> pd.DataFrame:
    """
    Converte colunas conforme o dict 'types' com segurança.
    Estratégia: lemos TUDO como string e depois:
      - float64: to_numeric(errors='coerce')
      - int8/int16/int32/int64: to_numeric + cast para pandas nullable (Int8/Int32...)
      - string: garantir dtype 'string'
    """
    for col, dt in types.items():
        if col not in df.columns:
            # se a linha tinha menos colunas, garante coluna vazia
            df[col] = pd.NA

        if dt == "string":
            df[col] = df[col].astype("string")
        elif dt.startswith("float"):
            df[col] = pd.to_numeric(df[col], errors="coerce").astype("float64")
        elif dt.startswith("int"):
            # escolhe dtype pandas nullable equivalente
            width = re.findall(r"\d+", dt)
            width = int(width[0]) if width else 64
            nullable = {8:"Int8",16:"Int16",32:"Int32",64:"Int64"}.get(width, "Int64")
            df[col] = pd.to_numeric(df[col], errors="coerce").astype(nullable)
        else:
            # fallback: string
            df[col] = df[col].astype("string")
    return df


# =========================
# CONFIG
# =========================
BASE_URL = "https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/"

# pastas locais (podem ser relativas)
DATA_ROOT = Path("./data").resolve()
PARQUET_ROOT = DATA_ROOT / "parquet"
META_ROOT = DATA_ROOT / "_metadata"
INGEST_LOG_PATH = META_ROOT / "ingestion_log.parquet"

# performance / robustez
CHUNK_ROWS = 1_000_000           # ajuste conforme sua RAM
TIMEOUT = (20, 120)            # (connect, read) em segundos
USER_AGENT = "CNPJ-Pipeline-Local/1.0 (+Windows Python 3.11)"
ENCODINGS_TRY = ("utf-8", "latin-1")  # tentativa de leitura

# mapeia nomes de arquivos para datasets “canônicos”
DATASET_ALIASES = {
    "empresas": "empresas",
    "estabelecimentos": "estabelecimentos",
    "socios": "socios",
    "simples": "simples",
    "cnaes": "cnaes",
    "municipios": "municipios",
    "pais": "paises",
    "paises": "paises",
    "naturezas_juridicas": "naturezas_juridicas",
    "qualificacoes": "qualificacoes",
    "motivos": "motivos",
    "membros": "membros",
}

# cria pastas
PARQUET_ROOT.mkdir(parents=True, exist_ok=True)
META_ROOT.mkdir(parents=True, exist_ok=True)

# =========================
# HTTP com retry
# =========================
def http_session() -> requests.Session:
    s = requests.Session()
    retries = Retry(
        total=5,
        backoff_factor=0.8,
        status_forcelist=(429, 500, 502, 503, 504),
        allowed_methods=frozenset(["GET", "HEAD"]),
        raise_on_status=False,
    )
    s.mount("https://", HTTPAdapter(max_retries=retries))
    s.mount("http://", HTTPAdapter(max_retries=retries))
    s.headers.update({"User-Agent": USER_AGENT})
    return s

SESSION = http_session()

# =========================
# Utilitários
# =========================
def list_month_dirs() -> List[str]:
    """
    Varre o índice HTML e retorna nomes tipo 'YYYY-MM/'.
    """
    r = SESSION.get(BASE_URL, timeout=TIMEOUT)
    r.raise_for_status()
    html = r.text

    months = sorted(set(re.findall(r'href="(\d{4}-\d{2}/)"', html)))
    if not months:
        # fallback
        months = sorted(set(re.findall(r">\s*(\d{4}-\d{2})/\s*<", html)))
        months = [m + "/" for m in months]

    if not months:
        raise RuntimeError("Não consegui localizar diretórios mensais no índice.")
    return months

def latest_month_dir() -> str:
    months = list_month_dirs()
    months_sorted = sorted(months, key=lambda m: m.rstrip("/"))
    return months_sorted[-1]

def list_zip_urls(month_dir: str) -> List[str]:
    """
    Lista URLs de .zip dentro do diretório mensal.
    """
    url = urljoin(BASE_URL, month_dir)
    r = SESSION.get(url, timeout=TIMEOUT)
    r.raise_for_status()
    html = r.text
    files = re.findall(r'href="([^"]+\.zip)"', html, flags=re.IGNORECASE)
    files = sorted(set(files))
    return [urljoin(url, f) for f in files]

def head_info(url: str) -> Dict[str, Optional[str]]:
    try:
        h = SESSION.head(url, timeout=TIMEOUT, allow_redirects=True)
        if h.status_code >= 400:
            h = SESSION.get(url, stream=True, timeout=TIMEOUT)
        headers = h.headers
        return {
            "etag": headers.get("ETag"),
            "last_modified": headers.get("Last-Modified"),
            "content_length": headers.get("Content-Length"),
        }
    except Exception:
        return {"etag": None, "last_modified": None, "content_length": None}

def md5_bytes(b: bytes) -> str:
    m = hashlib.md5()
    m.update(b)
    return m.hexdigest()

def infer_dataset_name(zip_name: str) -> str:
    """
    Extrai um nome de dataset amigável com base no nome do zip.
    """
    base = os.path.basename(zip_name).lower()
    base = re.sub(r"\.zip$", "", base)
    for k, v in DATASET_ALIASES.items():
        if k in base:
            return v
    base = re.sub(r"[^a-z0-9]+", "_", base).strip("_")
    return base or "dataset"

def ensure_ingestion_log() -> pd.DataFrame:
    if INGEST_LOG_PATH.exists():
        return pd.read_parquet(INGEST_LOG_PATH)
    cols = [
        "file_url",
        "zip_name",
        "dataset",
        "ref_month",
        "etag",
        "last_modified",
        "content_length",
        "content_md5",
        "processed_at",
    ]
    return pd.DataFrame(columns=cols)

def save_ingestion_log(df: pd.DataFrame) -> None:
    df.to_parquet(INGEST_LOG_PATH, index=False)

def already_processed(
    logdf: pd.DataFrame,
    file_url: str,
    ref_month: str,
    md5: Optional[str],
    etag: Optional[str],
    last_modified: Optional[str],
    content_length: Optional[str],
) -> bool:
    """
    Verifica se já processamos esse conteúdo exato (preferindo MD5).
    """
    if logdf.empty:
        return False
    subset = logdf[(logdf["file_url"] == file_url) & (logdf["ref_month"] == ref_month)]
    if subset.empty:
        return False
    if md5:
        return (subset["content_md5"] == md5).any()
    mask = True
    if etag:
        mask &= subset["etag"] == etag
    if last_modified:
        mask &= subset["last_modified"] == last_modified
    if content_length:
        mask &= subset["content_length"] == content_length
    return bool(subset[mask].shape[0] > 0)

def write_chunk_parquet(df_chunk: pd.DataFrame, dataset: str, ref_month: str) -> None:
    out_dir = PARQUET_ROOT / dataset / f"{ref_month}"
    out_dir.mkdir(parents=True, exist_ok=True)
    fname = f"part-{int(time.time()*1e6)}-{hashlib.md5(str(df_chunk.shape).encode()).hexdigest()[:8]}.parquet"
    df_chunk.to_parquet(out_dir / fname, index=False)

def process_zip_to_parquet(zip_bytes: bytes, dataset: str, ref_month: str) -> None:
    """
    - Extrai todos os membros do ZIP.
    - Se o nome terminar sem extensão mas sugerir CSV (ex.: ...CNAECSV), renomeia para .csv.
    - Escolhe o schema correto por aproximação (dataset + nome do arquivo).
    - Lê SEM cabeçalho (header=None) e aplica os nomes na **ordem** do schema.
    - Faz cast dos tipos conforme o schema.
    - Grava em Parquet particionado.
    """
    tmp_dir = Path(tempfile.mkdtemp(prefix="cnpj_zip_"))
    try:
        zpath = tmp_dir / "file.zip"
        zpath.write_bytes(zip_bytes)

        with zipfile.ZipFile(zpath) as z:
            for info in z.infolist():
                if info.is_dir():
                    continue

                extracted_path = Path(z.extract(info, tmp_dir))
                original_name = extracted_path.name
                name_upper = original_name.upper()
                name_lower = original_name.lower()

                # --- normalização: garantir .csv no fim quando "parece" CSV ---
                has_good_ext = bool(re.search(r"\.(csv|txt)$", name_lower))
                looks_like_csv = (
                    name_upper.endswith("CSV")
                    or "CNAECSV" in name_upper
                    or re.search(r"(?:^|[\W_])CSV(?:[\W_]|$)", name_upper) is not None
                )
                normalized_path = extracted_path
                if not has_good_ext and looks_like_csv:
                    normalized_path = extracted_path.with_name(original_name + ".csv")
                    try:
                        extracted_path.rename(normalized_path)
                    except FileExistsError:
                        normalized_path = extracted_path.with_name(original_name + f".{int(time.time())}.csv")
                        extracted_path.rename(normalized_path)

                # --- escolher schema por aproximação ---
                schema_key = choose_schema_key(preferred_hint=dataset, filename_hint=original_name)
                if not schema_key:
                    print(f"[WARN] Sem schema para '{original_name}' (dataset hint='{dataset}'). Ignorando.")
                    continue

                colnames, types = schema_columns_and_types(schema_key)
                ncols = len(colnames)

                # --- leitura em chunks SEM cabeçalho ---
                read_kwargs_base = dict(
                    sep=";",
                    header=None,          # não há cabeçalho!
                    names=colnames,       # aplica ordem do schema
                    usecols=range(ncols), # ignora colunas extras
                    dtype=str,            # lê tudo como string; depois fazemos cast
                    on_bad_lines="skip",
                )

                read_ok = False
                for enc in ENCODINGS_TRY:
                    # tentamos primeiro com engine C (rápido e aceita low_memory),
                    # depois fallback para engine python (sem low_memory).
                    for eng in ("c", "python"):
                        try:
                            read_kwargs = read_kwargs_base.copy()
                            read_kwargs["encoding"] = enc
                            read_kwargs["engine"] = eng
                            if eng == "c":
                                read_kwargs["low_memory"] = False  # suportado só no engine C
                            # (opcional) tornar parsing mais robusto:
                            # read_kwargs["quoting"] = _csv.QUOTE_MINIMAL

                            for chunk in pd.read_csv(normalized_path, chunksize=CHUNK_ROWS, **read_kwargs):
                                # garante todas as colunas esperadas
                                for c in colnames:
                                    if c not in chunk.columns:
                                        chunk[c] = pd.NA
                                # cast seguro
                                chunk = cast_df_by_types(chunk, types)
                                # colunas auxiliares
                                chunk["ref_month"] = ref_month
                                chunk["ingestion_ts"] = pd.Timestamp(datetime.now(timezone.utc))
                                write_chunk_parquet(chunk, schema_key, ref_month)

                            read_ok = True
                            break  # saiu do loop de engines
                        except UnicodeDecodeError:
                            # tenta o próximo encoding
                            break  # muda o enc rapidamente
                        except pd.errors.ParserError as e:
                            # se o engine C reclamar, tentaremos o python
                            if eng == "c":
                                continue
                            print(f"[WARN] ParserError em '{original_name}' com engine={eng}: {e}")
                            break
                        except Exception as e:
                            # outros erros: tenta engine alternativo ou próximo encoding
                            if eng == "c":
                                continue
                            print(f"[WARN] Falha lendo '{original_name}' enc='{enc}' engine='{eng}': {e}")
                            break
                    if read_ok:
                        break

                if not read_ok:
                    print(f"[INFO] Ignorando '{original_name}' — não consegui ler como CSV com o schema '{schema_key}'.")

    finally:
        shutil.rmtree(tmp_dir, ignore_errors=True)

def run_pipeline(process_only_latest: bool = True) -> None:
    logdf = ensure_ingestion_log()

    months = list_month_dirs()
    if process_only_latest:
        months = [sorted(months)[-1]]

    print(f"Meses alvo: {months}")
    for month_dir in months:
        ref_month = month_dir.rstrip("/")
        zip_urls = list_zip_urls(month_dir)
        if not zip_urls:
            print(f"[{ref_month}] Nenhum .zip encontrado.")
            continue
        zip_urls = zip_urls[0:3]  # DEBUG: limitar para testes

        print(f"[{ref_month}] {len(zip_urls)} arquivos .zip encontrados.")
        for url in zip_urls:
            headers = head_info(url)
            etag = headers.get("etag")
            last_modified = headers.get("last_modified")
            content_length = headers.get("content_length")

            zip_name = os.path.basename(urlparse(url).path)
            dataset = infer_dataset_name(zip_name)

            print(f"  - baixando {zip_name} ...")
            resp = SESSION.get(url, timeout=TIMEOUT)
            resp.raise_for_status()
            b = resp.content
            content_md5 = md5_bytes(b)

            if already_processed(logdf, url, ref_month, content_md5, etag, last_modified, content_length):
                print(f"    > sem mudanças — pulando {zip_name}")
                continue

            print(f"    > processando dataset='{dataset}' ...")
            process_zip_to_parquet(b, dataset, ref_month)

            # atualiza log
            newrow = {
                "file_url": url,
                "zip_name": zip_name,
                "dataset": dataset,
                "ref_month": ref_month,
                "etag": etag,
                "last_modified": last_modified,
                "content_length": content_length,
                "content_md5": content_md5,
                "processed_at": pd.Timestamp(datetime.now(timezone.utc)),
            }
            logdf = pd.concat([logdf, pd.DataFrame([newrow])], ignore_index=True)
            save_ingestion_log(logdf)
            print(f"    > ok: {zip_name} (dataset={dataset})")

    print("Pipeline concluído.")

# =========================
# CLI/Launcher compatível com Jupyter e terminal
# =========================
def main(argv=None):
    parser = argparse.ArgumentParser(description="Pipeline local CNPJ (dados_abertos_cnpj)")
    parser.add_argument("--all", action="store_true", help="Processa todos os meses (carga histórica)")
    # Ignora argumentos desconhecidos (ex.: --f=... do Jupyter)
    args, _unknown = parser.parse_known_args(argv)
    run_pipeline(process_only_latest=not args.all)

if __name__ == "__main__":
    try:
        import pyarrow  # noqa: F401
    except Exception:
        print("Aviso: 'pyarrow' não encontrado. Instale com: pip install pyarrow", file=sys.stderr)

    # Se estiver dentro de um kernel Jupyter, NÃO use os argv do kernel
    if "ipykernel" in sys.modules:
        # 1) rode padrão (mês mais recente) OU
        # 2) se quiser a carga histórica dentro do notebook, mude para False
        run_pipeline(process_only_latest=True)
    else:
        # Terminal/PowerShell: use os argumentos passados
        main(sys.argv[1:])

In [None]:
pd.read_parquet(r"C:\Users\edmar\MeusProjetos\white-cube\data-ingestion-mvp\data\parquet\cnaes\2025-09\part-1758205684616270-9d983dbc.parquet")

In [None]:
pd.read_parquet(r"C:\Users\edmar\MeusProjetos\white-cube\data-ingestion-mvp\data\_metadata\ingestion_log.parquet")