# Iniciando com Spark

In [None]:
import os 

os.environ["SPARK_HOME"] = "C:\spark\spark-3.1.3-bin-hadoop2.7"

In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.master('local[*]').getOrCreate()

In [4]:
spark

In [5]:
data = [('Zeca','35'), ('Eva', '29')]
colNames = ['Nome', 'Idade']
df = spark.createDataFrame(data, colNames)
df

DataFrame[Nome: string, Idade: string]

In [6]:
df.show()


[Stage 0:>                                                          (0 + 1) / 1]

+----+-----+
|Nome|Idade|
+----+-----+
|Zeca|   35|
| Eva|   29|
+----+-----+




                                                                                

In [7]:
df.toPandas()

Unnamed: 0,Nome,Idade
0,Zeca,35
1,Eva,29


# Extraindo arquivos .zip

In [8]:
import zipfile

In [9]:
zipfile.ZipFile('arquivos/empresas.zip', 'r').extractall('arquivos')
zipfile.ZipFile('arquivos/estabelecimentos.zip', 'r').extractall('arquivos')
zipfile.ZipFile('arquivos/socios.zip', 'r').extractall('arquivos')

In [10]:
path = 'arquivos/empresas/part-0000*.csv'
empresas = spark.read.csv(path, sep = ';', inferSchema=True)

                                                                                

In [11]:
empresas.count()

                                                                                

4585679

In [12]:
path = 'arquivos/estabelecimentos/part-0000*.csv'
estabelecimentos = spark.read.csv(path, sep = ';', inferSchema=True)

                                                                                

In [13]:
estabelecimentos.count()

                                                                                

4836219

In [14]:
path = 'arquivos/socios/part-0000*.csv'
socios = spark.read.csv(path, sep = ';', inferSchema=True)

                                                                                

In [15]:
socios.count()

2046430

# Manipulando os dados

In [16]:
empresas.limit(10).toPandas()

Unnamed: 0,_c0,_c1,_c2,_c3,_c4,_c5,_c6
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,400000,1,
5,8416,ELETRICA RUBI LTDA,2062,49,0,5,
6,8992,SHIROMA VEICULOS LTDA.,2062,49,0,5,
7,9091,CONTATOS BAR E LANCHONETE LTDA,2062,49,0,5,
8,9614,ANTONIA APARECIDA DE SOUZA ULIANA,2135,50,0,5,
9,9896,DORACY CORAT DA COSTA,2135,50,0,5,


In [17]:
#renomeando as colunas
empresasColNames = ['cnpj_basico', 'razao_social_nome_empresarial', 'natureza_juridica', 'qualificacao_do_responsavel', 'capital_social_da_empresa', 'porte_da_empresa', 'ente_federativo_responsavel']

In [18]:
for index, colName in enumerate(empresasColNames):
    empresas = empresas.withColumnRenamed(f"_c{index}", colName)
    
empresas.columns

['cnpj_basico',
 'razao_social_nome_empresarial',
 'natureza_juridica',
 'qualificacao_do_responsavel',
 'capital_social_da_empresa',
 'porte_da_empresa',
 'ente_federativo_responsavel']

In [19]:
estabelecimentos.limit(10).toPandas()

22/03/14 22:22:05 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


Unnamed: 0,_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7,_c8,_c9,...,_c20,_c21,_c22,_c23,_c24,_c25,_c26,_c27,_c28,_c29
0,1879,1,96,1,PIRAMIDE M. C.,8,20011029,1,,,...,7107,,,,,,,,,
1,2818,1,43,1,,8,20081231,71,,,...,7107,,,,,,,,,
2,3110,1,7,1,,8,19971231,1,,,...,7107,,,,,,,,,
3,3733,1,80,1,,8,20081231,71,,,...,7107,,,,,,,,,
4,4628,3,27,2,EMBROIDERY & GIFT,8,19980429,1,,,...,7075,,,,,,,,,
5,4628,4,8,2,,8,20040719,1,,,...,7075,,,,,,,,,
6,4628,2,46,2,EMBROIDERY & GIFT,8,19960802,1,,,...,7075,,,,,,,,,
7,4628,1,65,1,,2,20051103,0,,,...,7075,11.0,43625402.0,11.0,43622689.0,11.0,43625402.0,SHIRLEY_LUPOS@HOTMAIL.COM,,
8,10804,1,71,1,KOLVOX,8,20170613,1,,,...,7107,11.0,20580186.0,,,,,NCCONTABILIDADE@TERRA.COM.BR,,
9,11448,1,0,1,PROLAV,8,20081231,71,,,...,7107,,,,,,,,,


In [20]:
#renomeando as colunas
estabsColNames = ['cnpj_basico', 'cnpj_ordem', 'cnpj_dv', 'identificador_matriz_filial', 'nome_fantasia', 'situacao_cadastral', 'data_situacao_cadastral', 'motivo_situacao_cadastral', 'nome_da_cidade_no_exterior', 'pais', 'data_de_inicio_atividade', 'cnae_fiscal_principal', 'cnae_fiscal_secundaria', 'tipo_de_logradouro', 'logradouro', 'numero', 'complemento', 'bairro', 'cep', 'uf', 'municipio', 'ddd_1', 'telefone_1', 'ddd_2', 'telefone_2', 'ddd_do_fax', 'fax', 'correio_eletronico', 'situacao_especial', 'data_da_situacao_especial']

In [21]:
for index, colName in enumerate(estabsColNames):
    estabelecimentos = estabelecimentos.withColumnRenamed(f"_c{index}", colName)
    
estabelecimentos.columns

['cnpj_basico',
 'cnpj_ordem',
 'cnpj_dv',
 'identificador_matriz_filial',
 'nome_fantasia',
 'situacao_cadastral',
 'data_situacao_cadastral',
 'motivo_situacao_cadastral',
 'nome_da_cidade_no_exterior',
 'pais',
 'data_de_inicio_atividade',
 'cnae_fiscal_principal',
 'cnae_fiscal_secundaria',
 'tipo_de_logradouro',
 'logradouro',
 'numero',
 'complemento',
 'bairro',
 'cep',
 'uf',
 'municipio',
 'ddd_1',
 'telefone_1',
 'ddd_2',
 'telefone_2',
 'ddd_do_fax',
 'fax',
 'correio_eletronico',
 'situacao_especial',
 'data_da_situacao_especial']

In [22]:
socios.limit(10).toPandas()

Unnamed: 0,_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7,_c8,_c9,_c10
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,19940725,,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,19940725,,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,19940516,,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,19940516,,***000000**,,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,19940609,,***000000**,,0,8
5,14798,2,CLOD ASSAD FADEL,***205668**,22,19940609,,***000000**,,0,6
6,17826,2,WALKYRIA ALGARVES,***689078**,49,19970227,,***000000**,,0,7
7,17826,2,SEBASTIAO JADIR TEIXEIRA NUNES,***904728**,49,20090813,,***000000**,,0,5
8,19491,2,JOSE JOAO ADAMO,***235698**,49,19940615,,***000000**,,0,7
9,19491,2,ROSEMARY CANTUARIA AFONSO ADAMO,***959818**,49,19940615,,***000000**,,0,6


In [23]:
#renomeando as colunas 
sociosColNames = ['cnpj_basico', 'identificador_de_socio', 'nome_do_socio_ou_razao_social', 'cnpj_ou_cpf_do_socio', 'qualificacao_do_socio', 'data_de_entrada_sociedade', 'pais', 'representante_legal', 'nome_do_representante', 'qualificacao_do_representante_legal', 'faixa_etaria']

In [24]:
for index, colName in enumerate(sociosColNames):
    socios = socios.withColumnRenamed(f"_c{index}", colName)
    
socios.columns

['cnpj_basico',
 'identificador_de_socio',
 'nome_do_socio_ou_razao_social',
 'cnpj_ou_cpf_do_socio',
 'qualificacao_do_socio',
 'data_de_entrada_sociedade',
 'pais',
 'representante_legal',
 'nome_do_representante',
 'qualificacao_do_representante_legal',
 'faixa_etaria']

# Analisando os dados

In [25]:
empresas.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,400000,1,


In [26]:
empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: string (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [27]:
socios.limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,19940725,,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,19940725,,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,19940516,,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,19940516,,***000000**,,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,19940609,,***000000**,,0,8


In [28]:
socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: integer (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



In [29]:
estabelecimentos.limit(5).toPandas()

Unnamed: 0,cnpj_basico,cnpj_ordem,cnpj_dv,identificador_matriz_filial,nome_fantasia,situacao_cadastral,data_situacao_cadastral,motivo_situacao_cadastral,nome_da_cidade_no_exterior,pais,...,municipio,ddd_1,telefone_1,ddd_2,telefone_2,ddd_do_fax,fax,correio_eletronico,situacao_especial,data_da_situacao_especial
0,1879,1,96,1,PIRAMIDE M. C.,8,20011029,1,,,...,7107,,,,,,,,,
1,2818,1,43,1,,8,20081231,71,,,...,7107,,,,,,,,,
2,3110,1,7,1,,8,19971231,1,,,...,7107,,,,,,,,,
3,3733,1,80,1,,8,20081231,71,,,...,7107,,,,,,,,,
4,4628,3,27,2,EMBROIDERY & GIFT,8,19980429,1,,,...,7075,,,,,,,,,


In [30]:
estabelecimentos.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: integer (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: integer (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: str

# Convertendo String > Double 

In [31]:
from pyspark.sql.types import DoubleType, StringType
from pyspark.sql import functions as f

In [32]:
empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: string (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [33]:
empresas.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,400000,1,


# Substituindo vírgula por ponto

In [34]:
empresas = empresas.withColumn('capital_social_da_empresa', f.regexp_replace('capital_social_da_empresa', ',', '.'))
empresas.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0.0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0.0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0.0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0.0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,4000.0,1,


# Convertendo string por double

In [35]:
empresas = empresas.withColumn('capital_social_da_empresa', empresas['capital_social_da_empresa'].cast(DoubleType()))
empresas.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0.0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0.0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0.0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0.0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,4000.0,1,


In [36]:
empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



# Convertendo int para date

In [37]:
estabelecimentos = estabelecimentos\
    .withColumn("data_situacao_cadastral", f.to_date(estabelecimentos.data_situacao_cadastral\
                                                     .cast(StringType()), "yyyy-MM-DD"))\
    .withColumn("data_de_inicio_atividade", f.to_date(estabelecimentos.data_de_inicio_atividade\
                                                     .cast(StringType()), "yyyy-MM-DD"))\
    .withColumn("data_da_situacao_especial", f.to_date(estabelecimentos.data_da_situacao_especial\
                                                     .cast(StringType()), "yyyy-MM-DD"))


estabelecimentos.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: date (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: date (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: string (n

In [38]:
estabelecimentos.limit(10).toPandas()

Unnamed: 0,cnpj_basico,cnpj_ordem,cnpj_dv,identificador_matriz_filial,nome_fantasia,situacao_cadastral,data_situacao_cadastral,motivo_situacao_cadastral,nome_da_cidade_no_exterior,pais,...,municipio,ddd_1,telefone_1,ddd_2,telefone_2,ddd_do_fax,fax,correio_eletronico,situacao_especial,data_da_situacao_especial
0,1879,1,96,1,PIRAMIDE M. C.,8,,1,,,...,7107,,,,,,,,,
1,2818,1,43,1,,8,,71,,,...,7107,,,,,,,,,
2,3110,1,7,1,,8,,1,,,...,7107,,,,,,,,,
3,3733,1,80,1,,8,,71,,,...,7107,,,,,,,,,
4,4628,3,27,2,EMBROIDERY & GIFT,8,,1,,,...,7075,,,,,,,,,
5,4628,4,8,2,,8,,1,,,...,7075,,,,,,,,,
6,4628,2,46,2,EMBROIDERY & GIFT,8,,1,,,...,7075,,,,,,,,,
7,4628,1,65,1,,2,,0,,,...,7075,11.0,43625402.0,11.0,43622689.0,11.0,43625402.0,SHIRLEY_LUPOS@HOTMAIL.COM,,
8,10804,1,71,1,KOLVOX,8,,1,,,...,7107,11.0,20580186.0,,,,,NCCONTABILIDADE@TERRA.COM.BR,,
9,11448,1,0,1,PROLAV,8,,71,,,...,7107,,,,,,,,,


In [39]:
socios = socios.withColumn("data_de_entrada_sociedade", f.to_date(f.col('data_de_entrada_sociedade').cast('string'), 'yyyyMMdd'))


socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: date (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



In [40]:
socios.limit(10).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,1994-07-25,,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,1994-07-25,,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,1994-05-16,,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,1994-05-16,,***000000**,,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,1994-06-09,,***000000**,,0,8
5,14798,2,CLOD ASSAD FADEL,***205668**,22,1994-06-09,,***000000**,,0,6
6,17826,2,WALKYRIA ALGARVES,***689078**,49,1997-02-27,,***000000**,,0,7
7,17826,2,SEBASTIAO JADIR TEIXEIRA NUNES,***904728**,49,2009-08-13,,***000000**,,0,5
8,19491,2,JOSE JOAO ADAMO,***235698**,49,1994-06-15,,***000000**,,0,7
9,19491,2,ROSEMARY CANTUARIA AFONSO ADAMO,***959818**,49,1994-06-15,,***000000**,,0,6


# Selecionando informações

In [41]:
empresas.select('natureza_juridica', 'porte_da_empresa', 'capital_social_da_empresa').show(5, False)

+-----------------+----------------+-------------------------+
|natureza_juridica|porte_da_empresa|capital_social_da_empresa|
+-----------------+----------------+-------------------------+
|2240             |1               |0.0                      |
|2062             |5               |0.0                      |
|3034             |5               |0.0                      |
|2135             |5               |0.0                      |
|2062             |1               |4000.0                   |
+-----------------+----------------+-------------------------+
only showing top 5 rows



In [42]:
socios.select('nome_do_socio_ou_razao_social', 'faixa_etaria', f.year('data_de_entrada_sociedade')\
              .alias('ano_entrada_sociedade')).show(5, False)

+-------------------------------+------------+---------------------+
|nome_do_socio_ou_razao_social  |faixa_etaria|ano_entrada_sociedade|
+-------------------------------+------------+---------------------+
|LILIANA PATRICIA GUASTAVINO    |7           |1994                 |
|CRISTINA HUNDERTMARK           |7           |1994                 |
|CELSO EDUARDO DE CASTRO STEPHAN|8           |1994                 |
|EDUARDO BERRINGER STEPHAN      |5           |1994                 |
|HANNE MAHFOUD FADEL            |8           |1994                 |
+-------------------------------+------------+---------------------+
only showing top 5 rows



# Identificando valores nulos

In [43]:
socios.limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,1994-07-25,,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,1994-07-25,,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,1994-05-16,,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,1994-05-16,,***000000**,,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,1994-06-09,,***000000**,,0,8


In [44]:
socios.select([f.count(f.when(f.isnull(c), 1)).alias(c) for c in socios.columns]).toPandas()

                                                                                

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,0,0,208,1234,0,1,2038255,0,1995432,0,0


In [45]:
socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: date (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



In [46]:
socios.na.fill(0).limit(5).toPandas()
socios.na.fill('-').limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,1994-07-25,,***000000**,-,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,1994-07-25,,***000000**,-,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,1994-05-16,,***000000**,-,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,1994-05-16,,***000000**,-,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,1994-06-09,,***000000**,-,0,8


# Ordenando os dados

In [47]:
socios\
    .select('nome_do_socio_ou_razao_social', 'faixa_etaria', f.year('data_de_entrada_sociedade')\
            .alias('ano_de_entrada')).orderBy('faixa_etaria', ascending = False).show(5, False)



+--------------------------------------+------------+--------------+
|nome_do_socio_ou_razao_social         |faixa_etaria|ano_de_entrada|
+--------------------------------------+------------+--------------+
|NOSKE DO REGO CASTELO BRANCO          |9           |2000          |
|JOSE BATISTA SOARES LEITE             |9           |2002          |
|RICARDO DE MELLO OLIVEIRA GASPARIAN   |9           |1995          |
|EUZA THEREZINHA DE PAULA E SILVA COSTA|9           |1995          |
|MARIA LUIZA DA SILVA                  |9           |2002          |
+--------------------------------------+------------+--------------+
only showing top 5 rows




                                                                                

# Ordenando com dois parâmetros

In [48]:
socios\
    .select('nome_do_socio_ou_razao_social', 'faixa_etaria', f.year('data_de_entrada_sociedade')\
            .alias('ano_de_entrada')).orderBy(['faixa_etaria', 'ano_de_entrada'], ascending = [False, True]).show(5, False)



+-----------------------------+------------+--------------+
|nome_do_socio_ou_razao_social|faixa_etaria|ano_de_entrada|
+-----------------------------+------------+--------------+
|ANTERO DA SILVA RAMALHO CRUZ |9           |1918          |
|NELSON FALDINI               |9           |1937          |
|ALFREDO MARINHO DE QUEIROZ   |9           |1939          |
|PAULO BRASIL FERREIRA VELLOSO|9           |1941          |
|EDGAR ERTHAL                 |9           |1941          |
+-----------------------------+------------+--------------+
only showing top 5 rows




                                                                                

# Filtrando os dados

In [49]:
empresas\
    .filter("capital_social_da_empresa > 50").toPandas()

                                                                                

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,6846,BADU E FILHOS TECIDOS LTDA,2062,49,4000.0,1.0,
1,24205,SUELY LEME MARI ANI 25086572800,2135,50,1000.0,1.0,
2,58970,TOTAL CAR VEICULOS LTDA,2062,49,20000.0,1.0,
3,70273,VANDA DA SILVA LANCHONETE,2135,50,15000.0,1.0,
4,74218,V G SILVA TECIDOS,2135,50,16000.0,1.0,
...,...,...,...,...,...,...,...
2517165,97546747,JAQUELINE P.M.DA SILVA & CIA LTDA,2062,49,20000.0,1.0,
2517166,97549420,FRANCISCO ALCIMAR DA SILVA 78984920487,2135,50,10000.0,1.0,
2517167,97551200,MARTINS & MARTINS REPRESENTACOES LTDA,2062,49,10000.0,1.0,
2517168,97552959,EDNALDO GOMES SOARES,2135,50,30000.0,1.0,


In [50]:
empresas\
    .where("capital_social_da_empresa = 50").toPandas()

                                                                                

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,17350147,ERIK MARCELO DOS SANTOS 42107848858,2135,50,50.0,1,
1,17833214,ALEXANDRE MACHADO LIMA 73750123772,2135,50,50.0,1,
2,20860830,YASMIN MOURA DA FONSECA 13457709793,2135,50,50.0,1,
3,22242856,JOAO CESAR MESSIAS 08707149883,2135,50,50.0,1,
4,23238540,EVERTON ROBERTO DA SILVA 42101963809,2135,50,50.0,1,
...,...,...,...,...,...,...,...
6546,39717910,JANAINA CRISTINA GUEDES DE NOVAIS 40789379856,2135,50,50.0,1,
6547,40134726,JOSE VENITH 44292147949,2135,50,50.0,1,
6548,40983141,VALERIO ANTONIO CRISPIM DA CRUZ 53025776668,2135,50,50.0,1,
6549,41238271,MARIANGELA RAMOS PIMENTA 73179787600,2135,50,50.0,1,


In [51]:
empresas\
    .where("capital_social_da_empresa < 50").toPandas()

                                                                                

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0.0,1.0,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0.0,5.0,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0.0,5.0,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0.0,5.0,
4,8416,ELETRICA RUBI LTDA,2062,49,0.0,5.0,
...,...,...,...,...,...,...,...
2061953,97536261,EDER NUNES FILGUEIRA 06221103657,2135,50,1.0,1.0,
2061954,97541408,RAPHAEL FARIAS DE SOUZA 11334313725,2135,50,1.0,1.0,
2061955,97836258,COOPERATIVA AGRICOLA MISTA DE TAQUARI LTDA,2143,16,0.0,5.0,
2061956,97881551,IRMAOS MEGIER LTDA,2062,49,0.0,5.0,


In [52]:
socios\
    .select('nome_do_socio_ou_razao_social')\
    .filter(socios.nome_do_socio_ou_razao_social.startswith('RODRIGO'))\
    .filter(socios.nome_do_socio_ou_razao_social.endswith('DIAS'))\
    .limit(10)\
    .toPandas()


[Stage 37:>                                                         (0 + 1) / 1]

                                                                                

Unnamed: 0,nome_do_socio_ou_razao_social
0,RODRIGO BENASSI DIAS
1,RODRIGO RUDIBERTO DIAS
2,RODRIGO AURELIANO DIAS
3,RODRIGO SIMOES LEMOS DIAS
4,RODRIGO GEORGE DIAS
5,RODRIGO AUGUSTO FELICIO DIAS
6,RODRIGO FERNANDES DIAS
7,RODRIGO GARRIDO DIAS
8,RODRIGO OLIVEIRA DIAS
9,RODRIGO GONCALVES DIAS


# O comando LIKE

In [53]:
df = spark.createDataFrame([('RESTAURANTE DO RUI',), ('Juca restaurantes ltda',), ('Joca Restaurante',)], ['data'])
df.toPandas()

Unnamed: 0,data
0,RESTAURANTE DO RUI
1,Juca restaurantes ltda
2,Joca Restaurante


In [54]:
df\
    .filter(f.upper(df.data)\
    .like('%RESTAURANTE%'))\
    .limit(10)\
    .toPandas()

Unnamed: 0,data
0,RESTAURANTE DO RUI
1,Juca restaurantes ltda
2,Joca Restaurante


In [55]:
df\
    .filter(f.upper(df.data)\
    .like('RESTAURANTE%'))\
    .limit(10)\
    .toPandas()

Unnamed: 0,data
0,RESTAURANTE DO RUI


In [56]:
df\
    .filter(f.upper(df.data)\
    .like('%RESTAURANTE'))\
    .limit(10)\
    .toPandas()

Unnamed: 0,data
0,Joca Restaurante


In [57]:
empresas\
    .select('razao_social_nome_empresarial', 'natureza_juridica', 'porte_da_empresa', 'capital_social_da_empresa')\
    .filter(f.upper(empresas['razao_social_nome_empresarial'])\
    .like('%RESTAURANTE'))\
    .limit(20)\
    .toPandas()


Unnamed: 0,razao_social_nome_empresarial,natureza_juridica,porte_da_empresa,capital_social_da_empresa
0,MARIA ROZA DOS SANTOS- BAR E RESTAURANTE,2135,5,0.0
1,ELIANIA A. CUSTODIO RESTAURANTE,2135,1,0.0
2,R. A. D. ABRIL RESTAURANTE,2135,1,0.0
3,R. DA S. ARAUJO - RESTAURANTE,2135,5,0.0
4,A C M COUTINHO-LANCHONETE E RESTAURANTE,2135,1,0.0
5,REGINA DA S SANTOS RESTAURANTE,2135,1,8000.0
6,MARCIANO REZES DA SILVA RESTAURANTE,2135,1,20000.0
7,S. L. DE OLIVEIRA RESTAURANTE,2135,1,50000.0
8,DIELE LIMA DE ARAUJO RESTAURANTE,2135,1,20000.0
9,G.DA ROSA - RESTAURANTE,2135,1,5000.0


In [58]:
empresas\
    .select('razao_social_nome_empresarial', 'natureza_juridica', 'porte_da_empresa', 'capital_social_da_empresa')\
    .filter(f.upper(empresas['razao_social_nome_empresarial'])\
    .like('RESTAURANTE%'))\
    .limit(20)\
    .toPandas()


Unnamed: 0,razao_social_nome_empresarial,natureza_juridica,porte_da_empresa,capital_social_da_empresa
0,RESTAURANTE IMIGRANTE PORTUGUES LTDA.,2062,5,0.0
1,RESTAURANTE E PETISCARIA THE-DELICIA LTDA,2062,5,0.0
2,RESTAURANTE PONTO NOBRE EIRELI,2305,1,105000.0
3,RESTAURANTE ALEGRIA ALEGRIA LTDA,2062,1,5000.0
4,RESTAURANTE D GABI EIRELI,2305,1,95400.0
5,RESTAURANTE DONA CHICA LTDA,2062,1,5000.0
6,RESTAURANTE SABOR REGIONAL LTDA,2062,1,20000.0
7,RESTAURANTE E LANCHONETE MEU CANTINHO LTDA,2062,5,0.0
8,RESTAURANTE BEM NUTRIDO LTDA,2062,5,0.0
9,RESTAURANTE TOQUIO SUSHI EIRELI,2305,1,68000.0


# Sumarizando os dados

In [59]:
socios\
    .select(f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
    .where('ano_de_entrada >= 2010')\
    .groupBy('ano_de_entrada')\
    .count()\
    .orderBy('ano_de_entrada', ascending = True)\
    .toPandas()

                                                                                

Unnamed: 0,ano_de_entrada,count
0,2010,79337
1,2011,83906
2,2012,80101
3,2013,83919
4,2014,80590
5,2015,80906
6,2016,81587
7,2017,90221
8,2018,99935
9,2019,118248


In [60]:
empresas.select('cnpj_basico', 'porte_da_empresa', 'capital_social_da_empresa')\
    .groupBy('porte_da_empresa')\
    .agg(
        f.avg('capital_social_da_empresa').alias('capital_social_medio'),
        f.count('cnpj_basico').alias('frequencia')
     )\
     .orderBy('porte_da_empresa', ascending = True).show()



+----------------+--------------------+----------+
|porte_da_empresa|capital_social_medio|frequencia|
+----------------+--------------------+----------+
|            null|    8.35421888053467|      5985|
|               1|  339994.53313506936|   3129043|
|               3|  2601001.7677092673|    115151|
|               5|   708660.4208249798|   1335500|
+----------------+--------------------+----------+





                                                                                

In [61]:
empresas.select('capital_social_da_empresa').summary().show()
# poderia também utilizar a parte de baixo
# .summary('count', 'mean', 'stddev', 'min', '25%', '50%', '75%', 'max' )



+-------+-------------------------+
|summary|capital_social_da_empresa|
+-------+-------------------------+
|  count|                  4585679|
|   mean|        503694.5478542675|
| stddev|     2.1118691490537405E8|
|    min|                      0.0|
|    25%|                      0.0|
|    50%|                   1000.0|
|    75%|                   7000.0|
|    max|         3.22014670262E11|
+-------+-------------------------+




                                                                                

# Juntando DataFrames

In [62]:
produtos = spark.createDataFrame(
    [
        ('1', 'Bebidas', 'Água Mineral'),
        ('2', 'Limpeza', 'Sabão em Pó'),
        ('3', 'Frios', 'Queijo'),
        ('4', 'Bebidas', 'Refrigerante'),
        ('5', 'Pet', 'Ração para Cães')
    ],
    ['id', 'cat', 'prod']
)

impostos = spark.createDataFrame(
    [
        ('Bebidas', 0.15),
        ('Limpeza', 0.05),
        ('Frios', 0.065),
        ('Carnes', 0.08)
    ],
    ['cat', 'tax']
)

In [63]:
produtos.toPandas()

Unnamed: 0,id,cat,prod
0,1,Bebidas,Água Mineral
1,2,Limpeza,Sabão em Pó
2,3,Frios,Queijo
3,4,Bebidas,Refrigerante
4,5,Pet,Ração para Cães


In [64]:
impostos.toPandas()

Unnamed: 0,cat,tax
0,Bebidas,0.15
1,Limpeza,0.05
2,Frios,0.065
3,Carnes,0.08


In [65]:
produtos.join(impostos, 'cat', how='inner')\
    .sort('id')\
    .toPandas()

                                                                                

Unnamed: 0,cat,id,prod,tax
0,Bebidas,1,Água Mineral,0.15
1,Limpeza,2,Sabão em Pó,0.05
2,Frios,3,Queijo,0.065
3,Bebidas,4,Refrigerante,0.15


In [66]:
produtos.join(impostos, 'cat', how='left')\
    .sort('id')\
    .toPandas()

                                                                                

Unnamed: 0,cat,id,prod,tax
0,Bebidas,1,Água Mineral,0.15
1,Limpeza,2,Sabão em Pó,0.05
2,Frios,3,Queijo,0.065
3,Bebidas,4,Refrigerante,0.15
4,Pet,5,Ração para Cães,


In [67]:
produtos.join(impostos, 'cat', how='right')\
    .sort('id')\
    .toPandas()

                                                                                

Unnamed: 0,cat,id,prod,tax
0,Carnes,,,0.08
1,Bebidas,1.0,Água Mineral,0.15
2,Limpeza,2.0,Sabão em Pó,0.05
3,Frios,3.0,Queijo,0.065
4,Bebidas,4.0,Refrigerante,0.15


In [68]:
produtos.join(impostos, 'cat', how='outer')\
    .sort('id')\
    .toPandas()

                                                                                

Unnamed: 0,cat,id,prod,tax
0,Carnes,,,0.08
1,Bebidas,1.0,Água Mineral,0.15
2,Limpeza,2.0,Sabão em Pó,0.05
3,Frios,3.0,Queijo,0.065
4,Bebidas,4.0,Refrigerante,0.15
5,Pet,5.0,Ração para Cães,


In [69]:
empresas_join = estabelecimentos.join(empresas, 'cnpj_basico', how='inner')

In [70]:
empresas_join.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: date (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: date (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: string (n

In [71]:
empresas_join.limit(10).toPandas()

                                                                                

Unnamed: 0,cnpj_basico,cnpj_ordem,cnpj_dv,identificador_matriz_filial,nome_fantasia,situacao_cadastral,data_situacao_cadastral,motivo_situacao_cadastral,nome_da_cidade_no_exterior,pais,...,fax,correio_eletronico,situacao_especial,data_da_situacao_especial,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,4519,1,48,1,GIRAFFAS,8,,1,,,...,,,,,DANIELA DA SILVA CRUZ,2135,50,0.0,5,
1,8638,1,79,1,AGROPECUARIA FAGUNDES,8,,73,,,...,,,,,JOAO DOS SANTOS FAGUNDES,2135,50,0.0,5,
2,11748,1,90,1,,4,,63,,,...,,,,,PANIFICADORA E CONFEITARIA CONFIANCA RIO PRETO...,2062,49,0.0,1,
3,12027,1,2,1,,8,,71,,,...,,,,,L G SORVETERIA LTDA,2062,49,0.0,5,
4,13289,1,83,1,JS MATERIAIS DE CONSTRUCAO,2,,0,,,...,,CONTATO@LEONECONTABIL.COM.BR,,,ANDREIA CRISTINA DELSIN EIRELI,2305,65,100000.0,1,
5,13623,1,7,1,MARISTELA INDUSTRIA E COMERCIO DE SORVETES,8,,71,,,...,,,,,MARISTELA INDUSTRIA E COMERCIO DE SORVETES LTDA,2062,49,0.0,5,
6,17389,1,88,1,,4,,63,,,...,5352348.0,,,,DAICICOM MARKETING DIGITAL S/C LTDA,2240,49,0.0,1,
7,18944,1,96,1,MONTANHES,8,,1,,,...,,,,,SAO GOTARDO-DISTRIBUICAO E COMERCIO LTDA,2062,49,0.0,5,
8,19204,1,74,1,,8,,1,,,...,,,,,TORTARIA CAMPINAS COMERCIO DE ALIMENTOS LTDA,2062,49,0.0,1,
9,22223,1,50,1,,8,,1,,,...,,,,,S R FARIAS DA SILVA E CIA LTDA,2062,49,0.0,5,


In [72]:
freq = empresas_join.select('cnpj_basico',f.year('data_de_inicio_atividade').alias('data_de_inicio'))\
    .where('data_de_inicio >= 2010')\
    .groupBy('data_de_inicio')\
    .agg(f.count('cnpj_basico').alias('frequencia'))\
    .orderBy('data_de_inicio', ascending = True)
    

In [73]:
freq.limit(10).toPandas()

                                                                                

Unnamed: 0,data_de_inicio,frequencia


# Criando um total

In [74]:
freq.union(
    freq.select(
    f.lit('Total').alias('data_de_inicio'),
    f.sum(freq.frequencia).alias('frequencia')
    )
).toPandas()

                                                                                

Unnamed: 0,data_de_inicio,frequencia
0,Total,


# SparkSQL

In [75]:
empresas.createOrReplaceTempView('empresasView')

In [76]:
spark.sql('SELECT * FROM empresasView').show(5)

+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial|natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|        306|         FRANCAMAR REFRIGE...|             2240|                         49|                      0.0|               1|                       null|
|       1355|         BRASILEIRO & OLIV...|             2062|                         49|                      0.0|               5|                       null|
|       4820|         REGISTRO DE IMOVE...|             3034|                         32|                      0.0|               5|                       null|
|       5347|         ROSELY APARE

In [77]:
spark.sql('SELECT * FROM empresasView').limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0.0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0.0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0.0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0.0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,4000.0,1,


In [78]:
spark.sql(""" SELECT * FROM empresasView WHERE capital_social_da_empresa = 50 """).limit(10).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,17350147,ERIK MARCELO DOS SANTOS 42107848858,2135,50,50.0,1,
1,17833214,ALEXANDRE MACHADO LIMA 73750123772,2135,50,50.0,1,
2,20860830,YASMIN MOURA DA FONSECA 13457709793,2135,50,50.0,1,
3,22242856,JOAO CESAR MESSIAS 08707149883,2135,50,50.0,1,
4,23238540,EVERTON ROBERTO DA SILVA 42101963809,2135,50,50.0,1,
5,24310504,LIGIA WERNECK DAMASCENO MARINS 07108267705,2135,50,50.0,1,
6,26171412,CLEUZA NERES DA CONCEICAO 14580656857,2135,50,50.0,1,
7,26366885,KAROLINA DE FREITAS RIBEIRO 42913650805,2135,50,50.0,1,
8,27907321,ANDRE LUIS DOS SANTOS RAMOS 93069960553,2135,50,50.0,1,
9,28598904,ANDRE LUIS PEREIRA PEDROSO 14468087773,2135,50,50.0,1,


In [79]:
spark.sql(""" SELECT porte_da_empresa, MEAN(capital_social_da_empresa) AS Media 
               FROM empresasView 
               GROUP BY porte_da_empresa """).limit(10).toPandas()

                                                                                

Unnamed: 0,porte_da_empresa,Media
0,,8.354219
1,1.0,339994.5
2,3.0,2601002.0
3,5.0,708660.4


In [80]:
empresas_join.createOrReplaceTempView('empresasJoinView')

In [81]:
freq = spark\
        .sql("""
            SELECT YEAR(data_de_inicio_atividade) AS data_de_inicio, 
            COUNT(cnpj_basico) as count
            FROM empresasJoinView
            WHERE YEAR(data_de_inicio_atividade) >= 2010
            GROUP BY data_de_inicio
            ORDER BY data_de_inicio""")

freq.toPandas()

                                                                                

Unnamed: 0,data_de_inicio,count


In [82]:
freq.createOrReplaceTempView('freqView')

In [83]:
spark.sql(""" SELECT * FROM freqView UNION ALL 
        SELECT 'Total' AS data_de_inicio, SUM(count) AS count
        FROM freqView""")\
        .toPandas()
        

                                                                                

Unnamed: 0,data_de_inicio,count
0,Total,


# Formas de armazenamento

# Salvando em .csv

In [84]:
empresas.write.csv(
    'arquivos/empresas/csv', mode='overwrite',
    sep=';',
    header=True
)

                                                                                

In [85]:
empresas2 = spark.read.csv(
       path= "arquivos/empresas/csv",
       sep=";",
       inferSchema=True,
       header=True
)

                                                                                

In [86]:
empresas2.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [87]:
estabelecimentos.write.csv(
    path= 'arquivos/estabelecimentos/csv', mode="overwrite",
    sep=";",
    header=True
)

                                                                                

In [88]:
estabelecimentos2 = spark.read.csv(
       path= "arquivos/estabelecimentos/csv",
       sep=";",
       inferSchema=True,
       header=True
)

                                                                                

In [89]:
estabelecimentos2.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: string (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: string (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: strin

In [90]:
socios.write.csv(
    path='arquivos/socios/csv', mode="overwrite",
    sep=";",
    header=True
)

                                                                                

In [None]:
socios2 = spark.read.csv(
       path= "arquivos/socios/csv",
       sep=";".
       inferSchema=True,
       header=True
)

In [None]:
socios2.printSchema()

# Salvando em Parquet

In [None]:
empresas.write.parquet(
    path='arquivos/empresas/parquet', mode="overwrite"
)

In [None]:
empresas_parquet = spark.read.parquet(
        'arquivos/empresas/parquet')

In [None]:
estabelecimentos.write.parquet(
    path='arquivos/estabelecimentos/parquet', mode="overwrite"
)

In [None]:
estabelecimentos_parquet = spark.read.parquet(
        'arquivos/estabelecimentos/parquet')

In [None]:
socios.write.parquet(
    path='arquivos/socios/parquet', mode="overwrite"
)

In [None]:
socios_parquet = spark.read.parquet(
        'arquivos/socios/parquet')

 # Particionamento dos dados

In [None]:
empresas.coalesce(1).write.csv(
    path='arquivos/empresas/csv-unico',
    mode='overwrite',
    sep=';',
    header=True
)

In [None]:
empresas.write.parquet(
    path='arquivos/empresas/parquet-partitionBy',
    mode='overwrite',
    partitionBy='porte_da_empresa'
)

# Finalizando o spark

In [None]:
spark.stop()