# (0)  Path & logging utilities (works everywhere)

In [1]:
# -----------------------------------------------
# (0)  Path & logging utilities (works everywhere)
# -----------------------------------------------

from pathlib import Path
import logging
import time
from datetime import datetime as dt
import requests
from bs4 import BeautifulSoup



In [2]:
# ===========================================================
#  Path resolution & folder setup (robust & portable)
#  -----------------------------------------------------------
#  Works seamlessly in:
#   1.  Plain Python scripts  ‚Üí where __file__ is defined
#   2.  Jupyter / IPython notebooks ‚Üí uses current working dir
#   3.  Docker containers with mounted volumes
#
#  Defines the four canonical paths:
#       REPO_ROOT   ‚Üí top-level project folder
#       BASE_DIR    ‚Üí <repo>/airflow/datasets
#       DATA_DIR    ‚Üí BASE_DIR/public-data
#       ZIP_DIR     ‚Üí BASE_DIR/public-zips
# ===========================================================

from pathlib import Path
import logging

# -----------------------------------------------------------------
# Locate the real project root by walking upward until we find
# BOTH "airflow/dags" and "streamlit_app" ‚Äî prevents false matches
# under subfolders like /notebooks.
# -----------------------------------------------------------------
REPO_ROOT = Path.cwd().resolve()
while REPO_ROOT != REPO_ROOT.parent:
    has_airflow = (REPO_ROOT / "airflow" / "dags").is_dir()
    has_streamlit = (REPO_ROOT / "streamlit_app").is_dir()
    if has_airflow and has_streamlit:
        break
    REPO_ROOT = REPO_ROOT.parent

# Fail loudly if the expected structure isn't found
if not ((REPO_ROOT / "airflow" / "dags").is_dir() and (REPO_ROOT / "streamlit_app").is_dir()):
    raise RuntimeError(
        "‚ùå Could not locate project root (expected both 'airflow/dags' and 'streamlit_app'). "
        "Run this script from inside the repository tree."
    )

# -----------------------------------------------------------------
# Define canonical dataset directories under Airflow
# -----------------------------------------------------------------
BASE_DIR = REPO_ROOT / "airflow" / "datasets"
DATA_DIR = BASE_DIR / "public-data"
ZIP_DIR  = BASE_DIR / "public-zips"

# -----------------------------------------------------------------
# Ensure the folders exist (both locally and inside Docker)
# -----------------------------------------------------------------
for folder in (DATA_DIR, ZIP_DIR):
    folder.mkdir(parents=True, exist_ok=True)

# -----------------------------------------------------------------
# Basic logging for visibility
# -----------------------------------------------------------------
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s  %(levelname)-8s  %(name)s ‚Üí %(message)s",
    datefmt="%H:%M:%S",
)
log = logging.getLogger("setup")
log.info("üìÅ REPO_ROOT ‚ûú %s", REPO_ROOT)
log.info("üìÅ DATA_DIR  ‚ûú %s", DATA_DIR)
log.info("üìÅ ZIP_DIR   ‚ûú %s", ZIP_DIR)


20:35:06  INFO      setup ‚Üí üìÅ REPO_ROOT ‚ûú /Users/viniciuslamb/Documents/portifolio_cnpj
20:35:06  INFO      setup ‚Üí üìÅ DATA_DIR  ‚ûú /Users/viniciuslamb/Documents/portifolio_cnpj/airflow/datasets/public-data
20:35:06  INFO      setup ‚Üí üìÅ ZIP_DIR   ‚ûú /Users/viniciuslamb/Documents/portifolio_cnpj/airflow/datasets/public-zips


# 1)  Folder maintenance helpers

In [3]:
# ===========================================================
#  Folder-maintenance helper
#  -----------------------------------------------------------
#  *One* reusable function that wipes the contents of both
#  `DATA_DIR`  (processed CSV / parquet) **and**
#  `ZIP_DIR`   (raw .zip archives).
#
#  ‚ö†Ô∏è  IMPORTANT
#  ---------------
#  ‚Ä¢ This is destructive.  We therefore:
#      ‚îÄ print a trash-can emoji üóëÔ∏è for every file removed
#      ‚îÄ catch and report *permission* errors instead of crashing
#  ‚Ä¢ No sub-directories are touched; only *direct* children that
#    match the Unix glob pattern ‚Äú*‚Äù.
# ===========================================================

from typing import NoReturn

def clean_folders() -> NoReturn:
    """
    Purge previously-downloaded artefacts from *both* ``DATA_DIR`` and
    ``ZIP_DIR``.

    Behaviour
    ---------
    ‚Ä¢ Iterates over the two folders defined earlier.
    ‚Ä¢ For *every* regular file encountered:
        ‚Äì attempts ``Path.unlink()`` (i.e. delete)
        ‚Äì prints a friendly message on success
    ‚Ä¢ If a given file cannot be removed due to filesystem
      permissions it prints a warning instead of raising.

    Returns
    -------
    NoReturn
        The function exists solely for its side-effects
        (deleted files + console output).
    """
    for target_dir in (DATA_DIR, ZIP_DIR):
        for candidate in target_dir.glob("*"):
            try:
                candidate.unlink()
                print(f"üóëÔ∏è  Deleted {candidate.name}")
            except PermissionError:
                print(f"‚ö†Ô∏è  Permission denied ‚Üí {candidate}")


In [4]:
# ===========================================================
#  Inventory helper ‚Äì what ZIPs do we already have?
#  -----------------------------------------------------------
#  Lists every file currently in `ZIP_DIR`, prints a neat table
#  on screen **and** returns a dictionary for programmatic use.
# ===========================================================

from datetime import datetime as dt
from typing import Dict

def inventory_existing_zips() -> Dict[str, dt]:
    """
    Scan ``ZIP_DIR`` and build an inventory of already-downloaded
    archives.

    Console output
    --------------
    ‚Ä¢ If the directory is empty ‚Üí prints a single informative line.
    ‚Ä¢ Otherwise prints **one line per file** with:
        ‚îÄ filename (left-aligned)
        ‚îÄ file size in MB (1 decimal)
        ‚îÄ last-modified timestamp (local time)

    Return value
    ------------
    dict[str, datetime]
        Keys   ‚Üí *filename* (no path)
        Values ‚Üí *mtime* (`datetime` object)

    This is handy for downstream logic (e.g. ‚Äúskip files we already own‚Äù).
    """
    # Quick exit: nothing to report
    if not any(ZIP_DIR.iterdir()):
        print("‚ÑπÔ∏è  No ZIP files found in", ZIP_DIR)
        return {}

    print("üì¶  ZIP files currently present:")
    inventory: Dict[str, dt] = {}

    # Iterate over direct children only (no recursion)
    for file_path in ZIP_DIR.iterdir():
        # Skip directories just in case
        if file_path.is_dir():
            continue

        modified     = dt.fromtimestamp(file_path.stat().st_mtime)
        size_in_mb   = file_path.stat().st_size / 1_048_576
        inventory[file_path.name] = modified

        # Nicely formatted console row
        print(
            f"  ‚Ä¢ {file_path.name:<50} "
            f"{size_in_mb:6.1f} MB  ‚Äì  {modified:%Y-%m-%d %H:%M}"
        )

    return inventory


# 2)  Scrape the IRS (Receita) website

In [5]:
# ===========================================================
#  Download helper ‚Äì discover the *latest* monthly folder and
#  collect its .zip URLs
# ===========================================================

import requests
from bs4 import BeautifulSoup
from typing import List

# -----------------------------------------------------------------
# Constants
# -----------------------------------------------------------------
RECEITA_ROOT: str = (
    "https://arquivos.receitafederal.gov.br/cnpj/dados_abertos_cnpj/"
)
HEADERS: dict[str, str] = {"User-Agent": "Mozilla/5.0"}

# -----------------------------------------------------------------
# Main helper
# -----------------------------------------------------------------
def newest_month_links(max_links: int = 3) -> List[str]:
    """
    Crawl the Receita Federal ‚ÄúDados Abertos CNPJ‚Äù index and return
    up to **`max_links`** direct `.zip` URLs belonging to the most
    recent monthly folder (named `YYYYMM/`).

    Parameters
    ----------
    max_links : int, default=3
        Hard cap on how many archive links should be returned.
        Handy for quick tests or CI jobs where bandwidth matters.

    Returns
    -------
    list[str]
        A list of fully-qualified URLs (https://‚Ä¶) ready for `requests`
        or any download tool.

    Notes
    -----
    ‚Ä¢ Uses a *very* lightweight scraping strategy ‚Äî BeautifulSoup +
      simple CSS selectors.  
    ‚Ä¢ Raises `RuntimeError` if no folders or no `.zip` files are found
      (so callers can decide what to do).  
    ‚Ä¢ Prints a concise progress report so notebook readers see exactly
      what‚Äôs happening.
    """
    # ----------------------------------------------------------
    # Step 1 ‚Üí download the *main* index page
    # ----------------------------------------------------------
    print("üîé  Requesting main index ‚Ä¶")
    try:
        index_resp = requests.get(RECEITA_ROOT, timeout=15, headers=HEADERS)
        index_resp.raise_for_status()
    except requests.RequestException as err:
        raise RuntimeError(f"Failed to fetch main index: {err}") from err

    soup_index = BeautifulSoup(index_resp.text, "lxml")

    # ----------------------------------------------------------
    # Step 2 ‚Üí find every anchor that *starts* with ‚Äú20‚Äù (YYYYMM)
    # ----------------------------------------------------------
    month_folders = sorted(a["href"] for a in soup_index.select('a[href^="20"]'))
    if not month_folders:
        raise RuntimeError("Could not locate any monthly folders on the page.")

    latest_folder = month_folders[-1]           # e.g. "202503/"
    month_url     = RECEITA_ROOT + latest_folder
    print(f"  ‚Ä¢ Latest folder detected: {latest_folder}")

    # ----------------------------------------------------------
    # Step 3 ‚Üí open that folder and parse all `.zip` links
    # ----------------------------------------------------------
    try:
        month_resp = requests.get(month_url, timeout=15, headers=HEADERS)
        month_resp.raise_for_status()
    except requests.RequestException as err:
        raise RuntimeError(f"Failed to open folder {latest_folder}: {err}") from err

    soup_month = BeautifulSoup(month_resp.text, "lxml")

    # Build absolute URLs; keep only *.zip
    zip_urls = [
        href if href.startswith("http") else month_url + href
        for href in (a.get("href") for a in soup_month.find_all("a"))
        if href and href.lower().endswith(".zip")
    ]

    if not zip_urls:
        raise RuntimeError(f"No .zip archives found under {latest_folder}")

    # ----------------------------------------------------------
    # Step 4 ‚Üí trim to the requested limit & print preview
    # ----------------------------------------------------------
    capped_urls = zip_urls[:max_links]
    print(f"  ‚Ä¢ {len(zip_urls)} ZIPs available; returning the first {max_links}.")
    for idx, url in enumerate(capped_urls, 1):
        print(f"    - [{idx}] {Path(url).name}")

    return capped_urls


# 3 Download selected ZIP files

In [6]:
# ===========================================================
#  Download helper ‚Äì save each ZIP to disk (streaming + verify)
# ===========================================================

import time
import zipfile
from typing import List

def download_zips(
    urls: List[str],
    destination: Path = ZIP_DIR,
    chunk_size: int = 1 << 20,   # 1 MiB
    max_retries: int = 3,        # validate + retry on corruption/network errors
    retry_backoff_s: float = 2.0 # simple exponential backoff base
) -> None:
    """
    Stream‚Äêdownload every archive in *urls* into *destination* and **validate**
    the file as a real ZIP after each download. Corrupted or partial files are
    deleted and re-downloaded automatically (up to `max_retries` times).

    Behavior
    --------
    ‚Ä¢ Skips files that already exist **and** pass ZIP validation.  
    ‚Ä¢ If a file exists but is invalid, it is removed and fetched again.  
    ‚Ä¢ Streams in `chunk_size` blocks (memory-efficient).  
    ‚Ä¢ Prints progress roughly every 5% when the server sends Content-Length.  
    ‚Ä¢ Retries on network errors and on ZIP validation failures.

    Parameters
    ----------
    urls : list[str]
        Fully-qualified HTTPS URLs pointing to `.zip` files.
    destination : pathlib.Path
        Folder where the archives will be saved.
    chunk_size : int
        Bytes per `iter_content` read.
    max_retries : int
        Maximum attempts per file (including the first try).
    retry_backoff_s : float
        Base seconds for exponential backoff between retries (2, 4, 8, ...).

    Returns
    -------
    None
    """
    if not urls:
        print("‚ö†Ô∏è  No URLs supplied ‚Äì nothing to download.")
        return

    destination.mkdir(parents=True, exist_ok=True)

    with requests.Session() as session:
        session.headers.update(HEADERS)

        total_files = len(urls)
        for file_idx, url in enumerate(urls, start=1):
            filename  = Path(url).name
            local_zip = destination / filename

            # If file already exists, validate it; skip only if valid.
            if local_zip.exists():
                if zipfile.is_zipfile(local_zip):
                    print(f"‚úÖ [{file_idx}/{total_files}] {filename} already present and valid ‚Äì skipping.")
                    continue
                else:
                    print(f"‚ôªÔ∏è  [{file_idx}/{total_files}] {filename} exists but is invalid ‚Äì re-downloading ‚Ä¶")
                    local_zip.unlink(missing_ok=True)

            # Retry loop (covers both network errors and bad ZIPs)
            attempt = 0
            while attempt < max_retries:
                attempt += 1
                try:
                    print(f"‚¨áÔ∏è  [{file_idx}/{total_files}] Downloading {filename} (attempt {attempt}/{max_retries}) ‚Ä¶")
                    t0 = time.time()

                    with session.get(url, stream=True, timeout=90) as resp:
                        resp.raise_for_status()

                        total_bytes   = int(resp.headers.get("content-length", 0))
                        next_progress = 5  # 5%, 10%, ‚Ä¶
                        bytes_written = 0

                        with local_zip.open("wb") as fp:
                            for chunk in resp.iter_content(chunk_size=chunk_size):
                                if not chunk:
                                    continue
                                fp.write(chunk)
                                bytes_written += len(chunk)
                                if total_bytes:
                                    pct = bytes_written * 100 / total_bytes
                                    if pct >= next_progress:
                                        mb = bytes_written / 1_048_576
                                        print(f"   ‚Ä¢ {pct:5.1f}% ({mb:,.1f} MB)")
                                        next_progress += 5

                    elapsed = time.time() - t0
                    print(f"üéâ  Finished {filename} in {elapsed:,.1f}s\n")

                    # ZIP integrity check
                    if not zipfile.is_zipfile(local_zip):
                        print(f"‚ùå  Invalid ZIP detected for {filename}")
                        local_zip.unlink(missing_ok=True)
                        if attempt < max_retries:
                            sleep_s = retry_backoff_s ** attempt
                            print(f"   ‚Üª Retrying in {sleep_s:,.1f}s ‚Ä¶")
                            time.sleep(sleep_s)
                            continue
                        else:
                            print(f"üö´  Giving up on {filename} after {max_retries} attempts.")
                    # Valid file ‚Üí break retry loop
                    else:
                        break

                except requests.RequestException as err:
                    print(f"‚ùå  Network error while downloading {filename}: {err}")
                    local_zip.unlink(missing_ok=True)
                    if attempt < max_retries:
                        sleep_s = retry_backoff_s ** attempt
                        print(f"   ‚Üª Retrying in {sleep_s:,.1f}s ‚Ä¶")
                        time.sleep(sleep_s)
                        continue
                    else:
                        print(f"üö´  Giving up on {filename} after {max_retries} attempts.")

    print("üèÅ  All downloads completed and validated.")


In [7]:
# ===========================================================
#  ‚ÄúOne-click‚Äù runner ‚Äì tie everything together
# ===========================================================

def run_cnpj_download(limit: int = 37, clean_first: bool = False) -> None:
    """
    End-to-end helper that:

    1. *Optionally* wipes the target folders (`clean_first=True`);
    2. Prints an inventory of ZIPs already on disk;
    3. Scrapes the Receita Federal site for the **latest** monthly folder
       and grabs up to `limit` `.zip` URLs;
    4. Downloads any missing archives into `ZIP_DIR`; progress is streamed
       to the notebook console;
    5. Shows a final inventory so you can verify what was added.

    Parameters
    ----------
    limit : int, default=3
        Maximum number of archives to fetch.  Adjust to `None` or a
        larger value if you want *all* ZIPs for the month.
    clean_first : bool, default=False
        If *True* the function will call :pyfunc:`clean_folders` at the
        very start, guaranteeing a fresh state.

    Notes
    -----
    ‚Ä¢ The individual steps are printed with emoji bullets so readers
      can easily follow along.  
    ‚Ä¢ All underlying helpers (scrape, download, inventory) already
      include robust error handling; if something fails the exception
      message will bubble up and the notebook cell will stop ‚Äî making
      debugging straightforward.
    """
    print("\nüöÄ  CNPJ end-to-end download started\n" + "-" * 60)

    # ------------------------------------------------------
    # (1) Optional cleanup
    # ------------------------------------------------------
    if clean_first:
        print("üßπ  Cleaning existing files ‚Ä¶")
        clean_folders()
        print("   Done. ‚ú®\n")

    # ------------------------------------------------------
    # (2) Inventory before
    # ------------------------------------------------------
    print("üìë  Inventory *before* download:")
    pre_inventory = inventory_existing_zips()
    print()

    # ------------------------------------------------------
    # (3) Scrape latest folder + collect links
    # ------------------------------------------------------
    try:
        links = newest_month_links(max_links=limit)
    except RuntimeError as err:
        print(f"‚ùå  Aborting ‚Äì {err}")
        return
    print()

    # ------------------------------------------------------
    # (4) Download missing ZIPs
    # ------------------------------------------------------
    download_zips(links)
    print()

    # ------------------------------------------------------
    # (5) Inventory after
    # ------------------------------------------------------
    print("üìë  Inventory *after* download:")
    post_inventory = inventory_existing_zips()
    added = set(post_inventory) - set(pre_inventory)
    print(f"\n‚úÖ  Added {len(added)} new file(s): {', '.join(added) if added else '-'}")
    print("\nüèÅ  Workflow complete.")


# ‚ñ∂Ô∏è  Run with defaults (grab 3 ZIPs, keep existing files)
#    Uncomment the line below inside your notebook cell.
#run_cnpj_download()


# TESTE

In [13]:
# Notebook: pick-one-per-family (skip family if any local ZIP exists) + run pipeline
from pathlib import Path
from types import SimpleNamespace

# assumes you already have in the notebook:
# newest_month_links, download_zips,
# extract_zip_to_csv, create_temp_tables, load_csvs_into_postgres,
# promote_temp_tables, _harden_final_schemas

# ---------------------------------------------------------------------
# 0) Resolve repo paths like in the DAG (no imports from your .py file)
# ---------------------------------------------------------------------
REPO_ROOT = Path.cwd().resolve()
while REPO_ROOT != REPO_ROOT.parent:
    if (REPO_ROOT / "airflow" / "dags").is_dir():
        break
    REPO_ROOT = REPO_ROOT.parent

BASE_DIR       = REPO_ROOT / "airflow" / "datasets"
SOURCE_ZIP_DIR = BASE_DIR / "public-zips"
CSV_DIR        = BASE_DIR / "public-data"
SOURCE_ZIP_DIR.mkdir(parents=True, exist_ok=True)
CSV_DIR.mkdir(parents=True, exist_ok=True)

# ---------------------------------------------------------------------
# 1) Fetch all links for the newest month (adjust the limit if needed)
# ---------------------------------------------------------------------
all_links = newest_month_links(max_links=5000)

# family detection via filename substring (lower-cased)
FAMILIES = {
    "header": "header",
    "cnae": "cnae",
    "empresas": "empresas",
    "estabelec": "estabelecimento",
    "motivo": "motivo",
    "municipio": "municipio",
    "natureza": "natureza_juridica",
    "pais": "pais",
    "qualific": "qualificacao_socio",
    "simples": "simples",
    "socios": "socios_original",
    "secundaria": "cnaes_secundarias",
    "trailler": "trailler",
}

# ---------------------------------------------------------------------
# 2) Detect which families ALREADY have at least one local ZIP
#    If a family is present, we won't download anything else for it.
# ---------------------------------------------------------------------
present_files = [p.name.lower() for p in SOURCE_ZIP_DIR.glob("*.zip")]
present_families = set()
for fname in present_files:
    for sub, fam in FAMILIES.items():
        if sub in fname:
            present_families.add(fam)
            break

print(f"üì¶ Local ZIPs: {len(present_files)}")
if present_families:
    print("‚úÖ Families already present locally:", ", ".join(sorted(present_families)))
else:
    print("‚úÖ No local families yet.")

# ---------------------------------------------------------------------
# 3) For each family NOT present, pick the first monthly link candidate
# ---------------------------------------------------------------------
chosen = {}  # family -> url
for url in all_links:
    name_l = Path(url).name.lower()
    for sub, fam in FAMILIES.items():
        if sub in name_l:
            if fam in present_families or fam in chosen:
                # already have a local ZIP for this family, or we already picked one to download
                break
            chosen[fam] = url
            break

print(f"\nüîé Monthly links found: {len(all_links)}")
print(f"‚¨áÔ∏è Will download (one per family, only if missing locally): {len(chosen)}\n")
for fam, url in sorted(chosen.items()):
    print(f" ‚Ä¢ {fam:<22} -> {Path(url).name}")

# ---------------------------------------------------------------------
# 4) Download only what‚Äôs missing (no-op if everything is present)
# ---------------------------------------------------------------------
if chosen:
    print("\n‚¨áÔ∏è Downloading selected ZIP(s)‚Ä¶\n")
    download_zips(list(chosen.values()))
else:
    print("\n‚ÑπÔ∏è Nothing to download (each family already has a local ZIP).")

# ---------------------------------------------------------------------
# 5) Run the end-to-end pipeline using the functions already in the notebook
# ---------------------------------------------------------------------
print("\nüß© Running: extract_zip_to_csv ‚Üí create_temp_tables ‚Üí load_csvs ‚Üí promote ‚Üí harden\n")

# 5.1) Extract whatever is in SOURCE_ZIP_DIR to CSV_DIR (idempotent; bad zips are skipped)
extract_zip_to_csv(limit=None)

# 5.2) (Re)create temp tables
create_temp_tables()

# 5.3) Load all CSVs (uses your layout_key + robust COPY)
status = load_csvs_into_postgres(limit_files=None)
print(f"\nüìä load_csvs_into_postgres status: {'SUCCESS' if status else 'PARTIAL FAILURE'}")

# 5.4) Promote temp_* ‚Üí final, honoring the status (emulates XCom in notebook)
fake_ctx = {"ti": SimpleNamespace(xcom_pull=lambda **kw: status)}
promote_temp_tables(**fake_ctx)

# 5.5) Harden final schemas (TEXT ‚Üí DATE with cleanup)
_harden_final_schemas()

print("\n‚úÖ Notebook pipeline finished.")
print(f"   ZIPs dir: {SOURCE_ZIP_DIR}")
print(f"   CSVs dir: {CSV_DIR}")


üîé  Requesting main index ‚Ä¶
  ‚Ä¢ Latest folder detected: 2025-10/
  ‚Ä¢ 37 ZIPs available; returning the first 5000.
    - [1] Cnaes.zip
    - [2] Empresas0.zip
    - [3] Empresas1.zip
    - [4] Empresas2.zip
    - [5] Empresas3.zip
    - [6] Empresas4.zip
    - [7] Empresas5.zip
    - [8] Empresas6.zip
    - [9] Empresas7.zip
    - [10] Empresas8.zip
    - [11] Empresas9.zip
    - [12] Estabelecimentos0.zip
    - [13] Estabelecimentos1.zip
    - [14] Estabelecimentos2.zip
    - [15] Estabelecimentos3.zip
    - [16] Estabelecimentos4.zip
    - [17] Estabelecimentos5.zip
    - [18] Estabelecimentos6.zip
    - [19] Estabelecimentos7.zip
    - [20] Estabelecimentos8.zip
    - [21] Estabelecimentos9.zip
    - [22] Motivos.zip
    - [23] Municipios.zip
    - [24] Naturezas.zip
    - [25] Paises.zip
    - [26] Qualificacoes.zip
    - [27] Simples.zip
    - [28] Socios0.zip
    - [29] Socios1.zip
    - [30] Socios2.zip
    - [31] Socios3.zip
    - [32] Socios4.zip
    - [33] Socios5.zip


NameError: name 'extract_zip_to_csv' is not defined