In [None]:
import zipfile

# 01. Começando o trabalho

In [None]:
# Para o reconhecimento do spark pelo ambiente, no caso de não iniciar pelo spark shell
import os
os.environ['JAVA_HOME'] = r'D:\Java\jdk-11.0.15'
os.environ['SPARK_HOME'] = 'D:\spark'
os.environ['PYSPARK_PYTHON'] = r'C:\Users\Adolfo\anaconda3\python.exe'
os.environ['PYSPARK_PYTHON_DRIVER'] = r'C:\Users\Adolfo\anaconda3\python.exe'

import findspark
findspark.init()

from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType, StringType
from pyspark.sql import functions as f

In [None]:
spark = SparkSession.builder\
    .master('local[*]') \
    .appName('Iniciando com Spark') \
    .getOrCreate()

### Ajuste para conseguir escrever o parquet na seção 6:

# Mudar uma configuração do spark context e reiniciá-lo; sobrescrevi a sessão do spark
# Resgata o spark context da sessão atual
sc = SparkContext.getOrCreate()
# Captura as configurações do spark context e muda a necessária
conf = sc.getConf()
conf.set('spark.sql.legacy.parquet.datetimeRebaseModeInWrite', 'LEGACY')
# Reiniciar o spark context
sc.stop()

spark = SparkSession.builder\
    .master('local[*]') \
    .appName('Iniciando com Spark') \
    .config(conf=conf)\
    .getOrCreate()

# 02. Carregamento de dados

In [None]:
# Extraindo as bases de dados zipadas
zipfile.ZipFile('empresas.zip', 'r').extractall('./')
zipfile.ZipFile('estabelecimentos.zip', 'r').extractall('./')
zipfile.ZipFile('socios.zip', 'r').extractall('./')

In [5]:
# Carregando no spark
empresas = spark.read.csv('empresas/', sep=';', inferSchema=True)
estabelecimentos = spark.read.csv('estabelecimentos/', sep=';', inferSchema=True)
socios = spark.read.csv('socios/', sep=';', inferSchema=True)

In [6]:
print(f'''Num. de linhas no dataframe empresas: {empresas.count()};
Num. de linhas no dataframe estabelecimentos: {estabelecimentos.count()};
Num. de linhas no dataframe socios: {socios.count()}.''')

Num. de linhas no dataframe empresas: 4585679;
Num. de linhas no dataframe estabelecimentos: 4836219;
Num. de linhas no dataframe socios: 2046430.


# 03. Manipulando os dados

In [7]:
# Novos nomes para as colunas
empresasColNames = ['cnpj_basico', 'razao_social_nome_empresarial', 'natureza_juridica',
                    'qualificacao_do_responsavel', 'capital_social_da_empresa', 'porte_da_empresa',
                    'ente_federativo_responsavel']
estabsColNames = ['cnpj_basico', 'cnpj_ordem', 'cnpj_dv', 'identificador_matriz_filial', 'nome_fantasia',
                  'situacao_cadastral', 'data_situacao_cadastral', 'motivo_situacao_cadastral',
                  'nome_da_cidade_no_exterior', 'pais', 'data_de_inicio_atividade',
                  'cnae_fiscal_principal', 'cnae_fiscal_secundaria', 'tipo_de_logradouro', 'logradouro',
                  'numero', 'complemento', 'bairro', 'cep', 'uf', 'municipio', 'ddd_1', 'telefone_1',
                  'ddd_2', 'telefone_2', 'ddd_do_fax', 'fax', 'correio_eletronico', 'situacao_especial',
                  'data_da_situacao_especial']
sociosColNames = ['cnpj_basico', 'identificador_de_socio', 'nome_do_socio_ou_razao_social',
                  'cnpj_ou_cpf_do_socio', 'qualificacao_do_socio', 'data_de_entrada_sociedade',
                  'pais', 'representante_legal', 'nome_do_representante',
                  'qualificacao_do_representante_legal', 'faixa_etaria']

# Para facilitar a renomeação
def renomear(dataframe, rename_list):
    
    for num in range(len(rename_list)):
        dataframe = dataframe.withColumnRenamed(f'_c{num}', rename_list[num])
        
    return dataframe

# Renomeando
empresas = renomear(empresas, empresasColNames)
estabelecimentos = renomear(estabelecimentos, estabsColNames)
socios = renomear(socios, sociosColNames)

In [8]:
display(empresas.limit(5).toPandas())
empresas.printSchema()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,400000,1,


root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: string (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [9]:
# Modificando o separador decimal e passando para tipo Double o campo capital_social_da_empresa
empresas = empresas\
    .withColumn(
        'capital_social_da_empresa',
        f.regexp_replace('capital_social_da_empresa', ',', '.').cast(DoubleType())
    )\

empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [10]:
# Transformando os campos da 'estabelecimentos' em data
estabelecimentos = estabelecimentos\
    .withColumn(
        "data_situacao_cadastral", 
        f.to_date(estabelecimentos.data_situacao_cadastral.cast(StringType()), 'yyyyMMdd')
    )\
    .withColumn(
        "data_de_inicio_atividade", 
        f.to_date(estabelecimentos.data_de_inicio_atividade.cast(StringType()), 'yyyyMMdd')
    )\
    .withColumn(
        "data_da_situacao_especial", 
        f.to_date(estabelecimentos.data_da_situacao_especial.cast(StringType()), 'yyyyMMdd')
    )

In [11]:
# O mesmo para 'socios'
socios = socios.withColumn(
    'data_de_entrada_sociedade',
    f.to_date(socios.data_de_entrada_sociedade.cast(StringType()), 'yyyyMMdd')
)

In [12]:
# Visualizar se a mudança foi feita
socios\
    .groupby(f.year(socios.data_de_entrada_sociedade))\
    .count()\
    .orderBy('count', ascending=False)\
    .limit(5)\
    .toPandas()

Unnamed: 0,year(data_de_entrada_sociedade),count
0,2020,125927
1,2019,118248
2,2005,111517
3,2018,99935
4,2017,90221


# 04. Seleções e consultas

### Seleções

In [13]:

empresas.select('natureza_juridica', 'porte_da_empresa', 'capital_social_da_empresa').show(5)

estabelecimentos.select('nome_fantasia', 
                        'municipio', 
                        f.year('data_de_inicio_atividade').alias('ano_de_inicio_atividade'),
                        f.month('data_de_inicio_atividade').alias('mes_de_inicio_atividade')).show(5)

socios.select('nome_do_socio_ou_razao_social', 'faixa_etaria', 
              f.year('data_de_entrada_sociedade').alias('ano_de_entrada')).show(5, False)

+-----------------+----------------+-------------------------+
|natureza_juridica|porte_da_empresa|capital_social_da_empresa|
+-----------------+----------------+-------------------------+
|             2240|               1|                      0.0|
|             2062|               5|                      0.0|
|             3034|               5|                      0.0|
|             2135|               5|                      0.0|
|             2062|               1|                   4000.0|
+-----------------+----------------+-------------------------+
only showing top 5 rows

+-----------------+---------+-----------------------+-----------------------+
|    nome_fantasia|municipio|ano_de_inicio_atividade|mes_de_inicio_atividade|
+-----------------+---------+-----------------------+-----------------------+
|   PIRAMIDE M. C.|     7107|                   1994|                      5|
|             null|     7107|                   1994|                      5|
|             null

### Valores nulos

In [14]:
# Checando valores nulos:
socios.select(
    [f.count(f.when(f.isnull(col), 1)).alias(col) for col in socios.columns]
).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,0,0,208,1234,0,1,2038255,0,1995432,0,0


In [15]:
# Preenchendo os valores nulos: 
socios = socios.na.fill(0)  # Preenche os valores None
socios = socios.na.fill('-')  # Preenche os valores NaN

socios.limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,1994-07-25,0,***000000**,-,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,1994-07-25,0,***000000**,-,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,1994-05-16,0,***000000**,-,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,1994-05-16,0,***000000**,-,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,1994-06-09,0,***000000**,-,0,8


### Ordenação

In [16]:
# Ordenando os dados:
socios\
    .select('nome_do_socio_ou_razao_social', 'faixa_etaria', f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
    .orderBy('ano_de_entrada', ascending=False)\
    .show(5, False)

+--------------------------------+------------+--------------+
|nome_do_socio_ou_razao_social   |faixa_etaria|ano_de_entrada|
+--------------------------------+------------+--------------+
|FERNANDO DO NASCIMENTO FERNANDES|6           |2021          |
|ALICE VASCONCELOS DE FARIA      |4           |2021          |
|IBRAHIM OZER                    |4           |2021          |
|LUCIANA DE SOUZA FERREIRA       |5           |2021          |
|YOSHIKUNI MURAKAMI              |8           |2021          |
+--------------------------------+------------+--------------+
only showing top 5 rows



In [17]:
# Pode ser feito com mais colunas: (e a ordem importa!)
socios\
    .select('nome_do_socio_ou_razao_social', 'faixa_etaria', f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
    .orderBy(['ano_de_entrada', 'faixa_etaria'], ascending=[False, False])\
    .show(5, False)

+-------------------------------+------------+--------------+
|nome_do_socio_ou_razao_social  |faixa_etaria|ano_de_entrada|
+-------------------------------+------------+--------------+
|MARIA RAIMUNDA DOS SANTOS LANZA|9           |2021          |
|MARIA JOSE DOMINGUES BONATO    |9           |2021          |
|DORIS PEREIRA GOMES JAZRA      |9           |2021          |
|NADIR BICHARA CHUAHY           |9           |2021          |
|RAIMUNDA TORRES MARTINS        |9           |2021          |
+-------------------------------+------------+--------------+
only showing top 5 rows



### Filtragem

In [18]:
# Passando o argumento na string, o operador de igualdade pode ser '=' ou '=='; passando como expressão do
# python, deve ser apenas o '==', como usual. 
empresas\
    .where('capital_social_da_empresa=50')\
    .limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,17350147,ERIK MARCELO DOS SANTOS 42107848858,2135,50,50.0,1,
1,17833214,ALEXANDRE MACHADO LIMA 73750123772,2135,50,50.0,1,
2,20860830,YASMIN MOURA DA FONSECA 13457709793,2135,50,50.0,1,
3,22242856,JOAO CESAR MESSIAS 08707149883,2135,50,50.0,1,
4,23238540,EVERTON ROBERTO DA SILVA 42101963809,2135,50,50.0,1,


In [19]:
# A função 'where' (homônima do SQL) é igual à filter do spark:
estabelecimentos\
    .filter(estabelecimentos.municipio == 3556909)\
    .toPandas()

# Nenhum estabelecimento do meu município ):
# https://www.ibge.gov.br/explica/codigos-dos-municipios.php

Unnamed: 0,cnpj_basico,cnpj_ordem,cnpj_dv,identificador_matriz_filial,nome_fantasia,situacao_cadastral,data_situacao_cadastral,motivo_situacao_cadastral,nome_da_cidade_no_exterior,pais,...,municipio,ddd_1,telefone_1,ddd_2,telefone_2,ddd_do_fax,fax,correio_eletronico,situacao_especial,data_da_situacao_especial


##### Comando *like*

In [21]:
# Outro maneira de filtrar é com o comando 'like':
df = spark.createDataFrame(
    [('RESTAURANTE DO RUI',), ('Juca restaurantes ltda',), ('Joca Restaurante',)],
    ['data']
)
df.toPandas()

Unnamed: 0,data
0,RESTAURANTE DO RUI
1,Juca restaurantes ltda
2,Joca Restaurante


# 05. Agregações e junções

### Sumarização

In [22]:
# Sumarização com contagem de sócios a cada ano de entrada, para 2010 ou depois
socios\
    .select(f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
    .where('ano_de_entrada >= 2010')\
    .groupby('ano_de_entrada').count()\
    .sort('ano_de_entrada')\
    .show()

+--------------+------+
|ano_de_entrada| count|
+--------------+------+
|          2010| 79337|
|          2011| 83906|
|          2012| 80101|
|          2013| 83919|
|          2014| 80590|
|          2015| 80906|
|          2016| 81587|
|          2017| 90221|
|          2018| 99935|
|          2019|118248|
|          2020|125927|
|          2021| 56316|
+--------------+------+



In [23]:
# Sumarização da quantidade de empresas e capital médio, por porte
empresas\
    .select('porte_da_empresa', 'capital_social_da_empresa', 'cnpj_basico')\
    .groupby('porte_da_empresa')\
    .agg(
        f.mean('capital_social_da_empresa').alias('capital_social_medio'),
        f.count('cnpj_basico').alias('frequencia')
    )\
    .orderBy('porte_da_empresa')\
    .show()

+----------------+--------------------+----------+
|porte_da_empresa|capital_social_medio|frequencia|
+----------------+--------------------+----------+
|            null|    8.35421888053467|      5985|
|               1|  339994.53313507047|   3129043|
|               3|  2601001.7677092687|    115151|
|               5|   708660.4208249793|   1335500|
+----------------+--------------------+----------+



In [24]:
# Uso do summary
empresas\
    .select('capital_social_da_empresa')\
    .summary()\
    .show()

# Summary exibe como padrão essas estatísticas, mas é possível simplificar a visáo passando um argumento
# summary('count', 'mean', 'stddev')

+-------+-------------------------+
|summary|capital_social_da_empresa|
+-------+-------------------------+
|  count|                  4585679|
|   mean|        503694.5478542674|
| stddev|     2.1118691490537727E8|
|    min|                      0.0|
|    25%|                      0.0|
|    50%|                   1000.0|
|    75%|                   7000.0|
|    max|         3.22014670262E11|
+-------+-------------------------+



### Junções

In [25]:
# Inner join
empresas_join = estabelecimentos.join(empresas, 'cnpj_basico', how='inner')
print(f'Num. de colunas no dataframe à esquerda: {len(empresas.columns)}')
print(f'Num. de colunas no dataframe à direita: {len(estabelecimentos.columns)}')
print(f'Num. de colunas após o inner join: {len(empresas_join.columns)}')

Num. de colunas no dataframe à esquerda: 7
Num. de colunas no dataframe à direita: 30
Num. de colunas após o inner join: 36


In [26]:
# Contagem de estabelecimentos/empresas por data de início da atividade
freq = empresas_join\
    .select('cnpj_basico', f.year('data_de_inicio_atividade').alias('ano_de_inicio'))\
    .where('ano_de_inicio >= 2010')\
    .groupBy('ano_de_inicio')\
    .agg(f.count('cnpj_basico').alias('frequencia'))\
    .sort('ano_de_inicio')

freq.show()

+-------------+----------+
|ano_de_inicio|frequencia|
+-------------+----------+
|         2010|    154159|
|         2011|    172677|
|         2012|    232480|
|         2013|    198424|
|         2014|    202276|
|         2015|    212523|
|         2016|    265417|
|         2017|    237292|
|         2018|    275435|
|         2019|    325922|
|         2020|    400654|
|         2021|    153275|
+-------------+----------+



In [27]:
# Adicionando uma última linha à query anterior no formato | Total: | ##### |, por meio do método union
freq.union(
    freq.select(
        f.lit('Total').alias('ano_de_inicio'),
        f.sum('frequencia').alias('frequencia')
    )
).toPandas()

Unnamed: 0,ano_de_inicio,frequencia
0,2010,154159
1,2011,172677
2,2012,232480
3,2013,198424
4,2014,202276
5,2015,212523
6,2016,265417
7,2017,237292
8,2018,275435
9,2019,325922


### Spark SQL Query

In [28]:
# É possível escrever código em SQL puro por meio de um método do módulo spark.sql:

# Criando uma view para ser usada no código
empresas.createOrReplaceTempView('empresasView')

# Uma seleção de todas as colunas:
spark.sql('SELECT * FROM empresasView').limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0.0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0.0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0.0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0.0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,4000.0,1,


In [29]:
# A pesquisa por capital social da empresa = 50 em sql fica:
spark.sql(
    '''
    SELECT *
    FROM empresasView
    WHERE capital_social_da_empresa = 50
    '''
).limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,17350147,ERIK MARCELO DOS SANTOS 42107848858,2135,50,50.0,1,
1,17833214,ALEXANDRE MACHADO LIMA 73750123772,2135,50,50.0,1,
2,20860830,YASMIN MOURA DA FONSECA 13457709793,2135,50,50.0,1,
3,22242856,JOAO CESAR MESSIAS 08707149883,2135,50,50.0,1,
4,23238540,EVERTON ROBERTO DA SILVA 42101963809,2135,50,50.0,1,


In [30]:
# A vista de capital médio por porte:
spark.sql('''
    SELECT porte_da_empresa, MEAN(capital_social_da_empresa) AS capital_social_medio
    FROM empresasView
    GROUP BY porte_da_empresa
    ORDER BY porte_da_empresa
''').show()

+----------------+--------------------+
|porte_da_empresa|capital_social_medio|
+----------------+--------------------+
|            null|    8.35421888053467|
|               1|  339994.53313507047|
|               3|  2601001.7677092687|
|               5|   708660.4208249793|
+----------------+--------------------+



In [31]:
# E as linhas finais da contagem de estabelecimentos por ano de início
freq.createOrReplaceTempView('freqView')

spark.sql('''
    SELECT * FROM freqView
    UNION ALL
    SELECT 'Total' AS data_de_inicio, SUM(frequencia) AS frequencia
    FROM freqView
''').show()


+-------------+----------+
|ano_de_inicio|frequencia|
+-------------+----------+
|         2010|    154159|
|         2011|    172677|
|         2012|    232480|
|         2013|    198424|
|         2014|    202276|
|         2015|    212523|
|         2016|    265417|
|         2017|    237292|
|         2018|    275435|
|         2019|    325922|
|         2020|    400654|
|         2021|    153275|
|        Total|   2830534|
+-------------+----------+



# 06. Formas de armazenamento

In [32]:
# Estrutura de output pra csv: DataFrame --> DataFrameWriter --> csv
empresas.write.csv(
    path='./csv/',
    mode='overwrite',
    sep=';',
    header=True
)

In [33]:
# No caso do formato parquet, a estrutura é parecida:
estabelecimentos.write.parquet(
    path='./parquet/',
    mode='overwrite'
)
# E para leitura, seria:
estabelecimentos_parquet = spark.read.parquet(
    './parquet/'
)

In [34]:
# O spark particiona o arquivo exportado por padrão, com a melhor performance, é possível mudar isso,
# ao custo de performance, com o coalesce (diminuir) ou o repartition (aumentar ou diminuir)
socios.coalesce(1).write.csv(
    path='./csv_unico/',
    mode='overwrite',
    sep=';',
    header=True
)

In [35]:
# A partição também pode ser feita levando em conta alguma coluna do dataframe usando partitionBy
empresas.write.parquet(
    path='.parquet-partitionBy/',
    mode='overwrite',
    partitionBy='porte_da_empresa'
)

In [36]:
# Para terminar a spark session:
spark.stop()