In [5]:
%pip install rich

import polars as pl
from rich.progress import Progress, SpinnerColumn, BarColumn, TextColumn, TimeElapsedColumn

from pathlib import Path
from itertools import islice


from core.constants import TABLES_INFO_DICT 

def csvs_to_parquet(audit_map: dict, unzip_dir: Path, output_dir: Path):
    """
    Convert CSVs under unzip_dir to one Parquet per key,
    selecting and ordering columns according to TABLES_INFO_DICT.
    """
    output_dir.mkdir(exist_ok=True)

    with Progress(
        SpinnerColumn(),
        TextColumn("{task.description}"),
        BarColumn(),
        TextColumn("{task.completed}/{task.total}"),
        TimeElapsedColumn(),
    ) as prog:
        for table_name, zip_map in audit_map.items():
            csv_paths = [unzip_dir / fname for files in zip_map.values() for fname in files]
            if not csv_paths:
                print(f"No CSVs for '{table_name}', skipping.")
                continue

            expected_columns = TABLES_INFO_DICT.get(table_name, {}).get("columns")
            encoding = TABLES_INFO_DICT[table_name].get("encoding", "utf8-lossy")

            if not expected_columns:
                print(f"No column mapping found for '{table_name}', skipping.")
                continue

            task = prog.add_task(f"[green]{table_name}", total=1)  # total = 1 per table
            out_file = output_dir / f"{table_name}.parquet"
            if out_file.exists():
                out_file.unlink()

            try:
                schema = {col: pl.Utf8 for col in expected_columns}
                dfs_lazy = [
                    pl.scan_csv(str(filepath), schema=schema, encoding='utf8-lossy')
                    for filepath in csv_paths if filepath.exists()
                ]

                if not dfs_lazy:
                    print(f"No valid CSV files for '{table_name}', skipping.")
                    continue

                df = pl.concat(dfs_lazy)
                df.write_parquet(str(out_file), cmpression='zstd')

                prog.update(task, advance=1)  # mark table as done

            except Exception as e:
                print(f"Error processing '{table_name}': {e}")



Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.2 -> 25.1.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [7]:
%pip install psycopg2

from os import getcwd, path
from pathlib import Path

import polars as pl

from core.constants import TABLES_INFO_DICT
from setup.base import get_sink_folder, init_database
from core.etl import CNPJ_ETL

YEAR = 2025
MONTH = str(5).zfill(2)
FILES_URL = f"https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/{YEAR}-{MONTH}"
LAYOUT_URL = "https://www.gov.br/receitafederal/dados/cnpj-metadados.pdf"

download_folder=Path(path.join(getcwd(), "../data/DOWNLOAD_FILES"))
extract_folder=Path(path.join(getcwd(), "../data/EXTRACTED_FILES"))
parquet_folder=Path(path.join(getcwd(), "../data/PARQUET_FILES"))

database = init_database(f"dadosrfb_{YEAR}{MONTH}")

scraper = CNPJ_ETL(
    database, FILES_URL, LAYOUT_URL,
    download_folder, extract_folder,
    is_parallel=True, delete_zips=True
)
audits = scraper._prepare_audit_metadata(scraper.fetch_data()).tablename_to_zipfile_to_files

csvs_to_parquet(audits, download_folder, parquet_folder)

Collecting psycopg2
  Downloading psycopg2-2.9.10-cp312-cp312-win_amd64.whl.metadata (5.0 kB)
Downloading psycopg2-2.9.10-cp312-cp312-win_amd64.whl (1.2 MB)
   ---------------------------------------- 0.0/1.2 MB ? eta -:--:--
   --------------------------- ------------ 0.8/1.2 MB 6.7 MB/s eta 0:00:01
   ---------------------------------------- 1.2/1.2 MB 5.7 MB/s eta 0:00:00
Installing collected packages: psycopg2
Successfully installed psycopg2-2.9.10
Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.2 -> 25.1.1
[notice] To update, run: python.exe -m pip install --upgrade pip


ModuleNotFoundError: No module named 'yaml'

In [2]:
!pip install polars

Defaulting to user installation because normal site-packages is not writeable



[notice] A new release of pip is available: 25.0.1 -> 25.1.1
[notice] To update, run: C:\Users\bruno\AppData\Local\Microsoft\WindowsApps\PythonSoftwareFoundation.Python.3.12_qbz5n2kfra8p0\python.exe -m pip install --upgrade pip
