importar bibliotecas

In [94]:
import findspark
findspark.init()
import pyspark

In [95]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import functions as Func 
from pyspark.sql.functions import *

iniciar a spark session

In [96]:
spark = SparkSession.builder.master("local").\
    appName("CNPJ_RAW").\
        config("spark.executer.memory","1gb").\
            getOrCreate()

### No site da RFB temos 10 arquivos CSV que totalizam mais de 10 Gb e mais de 51 milhões de linhas, nesse passo realizamos a carga de todos os arquivos que se encontram na pasta RAW num único DF

In [97]:
# Nesta pasta temos todos os arquivos brutos, da forma que vieram do site da RFB.
path = "/media/douglas/DATA/PRJ_CNPJ/new_data/ETL/01.RAW/*.csv"

In [98]:
csv_to_DF = spark.read.format("csv")\
    .option("delimiter",";")\
        .option("enconding","ISO-8859-1")\
            .option("header","False")\
                .option("inferSchema","True")\
                    .load(path)

                                                                                

In [99]:
# Contando a quantidade de registros que temos no DF
csv_to_DF.count()

                                                                                

51091268

In [100]:
# Deixamos que o Spark inferisse o Schema pois não vamos precisar de toda as colunas nos próximos passos.
# Após a seleção das colunas que iremos realmente usar, faremos essa etapa do tratamento, formatando as colunas de datas, por exemplo.
csv_to_DF.schema

StructType(List(StructField(_c0,IntegerType,true),StructField(_c1,IntegerType,true),StructField(_c2,IntegerType,true),StructField(_c3,IntegerType,true),StructField(_c4,StringType,true),StructField(_c5,StringType,true),StructField(_c6,IntegerType,true),StructField(_c7,IntegerType,true),StructField(_c8,StringType,true),StructField(_c9,IntegerType,true),StructField(_c10,IntegerType,true),StructField(_c11,StringType,true),StructField(_c12,StringType,true),StructField(_c13,StringType,true),StructField(_c14,StringType,true),StructField(_c15,StringType,true),StructField(_c16,StringType,true),StructField(_c17,StringType,true),StructField(_c18,StringType,true),StructField(_c19,StringType,true),StructField(_c20,StringType,true),StructField(_c21,StringType,true),StructField(_c22,StringType,true),StructField(_c23,StringType,true),StructField(_c24,StringType,true),StructField(_c25,StringType,true),StructField(_c26,StringType,true),StructField(_c27,StringType,true),StructField(_c28,StringType,true),

In [101]:
csv_to_DF.columns

['_c0',
 '_c1',
 '_c2',
 '_c3',
 '_c4',
 '_c5',
 '_c6',
 '_c7',
 '_c8',
 '_c9',
 '_c10',
 '_c11',
 '_c12',
 '_c13',
 '_c14',
 '_c15',
 '_c16',
 '_c17',
 '_c18',
 '_c19',
 '_c20',
 '_c21',
 '_c22',
 '_c23',
 '_c24',
 '_c25',
 '_c26',
 '_c27',
 '_c28',
 '_c29']

Agora vamos renomear as colunas, pois o CSV original não tinha qualquer tipo de informação, limpar algumas colunas que não vamos precisar (telefone e email, por exemplo) e concatenar outras, dessa forma deixaremos nosso DF mais clean para gerar o arquivo parquet

In [102]:
csv_to_DF = csv_to_DF\
    .withColumnRenamed('_c0','CNPJ_basico')\
    .withColumnRenamed('_c1','CNPJ_ordem')\
    .withColumnRenamed('_c2','CNPJ_dv')\
    .withColumnRenamed('_c3','id_matriz_filial')\
    .withColumnRenamed('_c4','nome_fantasia')\
    .withColumnRenamed('_c5','sit_cad')\
    .withColumnRenamed('_c6','dt_cad')\
    .withColumnRenamed('_c7','mt_sit_cad')\
    .withColumnRenamed('_c8','cid_ext')\
    .withColumnRenamed('_c9','pais')\
    .withColumnRenamed('_c10','dt_inicio_at')\
    .withColumnRenamed('_c11','cnae_principal')\
    .withColumnRenamed('_c12','cnae_secundario')\
    .withColumnRenamed('_c13','tp_log')\
    .withColumnRenamed('_c14','log')\
    .withColumnRenamed('_c15','nm')\
    .withColumnRenamed('_c16','comp')\
    .withColumnRenamed('_c17','bairro')\
    .withColumnRenamed('_c18','CEP')\
    .withColumnRenamed('_c19','UF')\
    .withColumnRenamed('_c20','municipio')\
    .withColumnRenamed('_c21','ddd1')\
    .withColumnRenamed('_c22','tel1')\
    .withColumnRenamed('_c23','ddd2')\
    .withColumnRenamed('_c24','tel2')\
    .withColumnRenamed('_c25','ddd_fax')\
    .withColumnRenamed('_c26','fax')\
    .withColumnRenamed('_c27','email')\
    .withColumnRenamed('_c28','sit_esp')\
    .withColumnRenamed('_c29','dt_sit_esp')


In [103]:
csv_to_DF.columns

['CNPJ_basico',
 'CNPJ_ordem',
 'CNPJ_dv',
 'id_matriz_filial',
 'nome_fantasia',
 'sit_cad',
 'dt_cad',
 'mt_sit_cad',
 'cid_ext',
 'pais',
 'dt_inicio_at',
 'cnae_principal',
 'cnae_secundario',
 'tp_log',
 'log',
 'nm',
 'comp',
 'bairro',
 'CEP',
 'UF',
 'municipio',
 'ddd1',
 'tel1',
 'ddd2',
 'tel2',
 'ddd_fax',
 'fax',
 'email',
 'sit_esp',
 'dt_sit_esp']

In [104]:
# Select no DF principal para deixar uma saída mais limpa, tirando dados sensíveis como telefone e email
df_select = csv_to_DF.select('CNPJ_basico',
                             'id_matriz_filial', 
                             'nome_fantasia',
                             'sit_cad', 
                             'dt_cad', 
                             'mt_sit_cad', 
                             'dt_inicio_at', 
                             'cnae_principal', 
                             'cnae_secundario', 
                             'tp_log', 
                             'log', 
                             'nm', 
                             'comp', 
                             'bairro', 
                             'CEP', 
                             'UF', 
                             'municipio') 

In [105]:
df_select.schema

StructType(List(StructField(CNPJ_basico,IntegerType,true),StructField(id_matriz_filial,IntegerType,true),StructField(nome_fantasia,StringType,true),StructField(sit_cad,StringType,true),StructField(dt_cad,IntegerType,true),StructField(mt_sit_cad,IntegerType,true),StructField(dt_inicio_at,IntegerType,true),StructField(cnae_principal,StringType,true),StructField(cnae_secundario,StringType,true),StructField(tp_log,StringType,true),StructField(log,StringType,true),StructField(nm,StringType,true),StructField(comp,StringType,true),StructField(bairro,StringType,true),StructField(CEP,StringType,true),StructField(UF,StringType,true),StructField(municipio,StringType,true)))

Para facilitar as próximas manipulações dos dados iremos persistir esses dados previamente limpos em um arquivo parquet e carregar em uma pasta com o nome Bronze, após esse passo, continuaremos efetuando alguns tratamentos

In [106]:
df_select.write.mode("overwrite").save("/media/douglas/DATA/PRJ_CNPJ/new_data/ETL/02.Bronze")

                                                                                

In [107]:
spark.stop()