In [1]:
# Libs
import os
import pandas as pd
from download_transformacao_CNPJ import EXTRATOR_CNPJ
from pyspark.sql.functions import concat_ws, lpad, coalesce, when, lit
from time import localtime, strftime
current_dir = os.getcwd()
dir_dados = os.path.join(current_dir, r"dados")

In [2]:
# Se passar baixar_e_extrair como false, precisa do nome do arquivo.
print(f'Começando a buscar os dados: {strftime("%d/%m/%Y %H:%M:%S", localtime())}')
ESTABELECIMENTOS, spark = EXTRATOR_CNPJ(baixar_e_extrair=False, nome_arquivo="Estabelecimentos").run()
print(f'Termino da coleta dos ESTABELECIMENTOS: {strftime("%d/%m/%Y %H:%M:%S", localtime())}')
EMPRESAS, spark = EXTRATOR_CNPJ(baixar_e_extrair=False, nome_arquivo="Empresas").run()
print(f'Termino da coleta dos EMPRESAS: {strftime("%d/%m/%Y %H:%M:%S", localtime())}')
MUNICIPIOS, spark = EXTRATOR_CNPJ(baixar_e_extrair=False, nome_arquivo="Municipios").run()
print(f'Final da coleta dos dados: {strftime("%d/%m/%Y %H:%M:%S", localtime())}')


Começando a buscar os dados: 10/05/2023 09:01:28
Termino da coleta dos ESTABELECIMENTOS: 10/05/2023 09:03:42
Termino da coleta dos EMPRESAS: 10/05/2023 09:04:36
Final da coleta dos dados: 10/05/2023 09:04:36


In [3]:
CNAES = {
        5612100:'Serviços ambulantes de alimentação',
        5611201:'Restaurantes e similares',
        5611203:'Lanchonetes casas de chá de sucos e similares',
        5611204:'Bares e outros estabelecimentos especializados em servir bebidas sem entretenimento',
        5611205:'Bares e outros estabelecimentos especializados em servir bebidas com entretenimento',
        4721102: 'Padaria e confeitaria com predominância de revenda'
        }

# SQL QUERY SPARK
``` PYTHON
# Cria uma view com o mesmo nome do DataFrame
ESTABELECIMENTOS.createOrReplaceTempView("ESTABELECIMENTOS")
# Contagem ativos, inativos e geral
spark.sql("""SELECT t1.CNAE_PRINCIPAL, t1.QUANTIDADE_FECHADA, t2.QUANTIDADE_GERAL, t3.QUANTIDADE_ABERTA
            FROM (
            SELECT CNAE_PRINCIPAL, COUNT(*) AS QUANTIDADE_FECHADA
            FROM ESTABELECIMENTOS
            WHERE CNAE_PRINCIPAL IN ('4721102','5612100', '5611201', '5611203', '5611204', '5611205') AND SITUACAO_CADASTRAL NOT IN (2, 3, 4)
            GROUP BY CNAE_PRINCIPAL
            ) AS t1
            FULL OUTER JOIN (
            SELECT CNAE_PRINCIPAL, COUNT(*) AS QUANTIDADE_GERAL
            FROM ESTABELECIMENTOS 
            WHERE CNAE_PRINCIPAL IN ('4721102','5612100', '5611201', '5611203', '5611204', '5611205')
            GROUP BY CNAE_PRINCIPAL 
            ) AS t2 
            ON t1.CNAE_PRINCIPAL = t2.CNAE_PRINCIPAL
            FULL OUTER JOIN (
            SELECT CNAE_PRINCIPAL, COUNT(*) AS QUANTIDADE_ABERTA
            FROM ESTABELECIMENTOS
            WHERE CNAE_PRINCIPAL IN ('4721102','5612100', '5611201', '5611203', '5611204', '5611205') AND SITUACAO_CADASTRAL IN (2, 3, 4)
            GROUP BY CNAE_PRINCIPAL
            ) AS t3 
            ON t1.CNAE_PRINCIPAL = t3.CNAE_PRINCIPAL
            ORDER BY t1.CNAE_PRINCIPAL
            LIMIT 100
            """).show()
```

In [4]:
def filtragem_cnae_sql(cod_cnae: int):
    # Cria uma view com o mesmo nome do DataFrame
    ESTABELECIMENTOS.createOrReplaceTempView("ESTABELECIMENTOS")
    EMPRESAS.createOrReplaceTempView("EMPRESAS")
    MUNICIPIOS.createOrReplaceTempView("MUNICIPIOS")
    # cria um dataframe com base na query
    dataframe = spark.sql(
        f"""
        SELECT CONCAT(LPAD(EST.CNPJ_BASE, 8, '0'), LPAD(EST.CNPJ_ORDEM, 4, '0'), LPAD(EST.CNPJ_DV, 2, '0')) AS CNPJ,
                EMP.RAZAO_SOCIAL,
                EST.NOME_FANTASIA,
                EST.SITUACAO_CADASTRAL,
                EST.DATA_SITUACAO_CADASTRAL,
                EST.DATA_INICIO_ATIVIDADE,
                EST.CNAE_PRINCIPAL,
                CONCAT(
                    COALESCE(EST.TIPO_LOGRADOURO, ''),
                    ' ',
                    COALESCE(EST.LOGRADOURO, ''),
                    ' ',
                    COALESCE(EST.NUMERO, ''),
                    ' ',
                    COALESCE(EST.COMPLEMENTO, '')
                ) AS ENDERECO
                EST.BAIRRO,
                MUN.NOME_MUNICIPIO AS CIDADE,
                EST.UF,
                EST.CEP,
                CONCAT(
                    COALESCE(EST.DDD_CONTATO, ''), 
                    ' ',
                    COALESCE(EST.TELEFONE_CONTATO, '')
                    ) AS TELEFONE,
                EST.EMAIL
        FROM ESTABELECIMENTOS AS EST, EMPRESAS AS EMP, MUNICIPIOS AS MUN
        WHERE EST.CNPJ_BASE = EMP.CNPJ_BASE
            AND EST.CODIGO_MUNICIPIO = MUN.CODIGO_MUNICIPIO
            AND EST.CNAE_PRINCIPAL = {cod_cnae}
            AND EST.SITUACAO_CADASTRAL IN (2, 3, 4)
        """
    )
    #display(dataframe.show(5))
    return dataframe

In [5]:
def filtragem_cnae_df(cod_cnae:int):
    # cria um dataframe com base nos filtros aplicados
    from pyspark.sql.functions import concat_ws, lpad, coalesce, when
    dataframe = (
        ESTABELECIMENTOS
        .join(EMPRESAS, "CNPJ_BASE", "right")
        .join(MUNICIPIOS, "CODIGO_MUNICIPIO", "right")
        .where(
            (ESTABELECIMENTOS["CNAE_PRINCIPAL"] == f"{cod_cnae}") &
            (ESTABELECIMENTOS["SITUACAO_CADASTRAL"].isin([2, 3, 4]))
        )
        .select(
            concat_ws("", 
                lpad(ESTABELECIMENTOS["CNPJ_BASE"].cast("bigint"), 8, "0"), 
                lpad(ESTABELECIMENTOS["CNPJ_ORDEM"], 4, "0"), 
                lpad(ESTABELECIMENTOS["CNPJ_DV"], 2, "0")
            ).alias("CNPJ"),
            when(EMPRESAS.RAZAO_SOCIAL.isNull(), None).otherwise(EMPRESAS.RAZAO_SOCIAL).alias("RAZAO_SOCIAL"),
            when(ESTABELECIMENTOS.NOME_FANTASIA.isNull(), None).otherwise(ESTABELECIMENTOS.NOME_FANTASIA).alias("NOME_FANTASIA"),
            ESTABELECIMENTOS.SITUACAO_CADASTRAL,
            ESTABELECIMENTOS.DATA_SITUACAO_CADASTRAL,
            ESTABELECIMENTOS.DATA_INICIO_ATIVIDADE,
            ESTABELECIMENTOS.CNAE_PRINCIPAL,
            concat_ws(" ",
                when(ESTABELECIMENTOS.TIPO_LOGRADOURO.isNull(), None).otherwise(ESTABELECIMENTOS.TIPO_LOGRADOURO),
                when(ESTABELECIMENTOS.LOGRADOURO.isNull(), None).otherwise(ESTABELECIMENTOS.LOGRADOURO),
                when(ESTABELECIMENTOS.NUMERO.isNull(), None).otherwise(ESTABELECIMENTOS.NUMERO),
                when(ESTABELECIMENTOS.COMPLEMENTO.isNull(), None).otherwise(ESTABELECIMENTOS.COMPLEMENTO)
            ).alias("ENDERECO"),
            ESTABELECIMENTOS.BAIRRO,
            MUNICIPIOS.NOME_MUNICIPIO.alias("CIDADE"),
            ESTABELECIMENTOS.UF,
            ESTABELECIMENTOS.CEP,
            concat_ws("-", 
                when(ESTABELECIMENTOS.DDD_CONTATO.isNull(), None).otherwise(ESTABELECIMENTOS.DDD_CONTATO),
                when(ESTABELECIMENTOS.TELEFONE_CONTATO.isNull(), None).otherwise(ESTABELECIMENTOS.TELEFONE_CONTATO)
            ).alias("TELEFONE"),
            ESTABELECIMENTOS.EMAIL
        )
    )

    #display(dataframe.show(5))
    return dataframe

### Salva no disco e lê após isso
``` PYTHON
# Libs
import requests # SE FOR USAR REQUESTS
import urllib.request # SE FOR USAR URLLIB
from pySmartDL import SmartDL # SE FOR USAR PYSMARTDL
import zipfile
import os
from pyspark.sql import SparkSession
current_dir = os.getcwd()

# Define ou busca uma sessão do Spark
spark = SparkSession.builder.master("local[2]") \
    .appName("OnlineReader") \
    .getOrCreate()

# Define a url de download dos dados
url = 'https://dadosabertos.rfb.gov.br/CNPJ/Simples.zip'
# Pega o nome do arquivo pela url
arquivo = url.split('/')[-1]
# Define o caminho absoluto do diretório.
salvar_onde = f"{current_dir}/RAW/{arquivo.split('.')[0]}/"

# cria a pasta para armazenar o arquivo, se ela não existir
if not os.path.exists(salvar_onde):
    os.makedirs(salvar_onde)

### Com requests
"""
    with requests.get(url, stream=True) as response:
        with open(os.path.join(salvar_onde, arquivo), 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)

"""
### Com urllib

# faz o download do arquivo e salva em salvar_onde/arquivo
urllib.request.urlretrieve(url, os.path.join(salvar_onde, arquivo))

### Com SmartDL
"""
    dest = os.path.join(salvar_onde, arquivo)
    obj = SmartDL(url, dest, threads=4)
    obj.start()
"""

"""
# Imprime o caminho do diretório de download
print(salvar_onde)
"""

# Descompactação do arquivo
with zipfile.ZipFile(os.path.join(salvar_onde, arquivo), 'r') as zip_ref:

    # obtem o nome do primeiro arquivo dentro do zip
    nome_original_arquivo_zip = zip_ref.namelist()[0]

    # define um novo nome para o arquivo
    novo_nome_arquivo = f"CNPJ_{arquivo.split('.')[0]}.csv"

    # cria um dicionário com as informações de origem e destino
    arquivos_para_extrair = {nome_original_arquivo_zip : novo_nome_arquivo}

    # realiza a extração do arquivo zip
    zip_ref.extractall(path = f"{salvar_onde}/", members=arquivos_para_extrair)
    
    # renomeia o arquivo extraído com o novo nome
    os.rename(os.path.join(salvar_onde, nome_original_arquivo_zip), os.path.join(salvar_onde, novo_nome_arquivo))

"""
# imprime o nome do arquivo dentro do zip e o novo nome
print(f"Arquivo dentro do zip: {nome_original_arquivo_zip}")
print(f"Novo nome do arquivo: {novo_nome_arquivo}")
"""

# Define o caminho absoluto para o arquivo
csv_file = os.path.join(salvar_onde, novo_nome_arquivo)

# Lê o arquivo em um dataframe Spark
dados = spark.read.options(delimiter=";", header=False, inferSchema=True).csv(csv_file)

# Plota primeira linha do dataframe
dados.show(1, vertical=True)
```

In [6]:
def salvar_df_cnae(CNAES:dict[int,str] = CNAES):
    from backup_limpeza import backup_limpeza_simples
    """
    Args:
        CNAES (dict[int,str], optional): informa um dicionário com os códigos e descrição cnae.
    Return:
        dados_pandas : salva o dataframe gerador pela função em um arquivo único csv e arquivos parquets
    """
    arquivo_csv = os.path.join(dir_dados, r"csv\BASE_RFB.csv")
    if os.path.exists(arquivo_csv):
        backup_limpeza_simples(pasta=arquivo_csv.replace(r"BASE_RFB.csv", ""), nome_zipado=f"BASE_RFB_{strftime('%d-%m-%Y %H_%M_%S', localtime())}.zip")
    dados = None
    for cod_cnae, descricao_cnae in CNAES.items():
        print(cod_cnae)
        dados = filtragem_cnae_df(cod_cnae)
        dados = dados.withColumn(
                        "CNAE_DESCRICAO", lit(descricao_cnae.upper())
                    )
        dados_pandas = dados.toPandas()
        dados_pandas.to_csv(arquivo_csv, sep=";",mode="a", encoding="utf-8", index=False)

In [7]:
salvar_df_cnae(CNAES)

5612100
5611201
5611203
5611204
5611205
4721102


In [2]:
arquivo_csv = os.path.join(dir_dados, r"csv\BASE_RFB.csv")
print(os.path.exists(arquivo_csv))

True


# Define função que manipula a list ade arquivos
```PYTHON
def manipula_lista_arquivos(lista_de_arquivos:list):
    # Lê cada arquivo parquet em um dataframe Spark 
    dados = None
    for arquivo_no_diretorio in lista_de_arquivos:
        if dados is None:
            dados = (
                spark.read.format("parquet")
                .option("inferSchema", "true")
                .load(arquivo_no_diretorio)
            )
        else:
            # print(arquivo_no_diretorio)
            dados_incrementados = (
                spark.read.format("parquet")
                .option("inferSchema", "true")
                .load(arquivo_no_diretorio)
            )
            dados = dados.union(dados_incrementados)
    
    pd_dados = dados.toPandas()
    #pd_dados = dados.toPandas()
    return pd_dados```

``` PYTHON
dir_parquet = dir_dados + r"\parquet"
lista_arquivos_no_diretorio = [
            os.path.join(dir_parquet, nome)
            for nome in os.listdir(dir_parquet)
            if nome.endswith(".parquet")
        ]
df_to_save = manipula_lista_arquivos(lista_arquivos_no_diretorio)```

```PYTHON
arquivo_csv = os.path.join(dir_dados, r"csv\BASE_RFB.csv")
df_to_save.to_csv(arquivo_csv, sep=";", encoding="utf-8", index=False)```