In [1]:
import asyncio
import aiohttp
from aiohttp import ClientSession
from datetime import datetime
from pathlib import Path
from tqdm.notebook import tqdm  # Jupyter-specific progress bar
import nest_asyncio
import os

nest_asyncio.apply()


In [2]:
def get_zip_urls(year_month: str, name:str) -> list[str]:
    """Generate URLs for CNPJ zip files for a given year-month."""
    base_url = f"https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/{year_month}/"
    return [f"{base_url}{name}{i}.zip" for i in range(10)]

In [3]:
year_month = datetime.now().strftime("%Y-%m")
get_zip_urls(year_month, "Estabelecimentos")

['https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/2025-08/Estabelecimentos0.zip',
 'https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/2025-08/Estabelecimentos1.zip',
 'https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/2025-08/Estabelecimentos2.zip',
 'https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/2025-08/Estabelecimentos3.zip',
 'https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/2025-08/Estabelecimentos4.zip',
 'https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/2025-08/Estabelecimentos5.zip',
 'https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/2025-08/Estabelecimentos6.zip',
 'https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/2025-08/Estabelecimentos7.zip',
 'https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/2025-08/Estabelecimentos8.zip',
 'https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/20

In [None]:
async def download_file(session: ClientSession, url: str, pbar: tqdm, download_path: str, retries=3):
    filename = url.split('/')[-1]
    file_path = os.path.join(download_path, filename)  # Add this line
    
    for attempt in range(retries):
        try:
            async with session.get(url) as response:
                response.raise_for_status()
                with open(file_path, 'wb') as f:  # Modified this line
                    async for chunk in response.content.iter_chunked(1024*1024):
                        f.write(chunk)
                pbar.update(1)
                pbar.set_postfix_str(filename)
                return True
        except Exception as e:
            if attempt < retries - 1:
                await asyncio.sleep(2 ** attempt)
            else:
                pbar.write(f"Failed to download {filename}: {str(e)}")
                return False

async def download_urls(urls, download_path,):  # Modified this line
    os.makedirs(download_path, exist_ok=True)  # Add this line
    connector = aiohttp.TCPConnector(limit=5)
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'
    }

    with tqdm(total=len(urls), desc="Downloading files") as pbar:
        async with aiohttp.ClientSession(connector=connector, headers=headers) as session:
            tasks = [download_file(session, url, pbar, download_path) for url in urls]  # Modified this line
            results = await asyncio.gather(*tasks)
            
            success_count = sum(results)
            pbar.write(f"\nSuccessfully downloaded {success_count}/{len(urls)} files to {download_path}")





def unzip_files(download_path):
    # Get list of zip files
    zip_files = [f for f in os.listdir(download_path) if f.endswith('.zip')]
    
    if not zip_files:
        print("No zip files found in the directory")
        return

    with tqdm(total=len(zip_files), desc="Unzipping files") as pbar:
        for zip_file in zip_files:
            file_path = os.path.join(download_path, zip_file)
            try:
                with zipfile.ZipFile(file_path, 'r') as zip_ref:
                    zip_ref.extractall(download_path)
                
                pbar.update(1)
                pbar.write(f"Successfully extracted {zip_file} to {download_path}")
                os.remove(zip_file)
                
            except Exception as e:
                pbar.write(f"Error extracting {zip_file}: {str(e)}")


In [None]:
year_month = datetime.now().strftime("%Y-%m")
base_dir = Path("../data/inputs/rfb_cnpj")
download_path = base_dir / year_month
download_path

PosixPath('../data/inputs/rfb_cnpj/2025-02')

In [None]:
urls = get_zip_urls(year_month, "Estabelecimentos")
await download_urls(urls, download_path)

Downloading files:   0%|          | 0/10 [00:00<?, ?it/s]


Successfully downloaded 10/10 files to ../data/inputs/rfb_cnpj/2025-02


In [None]:
unzip_files(download_path)

Unzipping files:   0%|          | 0/10 [00:00<?, ?it/s]

Error extracting Estabelecimentos4.zip: name 'zipfile' is not defined
Error extracting Estabelecimentos5.zip: name 'zipfile' is not defined
Error extracting Estabelecimentos7.zip: name 'zipfile' is not defined
Error extracting Estabelecimentos6.zip: name 'zipfile' is not defined
Error extracting Estabelecimentos2.zip: name 'zipfile' is not defined
Error extracting Estabelecimentos3.zip: name 'zipfile' is not defined
Error extracting Estabelecimentos1.zip: name 'zipfile' is not defined
Error extracting Estabelecimentos0.zip: name 'zipfile' is not defined
Error extracting Estabelecimentos8.zip: name 'zipfile' is not defined
Error extracting Estabelecimentos9.zip: name 'zipfile' is not defined


In [None]:
import os
import zipfile


def unzip_files(download_path):
    # Get list of zip files
    zip_files = [f for f in os.listdir(download_path) if f.endswith('.zip')]
    
    if not zip_files:
        print("No zip files found in the directory")
        return

    with tqdm(total=len(zip_files), desc="Unzipping files") as pbar:
        for zip_file in zip_files:
            file_path = os.path.join(download_path, zip_file)
            try:
                with zipfile.ZipFile(file_path, 'r') as zip_ref:
                    zip_ref.extractall(download_path)
                
                pbar.update(1)
                pbar.write(f"Successfully extracted {zip_file} to {download_path}")
                os.remove(file_path)  # Use the full path, not just the filename
                
            except zipfile.BadZipFile as e:
                pbar.write(f"Error extracting {zip_file}: Not a valid zip file")
            except PermissionError:
                pbar.write(f"Error extracting {zip_file}: Permission denied")
            except Exception as e:
                pbar.write(f"Error extracting {zip_file}: {str(e)}")

In [None]:
unzip_files(download_path)

Unzipping files:   0%|          | 0/10 [00:00<?, ?it/s]

Successfully extracted Estabelecimentos4.zip to ../data/inputs/rfb_cnpj/2025-02
Successfully extracted Estabelecimentos5.zip to ../data/inputs/rfb_cnpj/2025-02
Successfully extracted Estabelecimentos7.zip to ../data/inputs/rfb_cnpj/2025-02
Successfully extracted Estabelecimentos6.zip to ../data/inputs/rfb_cnpj/2025-02
Successfully extracted Estabelecimentos2.zip to ../data/inputs/rfb_cnpj/2025-02
Successfully extracted Estabelecimentos3.zip to ../data/inputs/rfb_cnpj/2025-02
Successfully extracted Estabelecimentos1.zip to ../data/inputs/rfb_cnpj/2025-02
Successfully extracted Estabelecimentos0.zip to ../data/inputs/rfb_cnpj/2025-02
Successfully extracted Estabelecimentos8.zip to ../data/inputs/rfb_cnpj/2025-02
Successfully extracted Estabelecimentos9.zip to ../data/inputs/rfb_cnpj/2025-02
