In [2]:
import os

os.environ["SPARK_HOME"] = "C:\\spark\\spark-3.5.1-bin-hadoop3"

In [3]:
import findspark
findspark.init()

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [5]:
import zipfile
zipfile.ZipFile('..\\loading-data\\data-samples\\estabelecimentos.zip','r').extractall('..\\loading-data\\data-samples')
zipfile.ZipFile('..\\loading-data\\data-samples\\socios.zip','r').extractall('..\\loading-data\\data-samples')
zipfile.ZipFile('..\\loading-data\\data-samples\\empresas.zip','r').extractall('..\\loading-data\\data-samples')

In [5]:
path_empresas = '..\\loading-data\\data-samples\\empresas'
empresas = spark.read.csv(path_empresas, sep=';', inferSchema=True)

In [6]:
path_estabelecimentos = '..\\loading-data\\data-samples\\estabelecimentos'
estabelecimentos = spark.read.csv(path_estabelecimentos, sep=';', inferSchema=True)

In [7]:
path_socios = '..\\loading-data\\data-samples\\socios'
socios = spark.read.csv(path_socios, sep=';', inferSchema=True)

In [8]:
empresasColNames = ['cnpj_basico', 'razao_social_nome_empresarial', 'natureza_juridica', 'qualificacao_do_responsavel', 'capital_social_da_empresa', 'porte_da_empresa', 'ente_federativo_responsavel']

for index, colName in enumerate(empresasColNames):
    empresas = empresas.withColumnRenamed(f"_c{index}", colName)

In [9]:
estabsColNames = ['cnpj_basico', 'cnpj_ordem', 'cnpj_dv', 'identificador_matriz_filial', 'nome_fantasia', 'situacao_cadastral', 'data_situacao_cadastral', 'motivo_situacao_cadastral', 'nome_da_cidade_no_exterior', 'pais', 'data_de_inicio_atividade', 'cnae_fiscal_principal', 'cnae_fiscal_secundaria', 'tipo_de_logradouro', 'logradouro', 'numero', 'complemento', 'bairro', 'cep', 'uf', 'municipio', 'ddd_1', 'telefone_1', 'ddd_2', 'telefone_2', 'ddd_do_fax', 'fax', 'correio_eletronico', 'situacao_especial', 'data_da_situacao_especial']

for index, colName in enumerate(estabsColNames):
    estabelecimentos = estabelecimentos.withColumnRenamed(f"_c{index}", colName)

In [10]:
sociosColNames = ['cnpj_basico', 'identificador_de_socio', 'nome_do_socio_ou_razao_social', 'cnpj_ou_cpf_do_socio', 'qualificacao_do_socio', 'data_de_entrada_sociedade', 'pais', 'representante_legal', 'nome_do_representante', 'qualificacao_do_representante_legal', 'faixa_etaria']

for index, colName in enumerate(sociosColNames):
    socios = socios.withColumnRenamed(f"_c{index}", colName)

In [11]:
from pyspark.sql.types import DoubleType, StringType
from pyspark.sql import functions as pyspark_functions

#primeiro trocar , por . para que o cast entenda a conversao correta para double
empresas = empresas.withColumn('capital_social_da_empresa', pyspark_functions.regexp_replace('capital_social_da_empresa',',','.'))

In [12]:
#agora converto para double
empresas = empresas.withColumn('capital_social_da_empresa', empresas['capital_social_da_empresa'].cast(DoubleType()))

In [13]:
estabelecimentos = estabelecimentos\
.withColumn(
    'data_situacao_cadastral', 
    pyspark_functions.to_date(estabelecimentos.data_situacao_cadastral.cast(StringType()), 'yyyyMMdd')
)\
.withColumn(
    'data_de_inicio_atividade', 
    pyspark_functions.to_date(estabelecimentos.data_de_inicio_atividade.cast(StringType()), 'yyyyMMdd')
)\
.withColumn(
    'data_da_situacao_especial', 
    pyspark_functions.to_date(estabelecimentos.data_da_situacao_especial.cast(StringType()), 'yyyyMMdd')
)

In [19]:
empresas.select('*')\
.select('*')\
.show(5, False) #False para nao truncar o dado

+-----------+--------------------------------------------------------------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial                                                               |natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+--------------------------------------------------------------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|306        |FRANCAMAR REFRIGERACAO TECNICA S/C LTDA                                                     |2240             |49                         |0.0                      |1               |NULL                       |
|1355       |BRASILEIRO & OLIVEIRA LTDA                                                                 

In [21]:
#exibir somente ano de data
estabelecimentos\
.select('data_situacao_cadastral', pyspark_functions.year('data_situacao_cadastral').alias('ano_situacao_cadastral'),pyspark_functions.month('data_situacao_cadastral').alias('mes_situacao_cadastral'))\
.show(5, False)

+-----------------------+----------------------+----------------------+
|data_situacao_cadastral|ano_situacao_cadastral|mes_situacao_cadastral|
+-----------------------+----------------------+----------------------+
|2001-10-29             |2001                  |10                    |
|2008-12-31             |2008                  |12                    |
|1997-12-31             |1997                  |12                    |
|2008-12-31             |2008                  |12                    |
|1998-04-29             |1998                  |4                     |
+-----------------------+----------------------+----------------------+
only showing top 5 rows



In [23]:
#contar quantos registros com valor nulo na coluna
socios.select([pyspark_functions.count(pyspark_functions.when(pyspark_functions.isnull(c),1)).alias(c) for c in socios.columns]).show()

+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+-------+-------------------+---------------------+-----------------------------------+------------+
|cnpj_basico|identificador_de_socio|nome_do_socio_ou_razao_social|cnpj_ou_cpf_do_socio|qualificacao_do_socio|data_de_entrada_sociedade|   pais|representante_legal|nome_do_representante|qualificacao_do_representante_legal|faixa_etaria|
+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+-------+-------------------+---------------------+-----------------------------------+------------+
|          0|                     0|                          208|                1234|                    0|                        0|2038255|                  0|              1995432|                                  0|           0|
+-----------+----------------------+------------------------

In [24]:
#substituir o valor nulo por algum outro
socios.na.fill(0).limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,19940725,0,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,19940725,0,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,19940516,0,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,19940516,0,***000000**,,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,19940609,0,***000000**,,0,8


In [27]:
#orderBy
estabelecimentos.limit(5).orderBy('data_situacao_cadastral', ascending=False).toPandas()

Unnamed: 0,cnpj_basico,cnpj_ordem,cnpj_dv,identificador_matriz_filial,nome_fantasia,situacao_cadastral,data_situacao_cadastral,motivo_situacao_cadastral,nome_da_cidade_no_exterior,pais,...,municipio,ddd_1,telefone_1,ddd_2,telefone_2,ddd_do_fax,fax,correio_eletronico,situacao_especial,data_da_situacao_especial
0,2818,1,43,1,,8,2008-12-31,71,,,...,7107,,,,,,,,,
1,3733,1,80,1,,8,2008-12-31,71,,,...,7107,,,,,,,,,
2,1879,1,96,1,PIRAMIDE M. C.,8,2001-10-29,1,,,...,7107,,,,,,,,,
3,4628,3,27,2,EMBROIDERY & GIFT,8,1998-04-29,1,,,...,7075,,,,,,,,,
4,3110,1,7,1,,8,1997-12-31,1,,,...,7107,,,,,,,,,


In [29]:
#filtrando
socios\
.select('nome_do_socio_ou_razao_social')\
.filter(socios.nome_do_socio_ou_razao_social.startswith('RODRIGO'))\
.filter(socios.nome_do_socio_ou_razao_social.endswith('DIAS'))\
.limit(10)\
.toPandas()

Unnamed: 0,nome_do_socio_ou_razao_social
0,RODRIGO BENASSI DIAS
1,RODRIGO RUDIBERTO DIAS
2,RODRIGO AURELIANO DIAS
3,RODRIGO SIMOES LEMOS DIAS
4,RODRIGO GEORGE DIAS
5,RODRIGO AUGUSTO FELICIO DIAS
6,RODRIGO FERNANDES DIAS
7,RODRIGO GARRIDO DIAS
8,RODRIGO OLIVEIRA DIAS
9,RODRIGO GONCALVES DIAS


In [36]:
#LIKE
socios\
.select('nome_do_socio_ou_razao_social')\
.where(pyspark_functions.upper(socios.nome_do_socio_ou_razao_social).like('RODRIGO%'))\
.limit(10)\
.toPandas()

Unnamed: 0,nome_do_socio_ou_razao_social
0,RODRIGO BOEHL PINHEIRO MACHADO
1,RODRIGO MARTINELLI LAPORT
2,RODRIGO VIEIRA FRANCO
3,RODRIGO BATISTA DA SILVA
4,RODRIGO PIMENTEL FADEL
5,RODRIGO DA SILVA OLIVEIRA GOMES
6,RODRIGO MORAES
7,RODRIGO CARDOZO CARARA
8,RODRIGO DANILO LEITE
9,RODRIGO EVANGELISTA MARQUES
