<a href="https://colab.research.google.com/github/afonso-hub/PySpark/blob/main/Atividade_spark_01.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Utilizando o Spark no Google Colab

Para facilitar o desenvolvimento de nosso projeto neste curso vamos utilizar o Google Colab como ferramenta e para configurar o PySpark basta executar os comandos abaixo na própria célula do seu *notebook*.


In [None]:
# instalar as dependências
!apt-get update -qq
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()

# Carregamento de Dados
---

## [SparkSession](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.SparkSession.html)

O ponto de entrada para programar o Spark com a API Dataset e DataFrame.

Uma SparkSession pode ser utilizada para criar DataFrames, registrar DataFrames como tabelas, executar consultas SQL em tabelas, armazenar em cache e ler arquivos parquet. Para criar uma SparkSession, use o seguinte padrão de construtor:

In [None]:
# from pyspark.sql import SparkSession
# spark = SparkSession.builder \
#      .master('local[*]') \
#      .appName("Iniciando com Spark") \
#      .getOrCreate()

In [None]:
# spark


In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
     .master('local[*]')\
     .appName("Iniciando com Spark")\
     .config('spark.ui.port', '4050')\
     .getOrCreate()
     

In [None]:
!wget -q https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip

Archive:  ngrok-stable-linux-amd64.zip
  inflating: ngrok                   


In [None]:
get_ipython().system_raw('./ngrok authtoken 2IK0sufmSmGgv3pybsmBHUMdyGC_3hG6FVrvHcxPci8weJGGK')
get_ipython().system_raw('./ngrok http 4050 &')

In [None]:
!curl -s http://localhost:4040/api/tunnels

{"tunnels":[{"name":"command_line","uri":"/api/tunnels/command_line","public_url":"https://063c-35-245-27-103.ngrok.io","proto":"https","config":{"addr":"http://localhost:4050","inspect":true},"metrics":{"conns":{"count":0,"gauge":0,"rate1":0,"rate5":0,"rate15":0,"p50":0,"p90":0,"p95":0,"p99":0},"http":{"count":0,"rate1":0,"rate5":0,"rate15":0,"p50":0,"p90":0,"p95":0,"p99":0}}},{"name":"command_line (http)","uri":"/api/tunnels/command_line%20%28http%29","public_url":"http://063c-35-245-27-103.ngrok.io","proto":"http","config":{"addr":"http://localhost:4050","inspect":true},"metrics":{"conns":{"count":0,"gauge":0,"rate1":0,"rate5":0,"rate15":0,"p50":0,"p90":0,"p95":0,"p99":0},"http":{"count":0,"rate1":0,"rate5":0,"rate15":0,"p50":0,"p90":0,"p95":0,"p99":0}}}],"uri":"/api/tunnels"}


In [None]:
spark

In [None]:
data =[('Zeca','35'), ('Eva','29')]
colNames = ['Nome', 'Idade']

In [None]:
df = spark.createDataFrame(data,colNames)

In [None]:
df.show()

+----+-----+
|Nome|Idade|
+----+-----+
|Zeca|   35|
| Eva|   29|
+----+-----+



In [None]:
df.toPandas()

Unnamed: 0,Nome,Idade
0,Zeca,35
1,Eva,29


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import zipfile

In [None]:
#zipfile.ZipFile('/content/drive/MyDrive/curso-spark/empresas.zip', 'r').extractall('/content/drive/MyDrive/curso-spark')

#EMPRESAS


In [None]:
path ='/content/drive/MyDrive/curso-spark/empresas'
empresas = spark.read.csv(path, sep=';',inferSchema=True)

In [None]:
empresas.limit(9).toPandas()

Unnamed: 0,_c0,_c1,_c2,_c3,_c4,_c5,_c6
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,400000,1,
5,8416,ELETRICA RUBI LTDA,2062,49,0,5,
6,8992,SHIROMA VEICULOS LTDA.,2062,49,0,5,
7,9091,CONTATOS BAR E LANCHONETE LTDA,2062,49,0,5,
8,9614,ANTONIA APARECIDA DE SOUZA ULIANA,2135,50,0,5,


In [None]:
empresas.limit(5).show()

+----+--------------------+----+---+-------+---+----+
| _c0|                 _c1| _c2|_c3|    _c4|_c5| _c6|
+----+--------------------+----+---+-------+---+----+
| 306|FRANCAMAR REFRIGE...|2240| 49|   0,00|  1|null|
|1355|BRASILEIRO & OLIV...|2062| 49|   0,00|  5|null|
|4820|REGISTRO DE IMOVE...|3034| 32|   0,00|  5|null|
|5347|ROSELY APARECIDA ...|2135| 50|   0,00|  5|null|
|6846|BADU E FILHOS TEC...|2062| 49|4000,00|  1|null|
+----+--------------------+----+---+-------+---+----+



In [None]:
empresasColNames = ['cnpj_basico', 'razao_social_nome_empresarial', 'natureza_juridica', 'qualificacao_do_responsavel', 'capital_social_da_empresa', 'porte_da_empresa', 'ente_federativo_responsavel']

In [None]:
for item in enumerate(empresasColNames):
  print(item)

(0, 'cnpj_basico')
(1, 'razao_social_nome_empresarial')
(2, 'natureza_juridica')
(3, 'qualificacao_do_responsavel')
(4, 'capital_social_da_empresa')
(5, 'porte_da_empresa')
(6, 'ente_federativo_responsavel')


In [None]:
for index, colName in enumerate(empresasColNames):
   empresas = empresas.withColumnRenamed(f"_c{index}", colName)
  
empresas.columns

['cnpj_basico',
 'razao_social_nome_empresarial',
 'natureza_juridica',
 'qualificacao_do_responsavel',
 'capital_social_da_empresa',
 'porte_da_empresa',
 'ente_federativo_responsavel']

In [None]:
empresas.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,400000,1,


In [None]:
empresas.count()

4585679

Convertendo String - Double

In [None]:
from pyspark.sql.types import DoubleType, StringType
from pyspark.sql import functions as f

In [None]:
empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: string (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [None]:
empresas =  empresas.withColumn('capital_social_da_empresa', f.regexp_replace('capital_social_da_empresa',',', '.'))
empresas.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0.0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0.0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0.0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0.0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,4000.0,1,


In [None]:
empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: string (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [None]:
empresas = empresas.withColumn('capital_social_da_empresa', empresas['capital_social_da_empresa'].cast(DoubleType()))

In [None]:
empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [None]:
empresas.select('*').show(5, False)

+-----------+--------------------------------------------------------------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial                                                               |natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+--------------------------------------------------------------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|306        |FRANCAMAR REFRIGERACAO TECNICA S/C LTDA                                                     |2240             |49                         |0.0                      |1               |null                       |
|1355       |BRASILEIRO & OLIVEIRA LTDA                                                                 

In [None]:
empresas.select('natureza_juridica', 'porte_da_empresa', 'capital_social_da_empresa').show(5, False)

+-----------------+----------------+-------------------------+
|natureza_juridica|porte_da_empresa|capital_social_da_empresa|
+-----------------+----------------+-------------------------+
|2240             |1               |0.0                      |
|2062             |5               |0.0                      |
|3034             |5               |0.0                      |
|2135             |5               |0.0                      |
|2062             |1               |4000.0                   |
+-----------------+----------------+-------------------------+
only showing top 5 rows



Sumarizando os dados    #aggregate(agregacao)

In [None]:
empresas.select('cnpj_basico', 'porte_da_empresa', 'capital_social_da_empresa')\
    .groupBy('porte_da_empresa')\
    .agg(
        f.avg("capital_social_da_empresa").alias("capital_social_medio"),
        f.count("cnpj_basico").alias("frequencia")
    )\
    .orderBy("porte_da_empresa", ascending=True)\
    .show()

+----------------+--------------------+----------+
|porte_da_empresa|capital_social_medio|frequencia|
+----------------+--------------------+----------+
|            null|    8.35421888053467|      5985|
|               1|  339994.53313506936|   3129043|
|               3|  2601001.7677092673|    115151|
|               5|   708660.4208249798|   1335500|
+----------------+--------------------+----------+



In [None]:
# .summary("count","mean","stddev","min","25%","50%","75%","max")
empresas\
    .select("capital_social_da_empresa")\
    .summary()\
    .show()

+-------+-------------------------+
|summary|capital_social_da_empresa|
+-------+-------------------------+
|  count|                  4585679|
|   mean|        503694.5478542675|
| stddev|     2.1118691490537405E8|
|    min|                      0.0|
|    25%|                      0.0|
|    50%|                   1000.0|
|    75%|                   7000.0|
|    max|         3.22014670262E11|
+-------+-------------------------+



Arquivo CSV

In [None]:
empresas.write.csv(
    path='/content/drive/MyDrive/curso-spark/empresas/CSV',
    mode='overwrite',
    sep=';',
    header=True
)

In [None]:
empresas2 = spark.read.csv('/content/drive/MyDrive/curso-spark/empresas/CSV',
            sep=';',
            inferSchema=True,
            header=True)

In [None]:
empresas2.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [None]:
empresas2.toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0.0,1.0,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0.0,5.0,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0.0,5.0,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0.0,5.0,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,4000.0,1.0,
...,...,...,...,...,...,...,...
4585674,97552959,EDNALDO GOMES SOARES,2135,50,30000.0,1.0,
4585675,97554296,ANA PAULA MIGUEL,2135,50,10000.0,1.0,
4585676,97836258,COOPERATIVA AGRICOLA MISTA DE TAQUARI LTDA,2143,16,0.0,5.0,
4585677,97881551,IRMAOS MEGIER LTDA,2062,49,0.0,5.0,


Arquivo Parquet


In [None]:
empresas.write.parquet(
    path='/content/drive/MyDrive/curso-spark/empresas/Parquet',
    mode='overwrite'
)


In [None]:

empresas_parquet = spark.read.parquet(
  '/content/drive/MyDrive/curso-spark/empresas/Parquet'
)

In [None]:
empresas_parquet.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



Particionamento dos dados


In [None]:
empresas_parquet.coalesce(1).write.csv(
    path='/content/drive/MyDrive/curso-spark/empresas/csv-unico',
    mode='overwrite',
    sep=';',
    header=True
)

In [None]:
empresas_parquet.write.parquet(
    path='/content/drive/MyDrive/curso-spark/empresas/parquet-partitionBy',
    mode='overwrite',
    partitionBy='porte_da_empresa'
)

#SOCIOS

In [None]:
zipfile.ZipFile('/content/drive/MyDrive/curso-spark/socios.zip', 'r').extractall('/content/drive/MyDrive/curso-spark')

In [None]:
path ='/content/drive/MyDrive/curso-spark/socios'
socios = spark.read.csv(path, sep=';', inferSchema=True)


In [None]:
socios.limit(5).toPandas()

Unnamed: 0,_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7,_c8,_c9,_c10
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,19940725,,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,19940725,,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,19940516,,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,19940516,,***000000**,,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,19940609,,***000000**,,0,8


In [None]:
socioColNames = ['cnpj_basico', 'identificador_de_socio', 'nome_do_socio_ou_razao_social', 'cnpj_ou_cpf_do_socio', 'qualificacao_do_socio', 'data_de_entrada_sociedade', 'pais', 'representante_legal', 'nome_do_representante', 'qualificacao_do_representante_legal', 'faixa_etaria']

In [None]:
for index, colName in enumerate(socioColNames):
  socios = socios.withColumnRenamed(f"_c{index}", colName)

socios.columns

['cnpj_basico',
 'identificador_de_socio',
 'nome_do_socio_ou_razao_social',
 'cnpj_ou_cpf_do_socio',
 'qualificacao_do_socio',
 'data_de_entrada_sociedade',
 'pais',
 'representante_legal',
 'nome_do_representante',
 'qualificacao_do_representante_legal',
 'faixa_etaria']

In [None]:
socios.limit(9).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,19940725,,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,19940725,,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,19940516,,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,19940516,,***000000**,,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,19940609,,***000000**,,0,8
5,14798,2,CLOD ASSAD FADEL,***205668**,22,19940609,,***000000**,,0,6
6,17826,2,WALKYRIA ALGARVES,***689078**,49,19970227,,***000000**,,0,7
7,17826,2,SEBASTIAO JADIR TEIXEIRA NUNES,***904728**,49,20090813,,***000000**,,0,5
8,19491,2,JOSE JOAO ADAMO,***235698**,49,19940615,,***000000**,,0,7


In [None]:
socios.count()

2046430


Convertendo String - Date

In [None]:
socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: integer (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



In [None]:
socios = socios.withColumn("data_de_entrada_sociedade", f.to_date(socios.data_de_entrada_sociedade.cast(StringType()), 'yyyyMMdd'))

In [None]:
 socios.limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,1994-07-25,,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,1994-07-25,,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,1994-05-16,,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,1994-05-16,,***000000**,,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,1994-06-09,,***000000**,,0,8


In [None]:
# socios.drop('data_de_entrada_sociedade').collect()

In [None]:
socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: date (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



Sumarizando os dados

In [None]:
socios\
    .select(f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
    .where('ano_de_entrada >= 2010')\
    .groupBy('ano_de_entrada')\
    .count()\
    .orderBy('ano_de_entrada', ascending=True)\
    .show()

+--------------+------+
|ano_de_entrada| count|
+--------------+------+
|          2010| 79337|
|          2011| 83906|
|          2012| 80101|
|          2013| 83919|
|          2014| 80590|
|          2015| 80906|
|          2016| 81587|
|          2017| 90221|
|          2018| 99935|
|          2019|118248|
|          2020|125927|
|          2021| 56316|
+--------------+------+



In [None]:
# .summary("count","mean","stddev","min","25%","50%","75%","max")
socios.select('faixa_etaria')\
    .summary()\
    .show()

+-------+-----------------+
|summary|     faixa_etaria|
+-------+-----------------+
|  count|          2046430|
|   mean|5.396815918453111|
| stddev|1.687799526189411|
|    min|                0|
|    25%|                4|
|    50%|                5|
|    75%|                7|
|    max|                9|
+-------+-----------------+



Arquivo CSV


In [None]:
socios.write.csv(
    path='/content/drive/MyDrive/curso-spark/socios/CSV',
    mode='overwrite',
    sep=';',
    header=True
)

In [None]:
socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: date (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



In [None]:
socios2 = spark.read.csv('/content/drive/MyDrive/curso-spark/socios/CSV',
            sep=';',
            inferSchema=True,
            header=True)

In [None]:
socios2.toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,1994-07-25,,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,1994-07-25,,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,1994-05-16,,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,1994-05-16,,***000000**,,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,1994-06-09,,***000000**,,0,8
...,...,...,...,...,...,...,...,...,...,...,...
2046425,97546747,2,JAQUELINE PASSOS MACHADO DA SILVA,***739790**,49,2011-07-12,,***000000**,,0,4
2046426,97546747,2,JOABE OLIVEIRA DA SILVA,***016170**,49,2011-07-12,,***000000**,,0,4
2046427,97551200,2,AGOSTINHO RABELO MARTINS,***871962**,49,2014-10-16,,***000000**,,0,5
2046428,97551200,2,SONIA LUCIA RABELO MARTINS,***010942**,22,2014-10-16,,***000000**,,0,8


Arquivo Parquet

In [None]:
socios.write.parquet(
    path='/content/drive/MyDrive/curso-spark/socios/Parquet',
    mode='overwrite',
   
)

In [None]:
socios_parquet = spark.read.parquet(
    
    '/content/drive/MyDrive/curso-spark/socios/Parquet' 
)

In [None]:
socios_parquet.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: date (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)




Particionamento dos dados

In [None]:
socios_parquet.coalesce(1).write.csv(
    path='/content/drive/MyDrive/curso-spark/socios/csv-unico',
    mode='overwrite',
    sep=';',
    header=True
)

In [None]:
socios_parquet.write.parquet(
    path='/content/drive/MyDrive/curso-spark/socios/parquet-partitionBy',
    mode='overwrite',
    partitionBy='pais'
)



Ordenando os dados


In [None]:
socios.select('nome_do_socio_ou_razao_social', 'faixa_etaria', f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
    .orderBy('ano_de_entrada', ascending=False)\
    .show(5, False)

+----------------------------------------+------------+--------------+
|nome_do_socio_ou_razao_social           |faixa_etaria|ano_de_entrada|
+----------------------------------------+------------+--------------+
|KASSIANO RODRIGO KICHILESKI             |4           |2021          |
|LEONARDO MENNA BARRETO LARANJA GONCALVES|5           |2021          |
|MANOEL ADRIANO COSTA BARBOSA            |6           |2021          |
|ANTONOALDO GRANGEON TRANCOSO NEVES      |5           |2021          |
|MARIA SUELY DE MOURA                    |5           |2021          |
+----------------------------------------+------------+--------------+
only showing top 5 rows



In [None]:
socios.select('nome_do_socio_ou_razao_social', 'faixa_etaria', f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
            .orderBy(['ano_de_entrada', 'faixa_etaria'], ascending=False)\
            .show(5, False)

+-------------------------------+------------+--------------+
|nome_do_socio_ou_razao_social  |faixa_etaria|ano_de_entrada|
+-------------------------------+------------+--------------+
|MARIA RAIMUNDA DOS SANTOS LANZA|9           |2021          |
|MARIA JOSE DOMINGUES BONATO    |9           |2021          |
|DORIS PEREIRA GOMES JAZRA      |9           |2021          |
|NADIR BICHARA CHUAHY           |9           |2021          |
|RAIMUNDA TORRES MARTINS        |9           |2021          |
+-------------------------------+------------+--------------+
only showing top 5 rows



In [None]:
socios.select('nome_do_socio_ou_razao_social', 'faixa_etaria', f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
            .orderBy(['ano_de_entrada', 'faixa_etaria'], ascending=[False, False])\
            .show(5, False)

+-------------------------------+------------+--------------+
|nome_do_socio_ou_razao_social  |faixa_etaria|ano_de_entrada|
+-------------------------------+------------+--------------+
|MARIA RAIMUNDA DOS SANTOS LANZA|9           |2021          |
|MARIA JOSE DOMINGUES BONATO    |9           |2021          |
|DORIS PEREIRA GOMES JAZRA      |9           |2021          |
|NADIR BICHARA CHUAHY           |9           |2021          |
|RAIMUNDA TORRES MARTINS        |9           |2021          |
+-------------------------------+------------+--------------+
only showing top 5 rows



Contagem de nulos


In [None]:
socios.select([f.count(f.when(f.isnull(c), 1)).alias(c) for c in socios.columns]).show()

+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+-------+-------------------+---------------------+-----------------------------------+------------+
|cnpj_basico|identificador_de_socio|nome_do_socio_ou_razao_social|cnpj_ou_cpf_do_socio|qualificacao_do_socio|data_de_entrada_sociedade|   pais|representante_legal|nome_do_representante|qualificacao_do_representante_legal|faixa_etaria|
+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+-------+-------------------+---------------------+-----------------------------------+------------+
|          0|                     0|                          208|                1234|                    0|                        1|2038255|                  0|              1995432|                                  0|           0|
+-----------+----------------------+------------------------

Filtrando os dados

In [None]:
socios.select("nome_do_socio_ou_razao_social")\
      .filter(socios.nome_do_socio_ou_razao_social.startswith("RODRIGO"))\
      .filter(socios.nome_do_socio_ou_razao_social.endswith("DIAS"))\
      .limit(10)\
      .toPandas()

Unnamed: 0,nome_do_socio_ou_razao_social
0,RODRIGO BENASSI DIAS
1,RODRIGO RUDIBERTO DIAS
2,RODRIGO AURELIANO DIAS
3,RODRIGO SIMOES LEMOS DIAS
4,RODRIGO GEORGE DIAS
5,RODRIGO AUGUSTO FELICIO DIAS
6,RODRIGO FERNANDES DIAS
7,RODRIGO GARRIDO DIAS
8,RODRIGO OLIVEIRA DIAS
9,RODRIGO GONCALVES DIAS


#ESTABELECIMENTOS

In [None]:
zipfile.ZipFile('/content/drive/MyDrive/curso-spark/estabelecimentos.zip', 'r').extractall('/content/drive/MyDrive/curso-spark')

In [None]:
path ='/content/drive/MyDrive/curso-spark/estabelecimentos'
estabelecimentos = spark.read.csv(path,sep=';', inferSchema=True)

In [None]:
estaColNames = ['cnpj_basico', 'cnpj_ordem', 'cnpj_dv', 'identificador_matriz_filial', 'nome_fantasia', 'sintuacao_cadastral', 'data_situacao_cadastral', 'motivo_situacao_cadastral', 'nome_da_cidade_no_exterior', 'pais', 'data_de_inicio_atividade', 'cnae_fiscal_principal', 'cnae_fiscal_secundaria', 'tipo_de_logradouro',  'logradouro', 'numero', 'complemento', 'bairro', 'cep', 'uf', 'municipio', 'ddd_1', 'telefone_1', 'ddd_2', 'telefone_2', 'ddd_do_fax', 'fax', 'correio_eletronico', 'situacao_especial', 'data_da_situacao_especial']

In [None]:
for index, colName in enumerate(estaColNames):
  estabelecimentos = estabelecimentos.withColumnRenamed(f"_c{index}", colName)

estabelecimentos.columns

['cnpj_basico',
 'cnpj_ordem',
 'cnpj_dv',
 'identificador_matriz_filial',
 'nome_fantasia',
 'sintuacao_cadastral',
 'data_situacao_cadastral',
 'motivo_situacao_cadastral',
 'nome_da_cidade_no_exterior',
 'pais',
 'data_de_inicio_atividade',
 'cnae_fiscal_principal',
 'cnae_fiscal_secundaria',
 'tipo_de_logradouro',
 'logradouro',
 'numero',
 'complemento',
 'bairro',
 'cep',
 'uf',
 'municipio',
 'ddd_1',
 'telefone_1',
 'ddd_2',
 'telefone_2',
 'ddd_do_fax',
 'fax',
 'correio_eletronico',
 'situacao_especial',
 'data_da_situacao_especial']

In [None]:
estabelecimentos.limit(5).toPandas()

Unnamed: 0,cnpj_basico,cnpj_ordem,cnpj_dv,identificador_matriz_filial,nome_fantasia,sintuacao_cadastral,data_situacao_cadastral,motivo_situacao_cadastral,nome_da_cidade_no_exterior,pais,...,municipio,ddd_1,telefone_1,ddd_2,telefone_2,ddd_do_fax,fax,correio_eletronico,situacao_especial,data_da_situacao_especial
0,1879,1,96,1,PIRAMIDE M. C.,8,20011029,1,,,...,7107,,,,,,,,,
1,2818,1,43,1,,8,20081231,71,,,...,7107,,,,,,,,,
2,3110,1,7,1,,8,19971231,1,,,...,7107,,,,,,,,,
3,3733,1,80,1,,8,20081231,71,,,...,7107,,,,,,,,,
4,4628,3,27,2,EMBROIDERY & GIFT,8,19980429,1,,,...,7075,,,,,,,,,


In [None]:
estabelecimentos.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- sintuacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: integer (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: integer (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: st


Convertendo String - Date

In [None]:
estabelecimentos = estabelecimentos\
      .withColumn("data_situacao_cadastral", f.to_date(estabelecimentos.data_situacao_cadastral.cast(StringType()),'yyyyMMdd'))\
      .withColumn("data_de_inicio_atividade", f.to_date(estabelecimentos.data_de_inicio_atividade.cast(StringType()), 'yyyyMMdd'))\
      .withColumn("data_da_situacao_especial", f.to_date(estabelecimentos.data_da_situacao_especial.cast(StringType()), 'yyyyMMdd'))

In [None]:
estabelecimentos.limit(5).toPandas()

Unnamed: 0,cnpj_basico,cnpj_ordem,cnpj_dv,identificador_matriz_filial,nome_fantasia,sintuacao_cadastral,data_situacao_cadastral,motivo_situacao_cadastral,nome_da_cidade_no_exterior,pais,...,municipio,ddd_1,telefone_1,ddd_2,telefone_2,ddd_do_fax,fax,correio_eletronico,situacao_especial,data_da_situacao_especial
0,1879,1,96,1,PIRAMIDE M. C.,8,2001-10-29,1,,,...,7107,,,,,,,,,
1,2818,1,43,1,,8,2008-12-31,71,,,...,7107,,,,,,,,,
2,3110,1,7,1,,8,1997-12-31,1,,,...,7107,,,,,,,,,
3,3733,1,80,1,,8,2008-12-31,71,,,...,7107,,,,,,,,,
4,4628,3,27,2,EMBROIDERY & GIFT,8,1998-04-29,1,,,...,7075,,,,,,,,,


In [None]:
estabelecimentos.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- sintuacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: date (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: date (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: string (

In [None]:
estabelecimentos.select('nome_fantasia', 'municipio', 'data_de_inicio_atividade').show(5, False)

+-----------------+---------+------------------------+
|nome_fantasia    |municipio|data_de_inicio_atividade|
+-----------------+---------+------------------------+
|PIRAMIDE M. C.   |7107     |1994-05-09              |
|null             |7107     |1994-05-12              |
|null             |7107     |1994-05-12              |
|null             |7107     |1994-05-13              |
|EMBROIDERY & GIFT|7075     |1995-05-09              |
+-----------------+---------+------------------------+
only showing top 5 rows



In [None]:
estabelecimentos.select('nome_fantasia', 'municipio', f.year('data_de_inicio_atividade').alias("ano_de_inicio_atividade "),\
                        f.month('data_de_inicio_atividade').alias("mes_de_inicio_atividade")).show(5)

+-----------------+---------+------------------------+-----------------------+
|    nome_fantasia|municipio|ano_de_inicio_atividade |mes_de_inicio_atividade|
+-----------------+---------+------------------------+-----------------------+
|   PIRAMIDE M. C.|     7107|                    1994|                      5|
|             null|     7107|                    1994|                      5|
|             null|     7107|                    1994|                      5|
|             null|     7107|                    1994|                      5|
|EMBROIDERY & GIFT|     7075|                    1995|                      5|
+-----------------+---------+------------------------+-----------------------+
only showing top 5 rows



Sumarizando os dados

In [None]:
estabelecimentos\
    .select("numero")\
    .summary()\
    .show()

+-------+--------------------+
|summary|              numero|
+-------+--------------------+
|  count|             4836219|
|   mean|1.433247227024927...|
| stddev|                 NaN|
|    min|                 !*%|
|    25%|                82.0|
|    50%|               261.0|
|    75%|               698.0|
|    max|                �SNX|
+-------+--------------------+



Arquivos CSV

In [None]:
estabelecimentos.write.csv(
    path='/content/drive/MyDrive/curso-spark/estabelecimentos/CSV',
    mode='overwrite',
    sep=';',
    header=True
    )

In [None]:
estabelecimentos.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- sintuacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: date (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: date (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: string (

In [None]:
estabelecimentos2 = spark.read.csv('/content/drive/MyDrive/curso-spark/estabelecimentos/CSV',
                                   sep=';',
                                   inferSchema=True,
                                   header=True)

In [None]:
estabelecimentos2.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- sintuacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: string (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: string (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: stri

JOIN


In [None]:
estabelecimentos.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- sintuacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: date (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: date (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: string (

In [None]:
empresas_join = estabelecimentos.join(empresas, 'cnpj_basico', how='inner')
empresas_join.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- sintuacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: date (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: date (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: string (

In [None]:
freq = empresas_join\
    .select(
        'cnpj_basico', 
        f.year('data_de_inicio_atividade').alias('data_de_inicio')
    )\
    .where('data_de_inicio >= 2010')\
    .groupBy('data_de_inicio')\
    .agg(f.count("cnpj_basico").alias("frequencia"))\
    .orderBy('data_de_inicio', ascending=True)

In [None]:
freq.union(
    freq.select(
        f.lit('Total').alias('data_de_inicio'),
        f.sum(freq.frequencia).alias('frequencia')   
    )
).show()

+--------------+----------+
|data_de_inicio|frequencia|
+--------------+----------+
|          2010|    154159|
|          2011|    172677|
|          2012|    232480|
|          2013|    198424|
|          2014|    202276|
|          2015|    212523|
|          2016|    265417|
|          2017|    237292|
|          2018|    275435|
|          2019|    325922|
|          2020|    400654|
|          2021|    153275|
|         Total|   2830534|
+--------------+----------+



Spark SQL

In [None]:
empresas.createOrReplaceTempView("empresasView")

In [None]:
spark.sql("SELECT * FROM empresasView").show(5)

+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial|natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|        306|         FRANCAMAR REFRIGE...|             2240|                         49|                      0.0|               1|                       null|
|       1355|         BRASILEIRO & OLIV...|             2062|                         49|                      0.0|               5|                       null|
|       4820|         REGISTRO DE IMOVE...|             3034|                         32|                      0.0|               5|                       null|
|       5347|         ROSELY APARE

In [None]:
spark\
    .sql("""
        SELECT * 
            FROM empresasView 
            WHERE capital_social_da_empresa = 50
    """)\
    .show(5)

+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial|natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|   17350147|         ERIK MARCELO DOS ...|             2135|                         50|                     50.0|               1|                       null|
|   17833214|         ALEXANDRE MACHADO...|             2135|                         50|                     50.0|               1|                       null|
|   20860830|         YASMIN MOURA DA F...|             2135|                         50|                     50.0|               1|                       null|
|   22242856|         JOAO CESAR M

In [None]:
spark\
    .sql("""
        SELECT porte_da_empresa, MEAN(capital_social_da_empresa) AS Media 
            FROM empresasView 
            GROUP BY porte_da_empresa
    """)\
    .show(5)

+----------------+------------------+
|porte_da_empresa|             Media|
+----------------+------------------+
|            null|  8.35421888053467|
|               1|339994.53313506936|
|               3|2601001.7677092673|
|               5| 708660.4208249798|
+----------------+------------------+



In [None]:
empresas_join.createOrReplaceTempView("empresasJoinView")

In [None]:
freq = spark\
    .sql("""
        SELECT YEAR(data_de_inicio_atividade) AS data_de_inicio, COUNT(cnpj_basico) AS count
            FROM empresasJoinView 
            WHERE YEAR(data_de_inicio_atividade) >= 2010
            GROUP BY data_de_inicio
            ORDER BY data_de_inicio
    """)

freq\
    .show()

+--------------+------+
|data_de_inicio| count|
+--------------+------+
|          2010|154159|
|          2011|172677|
|          2012|232480|
|          2013|198424|
|          2014|202276|
|          2015|212523|
|          2016|265417|
|          2017|237292|
|          2018|275435|
|          2019|325922|
|          2020|400654|
|          2021|153275|
+--------------+------+



In [None]:
empresas_join\
    .select(f.year(empresas_join.data_de_inicio_atividade).alias('data_de_inicio'))\
    .where("data_de_inicio >= 2010")\
    .groupBy('data_de_inicio')\
    .count()\
    .orderBy('data_de_inicio')\
    .show()

+--------------+------+
|data_de_inicio| count|
+--------------+------+
|          2010|154159|
|          2011|172677|
|          2012|232480|
|          2013|198424|
|          2014|202276|
|          2015|212523|
|          2016|265417|
|          2017|237292|
|          2018|275435|
|          2019|325922|
|          2020|400654|
|          2021|153275|
+--------------+------+



In [None]:
freq.createOrReplaceTempView("freqView")

In [None]:
spark\
    .sql("""
        SELECT *
            FROM freqView
        UNION ALL
        SELECT 'Total' AS data_de_inicio, SUM(count) AS count
            FROM freqView
    """)\
    .show()

+--------------+-------+
|data_de_inicio|  count|
+--------------+-------+
|          2010| 154159|
|          2011| 172677|
|          2012| 232480|
|          2013| 198424|
|          2014| 202276|
|          2015| 212523|
|          2016| 265417|
|          2017| 237292|
|          2018| 275435|
|          2019| 325922|
|          2020| 400654|
|          2021| 153275|
|         Total|2830534|
+--------------+-------+



Arquivo Parquet

In [None]:
estabelecimentos2.write.parquet(
    path='/content/drive/MyDrive/curso-spark/estabelecimentos/Parquet',
    mode='overwrite'
   
)

In [None]:
estabe_parquet=spark.read.parquet(
    '/content/drive/MyDrive/curso-spark/estabelecimentos/Parquet' 
)

In [None]:
estabe_parquet.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- sintuacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: string (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: string (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: stri

Particionamento dos dados

In [None]:
estabe_parquet.coalesce(1).write.csv(
    path='/content/drive/MyDrive/curso-spark/estabelecimentos/csv-unico',
    mode='overwrite',
    sep=';',
    header=True
    
)

In [None]:
estabe_parquet.write.parquet(
    path='/content/drive/MyDrive/curso-spark/estabelecimentos/parquet-partitionBy',
    mode='overwrite',
    partitionBy='pais'
)




Identificando valores nulos

In [None]:
socios.limit(5).show()

+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|cnpj_basico|identificador_de_socio|nome_do_socio_ou_razao_social|cnpj_ou_cpf_do_socio|qualificacao_do_socio|data_de_entrada_sociedade|pais|representante_legal|nome_do_representante|qualificacao_do_representante_legal|faixa_etaria|
+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|        411|                     2|         LILIANA PATRICIA ...|         ***678188**|                   22|               1994-07-25|null|        ***000000**|                 null|                                  0|           7|
|        411|                     2|         CRISTINA HUNDERTMARK|      

In [None]:
socios.limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,1994-07-25,,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,1994-07-25,,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,1994-05-16,,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,1994-05-16,,***000000**,,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,1994-06-09,,***000000**,,0,8


Filtrando os dados


In [None]:
empresas.where("capital_social_da_empresa==50")\
            .show(5, False)

+-----------+------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial       |natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|17350147   |ERIK MARCELO DOS SANTOS 42107848858 |2135             |50                         |50.0                     |1               |null                       |
|17833214   |ALEXANDRE MACHADO LIMA 73750123772  |2135             |50                         |50.0                     |1               |null                       |
|20860830   |YASMIN MOURA DA FONSECA 13457709793 |2135             |50                         |50.0                     |1               |null                 

In [None]:
empresas.select('razao_social_nome_empresarial', 'natureza_juridica', 'porte_da_empresa', 'capital_social_da_empresa')\
        .filter(f.upper(empresas['razao_social_nome_empresarial']).like('%BAR%'))\
        .show(15, False)

+-----------------------------------------------------+-----------------+----------------+-------------------------+
|razao_social_nome_empresarial                        |natureza_juridica|porte_da_empresa|capital_social_da_empresa|
+-----------------------------------------------------+-----------------+----------------+-------------------------+
|CONTATOS BAR E LANCHONETE LTDA                       |2062             |5               |0.0                      |
|BAR E MERCEARIA KIT LTDA                             |2062             |5               |0.0                      |
|TRANSPORTADORA BARAO LTDA                            |2062             |5               |0.0                      |
|ANTONIO DA FONSECA GONCALVES BAR                     |2135             |5               |0.0                      |
|BARCLAYS BRASIL LTDA                                 |2062             |5               |0.0                      |
|CAMBARA INFORMATICA SOCIEDADE EMPRESARIA LTDA        |2062     

Utilizando as funções do Spark : separador

In [None]:
data = [
    ('GISELLE PAULA GUIMARAES CASTRO', 15),
    ('ELAINE GARCIA DE OLIVEIRA', 22),
    ('JOAO CARLOS ABNER DE LOURDES', 43),
    ('MARTA ZELI FERREIRA', 24),
    ('LAUDENETE WIGGERS ROEDER', 51)
]
colNames = ['nome', 'idade']
df = spark.createDataFrame(data, colNames)
df.show(truncate=False)

+------------------------------+-----+
|nome                          |idade|
+------------------------------+-----+
|GISELLE PAULA GUIMARAES CASTRO|15   |
|ELAINE GARCIA DE OLIVEIRA     |22   |
|JOAO CARLOS ABNER DE LOURDES  |43   |
|MARTA ZELI FERREIRA           |24   |
|LAUDENETE WIGGERS ROEDER      |51   |
+------------------------------+-----+



In [None]:
df.select(f.concat_ws(',',f.substring_index('nome', ' ', -1),
                      f.substring_index('nome', ' ', 1)).alias('ident'),'idade').show(truncate=False)

+----------------+-----+
|ident           |idade|
+----------------+-----+
|CASTRO,GISELLE  |15   |
|OLIVEIRA,ELAINE |22   |
|LOURDES,JOAO    |43   |
|FERREIRA,MARTA  |24   |
|ROEDER,LAUDENETE|51   |
+----------------+-----+



In [None]:
df \
    .select(
        f.concat_ws(
            ', ', 
            f.substring_index('nome', ' ', -1), 
            f.substring_index('nome', ' ', 1)
        ).alias('ident'), 
        'idade') \
    .show(truncate=False)

+-----------------+-----+
|ident            |idade|
+-----------------+-----+
|CASTRO, GISELLE  |15   |
|OLIVEIRA, ELAINE |22   |
|LOURDES, JOAO    |43   |
|FERREIRA, MARTA  |24   |
|ROEDER, LAUDENETE|51   |
+-----------------+-----+



Utilizando o método orderBy

In [None]:
data = [
    ('CARMINA RABELO', 4, 2010), 
    ('HERONDINA PEREIRA', 6, 2009), 
    ('IRANI DOS SANTOS', 12, 2010), 
    ('JOAO BOSCO DA FONSECA', 3, 2009), 
    ('CARLITO SOUZA', 1, 2010), 
    ('WALTER DIAS', 9, 2009), 
    ('BRENO VENTUROSO', 1, 2009), 
    ('ADELINA TEIXEIRA', 5, 2009), 
    ('ELIO SILVA', 7, 2010), 
    ('DENIS FONSECA', 6, 2010)
]
colNames = ['nome', 'mes', 'ano']
df = spark.createDataFrame(data, colNames)
df.show(truncate=False)

+---------------------+---+----+
|nome                 |mes|ano |
+---------------------+---+----+
|CARMINA RABELO       |4  |2010|
|HERONDINA PEREIRA    |6  |2009|
|IRANI DOS SANTOS     |12 |2010|
|JOAO BOSCO DA FONSECA|3  |2009|
|CARLITO SOUZA        |1  |2010|
|WALTER DIAS          |9  |2009|
|BRENO VENTUROSO      |1  |2009|
|ADELINA TEIXEIRA     |5  |2009|
|ELIO SILVA           |7  |2010|
|DENIS FONSECA        |6  |2010|
+---------------------+---+----+



In [None]:
df.select('*').orderBy(['ano', 'mes'], ascending=[False, False]).show(truncate=False)

+---------------------+---+----+
|nome                 |mes|ano |
+---------------------+---+----+
|IRANI DOS SANTOS     |12 |2010|
|ELIO SILVA           |7  |2010|
|DENIS FONSECA        |6  |2010|
|CARMINA RABELO       |4  |2010|
|CARLITO SOUZA        |1  |2010|
|WALTER DIAS          |9  |2009|
|HERONDINA PEREIRA    |6  |2009|
|ADELINA TEIXEIRA     |5  |2009|
|JOAO BOSCO DA FONSECA|3  |2009|
|BRENO VENTUROSO      |1  |2009|
+---------------------+---+----+



Definindo filtros

In [None]:
data = [
    ('CARMINA RABELO', 4, 2010), 
    ('HERONDINA PEREIRA', 6, 2009), 
    ('IRANI DOS SANTOS', 12, 2010), 
    ('JOAO BOSCO DA FONSECA', 3, 2009), 
    ('CARLITO SOUZA', 1, 2010), 
    ('WALTER DIAS', 9, 2009), 
    ('BRENO VENTUROSO', 1, 2009), 
    ('ADELINA TEIXEIRA', 5, 2009), 
    ('ELIO SILVA', 7, 2010), 
    ('DENIS FONSECA', 6, 2010)
]
colNames = ['nome', 'mes', 'ano']
df = spark.createDataFrame(data, colNames)
df.show(truncate=False)

+---------------------+---+----+
|nome                 |mes|ano |
+---------------------+---+----+
|CARMINA RABELO       |4  |2010|
|HERONDINA PEREIRA    |6  |2009|
|IRANI DOS SANTOS     |12 |2010|
|JOAO BOSCO DA FONSECA|3  |2009|
|CARLITO SOUZA        |1  |2010|
|WALTER DIAS          |9  |2009|
|BRENO VENTUROSO      |1  |2009|
|ADELINA TEIXEIRA     |5  |2009|
|ELIO SILVA           |7  |2010|
|DENIS FONSECA        |6  |2010|
+---------------------+---+----+



In [None]:
df.filter((df.mes <= 6) & (df.ano ==2009))\
  .show(truncate=False)

+---------------------+---+----+
|nome                 |mes|ano |
+---------------------+---+----+
|HERONDINA PEREIRA    |6  |2009|
|JOAO BOSCO DA FONSECA|3  |2009|
|BRENO VENTUROSO      |1  |2009|
|ADELINA TEIXEIRA     |5  |2009|
+---------------------+---+----+




O comando LIKE

In [None]:
df = spark.createDataFrame([('RESTAURANTE DO RUI',), ('Juca restaurantes ltda',), ('Joca Restaurante',)], ['data'])
df.toPandas()

Unnamed: 0,data
0,RESTAURANTE DO RUI
1,Juca restaurantes ltda
2,Joca Restaurante


In [None]:
df.where(f.upper(df.data).like('%RESTAURANTE%'))\
    .show(truncate=False)

+----------------------+
|data                  |
+----------------------+
|RESTAURANTE DO RUI    |
|Juca restaurantes ltda|
|Joca Restaurante      |
+----------------------+



In [None]:
df.where(f.upper(df.data).like('RESTAURANTE%'))\
    .show(truncate=False)

+------------------+
|data              |
+------------------+
|RESTAURANTE DO RUI|
+------------------+




Ferramenta de busca

In [None]:
data = [
    ('CARMINA RABELO', 4, 2010), 
    ('HERONDINA PEREIRA', 6, 2009), 
    ('IRANI DOS SANTOS', 12, 2010), 
    ('JOAO BOSCO DA FONSECA', 3, 2009), 
    ('CARLITO SOUZA', 1, 2010), 
    ('WALTER DIAS', 9, 2009), 
    ('BRENO VENTUROSO', 1, 2009), 
    ('ADELINA TEIXEIRA', 5, 2009), 
    ('ELIO SILVA', 7, 2010), 
    ('DENIS FONSECA', 6, 2010)
]
colNames = ['nome', 'mes', 'ano']
df = spark.createDataFrame(data, colNames)
df.show(truncate=False)

+---------------------+---+----+
|nome                 |mes|ano |
+---------------------+---+----+
|CARMINA RABELO       |4  |2010|
|HERONDINA PEREIRA    |6  |2009|
|IRANI DOS SANTOS     |12 |2010|
|JOAO BOSCO DA FONSECA|3  |2009|
|CARLITO SOUZA        |1  |2010|
|WALTER DIAS          |9  |2009|
|BRENO VENTUROSO      |1  |2009|
|ADELINA TEIXEIRA     |5  |2009|
|ELIO SILVA           |7  |2010|
|DENIS FONSECA        |6  |2010|
+---------------------+---+----+



In [None]:
df.filter(df.nome.like('C%')).show(truncate=False)

+--------------+---+----+
|nome          |mes|ano |
+--------------+---+----+
|CARMINA RABELO|4  |2010|
|CARLITO SOUZA |1  |2010|
+--------------+---+----+



Para saber mais: A função `when`

In [None]:
data = [
    ('CARLOS', 'MATEMÁTICA', 7), 
    ('IVO', 'MATEMÁTICA', 9), 
    ('MÁRCIA', 'MATEMÁTICA', 8), 
    ('LEILA', 'MATEMÁTICA', 9), 
    ('BRENO', 'MATEMÁTICA', 7), 
    ('LETÍCIA', 'MATEMÁTICA', 8), 
    ('CARLOS', 'FÍSICA', 2), 
    ('IVO', 'FÍSICA', 8), 
    ('MÁRCIA', 'FÍSICA', 10), 
    ('LEILA', 'FÍSICA', 9), 
    ('BRENO', 'FÍSICA', 1), 
    ('LETÍCIA', 'FÍSICA', 6), 
    ('CARLOS', 'QUÍMICA', 10), 
    ('IVO', 'QUÍMICA', 8), 
    ('MÁRCIA', 'QUÍMICA', 1), 
    ('LEILA', 'QUÍMICA', 10), 
    ('BRENO', 'QUÍMICA', 7), 
    ('LETÍCIA', 'QUÍMICA', 9)
]
colNames = ['nome', 'materia', 'nota']
df = spark.createDataFrame(data, colNames)
df.show()

+-------+----------+----+
|   nome|   materia|nota|
+-------+----------+----+
| CARLOS|MATEMÁTICA|   7|
|    IVO|MATEMÁTICA|   9|
| MÁRCIA|MATEMÁTICA|   8|
|  LEILA|MATEMÁTICA|   9|
|  BRENO|MATEMÁTICA|   7|
|LETÍCIA|MATEMÁTICA|   8|
| CARLOS|    FÍSICA|   2|
|    IVO|    FÍSICA|   8|
| MÁRCIA|    FÍSICA|  10|
|  LEILA|    FÍSICA|   9|
|  BRENO|    FÍSICA|   1|
|LETÍCIA|    FÍSICA|   6|
| CARLOS|   QUÍMICA|  10|
|    IVO|   QUÍMICA|   8|
| MÁRCIA|   QUÍMICA|   1|
|  LEILA|   QUÍMICA|  10|
|  BRENO|   QUÍMICA|   7|
|LETÍCIA|   QUÍMICA|   9|
+-------+----------+----+



In [None]:
df = df.withColumn('status', f.when(df.nota >= 7, "APROVADO").otherwise("REPROVADO"))
df.show()

+-------+----------+----+---------+
|   nome|   materia|nota|   status|
+-------+----------+----+---------+
| CARLOS|MATEMÁTICA|   7| APROVADO|
|    IVO|MATEMÁTICA|   9| APROVADO|
| MÁRCIA|MATEMÁTICA|   8| APROVADO|
|  LEILA|MATEMÁTICA|   9| APROVADO|
|  BRENO|MATEMÁTICA|   7| APROVADO|
|LETÍCIA|MATEMÁTICA|   8| APROVADO|
| CARLOS|    FÍSICA|   2|REPROVADO|
|    IVO|    FÍSICA|   8| APROVADO|
| MÁRCIA|    FÍSICA|  10| APROVADO|
|  LEILA|    FÍSICA|   9| APROVADO|
|  BRENO|    FÍSICA|   1|REPROVADO|
|LETÍCIA|    FÍSICA|   6|REPROVADO|
| CARLOS|   QUÍMICA|  10| APROVADO|
|    IVO|   QUÍMICA|   8| APROVADO|
| MÁRCIA|   QUÍMICA|   1|REPROVADO|
|  LEILA|   QUÍMICA|  10| APROVADO|
|  BRENO|   QUÍMICA|   7| APROVADO|
|LETÍCIA|   QUÍMICA|   9| APROVADO|
+-------+----------+----+---------+



In [None]:
df\
    .select('nota')\
    .summary('min','25%','50%','75%','max')\
    .show()

+-------+----+
|summary|nota|
+-------+----+
|    min|   1|
|    25%|   7|
|    50%|   8|
|    75%|   9|
|    max|  10|
+-------+----+



In [None]:
df.groupBy('status')\
  .count()\
  .orderBy('status', ascending=True)\
  .show()

+---------+-----+
|   status|count|
+---------+-----+
| APROVADO|   14|
|REPROVADO|    4|
+---------+-----+



Juntando DataFrames JOINS

In [None]:
produtos = spark.createDataFrame(
    [
        ('1', 'Bebidas', 'Água mineral'), 
        ('2', 'Limpeza', 'Sabão em pó'), 
        ('3', 'Frios', 'Queijo'), 
        ('4', 'Bebidas', 'Refrigerante'),
        ('5', 'Pet', 'Ração para cães')
    ],
    ['id', 'cat', 'prod']
)

impostos = spark.createDataFrame(
    [
        ('Bebidas', 0.15), 
        ('Limpeza', 0.05),
        ('Frios', 0.065),
        ('Carnes', 0.08)
    ],
    ['cat', 'tax']
)

In [None]:
produtos.join(impostos, 'cat', how='inner')\
    .sort('id')\
    .show()

+-------+---+------------+-----+
|    cat| id|        prod|  tax|
+-------+---+------------+-----+
|Bebidas|  1|Água mineral| 0.15|
|Limpeza|  2| Sabão em pó| 0.05|
|  Frios|  3|      Queijo|0.065|
|Bebidas|  4|Refrigerante| 0.15|
+-------+---+------------+-----+



In [None]:
produtos.join(impostos, 'cat', how='left')\
    .sort('id')\
    .show()

+-------+---+---------------+-----+
|    cat| id|           prod|  tax|
+-------+---+---------------+-----+
|Bebidas|  1|   Água mineral| 0.15|
|Limpeza|  2|    Sabão em pó| 0.05|
|  Frios|  3|         Queijo|0.065|
|Bebidas|  4|   Refrigerante| 0.15|
|    Pet|  5|Ração para cães| null|
+-------+---+---------------+-----+



In [None]:
produtos.join(impostos, 'cat', how='right')\
    .sort('id')\
    .show()

+-------+----+------------+-----+
|    cat|  id|        prod|  tax|
+-------+----+------------+-----+
| Carnes|null|        null| 0.08|
|Bebidas|   1|Água mineral| 0.15|
|Limpeza|   2| Sabão em pó| 0.05|
|  Frios|   3|      Queijo|0.065|
|Bebidas|   4|Refrigerante| 0.15|
+-------+----+------------+-----+



In [None]:
produtos.join(impostos, 'cat', how='outer')\
    .sort('id')\
    .show()

+-------+----+---------------+-----+
|    cat|  id|           prod|  tax|
+-------+----+---------------+-----+
| Carnes|null|           null| 0.08|
|Bebidas|   1|   Água mineral| 0.15|
|Limpeza|   2|    Sabão em pó| 0.05|
|  Frios|   3|         Queijo|0.065|
|Bebidas|   4|   Refrigerante| 0.15|
|    Pet|   5|Ração para cães| null|
+-------+----+---------------+-----+

