In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, IntegerType, ArrayType, DataType
import pandas as pd
import os 
import logging
import time
spark = SparkSession.builder.master("local[4]") \
    .appName("ETL_CNPJ") \
    .getOrCreate()

In [2]:
current_dir = os.getcwd()
file_log = current_dir + r"/logs/src.log"

# gerando log
logging.basicConfig(level=logging.DEBUG, filename=file_log, format="%(asctime)s - %(levelname)s - %(message)s")

# Warnings: Possui uma série de funções e comandos para tratamento de mensagens de avisos e alertas do Python
import warnings
warnings.filterwarnings("ignore")

In [3]:
# DEFINE O NOVO NOME DE CADA COLUNA
NOVO_NOME_COLUNAS = {
    "_c0" : "CNPJ_BASE", 
    "_c1" : "CNPJ_ORDEM", 
    "_c2" : "CNPJ_DV", 
    "_c3" : "MATRIZ_FILIAL", 
    "_c4" : "NOME_FANTASIA", 
    "_c5" : "SITUACAO_CADASTRAL", 
    "_c6" : "DATA_SITUACAO_CADASTRAL", 
    "_c7" : "MOTIVO_SITUACAO_CADASTRAL", 
    "_c8" : "CIDADE_EXTERIOR", 
    "_c9" : "PAIS", 
    "_c10" : "DATA_INICIO_ATIVIDADE", 
    "_c11" : "CNAE_PRINCIPAL", 
    "_c12" : "CNAE_SECUNDARIO", 
    "_c13" : "TIPO_LOGRADOURO", 
    "_c14" : "LOGRADOURO", 
    "_c15" : "NUMERO", 
    "_c16" : "COMPLEMENTO", 
    "_c17" : "BAIRRO", 
    "_c18" : "CEP", 
    "_c19" : "UF", 
    "_c20" : "MUNICIPIO", 
    "_c21" : "DDD1", 
    "_c22" : "TELEFONE1", 
    "_c23" : "DDD2", 
    "_c24" : "TELEFONE2", 
    "_c25" : "DDD_FAX", 
    "_c26" : "FAX", 
    "_c27" : "EMAIL", 
    "_c28" : "SITUACAO_ESPECIAL", 
    "_c29" : "DATA_SITUACAO_ESPECIAL"
    }
# DEFINE O ESQUEMA
ESQUEMA = StructType([
    StructField("CNPJ_BASE", IntegerType(), True),
    StructField("CNPJ_ORDEM", IntegerType(), True),
    StructField("CNPJ_DV", IntegerType(), True),
    StructField("MATRIZ_FILIAL", IntegerType(), True),
    StructField("NOME_FANTASIA", StringType(), True),    
    StructField("SITUACAO_CADASTRAL", IntegerType(), True),
    StructField("DATA_SITUACAO_CADASTRAL", IntegerType(), True),    
    StructField("MOTIVO_SITUACAO_CADASTRAL", IntegerType(), True),
    StructField("CIDADE_EXTERIOR", IntegerType(), True),
    StructField("PAIS", StringType(), True),
    StructField("DATA_INICIO_ATIVIDADE", IntegerType(), True),
    StructField("CNAE_PRINCIPAL", IntegerType(), True),
    StructField("CNAE_SECUNDARIO", ArrayType(IntegerType())),
    StructField("TIPO_LOGRADOURO", StringType(), True),
    StructField("LOGRADOURO", StringType(), True),
    StructField("NUMERO", IntegerType(), True),
    StructField("COMPLEMENTO", StringType(), True),
    StructField("BAIRRO", StringType(), True),
    StructField("CEP", StringType(), True),
    StructField("UF", StringType(), True),
    StructField("DDD1", IntegerType(), True),
    StructField("TELEFONE1", IntegerType(), True),
    StructField("DDD2", IntegerType(), True),
    StructField("TELEFONE2", IntegerType(), True),
    StructField("DDD_FAX", IntegerType(), True),
    StructField("FAX", IntegerType(), True),
    StructField("EMAIL", StringType(), True),
    StructField("SITUACAO_ESPECIAL", IntegerType(), True),
    StructField("DATA_SITUACAO_ESPECIAL", IntegerType(), True)
])
# DEFINE QUAIS COLUNAS QUEREMOS MANTER E QUAIS QUEREMOS DESCARTAR
COLUNAS_A_MANTER = ['CNPJ_BASE', 'CNPJ_ORDEM' , 'CNPJ_DV' , 'MATRIZ_FILIAL'  , 'NOME_FANTASIA' , 'SITUACAO_CADASTRAL' ,
              'DATA_SITUACAO_CADASTRAL'  , 'MOTIVO_SITUACAO_CADASTRAL'  , 'DATA_INICIO_ATIVIDADE'  ,
              'CNAE_PRINCIPAL'  , 'CNAE_SECUNDARIO' , 'TIPO_LOGRADOURO'  , 'LOGRADOURO'  , 'NUMERO'  , 'COMPLEMENTO' ,
              'BAIRRO'  , 'CEP'  , 'UF'  , 'MUNICIPIO'  , 'DDD1'  , 'TELEFONE1'  , 'DDD2'  , 'TELEFONE2'  ,
              'DDD_FAX'  , 'FAX'  , 'EMAIL']
# DEFINE QUAIS CNAES VAMOS TRABALHAR
CNAES = {
        5612100:'Serviços ambulantes de alimentação',
        5611201:'Restaurantes e similares',
        5611203:'Lanchonetes casas de chá de sucos e similares',
        5611204:'Bares e outros estabelecimentos especializados em servir bebidas sem entretenimento',
        5611205:'Bares e outros estabelecimentos especializados em servir bebidas com entretenimento',
        4721102: 'Padaria e confeitaria com predominância de revenda'
        }
# DEFINE O TAMANHO DO CHUNK
chunk_size = 1000000

In [4]:
# LENDO O DATAFRAME COM O ESQUEMA DEFINIDO
INPUT_BASE = current_dir.replace(r"GitHub\ETL_CNPJ\utilitarios", r"\CNPJ_PROGRAMATICA\ESTABELECIMENTOSCSV/")
ESTABELECIMENTOS = list(filter(lambda x: ".csv" in x, os.listdir(INPUT_BASE)))
dados = spark.read.options(delimiter = ";", header=False, inferSchema=True).csv(f"{INPUT_BASE}{ESTABELECIMENTOS[1]}")
# dados.show(1, vertical=True)

# USA O MÉTODO WITHCOLUMNRENAMED() PARA RENOMEAR AS COLUNAS
for NOME_ANTIGO, NOVO_NOME in NOVO_NOME_COLUNAS.items():
    dados = dados.withColumnRenamed(NOME_ANTIGO, NOVO_NOME)

# USA OS MÉTODOS SELECT() E DROP() PARA DESCARTAR AS COLUNAS QUE NÃO PRECISAMOS
dados = dados.select(COLUNAS_A_MANTER)

# MOSTRA O RESULTADO FINAL COM AS COLUNAS RENOMEADAS E DESCARTADAS
dados.show(1, vertical=True)


-RECORD 0------------------------------
 CNPJ_BASE                 | 7396865   
 CNPJ_ORDEM                | 1         
 CNPJ_DV                   | 68        
 MATRIZ_FILIAL             | 1         
 NOME_FANTASIA             | null      
 SITUACAO_CADASTRAL        | 8         
 DATA_SITUACAO_CADASTRAL   | 20170210  
 MOTIVO_SITUACAO_CADASTRAL | 1         
 DATA_INICIO_ATIVIDADE     | 20050518  
 CNAE_PRINCIPAL            | 1412602   
 CNAE_SECUNDARIO           | 1411801   
 TIPO_LOGRADOURO           | RUA       
 LOGRADOURO                | TUCANEIRA 
 NUMERO                    | 30        
 COMPLEMENTO               | null      
 BAIRRO                    | DOS LAGOS 
 CEP                       | 89136000  
 UF                        | SC        
 MUNICIPIO                 | 8297      
 DDD1                      | 47        
 TELEFONE1                 | 33851125  
 DDD2                      | 47        
 TELEFONE2                 | 33851125  
 DDD_FAX                   | 47.0      


In [5]:
# DEFINE O COD CNAE A SER BUSCADO
codigo_cnae = 4721102
def filtra_cnae(data, codigo_cnae):
    """Resumo filtra_cnae

    Args:
        data ([type]: dataframe): passa o dataframe com os dados a serem filtrados desde que tenha a coluna solicitada
        codigo_cnae ([type] int): código cnae usado para filtrar os registros da categoria dos estabelecimentos buscados

    Returns:
        df_cnae ([type] dataframe): Retorna um dataframe com os dados filtrados;
    """
    # QUERY TIPO SQL PARA FILTRAR COM BASE NO PARÂMETRO PASSADO
    df_cnae =  data.where((f"CNAE_PRINCIPAL == '{codigo_cnae}'"))
    # RETORNA A QUANTIDADE DE DADOS DESTA CONSULTA
    print(f"Nesta consulta temos :{df_cnae.count()} observações do cnae informado!")
    # SUBSTITUI OS VALORES NULOS POR STRINGS VAZIAS (SÓ FUNCIONA ATÉ AQUI PARA StructField StringTypes)
    df_cnae = df_cnae.na.fill('')
    # MOSTRA A NAMORADINHA DO DATA MAN
    return df_cnae.show()
# filtra_cnae(data=dados,codigo_cnae=codigo_cnae)

``` PYTHON
for chunk in dados.limit(chunk_size).rdd.toLocalIterator():
    print(chunk)
```

In [6]:
# APLICANDO A FUNÇÃO "foreachPartition" 
# ESTA FUNÇÃO PERMITE EXECUTAR UMA FUNÇÃO EM CADA PARTIÇÃO DO DATAFRAME DE FORMA PARALELA
def process_partition(iterator):
    # IMPORTANDO FUNÇÕES PARA CONCATENAR OS DADOS DE COLUNAS
    from pyspark.sql.functions import concat, col 
    # CRIA O LOOP QUE VAI PARTICIONAR O DATAFRAME ORIGINAL EM PARTES
    for dados in iterator:
        logging.info(f'Leitura inicial: {dados.count()}')
        # SUBSTITUI OS VALORES NULOS POR STRINGS VAZIAS (SÓ FUNCIONA ATÉ AQUI PARA StructField StringTypes)
        dados = dados.na.fill('')
        # QUERY TIPO SQL PARA FILTRAR COM BASE NA SITUAÇÃO CADASTRAL
        dados = dados.where((dados["SITUACAO_CADASTRAL"] == 2) | (dados["SITUACAO_CADASTRAL"] == 3) | (dados["SITUACAO_CADASTRAL"] == 4))
        logging.info(f'Somente os ativos: {dados.count()}')
        
        # DEFINE O COD CNAE A SER BUSCADO COM BASE NO DICIONÁRIO CNAES CRIADO ANTERIORMENTE
        for codigo_cnae, descricao_cnae in CNAES.items():
            # QUERY TIPO SQL PARA FILTRAR COM BASE NO PARÂMETRO PASSADO
            df_cnae =  dados.where((f"CNAE_PRINCIPAL == '{codigo_cnae}'"))
            # PROCESSO DE MODELAGEM DOS DADOS 
            df_cnae = df_cnae \
                .withColumn("TELEFONE1", concat(col("DDD1").cast("string"), col("TELEFONE1").cast("string"))) \
                .withColumn("TELEFONE2", concat(col("DDD2").cast("string"), col("TELEFONE2").cast("string"))) \
                .withColumn("FAX", concat(col("DDD_FAX").cast("string"), col("FAX").cast("string"))) \
                .drop('DDD1','DDD2','DDD_FAX', inplace=True)
            logging.info(f"Colunas Geradas: {df_cnae.columns}")
            logging.info(f"Itens capiturados: {df_cnae.count()} Categoria dos dados: {descricao_cnae}")

            if codigo_cnae == 5612100:
                contagem_5612100 = contagem_5612100 + df_cnae.count()
                
            elif codigo_cnae == 5611201:
                contagem_5611201 = contagem_5611201 + df_cnae.count()

            elif codigo_cnae == 5611203:
                contagem_5611203 = contagem_5611203 + df_cnae.count()
                
            elif codigo_cnae == 5611204:
                contagem_5611204 = contagem_5611204 + df_cnae.count()
                
            elif codigo_cnae == 5611205:
                contagem_5611205 = contagem_5611205 + df_cnae.count()
                
            else:
                pass
            # EXPORTA OS DADOS COM FORMATO PARQUET (POR QUESTÕES DE PROCESSAMENTO)
            OUTPUT_BASE = current_dir.replace("utilitarios", "TESTE_BASE")
            df_cnae.write.format("parque").mode("append").option("compression", "snappy").save(f'{OUTPUT_BASE}/{descricao_cnae}.parquet')

# APLICA A FUNÇÃO "process_partition" EM CADA PARTIÇÃO DO DATAFRAME
dados.repartition(1).foreachPartition(process_partition)


Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 5.0 failed 1 times, most recent failure: Lost task 0.0 in stage 5.0 (TID 20) (ntb-46 executor driver): java.io.IOException: Cannot run program "C:\Users\ABRASEL NACIONAL\AppData\Local\Programs\Python\Python39\Lib\site-packages\pyspark\bin": CreateProcess error=5, Acesso negado
	at java.lang.ProcessBuilder.start(Unknown Source)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:167)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:157)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.io.IOException: CreateProcess error=5, Acesso negado
	at java.lang.ProcessImpl.create(Native Method)
	at java.lang.ProcessImpl.<init>(Unknown Source)
	at java.lang.ProcessImpl.start(Unknown Source)
	... 16 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2259)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2278)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:180)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.io.IOException: Cannot run program "C:\Users\ABRASEL NACIONAL\AppData\Local\Programs\Python\Python39\Lib\site-packages\pyspark\bin": CreateProcess error=5, Acesso negado
	at java.lang.ProcessBuilder.start(Unknown Source)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:167)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:157)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more
Caused by: java.io.IOException: CreateProcess error=5, Acesso negado
	at java.lang.ProcessImpl.create(Native Method)
	at java.lang.ProcessImpl.<init>(Unknown Source)
	at java.lang.ProcessImpl.start(Unknown Source)
	... 16 more
