In [57]:
import findspark

findspark.init()

In [58]:
import zipfile

In [79]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, DoubleType
from pyspark.sql import functions as F

In [60]:
spark = SparkSession.builder.master("local[*]").appName("Introduction").config('spark.ui.port','4050').getOrCreate()

In [61]:
spark

In [62]:
data = [(1, 'a'), (2, 'b'), (3, 'c')]	
colNames = ['id', 'name']

In [63]:
df = spark.createDataFrame(data, colNames)
df

DataFrame[id: bigint, name: string]

In [64]:
df.show()

+---+----+
| id|name|
+---+----+
|  1|   a|
|  2|   b|
|  3|   c|
+---+----+



In [65]:
df.toPandas()

Unnamed: 0,id,name
0,1,a
1,2,b
2,3,c


# Processing data

In [66]:
PATH_DATA = "C:/developer/data/"

In [67]:
zipfile.ZipFile(f'{PATH_DATA}estabelecimentos.zip','r').extractall(PATH_DATA)
zipfile.ZipFile(f'{PATH_DATA}socios.zip', 'r').extractall(PATH_DATA)
zipfile.ZipFile(f'{PATH_DATA}empresas.zip', 'r').extractall(PATH_DATA)


In [68]:
enterprises = spark.read.csv(
    f'{PATH_DATA}empresas', sep=';', inferSchema=True, header=False)
enterprises.count()


4585679

In [69]:
establishments = spark.read.csv(
    f'{PATH_DATA}estabelecimentos', sep=';', inferSchema=True, header=False)
establishments.count()


4836219

In [70]:
partners = spark.read.csv(
    f'{PATH_DATA}socios', sep=';', inferSchema=True, header=False)
partners.count()

2046430

In [71]:
establishmentsColNames = ['cnpj_basico', 'cnpj_ordem', 'cnpj_dv', 'identificador_matriz_filial', 'nome_fantasia', 'situacao_cadastral', 'data_situacao_cadastral', 'motivo_situacao_cadastral', 'nome_da_cidade_no_exterior', 'pais', 'data_de_inicio_atividade', 'cnae_fiscal_principal',
                  'cnae_fiscal_secundaria', 'tipo_de_logradouro', 'logradouro', 'numero', 'complemento', 'bairro', 'cep', 'uf', 'municipio', 'ddd_1', 'telefone_1', 'ddd_2', 'telefone_2', 'ddd_do_fax', 'fax', 'correio_eletronico', 'situacao_especial', 'data_da_situacao_especial']
partnersColNames = ['cnpj_basico', 'identificador_de_socio', 'nome_do_socio_ou_razao_social', 'cnpj_ou_cpf_do_socio', 'qualificacao_do_socio',
                  'data_de_entrada_sociedade', 'pais', 'representante_legal', 'nome_do_representante', 'qualificacao_do_representante_legal', 'faixa_etaria']


In [72]:
for idx,item_value in enumerate(establishmentsColNames):
    establishments = establishments.withColumnRenamed(f"_c{idx}", item_value)

establishments.columns

['cnpj_basico',
 'cnpj_ordem',
 'cnpj_dv',
 'identificador_matriz_filial',
 'nome_fantasia',
 'situacao_cadastral',
 'data_situacao_cadastral',
 'motivo_situacao_cadastral',
 'nome_da_cidade_no_exterior',
 'pais',
 'data_de_inicio_atividade',
 'cnae_fiscal_principal',
 'cnae_fiscal_secundaria',
 'tipo_de_logradouro',
 'logradouro',
 'numero',
 'complemento',
 'bairro',
 'cep',
 'uf',
 'municipio',
 'ddd_1',
 'telefone_1',
 'ddd_2',
 'telefone_2',
 'ddd_do_fax',
 'fax',
 'correio_eletronico',
 'situacao_especial',
 'data_da_situacao_especial']

In [73]:
for idx, item_value in enumerate(partnersColNames):
    partners = partners.withColumnRenamed(f"_c{idx}", item_value)

partners.columns


['cnpj_basico',
 'identificador_de_socio',
 'nome_do_socio_ou_razao_social',
 'cnpj_ou_cpf_do_socio',
 'qualificacao_do_socio',
 'data_de_entrada_sociedade',
 'pais',
 'representante_legal',
 'nome_do_representante',
 'qualificacao_do_representante_legal',
 'faixa_etaria']

In [74]:
establishments.limit(10).toPandas()

Unnamed: 0,cnpj_basico,cnpj_ordem,cnpj_dv,identificador_matriz_filial,nome_fantasia,situacao_cadastral,data_situacao_cadastral,motivo_situacao_cadastral,nome_da_cidade_no_exterior,pais,...,municipio,ddd_1,telefone_1,ddd_2,telefone_2,ddd_do_fax,fax,correio_eletronico,situacao_especial,data_da_situacao_especial
0,4519,1,48,1,GIRAFFAS,8,19950331,1,,,...,6219,,,,,,,,,
1,8638,1,79,1,AGROPECUARIA FAGUNDES,8,20150209,73,,,...,7255,,,,,,,,,
2,11748,1,90,1,,4,20181219,63,,,...,7097,,,,,,,,,
3,12027,1,2,1,,8,20081231,71,,,...,7107,,,,,,,,,
4,13289,1,83,1,JS MATERIAIS DE CONSTRUCAO,2,20040123,0,,,...,6915,19.0,35811286.0,,,,,CONTATO@LEONECONTABIL.COM.BR,,
5,13623,1,7,1,MARISTELA INDUSTRIA E COMERCIO DE SORVETES,8,20081231,71,,,...,9701,,,,,,,,,
6,17389,1,88,1,,4,20181004,63,,,...,7107,11.0,5352348.0,,,11.0,5352348.0,,,
7,18944,1,96,1,MONTANHES,8,19960506,1,,,...,7071,,,,,,,,,
8,19204,1,74,1,,8,20121004,1,,,...,6291,,,,,,,,,
9,22223,1,50,1,,8,20161010,1,,,...,6469,,,,,,,,,


In [75]:
partners.limit(10).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,11748,2,MARIO KATUMI HOSI,***504158**,49,19940530,,***000000**,,0,7
1,11748,2,ROBERTO YUKIO HOSI,***241578**,22,19940530,,***000000**,,0,7
2,13289,2,ANDREIA CRISTINA DELSIN,***787278**,65,20180615,,***000000**,,0,3
3,17389,2,MARCIA DO CANTO ARRUDA DAIER,***920408**,49,19940613,,***000000**,,0,7
4,19204,2,ALMIR CARLOS CAPELLINI,***299028**,49,19980908,,***000000**,,0,7
5,19204,2,EDMIR CARLOS CAPELLINI,***633158**,49,19980908,,***000000**,,0,7
6,22223,2,SILVIA REGINA FARIAS,***203598**,49,19940627,,***000000**,,0,6
7,22223,2,RUBENS BATISTA DA SILVA,***464638**,22,19990111,,***000000**,,0,7
8,28664,2,CLARICE YURIKO MAEDA FERREIRA,***979398**,49,19940802,,***000000**,,0,6
9,28664,2,MARIA CLARA ROCHA FERREIRA,***387348**,22,20210203,,***000000**,,0,3


In [76]:
establishments.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: integer (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: integer (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: str

In [81]:
establishments = establishments.withColumn('data_situacao_cadastral', F.to_date(
    establishments.data_situacao_cadastral.cast(StringType()), 'yyyyMMdd'))
