In [1]:
# Libs
import os
import pandas as pd
from processos import  EXTRATOR_CNPJ
from pyspark.sql.functions import concat_ws, lpad, coalesce, when, lit, col, date_format
from functools import reduce
from pyspark.sql import DataFrame
from typing import List
from time import localtime, strftime
current_dir = os.getcwd()
dir_dados = r"\dados"

In [2]:
# Se passar baixar_e_extrair como false, precisa do nome do arquivo.
print(f'Começando a buscar os dados: {strftime("%d/%m/%Y %H:%M:%S", localtime())}')
ESTABELECIMENTOS, spark = EXTRATOR_CNPJ(baixar_e_extrair=False, nome_arquivo="Estabelecimentos").run()

print(f'Termino da coleta dos ESTABELECIMENTOS: {strftime("%d/%m/%Y %H:%M:%S", localtime())}')
EMPRESAS, spark = EXTRATOR_CNPJ(baixar_e_extrair=False, nome_arquivo="Empresas").run()

print(f'Termino da coleta dos EMPRESAS: {strftime("%d/%m/%Y %H:%M:%S", localtime())}')
MUNICIPIOS, spark = EXTRATOR_CNPJ(baixar_e_extrair=False, nome_arquivo="Municipios").run()

print(f'Final da coleta dos MUNICIPIOS: {strftime("%d/%m/%Y %H:%M:%S", localtime())}')
CNAES, spark = EXTRATOR_CNPJ(baixar_e_extrair=False, nome_arquivo="Cnaes").run()

print(f'Final da coleta dos dados: {strftime("%d/%m/%Y %H:%M:%S", localtime())}')

Começando a buscar os dados: 30/08/2023 15:49:06
Termino da coleta dos ESTABELECIMENTOS: 30/08/2023 15:50:54
Termino da coleta dos EMPRESAS: 30/08/2023 15:51:43
Final da coleta dos MUNICIPIOS: 30/08/2023 15:51:43
Final da coleta dos dados: 30/08/2023 15:51:43


In [3]:
from pyspark.sql.functions import split, explode
# Cria uma nova coluna com os valores da coluna CNAE_SECUNDARIO divididos em uma lista
ESTABELECIMENTOS = ESTABELECIMENTOS.withColumn("CNAE_SECUNDARIO_LIST", split(ESTABELECIMENTOS["CNAE_SECUNDARIO"], ","))
# Explode a coluna CNAE_SECUNDARIO_LIST para criar uma linha para cada valor
ESTABELECIMENTOS = ESTABELECIMENTOS.select("*", explode("CNAE_SECUNDARIO_LIST").alias("CNAE_SECUNDARIO_VALUE"))

# Cria uma view com o mesmo nome do DataFrame
ESTABELECIMENTOS.createOrReplaceTempView("ESTABELECIMENTOS")
EMPRESAS.createOrReplaceTempView("EMPRESAS")
MUNICIPIOS.createOrReplaceTempView("MUNICIPIOS")
CNAES.createOrReplaceTempView("CNAES")

In [4]:
CNAES_FILTROS = {
        5620104:'Fornecimento de alimentos preparados preponderantemente para consumo domiciliar',
        5611201:'Restaurantes e similares',
        5611203:'Lanchonetes casas de chá de sucos e similares',
        5611204:'Bares e outros estabelecimentos especializados em servir bebidas sem entretenimento',
        5611205:'Bares e outros estabelecimentos especializados em servir bebidas com entretenimento',
        4721102:'Padaria e confeitaria com predominância de revenda'
        }

In [5]:
def filtra_cnpj_cnae_principal():
    dados = spark.sql(
        """SELECT
          CONCAT(LPAD(EMPRESAS.CNPJ_BASE, 8, '0'), LPAD(ESTABELECIMENTOS.CNPJ_ORDEM, 4, '0'), LPAD(ESTABELECIMENTOS.CNPJ_DV, 2, '0')) AS CNPJ,
          EMPRESAS.RAZAO_SOCIAL,
          ESTABELECIMENTOS.NOME_FANTASIA,
          ESTABELECIMENTOS.SITUACAO_CADASTRAL,
                    date(
            substr(
              ESTABELECIMENTOS.DATA_SITUACAO_CADASTRAL, 1, 4
            ) || '-' || 
            substr(
              ESTABELECIMENTOS.DATA_SITUACAO_CADASTRAL, 5, 2
            ) || '-' || 
            substr(
              ESTABELECIMENTOS.DATA_SITUACAO_CADASTRAL, 7, 2
            )
          ) AS DATA_SITUACAO_CADASTRAL,
          date(
            substr(
                ESTABELECIMENTOS.DATA_INICIO_ATIVIDADE, 1, 4
              ) || '-' || 
            substr(
                ESTABELECIMENTOS.DATA_INICIO_ATIVIDADE, 5, 2
              ) || '-' || 
            substr(
                ESTABELECIMENTOS.DATA_INICIO_ATIVIDADE, 7, 2
              )
          ) AS DATA_INICIO_ATIVIDADE,
          ESTABELECIMENTOS.CNAE_PRINCIPAL,
          CNAES.DESCRICAO_CNAE,
          ESTABELECIMENTOS.CNAE_SECUNDARIO,
          CONCAT(
            COALESCE(ESTABELECIMENTOS.TIPO_LOGRADOURO, ''),
            ' ',
            COALESCE(ESTABELECIMENTOS.LOGRADOURO, ''),
            ' ',
            COALESCE(ESTABELECIMENTOS.NUMERO, ''),
            ' ',
            COALESCE(ESTABELECIMENTOS.COMPLEMENTO, '')
          ) AS ENDERECO,
          ESTABELECIMENTOS.BAIRRO,
          MUNICIPIOS.NOME_MUNICIPIO AS CIDADE,
          ESTABELECIMENTOS.UF,
          ESTABELECIMENTOS.CEP,
          CONCAT(
            COALESCE(ESTABELECIMENTOS.DDD_CONTATO, ''),
            ' ',
            COALESCE(ESTABELECIMENTOS.TELEFONE_CONTATO, '')
          ) AS TELEFONE,
          ESTABELECIMENTOS.EMAIL
        FROM EMPRESAS
        LEFT JOIN ESTABELECIMENTOS ON ESTABELECIMENTOS.CNPJ_BASE = EMPRESAS.CNPJ_BASE
        LEFT JOIN MUNICIPIOS ON ESTABELECIMENTOS.CODIGO_MUNICIPIO = MUNICIPIOS.CODIGO_MUNICIPIO
        LEFT JOIN CNAES ON ESTABELECIMENTOS.CNAE_PRINCIPAL = CNAES.CODIGO_CNAE
        WHERE ESTABELECIMENTOS.CNAE_PRINCIPAL IN ('4721102', '5611201', '5611203', '5611204', '5611205',  '5612100', '5620104') -- AND ESTABELECIMENTOS.SITUACAO_CADASTRAL IN (2, 3, 4)
        ORDER BY ESTABELECIMENTOS.CNPJ_BASE
    """)
    return dados

def filtra_cnpj_cnae_segundario():
    dados = spark.sql(
        """SELECT
          CONCAT(LPAD(EMPRESAS.CNPJ_BASE, 8, '0'), LPAD(ESTABELECIMENTOS.CNPJ_ORDEM, 4, '0'), LPAD(ESTABELECIMENTOS.CNPJ_DV, 2, '0')) AS CNPJ,
          EMPRESAS.RAZAO_SOCIAL,
          ESTABELECIMENTOS.NOME_FANTASIA,
          ESTABELECIMENTOS.SITUACAO_CADASTRAL,
          date(
            substr(
              ESTABELECIMENTOS.DATA_SITUACAO_CADASTRAL, 1, 4
            ) || '-' || 
            substr(
              ESTABELECIMENTOS.DATA_SITUACAO_CADASTRAL, 5, 2
            ) || '-' || 
            substr(
              ESTABELECIMENTOS.DATA_SITUACAO_CADASTRAL, 7, 2
            )
          ) AS DATA_SITUACAO_CADASTRAL,
          date(
            substr(
              ESTABELECIMENTOS.DATA_INICIO_ATIVIDADE, 1, 4
              ) || '-' || 
            substr(
              ESTABELECIMENTOS.DATA_INICIO_ATIVIDADE, 5, 2
              ) || '-' || 
            substr(
              ESTABELECIMENTOS.DATA_INICIO_ATIVIDADE, 7, 2
              )
          ) AS DATA_INICIO_ATIVIDADE,
          ESTABELECIMENTOS.CNAE_PRINCIPAL,
          CNAES.DESCRICAO_CNAE,
          ESTABELECIMENTOS.CNAE_SECUNDARIO,
          CONCAT(
            COALESCE(ESTABELECIMENTOS.TIPO_LOGRADOURO, ''),
            ' ',
            COALESCE(ESTABELECIMENTOS.LOGRADOURO, ''),
            ' ',
            COALESCE(ESTABELECIMENTOS.NUMERO, ''),
            ' ',
            COALESCE(ESTABELECIMENTOS.COMPLEMENTO, '')
          ) AS ENDERECO,
          ESTABELECIMENTOS.BAIRRO,
          MUNICIPIOS.NOME_MUNICIPIO AS CIDADE,
          ESTABELECIMENTOS.UF,
          ESTABELECIMENTOS.CEP,
          CONCAT(
            COALESCE(ESTABELECIMENTOS.DDD_CONTATO, ''),
            ' ',
            COALESCE(ESTABELECIMENTOS.TELEFONE_CONTATO, '')
          ) AS TELEFONE,
          ESTABELECIMENTOS.EMAIL
        FROM EMPRESAS
        LEFT JOIN ESTABELECIMENTOS ON ESTABELECIMENTOS.CNPJ_BASE = EMPRESAS.CNPJ_BASE
        LEFT JOIN MUNICIPIOS ON ESTABELECIMENTOS.CODIGO_MUNICIPIO = MUNICIPIOS.CODIGO_MUNICIPIO
        LEFT JOIN CNAES ON ESTABELECIMENTOS.CNAE_PRINCIPAL = CNAES.CODIGO_CNAE
        WHERE ESTABELECIMENTOS.CNAE_SECUNDARIO_VALUE IN ('4721102', '5611201', '5611203', '5611204', '5611205',  '5612100', '5620104') -- AND ESTABELECIMENTOS.SITUACAO_CADASTRAL IN (2, 3, 4)
        ORDER BY ESTABELECIMENTOS.CNPJ_BASE
    """)
    return dados

In [6]:
# Função de unir vários dataframes
def unir_dataframes(lista_df: List) -> DataFrame:
    # Cria uma união por nome das colunas
    unir_dataframes_por_colunas_diferentes = lambda dfa, dfb: dfa.unionByName(dfb, allowMissingColumns=True)
    
    # use reduce to combine all the dataframes
    dataframe_final = reduce(unir_dataframes_por_colunas_diferentes, lista_df)
    
    return dataframe_final
  

In [7]:
# Importa a função de backup
from backup_limpeza import backup_limpeza_simples

# Mapeia o caminho para salvar o arquivo, verifica se já existe dados lá e faz o backup
arquivo_csv = os.path.join(dir_dados, r"csv\BASE_RFB.csv")
arquivo_parquet = os.path.join(dir_dados, r"parquet\BASE_RFB.parquet")

if os.path.exists(arquivo_csv):
    nome_backup = dir_dados + r"/backup/"
    if not os.path.exists(nome_backup):
        os.makedirs(nome_backup)
    backup_limpeza_simples(pasta=arquivo_csv.replace(r"BASE_RFB.csv", ""), nome_zipado=nome_backup + f"BASE_RFB_{strftime('%d-%m-%Y %H_%M_%S', localtime())}.zip")

# Chama a função de filtrar por cnae
dados_primario = filtra_cnpj_cnae_principal()

# Renomeia a coluna CNAE_DESCRICAO
dados_primario = dados_primario.withColumnRenamed("DESCRICAO_CNAE", "CNAE_DESCRICAO")

# Seleciona as colunas
dados_primario = dados_primario.select(col("CNPJ"), col("RAZAO_SOCIAL"), col("NOME_FANTASIA"),
                     col("SITUACAO_CADASTRAL"), col("DATA_SITUACAO_CADASTRAL"),
                     col("DATA_INICIO_ATIVIDADE"), col("CNAE_PRINCIPAL"), col("ENDERECO"),
                     col("BAIRRO"), col("CIDADE"), col("UF"), col("CEP"), col("TELEFONE"),
                     col("CNAE_DESCRICAO"), col("EMAIL"))
# Salva o dataframe inicial
dados_pandas_primario = dados_primario.toPandas()
dados_pandas_primario.to_csv(arquivo_csv, sep=';', mode='a', index=False, encoding='utf-8')
dados_pandas_primario.to_parquet(arquivo_parquet, sep=';', index=False, encoding='utf-8')

# Chama a função de filtrar por cnae
dados_secundario = filtra_cnpj_cnae_segundario()

# Renomeia a coluna CNAE_DESCRICAO
dados_secundario = dados_secundario.withColumnRenamed("DESCRICAO_CNAE", "CNAE_DESCRICAO")

# Seleciona as colunas
dados_secundario = dados_secundario.select(col("CNPJ"), col("RAZAO_SOCIAL"), col("NOME_FANTASIA"),
                     col("SITUACAO_CADASTRAL"), col("DATA_SITUACAO_CADASTRAL"),
                     col("DATA_INICIO_ATIVIDADE"), col("CNAE_PRINCIPAL"), col("ENDERECO"),
                     col("BAIRRO"), col("CIDADE"), col("UF"), col("CEP"), col("TELEFONE"),
                     col("CNAE_DESCRICAO"), col("EMAIL"))

dados_pandas_segundario = dados_secundario.toPandas()
dados_pandas_segundario.to_csv(arquivo_csv, sep=';', mode='a', index=False, encoding='utf-8')
dados_pandas_segundario.to_parquet(arquivo_parquet, sep=';', index=False, encoding='utf-8')

Py4JJavaError: An error occurred while calling o302.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 46.0 failed 1 times, most recent failure: Lost task 4.0 in stage 46.0 (TID 174) (ntb-46 executor driver): java.io.IOException: Espaço insuficiente no disco
	at java.io.FileOutputStream.writeBytes(Native Method)
	at java.io.FileOutputStream.write(Unknown Source)
	at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:59)
	at org.apache.spark.io.MutableCheckedOutputStream.write(MutableCheckedOutputStream.scala:43)
	at java.io.BufferedOutputStream.flushBuffer(Unknown Source)
	at java.io.BufferedOutputStream.write(Unknown Source)
	at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:225)
	at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:178)
	at java.io.BufferedOutputStream.flushBuffer(Unknown Source)
	at java.io.BufferedOutputStream.write(Unknown Source)
	at java.io.DataOutputStream.write(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(UnsafeRow.java:543)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$1.writeValue(UnsafeRowSerializer.scala:69)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:310)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:171)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.io.IOException: Espaço insuficiente no disco
	at java.io.FileOutputStream.writeBytes(Native Method)
	at java.io.FileOutputStream.write(Unknown Source)
	at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:59)
	at org.apache.spark.io.MutableCheckedOutputStream.write(MutableCheckedOutputStream.scala:43)
	at java.io.BufferedOutputStream.flushBuffer(Unknown Source)
	at java.io.BufferedOutputStream.write(Unknown Source)
	at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:225)
	at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:178)
	at java.io.BufferedOutputStream.flushBuffer(Unknown Source)
	at java.io.BufferedOutputStream.write(Unknown Source)
	at java.io.DataOutputStream.write(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(UnsafeRow.java:543)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$1.writeValue(UnsafeRowSerializer.scala:69)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:310)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:171)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
