In [0]:
# Instalando o lxml para processamento do HTML
%pip install lxml==6.0.1

In [0]:
# Importação das bibliotecas necessárias
from requests import get
from lxml import etree
from urllib.request import urlretrieve
from datetime import datetime, timedelta
from zipfile import ZipFile
import pytz

import sys
workspace = dbutils.widgets.get('workspace')
sys.path.insert(0, f"{workspace}scr")

from libs.create_log_table import create_table

myparser = etree.HTMLParser(encoding='utf-8')

In [0]:
def inserts_log(
    pasta: str,
    inicio: datetime,
    fim: datetime,
    descricao: str = None,
    observacao: str = None
):
    
    # Criando a tabela caso não exista
    create_table(
        spark=spark, 
        catalog='projeto_cnpj', 
        schema='admin'
    )

    sql = f'''
        INSERT INTO projeto_cnpj.admin.log_extracao(codigo, inicio, fim, descricao, observacao)
        VALUES ('{pasta}', '{inicio}', '{fim}', '{descricao}', '{observacao}')
        '''
    spark.sql(sql)


def processamento_html(html):
    # Processando o HTML e buscando os arquivos zip
    tree = etree.HTML(html, parser=myparser)
    for link in tree.xpath('//table'):
        folders = [f for f in link.xpath('./tr/td/a/@href') if f.endswith('.zip')]
    return folders

def download_folders(
    url_base: str, 
    mes: str, 
    path_source: str, 
    folders: str
):
    # Realizando o download dos arquivos
    for folder in folders:
        url = f"{url_base}{mes}/{folder}"
        print(url)
        urlretrieve(url, f"{path_source}/{folder}")

def unzip_files(path_source: str, path_destination: str):
    paths = dbutils.fs.ls(path_source)

    # Descompactando os arquivos
    for path in paths:
        with ZipFile(f"{path_source}/{path.name}", 'r') as zipObj:
            zipObj.extractall(f"{path_destination}/{path.name.split('.')[0].lower()}")

        # Excluindo os arquivos compactados logo após descompactação
        dbutils.fs.rm(f"{path_source}/{path.name}", True)

In [0]:

# Caminhos onde os arquivos serão baixados e descompactados
path_source = dbutils.widgets.get('path_source')  # arquivos compactados zip
path_destination = dbutils.widgets.get('path_destination') # Arquivos descompactados
url_base = 'https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/'
mes = (datetime.today() - timedelta(days=15)).strftime('%Y-%m')
brasil_tz = pytz.timezone("America/Sao_Paulo")
inicio = datetime.now(brasil_tz)

# Realizando a requisição
requisicao = get(f"{url_base}{mes}")

# Realizando o download e descompactação dos arquivos
if requisicao.status_code != 200:
    print('Arquivos ainda não disponíveis')
else:
    html = requisicao.content
    folders = processamento_html(html)
    download_folders(url_base, mes, path_source, folders)
    unzip_files(path_source, path_destination)

fim = datetime.now(brasil_tz)

# Inserindo o log
inserts_log(
    pasta=mes,
    inicio=inicio.strftime('%Y-%m-%d %H:%M:%S'),
    fim=fim.strftime('%Y-%m-%d %H:%M:%S'),
    descricao='Processo de extração dos dados do CNPJ no site da Receita Federal',
    observacao=f"{url_base}{mes}"
)