## Library

In [121]:
import os
import zipfile
from utils.constants.main import header_companies, header_establishments, header_partners, path_raw_data

# Encontra o diretório do apache spark (utiliza a variável de ambiente $SPARK_HOME para pegar o diretório)
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace, col, to_date, date_format
from pyspark.sql.types import DoubleType, StringType, DateType

In [7]:
!pip install pandas

Collecting pandas
  Using cached pandas-1.4.2-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (11.7 MB)
Collecting pytz>=2020.1
  Using cached pytz-2022.1-py2.py3-none-any.whl (503 kB)
Installing collected packages: pytz, pandas
Successfully installed pandas-1.4.2 pytz-2022.1


## Functions

In [214]:
def createWorker():
    """
        Inicializa o servidor e configura o worker.
    """
    return SparkSession.builder \
        .master('local[*]') \
        .appName('Iniciando com Spark') \
        .config("spark.driver.bindAddress", "localhost") \
        .getOrCreate()

def downloadAndUnzipData(url, dest):
    os.system(f'wget {url} -P {dest} -q --show-progress')
    filename = url.split('/')[-1]
    zipfile.ZipFile(dest+'/'+filename, 'r').extractall(path=dest)

def sparkDataFrame(spark, files, header):
    df = spark.read.csv(files, sep=';', inferSchema=True)
    
    for key, column in enumerate(header):
        df = df.withColumnRenamed(f'_c{key}', column)

    return df

def totalRecords(df, name):
    print(f'Total {name}: {df.count()}')

def replaceValues(df, column, char, to_char):
    return df.withColumn(column, regexp_replace(column, char, to_char))

def convertType(df, column, to_type):
    return df.withColumn(column, col(column).cast(to_type))
    
def formatStringToDate(df, column, string_format='yyyyMMdd', to_format='dd/MM/yyyy'):
    return df.withColumn(column, date_format(to_date(col(column), string_format), to_format))
    

## Working with Spark Dataframe

In [3]:
spark = createWorker()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [210]:
df_companies = sparkDataFrame(spark, '../raw-data/empresas/part-*', header_companies)
totalRecords(df_companies, 'Companies')

                                                                                

Total Companies: 4585679


In [195]:
df_establishments = sparkDataFrame(spark, '../raw-data/estabelecimentos/part-*', header_establishments)
totalRecords(df_establishments, 'Establishments')

[Stage 179:>                                                      (0 + 12) / 12]

Total Establishments: 4836219


                                                                                

In [203]:
df_partners = sparkDataFrame(spark, '../raw-data/socios/part-*', header_partners)
totalRecords(df_partners, 'Partners')

                                                                                

Total Partners: 2046430


### Treatment of the Companies Dataframe

In [219]:
df_companies.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,4519,DANIELA DA SILVA CRUZ,2135,50,0.0,5,
1,8638,JOAO DOS SANTOS FAGUNDES,2135,50,0.0,5,
2,11748,PANIFICADORA E CONFEITARIA CONFIANCA RIO PRETO...,2062,49,0.0,1,
3,12027,L G SORVETERIA LTDA,2062,49,0.0,5,
4,13289,ANDREIA CRISTINA DELSIN EIRELI,2305,65,100000.0,1,


In [218]:
df_companies.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [217]:
df_companies = replaceValues(df_companies, 'capital_social_da_empresa', ',', '.')
df_companies = convertType(df_companies, 'capital_social_da_empresa', DoubleType())

### Treatment of the Establishments Dataframe

In [197]:
df_establishments.limit(5).toPandas()

Unnamed: 0,cnpj_basico,cnpj_ordem,cnpj_dv,identificador_matriz_filial,nome_fantasia,situacao_cadastral,data_situacao_cadastral,motivo_situacao_cadastral,nome_da_cidade_no_exterior,pais,...,municipio,ddd_1,telefone_1,ddd_2,telefone_2,ddd_do_fax,fax,correio_eletronico,situacao_especial,data_da_situacao_especial
0,4519,1,48,1,GIRAFFAS,8,31/03/1995,1,,,...,6219,,,,,,,,,
1,8638,1,79,1,AGROPECUARIA FAGUNDES,8,09/02/2015,73,,,...,7255,,,,,,,,,
2,11748,1,90,1,,4,19/12/2018,63,,,...,7097,,,,,,,,,
3,12027,1,2,1,,8,31/12/2008,71,,,...,7107,,,,,,,,,
4,13289,1,83,1,JS MATERIAIS DE CONSTRUCAO,2,23/01/2004,0,,,...,6915,19.0,35811286.0,,,,,CONTATO@LEONECONTABIL.COM.BR,,


In [200]:
df_establishments.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: string (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: string (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: strin

In [196]:
df_establishments = convertType(df_establishments, 'data_situacao_cadastral', StringType())
df_establishments = formatStringToDate(df_establishments, 'data_situacao_cadastral')
df_establishments.select('data_situacao_cadastral').limit(5).toPandas()

Unnamed: 0,data_situacao_cadastral
0,31/03/1995
1,09/02/2015
2,19/12/2018
3,31/12/2008
4,23/01/2004


In [199]:
df_establishments = convertType(df_establishments, 'data_de_inicio_atividade', StringType())
df_establishments = formatStringToDate(df_establishments, 'data_de_inicio_atividade')
df_establishments.select('data_de_inicio_atividade').limit(5).toPandas()

Unnamed: 0,data_de_inicio_atividade
0,16/05/1994
1,24/05/1994
2,31/05/1994
3,08/06/1994
4,01/06/1994


In [201]:
df_establishments = convertType(df_establishments, 'data_da_situacao_especial', StringType())
df_establishments = formatStringToDate(df_establishments, 'data_da_situacao_especial')
df_establishments.filter("data_da_situacao_especial is not Null").toPandas()

                                                                                

Unnamed: 0,cnpj_basico,cnpj_ordem,cnpj_dv,identificador_matriz_filial,nome_fantasia,situacao_cadastral,data_situacao_cadastral,motivo_situacao_cadastral,nome_da_cidade_no_exterior,pais,...,municipio,ddd_1,telefone_1,ddd_2,telefone_2,ddd_do_fax,fax,correio_eletronico,situacao_especial,data_da_situacao_especial
0,3302548,1,84,1,LAURA MATERIAIS DE LIMPEZA E HIDRAULICOS,8,15/09/2020,1,,,...,7107,11.0,37443180.0,,,,,contabil-camargo@uol.com.br,ESPOLIO EV 407,02/07/2020
1,2191115,1,36,1,,2,25/02/2001,0,,,...,7535,41.0,3226355.0,,,41.0,2331695.0,,RECUPERACAO JUDICIAL,22/12/2020
2,496012,1,59,1,,4,23/03/2021,63,,,...,8511,51.0,3183075.0,,,,,,ESPOLIO EV 407,26/04/2021
3,5043843,1,99,1,,2,03/11/2005,0,,,...,2921,22.0,27643069.0,22.0,27646906.0,,,CONTDP@VELOXMAIL.COM.BR,ESPOLIO EV 407,23/02/2021
4,13157,1,51,1,,2,03/11/2005,0,,,...,6619,,,,,,,,FALIDO,03/11/2016
5,6514,1,54,1,FEIRINHA POPULAR,2,03/11/2005,0,,,...,6575,16.0,31461384.0,,,,,,ESPOLIO EV 407,21/07/2020
6,94760634,1,96,1,CIA DAS FECHADURAS,8,19/02/2021,1,,,...,8877,51.0,35903362.0,,,51.0,35903362.0,regismetz@sinos.net,ESPOLIO EV 407,04/01/2021
7,3309171,1,95,1,CYCLO BYKE,8,16/03/2018,1,,,...,6389,,,,,,,,ESPOLIO EV 407,11/12/2017
8,5882398,1,50,1,CAFE & BOMBONIERE ALMEIDA,2,07/10/2005,0,,,...,7107,11.0,64010155.0,,,,,solucao@solucaoc.com.br,ESPOLIO EV 407,04/05/2021
9,74551532,1,53,1,,2,27/11/1999,0,,,...,6477,11.0,64088588.0,,,,,,LIQUIDACAO EXTRA-JUDICIAL,26/04/2021


### Treatment of the Partners Dataframe

In [209]:
df_partners.limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,25/07/1994,,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,25/07/1994,,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,16/05/1994,,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,16/05/1994,,***000000**,,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,09/06/1994,,***000000**,,0,8


In [206]:
df_partners.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: integer (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



In [208]:
df_partners = convertType(df_partners, 'data_de_entrada_sociedade', StringType())
df_partners = formatStringToDate(df_partners, 'data_de_entrada_sociedade')
df_partners.select('data_de_entrada_sociedade').limit(5).toPandas()

Unnamed: 0,data_de_entrada_sociedade
0,25/07/1994
1,25/07/1994
2,16/05/1994
3,16/05/1994
4,09/06/1994
