In [1]:
from pyspark.sql import SparkSession
import pandas as pd
import zipfile
import findspark
findspark.init()

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("StartingWithSpark") \
    .config("spark.ui.port", "4050") \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.memory", "2g") \
    .getOrCreate()
spark

In [3]:
data = [('Zeca','35'), ('Eva', '29')]
colNames = ['Nome', 'Idade']
df = spark.createDataFrame(data, colNames)
type(df)

pyspark.sql.dataframe.DataFrame

In [4]:
data = [{'Nome': 'Zeca', 'Idade': '35'}, {'Nome': 'Eva', 'Idade': '29'}]
df = spark.createDataFrame(data)
type(df)

pyspark.sql.dataframe.DataFrame

In [5]:
data = [{'Nome': 'Zeca', 'Idade': '35'}, {'Nome': 'Eva', 'Idade': '29'}]
pandas_df = pd.DataFrame(data)
df = spark.createDataFrame(pandas_df)
type(df)

pyspark.sql.dataframe.DataFrame

In [None]:
zipfile.ZipFile('empresas.zip', 'r').extractall('empresas')
zipfile.ZipFile('estabelecimentos.zip', 'r').extractall('estabelecimentos')
zipfile.ZipFile('socios.zip', 'r').extractall('socios')

# Company Data

In [6]:
import os
path = os.listdir('empresas\empresas')
files = []
for f in path:
    files.append(f'empresas\empresas\{f}')
df_company = spark.read.csv(files[0], sep=';', inferSchema=True)

  path = os.listdir('empresas\empresas')
  files.append(f'empresas\empresas\{f}')
  files.append(f'empresas\empresas\{f}')


In [7]:
df_company.show()

+-----+--------------------+----+---+---------+---+----+
|  _c0|                 _c1| _c2|_c3|      _c4|_c5| _c6|
+-----+--------------------+----+---+---------+---+----+
| 4519|DANIELA DA SILVA ...|2135| 50|     0,00|  5|NULL|
| 8638|JOAO DOS SANTOS F...|2135| 50|     0,00|  5|NULL|
|11748|PANIFICADORA E CO...|2062| 49|     0,00|  1|NULL|
|12027| L G SORVETERIA LTDA|2062| 49|     0,00|  5|NULL|
|13289|ANDREIA CRISTINA ...|2305| 65|100000,00|  1|NULL|
|13623|MARISTELA INDUSTR...|2062| 49|     0,00|  5|NULL|
|17389|DAICICOM MARKETIN...|2240| 49|     0,00|  1|NULL|
|18944|SAO GOTARDO-DISTR...|2062| 49|     0,00|  5|NULL|
|19204|TORTARIA CAMPINAS...|2062| 49|     0,00|  1|NULL|
|22223|S R FARIAS DA SIL...|2062| 49|     0,00|  5|NULL|
|23015|EVANGELINA P DE J...|2135| 50|     0,00|  1|NULL|
|24354|JUCELIA PEREIRA D...|2135| 50|     0,00|  1|NULL|
|26708|DONEIR RODRIGUES ...|2135| 50|     0,00|  1|NULL|
|28664|M ROCHA COML IMPO...|2062| 49|100000,00|  5|NULL|
|28759|LULETEF CONFECCAO...|206

In [8]:
df_company.limit(5).toPandas()

Unnamed: 0,_c0,_c1,_c2,_c3,_c4,_c5,_c6
0,4519,DANIELA DA SILVA CRUZ,2135,50,0,5,
1,8638,JOAO DOS SANTOS FAGUNDES,2135,50,0,5,
2,11748,PANIFICADORA E CONFEITARIA CONFIANCA RIO PRETO...,2062,49,0,1,
3,12027,L G SORVETERIA LTDA,2062,49,0,5,
4,13289,ANDREIA CRISTINA DELSIN EIRELI,2305,65,10000000,1,


In [9]:
campanyColNames = ['cnpj_basico', 'razao_social_nome_empresarial', 'natureza_juridica', 'qualificacao_do_responsavel', 'capital_social_da_empresa', 'porte_da_empresa', 'ente_federativo_responsavel']
for index, colName in enumerate(campanyColNames):
    df_company = df_company.withColumnRenamed(f'_c{index}', colName)
df_company.columns

['cnpj_basico',
 'razao_social_nome_empresarial',
 'natureza_juridica',
 'qualificacao_do_responsavel',
 'capital_social_da_empresa',
 'porte_da_empresa',
 'ente_federativo_responsavel']

In [10]:
df_company.show()

+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial|natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|       4519|         DANIELA DA SILVA ...|             2135|                         50|                     0,00|               5|                       NULL|
|       8638|         JOAO DOS SANTOS F...|             2135|                         50|                     0,00|               5|                       NULL|
|      11748|         PANIFICADORA E CO...|             2062|                         49|                     0,00|               1|                       NULL|
|      12027|          L G SORVETE

In [11]:
df_company.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,4519,DANIELA DA SILVA CRUZ,2135,50,0,5,
1,8638,JOAO DOS SANTOS FAGUNDES,2135,50,0,5,
2,11748,PANIFICADORA E CONFEITARIA CONFIANCA RIO PRETO...,2062,49,0,1,
3,12027,L G SORVETERIA LTDA,2062,49,0,5,
4,13289,ANDREIA CRISTINA DELSIN EIRELI,2305,65,10000000,1,


# Establishment Data

In [12]:
import os
path = os.listdir('estabelecimentos\estabelecimentos')
files = []
for f in path:
    files.append(f'estabelecimentos\estabelecimentos\{f}')
df_establishment = spark.read.csv(files[0], sep=';', inferSchema=True)

  path = os.listdir('estabelecimentos\estabelecimentos')
  files.append(f'estabelecimentos\estabelecimentos\{f}')
  files.append(f'estabelecimentos\estabelecimentos\{f}')


In [13]:
df_establishment.show()

+-----+---+---+---+--------------------+---+--------+---+----+----+--------+-------+---------------+-------+------------------+-----+-------------------+-----------------+--------+----+----+----+--------+----+----+----+-------+--------------------+----+----+
|  _c0|_c1|_c2|_c3|                 _c4|_c5|     _c6|_c7| _c8| _c9|    _c10|   _c11|           _c12|   _c13|              _c14| _c15|               _c16|             _c17|    _c18|_c19|_c20|_c21|    _c22|_c23|_c24|_c25|   _c26|                _c27|_c28|_c29|
+-----+---+---+---+--------------------+---+--------+---+----+----+--------+-------+---------------+-------+------------------+-----+-------------------+-----------------+--------+----+----+----+--------+----+----+----+-------+--------------------+----+----+
| 4519|  1| 48|  1|            GIRAFFAS|  8|19950331|  1|NULL|NULL|19940516|5611203|           NULL|    RUA|     HENRIQUE SAVI|15 55|LOJA UN2A 01 TERREO| JARDIM AEROPORTO|17012205|  SP|6219|NULL|    NULL|NULL|NULL|NULL|   N

In [14]:
df_establishment.limit(5).toPandas()

Unnamed: 0,_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7,_c8,_c9,...,_c20,_c21,_c22,_c23,_c24,_c25,_c26,_c27,_c28,_c29
0,4519,1,48,1,GIRAFFAS,8,19950331,1,,,...,6219,,,,,,,,,
1,8638,1,79,1,AGROPECUARIA FAGUNDES,8,20150209,73,,,...,7255,,,,,,,,,
2,11748,1,90,1,,4,20181219,63,,,...,7097,,,,,,,,,
3,12027,1,2,1,,8,20081231,71,,,...,7107,,,,,,,,,
4,13289,1,83,1,JS MATERIAIS DE CONSTRUCAO,2,20040123,0,,,...,6915,19.0,35811286.0,,,,,CONTATO@LEONECONTABIL.COM.BR,,


In [15]:
establishmentColNames = ['cnpj_basico', 'cnpj_ordem', 'cnpj_dv', 'identificador_matriz_filial', 'nome_fantasia', 'situacao_cadastral', 'data_situacao_cadastral', 'motivo_situacao_cadastral', 'nome_da_cidade_no_exterior', 'pais', 'data_de_inicio_atividade', 'cnae_fiscal_principal', 'cnae_fiscal_secundaria', 'tipo_de_logradouro', 'logradouro', 'numero', 'complemento', 'bairro', 'cep', 'uf', 'municipio', 'ddd_1', 'telefone_1', 'ddd_2', 'telefone_2', 'ddd_do_fax', 'fax', 'correio_eletronico', 'situacao_especial', 'data_da_situacao_especial']
for index, colName in enumerate(establishmentColNames):
    df_establishment = df_establishment.withColumnRenamed(f'_c{index}', colName)
df_establishment.columns

['cnpj_basico',
 'cnpj_ordem',
 'cnpj_dv',
 'identificador_matriz_filial',
 'nome_fantasia',
 'situacao_cadastral',
 'data_situacao_cadastral',
 'motivo_situacao_cadastral',
 'nome_da_cidade_no_exterior',
 'pais',
 'data_de_inicio_atividade',
 'cnae_fiscal_principal',
 'cnae_fiscal_secundaria',
 'tipo_de_logradouro',
 'logradouro',
 'numero',
 'complemento',
 'bairro',
 'cep',
 'uf',
 'municipio',
 'ddd_1',
 'telefone_1',
 'ddd_2',
 'telefone_2',
 'ddd_do_fax',
 'fax',
 'correio_eletronico',
 'situacao_especial',
 'data_da_situacao_especial']

In [16]:
df_establishment.show()

+-----------+----------+-------+---------------------------+--------------------+------------------+-----------------------+-------------------------+--------------------------+----+------------------------+---------------------+----------------------+------------------+------------------+------+-------------------+-----------------+--------+---+---------+-----+----------+-----+----------+----------+-------+--------------------+-----------------+-------------------------+
|cnpj_basico|cnpj_ordem|cnpj_dv|identificador_matriz_filial|       nome_fantasia|situacao_cadastral|data_situacao_cadastral|motivo_situacao_cadastral|nome_da_cidade_no_exterior|pais|data_de_inicio_atividade|cnae_fiscal_principal|cnae_fiscal_secundaria|tipo_de_logradouro|        logradouro|numero|        complemento|           bairro|     cep| uf|municipio|ddd_1|telefone_1|ddd_2|telefone_2|ddd_do_fax|    fax|  correio_eletronico|situacao_especial|data_da_situacao_especial|
+-----------+----------+-------+--------------

In [17]:
df_establishment.limit(5).toPandas()

Unnamed: 0,cnpj_basico,cnpj_ordem,cnpj_dv,identificador_matriz_filial,nome_fantasia,situacao_cadastral,data_situacao_cadastral,motivo_situacao_cadastral,nome_da_cidade_no_exterior,pais,...,municipio,ddd_1,telefone_1,ddd_2,telefone_2,ddd_do_fax,fax,correio_eletronico,situacao_especial,data_da_situacao_especial
0,4519,1,48,1,GIRAFFAS,8,19950331,1,,,...,6219,,,,,,,,,
1,8638,1,79,1,AGROPECUARIA FAGUNDES,8,20150209,73,,,...,7255,,,,,,,,,
2,11748,1,90,1,,4,20181219,63,,,...,7097,,,,,,,,,
3,12027,1,2,1,,8,20081231,71,,,...,7107,,,,,,,,,
4,13289,1,83,1,JS MATERIAIS DE CONSTRUCAO,2,20040123,0,,,...,6915,19.0,35811286.0,,,,,CONTATO@LEONECONTABIL.COM.BR,,


# Partner Data

In [24]:
import os
path = os.listdir('socios\socios')
files = []
for f in path:
    files.append(f'socios\socios\{f}')
df_partner = spark.read.csv(files[0], sep=';', inferSchema=True)

  path = os.listdir('socios\socios')
  files.append(f'socios\socios\{f}')
  files.append(f'socios\socios\{f}')


In [25]:
df_partner.show()

+-----+---+--------------------+--------------+---+--------+----+-----------+----+---+----+
|  _c0|_c1|                 _c2|           _c3|_c4|     _c5| _c6|        _c7| _c8|_c9|_c10|
+-----+---+--------------------+--------------+---+--------+----+-----------+----+---+----+
|11748|  2|   MARIO KATUMI HOSI|   ***504158**| 49|19940530|NULL|***000000**|NULL|  0|   7|
|11748|  2|  ROBERTO YUKIO HOSI|   ***241578**| 22|19940530|NULL|***000000**|NULL|  0|   7|
|13289|  2|ANDREIA CRISTINA ...|   ***787278**| 65|20180615|NULL|***000000**|NULL|  0|   3|
|17389|  2|MARCIA DO CANTO A...|   ***920408**| 49|19940613|NULL|***000000**|NULL|  0|   7|
|19204|  2|ALMIR CARLOS CAPE...|   ***299028**| 49|19980908|NULL|***000000**|NULL|  0|   7|
|19204|  2|EDMIR CARLOS CAPE...|   ***633158**| 49|19980908|NULL|***000000**|NULL|  0|   7|
|22223|  2|SILVIA REGINA FARIAS|   ***203598**| 49|19940627|NULL|***000000**|NULL|  0|   6|
|22223|  2|RUBENS BATISTA DA...|   ***464638**| 22|19990111|NULL|***000000**|NUL

In [26]:
df_partner.limit(5).toPandas()

Unnamed: 0,_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7,_c8,_c9,_c10
0,11748,2,MARIO KATUMI HOSI,***504158**,49,19940530,,***000000**,,0,7
1,11748,2,ROBERTO YUKIO HOSI,***241578**,22,19940530,,***000000**,,0,7
2,13289,2,ANDREIA CRISTINA DELSIN,***787278**,65,20180615,,***000000**,,0,3
3,17389,2,MARCIA DO CANTO ARRUDA DAIER,***920408**,49,19940613,,***000000**,,0,7
4,19204,2,ALMIR CARLOS CAPELLINI,***299028**,49,19980908,,***000000**,,0,7


In [28]:
partnerColNames = ['cnpj_basico', 'identificador_de_socio', 'nome_do_socio_ou_razao_social', 'cnpj_ou_cpf_do_socio', 'qualificacao_do_socio', 'data_de_entrada_sociedade', 'pais', 'representante_legal', 'nome_do_representante', 'qualificacao_do_representante_legal', 'faixa_etaria']
for index, colName in enumerate(partnerColNames):
    df_partner = df_partner.withColumnRenamed(f'_c{index}', colName)
df_partner.columns

['cnpj_basico',
 'identificador_de_socio',
 'nome_do_socio_ou_razao_social',
 'cnpj_ou_cpf_do_socio',
 'qualificacao_do_socio',
 'data_de_entrada_sociedade',
 'pais',
 'representante_legal',
 'nome_do_representante',
 'qualificacao_do_representante_legal',
 'faixa_etaria']

In [29]:
df_partner.show()

+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|cnpj_basico|identificador_de_socio|nome_do_socio_ou_razao_social|cnpj_ou_cpf_do_socio|qualificacao_do_socio|data_de_entrada_sociedade|pais|representante_legal|nome_do_representante|qualificacao_do_representante_legal|faixa_etaria|
+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|      11748|                     2|            MARIO KATUMI HOSI|         ***504158**|                   49|                 19940530|NULL|        ***000000**|                 NULL|                                  0|           7|
|      11748|                     2|           ROBERTO YUKIO HOSI|      

In [30]:
df_partner.limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,11748,2,MARIO KATUMI HOSI,***504158**,49,19940530,,***000000**,,0,7
1,11748,2,ROBERTO YUKIO HOSI,***241578**,22,19940530,,***000000**,,0,7
2,13289,2,ANDREIA CRISTINA DELSIN,***787278**,65,20180615,,***000000**,,0,3
3,17389,2,MARCIA DO CANTO ARRUDA DAIER,***920408**,49,19940613,,***000000**,,0,7
4,19204,2,ALMIR CARLOS CAPELLINI,***299028**,49,19980908,,***000000**,,0,7


# Data Analysis