In [59]:
import os
import re
import zipfile
from datetime import datetime

import requests
from bs4 import BeautifulSoup

In [14]:
# URL da página de downloads
url = "https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/"


def get_latest_url():
    response = requests.get(url)
    soup = BeautifulSoup(response.content, 'html.parser')

    # Regex para extrair ano e mês do texto do link
    date_pattern = re.compile(r"(\d{4})-(\d{2})")

    current_latest_date = None
    latest_url = None

    # Itera sobre todos os links encontrados na página
    for link in soup.find_all('a', href=True):
        href = link['href']

        match = date_pattern.search(href)
        if match:
            year, month = match.groups()
            date = datetime(int(year), int(month), 1)

            # Atualiza para o link mais recente
            if current_latest_date is None or date > current_latest_date:
                current_latest_date = date
                latest_url = url + href

    return latest_url


if __name__ == "__main__":
    latest_data_url = get_latest_url()
    if latest_data_url:
        print("URL mais atual:", latest_data_url)
    else:
        print("Nenhum dado correspondente encontrado.")

URL mais atual: https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/2024-11/


In [31]:
dados_baixar = ["cnaes", "motivos"]

response = requests.get(latest_data_url)
soup = BeautifulSoup(response.content, 'html.parser')

links = [link['href'] for link in soup.find_all('a', href=True)]

# Filtrar links
filtered_links = [f"{latest_data_url}{link}" for link in links if link.endswith('.zip') and
                  any(entity in link.lower() for entity in dados_baixar)]

# Exibir os links filtrados
print("Links filtrados:", filtered_links)

Links filtrados: ['https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/2024-11/Cnaes.zip', 'https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/2024-11/Motivos.zip']


In [76]:
# Função para exibir o progresso do download
def download_with_progress(file_url, file_path):
    with requests.get(file_url, stream=True) as file_response:
        file_response.raise_for_status()
        total_size = int(file_response.headers.get('content-length', 0))  # Tamanho total do arquivo
        chunk_size = 8192
        downloaded_size = 0

        # Abre o arquivo local para gravar o conteúdo baixado
        with open(file_path, 'wb') as file:
            # Baixa em blocos de chunk_size e escreve no arquivo
            for chunk in file_response.iter_content(chunk_size=chunk_size):
                if chunk:  # Filtro para ignorar chunks vazios
                    file.write(chunk)
                    downloaded_size += len(chunk)

                    # Calcula o percentual baixado
                    percent_downloaded = (downloaded_size / total_size) * 100
                    print(f"\rBaixando {file_path}: {percent_downloaded:.2f}% completo", end='')

    print(f"\n{file_path} baixado com sucesso!")


# Diretório onde os arquivos serão salvos
output_dir = "../dados"
os.makedirs(output_dir, exist_ok=True)

for link in filtered_links:
    file_name = link.split("/")[-1]
    file_path = os.path.join(output_dir, file_name)

    print(f"Preparando para baixar {file_name}...")
    download_with_progress(link, file_path)

print("Todos os arquivos foram baixados!")


Preparando para baixar Cnaes.zip...
Baixando ../dados/Cnaes.zip: 100.00% completo
../dados/Cnaes.zip baixado com sucesso!
Preparando para baixar Motivos.zip...
Baixando ../dados/Motivos.zip: 100.00% completo
../dados/Motivos.zip baixado com sucesso!
Todos os arquivos foram baixados!


In [68]:
# Função para descompactar os arquivos .zip
def extract_zip_files(zip_dir):
    # Percorre todos os arquivos no diretório
    for file_name in os.listdir(zip_dir):
        if file_name.endswith(".zip"):
            # Caminho completo do arquivo .zip
            zip_file_path = os.path.join(zip_dir, file_name)

            # Usa regex para remover números e extensão do nome do arquivo
            base_name = re.sub(r'\d*\.zip$', '', file_name)
            output_folder = os.path.join(zip_dir, base_name)

            # Cria o diretório de saída se ele não existir
            os.makedirs(output_folder, exist_ok=True)

            # Descompacta o arquivo .zip para o diretório de saída
            with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
                print(f"Descompactando {file_name} para {output_folder}...")
                zip_ref.extractall(output_folder)
                print(f"{file_name} descompactado com sucesso!")


# Diretório onde os arquivos .zip estão localizados
zip_dir = "../dados"

# Chama a função para descompactar os arquivos .zip
extract_zip_files(zip_dir)

print("Todos os arquivos .zip foram descompactados!")


Descompactando Cnaes.zip para ../dados/Cnaes...
Cnaes.zip descompactado com sucesso!
Descompactando Motivos.zip para ../dados/Motivos...
Motivos.zip descompactado com sucesso!
Todos os arquivos .zip foram descompactados!
