# Learning PySpark

## Apache Spark - Introdução

### [Apache Spark](https://spark.apache.org/)

Apache Spark é uma plataforma de computação em *cluster* que fornece uma API para programação distribuída para processamento de dados em larga escala, semelhante ao modelo *MapReduce*, mas projetada para ser rápida para consultas interativas e algoritmos iterativos.

O Spark permite que você distribua dados e tarefas em clusters com vários nós. Imagine cada nó como um computador separado. A divisão dos dados torna mais fácil o trabalho com conjuntos de dados muito grandes porque cada nó funciona processa apenas uma parte parte do volume total de dados.

O Spark é amplamente utilizado em projetos analíticos nas seguintes frentes:

- Preparação de dados
- Modelos de machine learning
- Análise de dados em tempo real

### [PySpark](https://spark.apache.org/docs/3.1.2/api/python/index.html)

PySpark é uma interface para Apache Spark em Python. Ele não apenas permite que você escreva aplicativos Spark usando APIs Python, mas também fornece o *shell* PySpark para analisar interativamente seus dados em um ambiente distribuído. O PySpark oferece suporte à maioria dos recursos do Spark, como Spark SQL, DataFrame, Streaming, MLlib (Machine Learning) e Spark Core.

<center><img src="https://caelum-online-public.s3.amazonaws.com/2273-introducao-spark/01/img-001.png"/></center>

#### Spark SQL e DataFrame

Spark SQL é um módulo Spark para processamento de dados estruturados. Ele fornece uma abstração de programação chamada DataFrame e também pode atuar como mecanismo de consulta SQL distribuído.

#### Spark Streaming

Executando em cima do Spark, o recurso de *streaming* no Apache Spark possibilita o uso de poderosas aplicações interativas e analíticas em *streaming* e dados históricos, enquanto herda a facilidade de uso do Spark e as características de tolerância a falhas.

#### Spark MLlib

Construído sobre o Spark, MLlib é uma biblioteca de aprendizado de máquina escalonável que fornece um conjunto uniforme de APIs de alto nível que ajudam os usuários a criar e ajustar *pipelines* de aprendizado de máquina práticos.

#### Spark Core

Spark Core é o mecanismo de execução geral subjacente para a plataforma Spark sobre o qual todas as outras funcionalidades são construídas. Ele fornece um RDD (*Resilient Distributed Dataset*) e recursos de computação na memória.

## Utilizando o Spark no Google Colab

Para facilitar o desenvolvimento de nosso projeto neste curso vamos utilizar o Google Colab como ferramenta e para configurar o PySpark basta executar os comandos abaixo na própria célula do seu *notebook*.

In [1]:
!apt-get update -qq
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)


In [2]:
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz

tar: spark-3.1.2-bin-hadoop2.7.tgz: Cannot open: No such file or directory
tar: Error is not recoverable: exiting now


In [3]:
!pip install -q findspark
!pip install pyspark
!pip install py4j



In [4]:
import sys
import pyspark

print(f"Python version: {sys.version}")
print(f"PySpark version: {pyspark.__version__}")

Python version: 3.12.12 (main, Oct 10 2025, 08:52:57) [GCC 11.4.0]
PySpark version: 4.0.2


In [5]:
# import os

# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"

In [6]:
import findspark

findspark.init()
findspark.find()

'/usr/local/lib/python3.12/dist-packages/pyspark'

# Carregamento de Dados
---

## [SparkSession](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.SparkSession.html)

O ponto de entrada para programar o Spark com a API Dataset e DataFrame.

Uma SparkSession pode ser utilizada para criar DataFrames, registrar DataFrames como tabelas, executar consultas SQL em tabelas, armazenar em cache e ler arquivos parquet. Para criar uma SparkSession, use o seguinte padrão de construtor:

## Acessando o [Spark UI](https://spark.apache.org/docs/3.1.2/web-ui.html) (Google Colab)

In [7]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[*]') \
    .appName("Learning Spark") \
    .config('spark.ui.port', '4050') \
    .getOrCreate()

[Site ngrok](https://ngrok.com)

In [8]:
!wget -q https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip

Archive:  ngrok-stable-linux-amd64.zip
  inflating: ngrok                   


In [9]:
!pip install pyngrok

from pyngrok import ngrok

Collecting pyngrok
  Downloading pyngrok-7.5.0-py3-none-any.whl.metadata (8.1 kB)
Downloading pyngrok-7.5.0-py3-none-any.whl (24 kB)
Installing collected packages: pyngrok
Successfully installed pyngrok-7.5.0


In [10]:
!ngrok authtoken "3A9xbgrSGc2PLPBhDJchhku1utm_6acuySpp5HGF7UHzp2CLb"

Authtoken saved to configuration file: /root/.config/ngrok/ngrok.yml


In [11]:
ngrok.connect('4050')

<NgrokTunnel: "https://stanton-unlimp-addisyn.ngrok-free.dev" -> "http://localhost:4050">

In [12]:
!curl n-s http://localhost:4040/api/tunnels

curl: (6) Could not resolve host: n-s
{"tunnels":[{"name":"http-4050-7ff00b45-d6ec-4bfd-b150-95ef85171af2","ID":"375dac76fb3a80a868affd168a65415a","uri":"/api/tunnels/http-4050-7ff00b45-d6ec-4bfd-b150-95ef85171af2","public_url":"https://stanton-unlimp-addisyn.ngrok-free.dev","proto":"https","config":{"addr":"http://localhost:4050","inspect":true},"metrics":{"conns":{"count":0,"gauge":0,"rate1":0,"rate5":0,"rate15":0,"p50":0,"p90":0,"p95":0,"p99":0},"http":{"count":0,"rate1":0,"rate5":0,"rate15":0,"p50":0,"p90":0,"p95":0,"p99":0}}}],"uri":"/api/tunnels"}


In [13]:
spark

## DataFrames com Spark


### Interfaces Spark

Existem três interfaces principais do Apache Spark que você deve conhecer: Resilient Distributed Dataset, DataFrame e Dataset.

- **Resilient Distributed Dataset**: A primeira abstração do Apache Spark foi o Resilient Distributed Dataset (RDD). É uma interface para uma sequência de objetos de dados que consiste em um ou mais tipos localizados em uma coleção de máquinas (um cluster). Os RDDs podem ser criados de várias maneiras e são a API de “nível mais baixo” disponível. Embora esta seja a estrutura de dados original do Apache Spark, você deve se concentrar na API DataFrame, que é um superconjunto da funcionalidade RDD. A API RDD está disponível nas linguagens Java, Python e Scala.

- **DataFrame**: Trata-se de um conceito similar ao DataFrame que você pode estar familiarizado como o pacote pandas do Python e a linguagem R . A API DataFrame está disponível nas linguagens Java, Python, R e Scala.

- **Dataset**: uma combinação de DataFrame e RDD. Ele fornece a interface digitada que está disponível em RDDs enquanto fornece a conveniência do DataFrame. A API Dataset está disponível nas linguagens Java e Scala.

Em muitos cenários, especialmente com as otimizações de desempenho incorporadas em DataFrames e Datasets, não será necessário trabalhar com RDDs. Mas é importante entender a abstração RDD porque:

- O RDD é a infraestrutura subjacente que permite que o Spark seja executado com tanta rapidez e forneça a linhagem de dados.

- Se você estiver mergulhando em componentes mais avançados do Spark, pode ser necessário usar RDDs.

- As visualizações na Spark UI fazem referência a RDDs.

In [14]:
data = [('Zeca','35'), ('Eva', '29')]
colNames = ['Nome', 'Idade']

In [15]:
df = spark.createDataFrame(data, colNames)
df

DataFrame[Nome: string, Idade: string]

In [16]:
df.show()

+----+-----+
|Nome|Idade|
+----+-----+
|Zeca|   35|
| Eva|   29|
+----+-----+



In [17]:
df.toPandas()

Unnamed: 0,Nome,Idade
0,Zeca,35
1,Eva,29


## Projeto

Nosso projeto consiste em ler, manipular, tratar e salvar um conjunto de dados volumosos utilizando como ferramenta o Spark.

## Carregamento de dados

### Dados Públicos CNPJ
#### Receita Federal

> [Empresas](https://caelum-online-public.s3.amazonaws.com/2273-introducao-spark/01/empresas.zip)
>
> [Estabelecimentos](https://caelum-online-public.s3.amazonaws.com/2273-introducao-spark/01/estabelecimentos.zip)
>
> [Sócios](https://caelum-online-public.s3.amazonaws.com/2273-introducao-spark/01/socios.zip)

[Fonte original dos dados](https://www.gov.br/receitafederal/pt-br/assuntos/orientacao-tributaria/cadastros/consultas/dados-publicos-cnpj)

---
[property SparkSession.read](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.SparkSession.read.html)

[DataFrameReader.csv(*args)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrameReader.csv.html)


### Montando nosso drive

In [18]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


### Carregando os dados das empresas

In [19]:
import zipfile

In [20]:
zipfile.ZipFile('/content/drive/MyDrive/curso-spark/empresas.zip', 'r').extractall('/content/drive/MyDrive/curso-spark')

In [21]:
path = "/content/drive/MyDrive/curso-spark/empresas"
empresas = spark.read.csv(path, sep=';', inferSchema=True)
empresas

DataFrame[_c0: int, _c1: string, _c2: int, _c3: int, _c4: string, _c5: int, _c6: string]

In [22]:
empresas.count()

4585679

## Faça como eu fiz: Estabelecimentos e Sócios

### Carregando os dados dos estabelecimentos

In [23]:
zipfile.ZipFile('/content/drive/MyDrive/curso-spark/estabelecimentos.zip', 'r').extractall('/content/drive/MyDrive/curso-spark')

In [145]:
places_path = "/content/drive/MyDrive/curso-spark/estabelecimentos"
estabelecimentos = spark.read.csv(places_path, sep=';', inferSchema=True)
estabelecimentos

DataFrame[_c0: int, _c1: int, _c2: int, _c3: int, _c4: string, _c5: int, _c6: int, _c7: int, _c8: string, _c9: int, _c10: int, _c11: int, _c12: string, _c13: string, _c14: string, _c15: string, _c16: string, _c17: string, _c18: int, _c19: string, _c20: int, _c21: string, _c22: string, _c23: string, _c24: string, _c25: int, _c26: string, _c27: string, _c28: string, _c29: int]

In [25]:
estabelecimentos.count()

4836219

### Carregando os dados dos sócios

In [26]:
zipfile.ZipFile('/content/drive/MyDrive/curso-spark/socios.zip', 'r').extractall('/content/drive/MyDrive/curso-spark')

In [87]:
partners_path = "/content/drive/MyDrive/curso-spark/socios"
socios = spark.read.csv(partners_path, sep=';', inferSchema=True)
socios

DataFrame[_c0: int, _c1: int, _c2: string, _c3: string, _c4: int, _c5: int, _c6: int, _c7: string, _c8: string, _c9: int, _c10: int]

In [28]:
socios.count()

2046430

# Manipulando os Dados
---

## Operações básicas

In [29]:
empresas.limit(5).show()

+----+--------------------+----+---+-------+---+----+
| _c0|                 _c1| _c2|_c3|    _c4|_c5| _c6|
+----+--------------------+----+---+-------+---+----+
| 306|FRANCAMAR REFRIGE...|2240| 49|   0,00|  1|NULL|
|1355|BRASILEIRO & OLIV...|2062| 49|   0,00|  5|NULL|
|4820|REGISTRO DE IMOVE...|3034| 32|   0,00|  5|NULL|
|5347|ROSELY APARECIDA ...|2135| 50|   0,00|  5|NULL|
|6846|BADU E FILHOS TEC...|2062| 49|4000,00|  1|NULL|
+----+--------------------+----+---+-------+---+----+



### Renomeando as colunas do DataFrame

In [30]:
empresasColNames = ['cnpj_basico', 'razao_social_nome_empresarial', 'natureza_juridica', 'qualificacao_do_responsavel', 'capital_social_da_empresa', 'porte_da_empresa', 'ente_federativo_responsavel']

In [146]:
for index, column in enumerate(empresasColNames):
  empresas = empresas.withColumnRenamed(f"_c{index}", column)

In [32]:
empresas.limit(5).show()

+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial|natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|        306|         FRANCAMAR REFRIGE...|             2240|                         49|                     0,00|               1|                       NULL|
|       1355|         BRASILEIRO & OLIV...|             2062|                         49|                     0,00|               5|                       NULL|
|       4820|         REGISTRO DE IMOVE...|             3034|                         32|                     0,00|               5|                       NULL|
|       5347|         ROSELY APARE

In [33]:
estabelecimentos.limit(5).show()

+----+---+---+---+-----------------+---+--------+---+----+----+--------+-------+----+----+--------------------+----+-------+------------------+-------+----+----+----+----+----+----+----+----+----+----+----+
| _c0|_c1|_c2|_c3|              _c4|_c5|     _c6|_c7| _c8| _c9|    _c10|   _c11|_c12|_c13|                _c14|_c15|   _c16|              _c17|   _c18|_c19|_c20|_c21|_c22|_c23|_c24|_c25|_c26|_c27|_c28|_c29|
+----+---+---+---+-----------------+---+--------+---+----+----+--------+-------+----+----+--------------------+----+-------+------------------+-------+----+----+----+----+----+----+----+----+----+----+----+
|1879|  1| 96|  1|   PIRAMIDE M. C.|  8|20011029|  1|NULL|NULL|19940509|1412602|NULL| RUA|     JOSE FIGLIOLINI| 608|   NULL|         VILA NILO|2278020|  SP|7107|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|
|2818|  1| 43|  1|             NULL|  8|20081231| 71|NULL|NULL|19940512|4671100|NULL| RUA|              BAQUIA| 416|   NULL|VL NOVA MANCHESTER|3443000|  SP|7107|NULL|NULL|N

In [34]:
estabsColNames = ['cnpj_basico', 'cnpj_ordem', 'cnpj_dv', 'identificador_matriz_filial', 'nome_fantasia', 'situacao_cadastral', 'data_situacao_cadastral', 'motivo_situacao_cadastral', 'nome_da_cidade_no_exterior', 'pais', 'data_de_inicio_atividade', 'cnae_fiscal_principal', 'cnae_fiscal_secundaria', 'tipo_de_logradouro', 'logradouro', 'numero', 'complemento', 'bairro', 'cep', 'uf', 'municipio', 'ddd_1', 'telefone_1', 'ddd_2', 'telefone_2', 'ddd_do_fax', 'fax', 'correio_eletronico', 'situacao_especial', 'data_da_situacao_especial']

In [147]:
for index, column in enumerate(estabsColNames):
  estabelecimentos = estabelecimentos.withColumnRenamed(f"_c{index}", column)

In [36]:
estabelecimentos.limit(5).show()

+-----------+----------+-------+---------------------------+-----------------+------------------+-----------------------+-------------------------+--------------------------+----+------------------------+---------------------+----------------------+------------------+--------------------+------+-----------+------------------+-------+---+---------+-----+----------+-----+----------+----------+----+------------------+-----------------+-------------------------+
|cnpj_basico|cnpj_ordem|cnpj_dv|identificador_matriz_filial|    nome_fantasia|situacao_cadastral|data_situacao_cadastral|motivo_situacao_cadastral|nome_da_cidade_no_exterior|pais|data_de_inicio_atividade|cnae_fiscal_principal|cnae_fiscal_secundaria|tipo_de_logradouro|          logradouro|numero|complemento|            bairro|    cep| uf|municipio|ddd_1|telefone_1|ddd_2|telefone_2|ddd_do_fax| fax|correio_eletronico|situacao_especial|data_da_situacao_especial|
+-----------+----------+-------+---------------------------+--------------

In [37]:
socios.limit(5).show()

+-----+---+--------------------+-----------+---+--------+----+-----------+----+---+----+
|  _c0|_c1|                 _c2|        _c3|_c4|     _c5| _c6|        _c7| _c8|_c9|_c10|
+-----+---+--------------------+-----------+---+--------+----+-----------+----+---+----+
|  411|  2|LILIANA PATRICIA ...|***678188**| 22|19940725|NULL|***000000**|NULL|  0|   7|
|  411|  2|CRISTINA HUNDERTMARK|***637848**| 28|19940725|NULL|***000000**|NULL|  0|   7|
| 5813|  2|CELSO EDUARDO DE ...|***786068**| 49|19940516|NULL|***000000**|NULL|  0|   8|
| 5813|  2|EDUARDO BERRINGER...|***442348**| 49|19940516|NULL|***000000**|NULL|  0|   5|
|14798|  2| HANNE MAHFOUD FADEL|***760388**| 49|19940609|NULL|***000000**|NULL|  0|   8|
+-----+---+--------------------+-----------+---+--------+----+-----------+----+---+----+



In [38]:
sociosColNames = ['cnpj_basico', 'identificador_de_socio', 'nome_do_socio_ou_razao_social', 'cnpj_ou_cpf_do_socio', 'qualificacao_do_socio', 'data_de_entrada_sociedade', 'pais', 'representante_legal', 'nome_do_representante', 'qualificacao_do_representante_legal', 'faixa_etaria']

In [88]:
for index, column in enumerate(sociosColNames):
  socios = socios.withColumnRenamed(f"_c{index}", column)

In [89]:
socios.limit(5).show()

+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|cnpj_basico|identificador_de_socio|nome_do_socio_ou_razao_social|cnpj_ou_cpf_do_socio|qualificacao_do_socio|data_de_entrada_sociedade|pais|representante_legal|nome_do_representante|qualificacao_do_representante_legal|faixa_etaria|
+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|        411|                     2|         LILIANA PATRICIA ...|         ***678188**|                   22|                 19940725|NULL|        ***000000**|                 NULL|                                  0|           7|
|        411|                     2|         CRISTINA HUNDERTMARK|      

## Analisando os dados

[Data Types](https://spark.apache.org/docs/3.1.2/api/python/reference/pyspark.sql.html#data-types)

In [41]:
empresas.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,400000,1,


In [42]:
empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: string (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [43]:
estabelecimentos.limit(5).toPandas()

Unnamed: 0,cnpj_basico,cnpj_ordem,cnpj_dv,identificador_matriz_filial,nome_fantasia,situacao_cadastral,data_situacao_cadastral,motivo_situacao_cadastral,nome_da_cidade_no_exterior,pais,...,municipio,ddd_1,telefone_1,ddd_2,telefone_2,ddd_do_fax,fax,correio_eletronico,situacao_especial,data_da_situacao_especial
0,1879,1,96,1,PIRAMIDE M. C.,8,20011029,1,,,...,7107,,,,,,,,,
1,2818,1,43,1,,8,20081231,71,,,...,7107,,,,,,,,,
2,3110,1,7,1,,8,19971231,1,,,...,7107,,,,,,,,,
3,3733,1,80,1,,8,20081231,71,,,...,7107,,,,,,,,,
4,4628,3,27,2,EMBROIDERY & GIFT,8,19980429,1,,,...,7075,,,,,,,,,


In [44]:
estabelecimentos.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: integer (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: integer (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: str

In [45]:
socios.limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,19940725,,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,19940725,,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,19940516,,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,19940516,,***000000**,,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,19940609,,***000000**,,0,8


In [46]:
socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: integer (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



## Modificando os tipos de dados

[Functions](https://spark.apache.org/docs/3.1.2/api/python/reference/pyspark.sql.html#functions)

[withColumn](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.withColumn.html)

### Convertendo String ➔ Double

#### `StringType ➔ DoubleType`

In [47]:
from pyspark.sql.types import DoubleType, StringType
from pyspark.sql import functions as f

In [48]:
empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: string (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [49]:
empresas.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,400000,1,


In [148]:
empresas = empresas.withColumn('capital_social_da_empresa', f.regexp_replace('capital_social_da_empresa', ',', '.'))
empresas.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0.0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0.0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0.0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0.0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,4000.0,1,


In [149]:
empresas = empresas.withColumn('capital_social_da_empresa', empresas['capital_social_da_empresa'].cast(DoubleType()))
empresas.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0.0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0.0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0.0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0.0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,4000.0,1,


In [52]:
empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



### Convertendo String ➔ Date

#### `StringType ➔ DateType`

[Datetime Patterns](https://spark.apache.org/docs/3.1.2/sql-ref-datetime-pattern.html)

In [53]:
df = spark.createDataFrame([(20200924,), (20201022,), (20210215,)], ['data'])
df.toPandas()

Unnamed: 0,data
0,20200924
1,20201022
2,20210215


In [54]:
df.printSchema()

root
 |-- data: long (nullable = true)



In [55]:
df = df.withColumn('data', f.to_date(df.data.cast(StringType()), 'yyyyMMdd'))
df

DataFrame[data: date]

In [56]:
df.toPandas()

Unnamed: 0,data
0,2020-09-24
1,2020-10-22
2,2021-02-15


In [57]:
estabelecimentos.limit(5).toPandas()

Unnamed: 0,cnpj_basico,cnpj_ordem,cnpj_dv,identificador_matriz_filial,nome_fantasia,situacao_cadastral,data_situacao_cadastral,motivo_situacao_cadastral,nome_da_cidade_no_exterior,pais,...,municipio,ddd_1,telefone_1,ddd_2,telefone_2,ddd_do_fax,fax,correio_eletronico,situacao_especial,data_da_situacao_especial
0,1879,1,96,1,PIRAMIDE M. C.,8,20011029,1,,,...,7107,,,,,,,,,
1,2818,1,43,1,,8,20081231,71,,,...,7107,,,,,,,,,
2,3110,1,7,1,,8,19971231,1,,,...,7107,,,,,,,,,
3,3733,1,80,1,,8,20081231,71,,,...,7107,,,,,,,,,
4,4628,3,27,2,EMBROIDERY & GIFT,8,19980429,1,,,...,7075,,,,,,,,,


In [195]:
estabelecimentos = estabelecimentos\
  .withColumn(
      'data_situacao_cadastral',
      f.to_date(estabelecimentos.data_situacao_cadastral.cast(StringType()), 'yyyyMMDD')
  )\
  .withColumn(
      'data_de_inicio_atividade',
      f.to_date(estabelecimentos.data_de_inicio_atividade.cast(StringType()), 'yyyyMMdd')
  )\
  .withColumn(
      'data_da_situacao_especial',
      f.to_date(estabelecimentos.data_da_situacao_especial.cast(StringType()), 'yyyyMMdd')
  )

In [175]:
estabelecimentos.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: date (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: date (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: string (n

In [59]:
estabelecimentos.limit(5).toPandas()

Unnamed: 0,cnpj_basico,cnpj_ordem,cnpj_dv,identificador_matriz_filial,nome_fantasia,situacao_cadastral,data_situacao_cadastral,motivo_situacao_cadastral,nome_da_cidade_no_exterior,pais,...,municipio,ddd_1,telefone_1,ddd_2,telefone_2,ddd_do_fax,fax,correio_eletronico,situacao_especial,data_da_situacao_especial
0,1879,1,96,1,PIRAMIDE M. C.,8,2001-10-29,1,,,...,7107,,,,,,,,,
1,2818,1,43,1,,8,2008-12-31,71,,,...,7107,,,,,,,,,
2,3110,1,7,1,,8,1997-12-31,1,,,...,7107,,,,,,,,,
3,3733,1,80,1,,8,2008-12-31,71,,,...,7107,,,,,,,,,
4,4628,3,27,2,EMBROIDERY & GIFT,8,1998-04-29,1,,,...,7075,,,,,,,,,


In [90]:
socios.limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,19940725,,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,19940725,,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,19940516,,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,19940516,,***000000**,,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,19940609,,***000000**,,0,8


In [92]:
socios = socios.withColumn('data_de_entrada_sociedade', f.try_to_timestamp(socios.data_de_entrada_sociedade.cast(StringType()), f.lit('yyyyMMdd')))
socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: timestamp (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



In [93]:
socios.limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,1994-07-25,,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,1994-07-25,,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,1994-05-16,,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,1994-05-16,,***000000**,,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,1994-06-09,,***000000**,,0,8


# Seleções e consultas
---

## Selecionando informações

[DataFrame.select(*cols)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.select.html)

In [63]:
empresas\
  .select('*')\
  .show(5, False)

+-----------+--------------------------------------------------------------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial                                                               |natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+--------------------------------------------------------------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|306        |FRANCAMAR REFRIGERACAO TECNICA S/C LTDA                                                     |2240             |49                         |0.0                      |1               |NULL                       |
|1355       |BRASILEIRO & OLIVEIRA LTDA                                                                 

In [64]:
empresas\
  .select('natureza_juridica', 'porte_da_empresa', 'capital_social_da_empresa')\
  .show(5, False)

+-----------------+----------------+-------------------------+
|natureza_juridica|porte_da_empresa|capital_social_da_empresa|
+-----------------+----------------+-------------------------+
|2240             |1               |0.0                      |
|2062             |5               |0.0                      |
|3034             |5               |0.0                      |
|2135             |5               |0.0                      |
|2062             |1               |4000.0                   |
+-----------------+----------------+-------------------------+
only showing top 5 rows


In [65]:
socios\
  .select('nome_do_socio_ou_razao_social', 'faixa_etaria', f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
  .show(5, False)

+-------------------------------+------------+--------------+
|nome_do_socio_ou_razao_social  |faixa_etaria|ano_de_entrada|
+-------------------------------+------------+--------------+
|LILIANA PATRICIA GUASTAVINO    |7           |1994          |
|CRISTINA HUNDERTMARK           |7           |1994          |
|CELSO EDUARDO DE CASTRO STEPHAN|8           |1994          |
|EDUARDO BERRINGER STEPHAN      |5           |1994          |
|HANNE MAHFOUD FADEL            |8           |1994          |
+-------------------------------+------------+--------------+
only showing top 5 rows


## Faça como eu fiz

In [166]:
estabelecimentos\
  .select(\
      'nome_fantasia',\
      'municipio',\
      f.year('data_de_inicio_atividade').alias('ano_de_inicio_atividade'),\
      f.month('data_de_inicio_atividade').alias('mes_de_inicio_atividade')\
  )\
  .show(5, False)


+-----------------+---------+-----------------------+-----------------------+
|nome_fantasia    |municipio|ano_de_inicio_atividade|mes_de_inicio_atividade|
+-----------------+---------+-----------------------+-----------------------+
|PIRAMIDE M. C.   |7107     |NULL                   |NULL                   |
|NULL             |7107     |NULL                   |NULL                   |
|NULL             |7107     |NULL                   |NULL                   |
|NULL             |7107     |NULL                   |NULL                   |
|EMBROIDERY & GIFT|7075     |NULL                   |NULL                   |
+-----------------+---------+-----------------------+-----------------------+
only showing top 5 rows


## Identificando valores nulos

In [74]:
df = spark.createDataFrame([(1,), (2,), (3,), (None,)], ['data'])
df.toPandas()

Unnamed: 0,data
0,1.0
1,2.0
2,3.0
3,


In [75]:
df.show()

+----+
|data|
+----+
|   1|
|   2|
|   3|
|NULL|
+----+



In [76]:
df = spark.createDataFrame([(1.,), (2.,), (3.,), (float('nan'),)], ['data'])
df.toPandas()

Unnamed: 0,data
0,1.0
1,2.0
2,3.0
3,


In [77]:
df.show()

+----+
|data|
+----+
| 1.0|
| 2.0|
| 3.0|
| NaN|
+----+



In [78]:
df = spark.createDataFrame([('1',), ('2',), ('3',), (None,)], ['data'])
df.toPandas()

Unnamed: 0,data
0,1.0
1,2.0
2,3.0
3,


In [79]:
df.show()

+----+
|data|
+----+
|   1|
|   2|
|   3|
|NULL|
+----+



In [82]:
socios.limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,1994-07-25,,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,1994-07-25,,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,1994-05-16,,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,1994-05-16,,***000000**,,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,1994-06-09,,***000000**,,0,8


In [83]:
socios.limit(5).show()

+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|cnpj_basico|identificador_de_socio|nome_do_socio_ou_razao_social|cnpj_ou_cpf_do_socio|qualificacao_do_socio|data_de_entrada_sociedade|pais|representante_legal|nome_do_representante|qualificacao_do_representante_legal|faixa_etaria|
+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|        411|                     2|         LILIANA PATRICIA ...|         ***678188**|                   22|               1994-07-25|NULL|        ***000000**|                 NULL|                                  0|           7|
|        411|                     2|         CRISTINA HUNDERTMARK|      

In [94]:
socios.select([f.count(f.when(f.isnull(col), 1)).alias(col) for col in socios.columns]).show()

+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+-------+-------------------+---------------------+-----------------------------------+------------+
|cnpj_basico|identificador_de_socio|nome_do_socio_ou_razao_social|cnpj_ou_cpf_do_socio|qualificacao_do_socio|data_de_entrada_sociedade|   pais|representante_legal|nome_do_representante|qualificacao_do_representante_legal|faixa_etaria|
+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+-------+-------------------+---------------------+-----------------------------------+------------+
|          0|                     0|                          208|                1234|                    0|                        1|2038255|                  0|              1995432|                                  0|           0|
+-----------+----------------------+------------------------

In [95]:
socios.na.fill(0).limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,1994-07-25,0,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,1994-07-25,0,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,1994-05-16,0,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,1994-05-16,0,***000000**,,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,1994-06-09,0,***000000**,,0,8


In [96]:
socios.na.fill('-').limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,1994-07-25,,***000000**,-,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,1994-07-25,,***000000**,-,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,1994-05-16,,***000000**,-,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,1994-05-16,,***000000**,-,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,1994-06-09,,***000000**,-,0,8


In [196]:
empresas = empresas.na.fill(0)
empresas.select('porte_da_empresa').distinct().show()

+----------------+
|porte_da_empresa|
+----------------+
|               1|
|               3|
|               5|
|               0|
+----------------+



## Ordenando os dados

[DataFrame.orderBy(*cols, **kwargs)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.orderBy.html)

In [97]:
socios\
  .select('nome_do_socio_ou_razao_social', 'faixa_etaria', f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
  .show(5, False)

+-------------------------------+------------+--------------+
|nome_do_socio_ou_razao_social  |faixa_etaria|ano_de_entrada|
+-------------------------------+------------+--------------+
|LILIANA PATRICIA GUASTAVINO    |7           |1994          |
|CRISTINA HUNDERTMARK           |7           |1994          |
|CELSO EDUARDO DE CASTRO STEPHAN|8           |1994          |
|EDUARDO BERRINGER STEPHAN      |5           |1994          |
|HANNE MAHFOUD FADEL            |8           |1994          |
+-------------------------------+------------+--------------+
only showing top 5 rows


In [99]:
socios\
  .select('nome_do_socio_ou_razao_social', 'faixa_etaria', f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
  .orderBy('ano_de_entrada', ascending=False)\
  .show(5, False)

+----------------------------------------+------------+--------------+
|nome_do_socio_ou_razao_social           |faixa_etaria|ano_de_entrada|
+----------------------------------------+------------+--------------+
|JOSE HUMBERTO PAIVA                     |6           |2021          |
|KASSIANO RODRIGO KICHILESKI             |4           |2021          |
|BENILDES BARBOSA RODRIGUES              |8           |2021          |
|LEONARDO MENNA BARRETO LARANJA GONCALVES|5           |2021          |
|MARCELO MOCELIN                         |5           |2021          |
+----------------------------------------+------------+--------------+
only showing top 5 rows


In [100]:
socios\
  .select('nome_do_socio_ou_razao_social', 'faixa_etaria', f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
  .orderBy(['ano_de_entrada', 'faixa_etaria'], ascending=[False, False])\
  .show(10, False)

+-------------------------------------+------------+--------------+
|nome_do_socio_ou_razao_social        |faixa_etaria|ano_de_entrada|
+-------------------------------------+------------+--------------+
|ANTONIO TAVARES DE ANDRADE           |9           |2021          |
|MARIA RAIMUNDA DOS SANTOS LANZA      |9           |2021          |
|ANNA MARIA TELLES FERREIRA SANTOS    |9           |2021          |
|RENILDE DAS GRACAS MAIA              |9           |2021          |
|ANTONIA DE SOUSA VIEIRA              |9           |2021          |
|DORIS PEREIRA GOMES JAZRA            |9           |2021          |
|AURA MARIA DE ANDRADE                |9           |2021          |
|MARIA JOSE DOMINGUES BONATO          |9           |2021          |
|SONIA MARQUES SAMAJA                 |9           |2021          |
|ZELIA MARIA CAMARA RODRIGUES DA SILVA|9           |2021          |
+-------------------------------------+------------+--------------+
only showing top 10 rows


In [101]:
data = [
    ('CARMINA RABELO', 4, 2010),
    ('HERONDINA PEREIRA', 6, 2009),
    ('IRANI DOS SANTOS', 12, 2010),
    ('JOAO BOSCO DA FONSECA', 3, 2009),
    ('CARLITO SOUZA', 1, 2010),
    ('WALTER DIAS', 9, 2009),
    ('BRENO VENTUROSO', 1, 2009),
    ('ADELINA TEIXEIRA', 5, 2009),
    ('ELIO SILVA', 7, 2010),
    ('DENIS FONSECA', 6, 2010)
]

colNames = ['nome', 'mes', 'ano']
df = spark.createDataFrame(data, colNames)
df.show(truncate=False)

+---------------------+---+----+
|nome                 |mes|ano |
+---------------------+---+----+
|CARMINA RABELO       |4  |2010|
|HERONDINA PEREIRA    |6  |2009|
|IRANI DOS SANTOS     |12 |2010|
|JOAO BOSCO DA FONSECA|3  |2009|
|CARLITO SOUZA        |1  |2010|
|WALTER DIAS          |9  |2009|
|BRENO VENTUROSO      |1  |2009|
|ADELINA TEIXEIRA     |5  |2009|
|ELIO SILVA           |7  |2010|
|DENIS FONSECA        |6  |2010|
+---------------------+---+----+



O DataFrame df contém os nomes de dez alunos com os respectivos meses e anos de nascimento. Ordenar este DataFrame dos alunos mais novos para os mais velhos

In [105]:
df\
  .select('*')\
  .orderBy(['ano', 'mes'], ascending=[False, False])\
  .show()

+--------------------+---+----+
|                nome|mes| ano|
+--------------------+---+----+
|    IRANI DOS SANTOS| 12|2010|
|          ELIO SILVA|  7|2010|
|       DENIS FONSECA|  6|2010|
|      CARMINA RABELO|  4|2010|
|       CARLITO SOUZA|  1|2010|
|         WALTER DIAS|  9|2009|
|   HERONDINA PEREIRA|  6|2009|
|    ADELINA TEIXEIRA|  5|2009|
|JOAO BOSCO DA FON...|  3|2009|
|     BRENO VENTUROSO|  1|2009|
+--------------------+---+----+



## Filtrando os dados

[DataFrame.where(condition)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.where.html) ou [DataFrame.filter(condition)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.filter.html)

In [153]:
empresas\
  .filter('capital_social_da_empresa==50')\
  .show(5, False)

+-----------+------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial       |natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|17350147   |ERIK MARCELO DOS SANTOS 42107848858 |2135             |50                         |50.0                     |1               |NULL                       |
|17833214   |ALEXANDRE MACHADO LIMA 73750123772  |2135             |50                         |50.0                     |1               |NULL                       |
|20860830   |YASMIN MOURA DA FONSECA 13457709793 |2135             |50                         |50.0                     |1               |NULL                 

In [154]:
empresas\
  .where('capital_social_da_empresa==50')\
  .show(5, False)

+-----------+------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial       |natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|17350147   |ERIK MARCELO DOS SANTOS 42107848858 |2135             |50                         |50.0                     |1               |NULL                       |
|17833214   |ALEXANDRE MACHADO LIMA 73750123772  |2135             |50                         |50.0                     |1               |NULL                       |
|20860830   |YASMIN MOURA DA FONSECA 13457709793 |2135             |50                         |50.0                     |1               |NULL                 

In [109]:
socios\
  .select('nome_do_socio_ou_razao_social')\
  .filter(socios.nome_do_socio_ou_razao_social.startswith('RODRIGO'))\
  .filter(socios.nome_do_socio_ou_razao_social.endswith('DIAS'))\
  .show(10, False)

+-----------------------------+
|nome_do_socio_ou_razao_social|
+-----------------------------+
|RODRIGO BENASSI DIAS         |
|RODRIGO RUDIBERTO DIAS       |
|RODRIGO AURELIANO DIAS       |
|RODRIGO SIMOES LEMOS DIAS    |
|RODRIGO GEORGE DIAS          |
|RODRIGO AUGUSTO FELICIO DIAS |
|RODRIGO FERNANDES DIAS       |
|RODRIGO GARRIDO DIAS         |
|RODRIGO OLIVEIRA DIAS        |
|RODRIGO GONCALVES DIAS       |
+-----------------------------+
only showing top 10 rows


In [111]:
data = [
    ('CARMINA RABELO', 4, 2010),
    ('HERONDINA PEREIRA', 6, 2009),
    ('IRANI DOS SANTOS', 12, 2010),
    ('JOAO BOSCO DA FONSECA', 3, 2009),
    ('CARLITO SOUZA', 1, 2010),
    ('WALTER DIAS', 9, 2009),
    ('BRENO VENTUROSO', 1, 2009),
    ('ADELINA TEIXEIRA', 5, 2009),
    ('ELIO SILVA', 7, 2010),
    ('DENIS FONSECA', 6, 2010)
]

colNames = ['nome', 'mes', 'ano']
df = spark.createDataFrame(data, colNames)
df.show(truncate=False)

+---------------------+---+----+
|nome                 |mes|ano |
+---------------------+---+----+
|CARMINA RABELO       |4  |2010|
|HERONDINA PEREIRA    |6  |2009|
|IRANI DOS SANTOS     |12 |2010|
|JOAO BOSCO DA FONSECA|3  |2009|
|CARLITO SOUZA        |1  |2010|
|WALTER DIAS          |9  |2009|
|BRENO VENTUROSO      |1  |2009|
|ADELINA TEIXEIRA     |5  |2009|
|ELIO SILVA           |7  |2010|
|DENIS FONSECA        |6  |2010|
+---------------------+---+----+



Seleção de apenas alunos nascidos no primeiro semestre de 2009

In [112]:
df\
  .select('*')\
  .where('ano == 2009 AND mes <= 6')\
  .show(10, truncate=False)

+---------------------+---+----+
|nome                 |mes|ano |
+---------------------+---+----+
|HERONDINA PEREIRA    |6  |2009|
|JOAO BOSCO DA FONSECA|3  |2009|
|BRENO VENTUROSO      |1  |2009|
|ADELINA TEIXEIRA     |5  |2009|
+---------------------+---+----+



## O comando LIKE

[Column.like(other)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.Column.like.html)

In [113]:
df = spark.createDataFrame([('RESTAURANTE DO RUI',), ('Juca restaurantes ltda',), ('Joca Restaurante',)], ['data'])
df.toPandas()

Unnamed: 0,data
0,RESTAURANTE DO RUI
1,Juca restaurantes ltda
2,Joca Restaurante


In [114]:
df\
  .where(f.upper(df.data).like('%RESTAURANTE%'))\
  .show(truncate=False)

+----------------------+
|data                  |
+----------------------+
|RESTAURANTE DO RUI    |
|Juca restaurantes ltda|
|Joca Restaurante      |
+----------------------+



In [155]:
empresas\
  .select('razao_social_nome_empresarial', 'natureza_juridica', 'porte_da_empresa', 'capital_social_da_empresa')\
  .where(f.upper(empresas.razao_social_nome_empresarial).like('%RESTAURANTE%'))\
  .show(15, False)

+-------------------------------------------------------+-----------------+----------------+-------------------------+
|razao_social_nome_empresarial                          |natureza_juridica|porte_da_empresa|capital_social_da_empresa|
+-------------------------------------------------------+-----------------+----------------+-------------------------+
|RESTAURANTE IMIGRANTE PORTUGUES LTDA.                  |2062             |5               |0.0                      |
|MORAIS & CARVALHO RESTAURANTE E PIZZARIA LTDA          |2062             |1               |0.0                      |
|BAR E RESTAURANTE PAGANOTTO LTDA                       |2062             |5               |0.0                      |
|RODRIGUES & RODRIGUES RESTAURANTE LTDA                 |2062             |5               |0.0                      |
|TEXAS RANCH BAR RESTAURANTE PRODUCOES ARTISTICAS E CULT|2062             |1               |0.0                      |
|V V SANTOS RESTAURANTE BAR E ATIV DESPORTIVAS L

In [117]:
data = [
    ('CARMINA RABELO', 4, 2010),
    ('HERONDINA PEREIRA', 6, 2009),
    ('IRANI DOS SANTOS', 12, 2010),
    ('JOAO BOSCO DA FONSECA', 3, 2009),
    ('CARLITO SOUZA', 1, 2010),
    ('WALTER DIAS', 9, 2009),
    ('BRENO VENTUROSO', 1, 2009),
    ('ADELINA TEIXEIRA', 5, 2009),
    ('ELIO SILVA', 7, 2010),
    ('DENIS FONSECA', 6, 2010)
]

colNames = ['nome', 'mes', 'ano']
df = spark.createDataFrame(data, colNames)
df.show(truncate=False)

+---------------------+---+----+
|nome                 |mes|ano |
+---------------------+---+----+
|CARMINA RABELO       |4  |2010|
|HERONDINA PEREIRA    |6  |2009|
|IRANI DOS SANTOS     |12 |2010|
|JOAO BOSCO DA FONSECA|3  |2009|
|CARLITO SOUZA        |1  |2010|
|WALTER DIAS          |9  |2009|
|BRENO VENTUROSO      |1  |2009|
|ADELINA TEIXEIRA     |5  |2009|
|ELIO SILVA           |7  |2010|
|DENIS FONSECA        |6  |2010|
+---------------------+---+----+



Selecionar apenas os alunos que têm seus nomes iniciados com a letra “C”

In [118]:
df\
  .where(df.nome.like('C%'))\
  .show(truncate=False)

+--------------+---+----+
|nome          |mes|ano |
+--------------+---+----+
|CARMINA RABELO|4  |2010|
|CARLITO SOUZA |1  |2010|
+--------------+---+----+



# Agregações e Junções
---

[DataFrame.groupBy(*cols)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.groupBy.html)

[DataFrame.agg(*exprs)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.agg.html)

[DataFrame.summary(*statistics)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.summary.html)

> Funções:
[approx_count_distinct](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.approx_count_distinct.html) |
[avg](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.avg.html) |
[collect_list](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.collect_list.html) |
[collect_set](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.collect_set.html) |
[countDistinct](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.countDistinct.html) |
[count](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.count.html) |
[grouping](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.grouping.html) |
[first](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.first.html) |
[last](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.last.html) |
[kurtosis](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.kurtosis.html) |
[max](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.max.html) |
[min](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.min.html) |
[mean](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.mean.html) |
[skewness](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.skewness.html) |
[stddev ou stddev_samp](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.stddev.html) |
[stddev_pop](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.stddev_pop.html) |
[sum](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.sum.html) |
[sumDistinct](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.sumDistinct.html) |
[variance ou var_samp](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.variance.html) |
[var_pop](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.var_pop.html)

## Sumarizando os dados

In [121]:
socios\
  .select(f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
  .where('ano_de_entrada >= 2010')\
  .groupBy('ano_de_entrada')\
  .count()\
  .orderBy('ano_de_entrada')\
  .show()

+--------------+------+
|ano_de_entrada| count|
+--------------+------+
|          2010| 79337|
|          2011| 83906|
|          2012| 80101|
|          2013| 83919|
|          2014| 80590|
|          2015| 80906|
|          2016| 81587|
|          2017| 90221|
|          2018| 99935|
|          2019|118248|
|          2020|125927|
|          2021| 56316|
+--------------+------+



In [156]:
empresas\
  .select('cnpj_basico', 'porte_da_empresa', 'capital_social_da_empresa')\
  .groupBy('porte_da_empresa')\
  .agg(
      f.avg('capital_social_da_empresa').alias('capital_social_medio'),
      f.count('cnpj_basico').alias('frequencia')
  )\
  .orderBy('porte_da_empresa', ascending=True)\
  .show()

+----------------+--------------------+----------+
|porte_da_empresa|capital_social_medio|frequencia|
+----------------+--------------------+----------+
|            NULL|    8.35421888053467|      5985|
|               1|  339994.53313506936|   3129043|
|               3|  2601001.7677092673|    115151|
|               5|   708660.4208249798|   1335500|
+----------------+--------------------+----------+



In [157]:
empresas\
  .select('capital_social_da_empresa')\
  .summary()\
  .show()

+-------+-------------------------+
|summary|capital_social_da_empresa|
+-------+-------------------------+
|  count|                  4585679|
|   mean|        503694.5478542675|
| stddev|     2.1118691490537405E8|
|    min|                      0.0|
|    25%|                      0.0|
|    50%|                   1000.0|
|    75%|                   7000.0|
|    max|         3.22014670262E11|
+-------+-------------------------+



In [124]:
data = [
    ('CARLOS', 'MATEMÁTICA', 7),
    ('IVO', 'MATEMÁTICA', 9),
    ('MÁRCIA', 'MATEMÁTICA', 8),
    ('LEILA', 'MATEMÁTICA', 9),
    ('BRENO', 'MATEMÁTICA', 7),
    ('LETÍCIA', 'MATEMÁTICA', 8),
    ('CARLOS', 'FÍSICA', 2),
    ('IVO', 'FÍSICA', 8),
    ('MÁRCIA', 'FÍSICA', 10),
    ('LEILA', 'FÍSICA', 9),
    ('BRENO', 'FÍSICA', 1),
    ('LETÍCIA', 'FÍSICA', 6),
    ('CARLOS', 'QUÍMICA', 10),
    ('IVO', 'QUÍMICA', 8),
    ('MÁRCIA', 'QUÍMICA', 1),
    ('LEILA', 'QUÍMICA', 10),
    ('BRENO', 'QUÍMICA', 7),
    ('LETÍCIA', 'QUÍMICA', 9)
]

colNames = ['nome', 'materia', 'nota']
df = spark.createDataFrame(data, colNames)
df.show()

+-------+----------+----+
|   nome|   materia|nota|
+-------+----------+----+
| CARLOS|MATEMÁTICA|   7|
|    IVO|MATEMÁTICA|   9|
| MÁRCIA|MATEMÁTICA|   8|
|  LEILA|MATEMÁTICA|   9|
|  BRENO|MATEMÁTICA|   7|
|LETÍCIA|MATEMÁTICA|   8|
| CARLOS|    FÍSICA|   2|
|    IVO|    FÍSICA|   8|
| MÁRCIA|    FÍSICA|  10|
|  LEILA|    FÍSICA|   9|
|  BRENO|    FÍSICA|   1|
|LETÍCIA|    FÍSICA|   6|
| CARLOS|   QUÍMICA|  10|
|    IVO|   QUÍMICA|   8|
| MÁRCIA|   QUÍMICA|   1|
|  LEILA|   QUÍMICA|  10|
|  BRENO|   QUÍMICA|   7|
|LETÍCIA|   QUÍMICA|   9|
+-------+----------+----+



In [125]:
df = df.withColumn('status', f.when(df.nota >= 7, "APROVADO").otherwise("REPROVADO"))
df.show()

+-------+----------+----+---------+
|   nome|   materia|nota|   status|
+-------+----------+----+---------+
| CARLOS|MATEMÁTICA|   7| APROVADO|
|    IVO|MATEMÁTICA|   9| APROVADO|
| MÁRCIA|MATEMÁTICA|   8| APROVADO|
|  LEILA|MATEMÁTICA|   9| APROVADO|
|  BRENO|MATEMÁTICA|   7| APROVADO|
|LETÍCIA|MATEMÁTICA|   8| APROVADO|
| CARLOS|    FÍSICA|   2|REPROVADO|
|    IVO|    FÍSICA|   8| APROVADO|
| MÁRCIA|    FÍSICA|  10| APROVADO|
|  LEILA|    FÍSICA|   9| APROVADO|
|  BRENO|    FÍSICA|   1|REPROVADO|
|LETÍCIA|    FÍSICA|   6|REPROVADO|
| CARLOS|   QUÍMICA|  10| APROVADO|
|    IVO|   QUÍMICA|   8| APROVADO|
| MÁRCIA|   QUÍMICA|   1|REPROVADO|
|  LEILA|   QUÍMICA|  10| APROVADO|
|  BRENO|   QUÍMICA|   7| APROVADO|
|LETÍCIA|   QUÍMICA|   9| APROVADO|
+-------+----------+----+---------+



In [128]:
df\
  .select('nota')\
  .summary('min', '25%', '50%', '75%', 'max')\
  .show()

+-------+----+
|summary|nota|
+-------+----+
|    min|   1|
|    25%|   7|
|    50%|   8|
|    75%|   9|
|    max|  10|
+-------+----+



In [131]:
df\
  .groupBy('status')\
  .count()\
  .orderBy('status')\
  .show()

+---------+-----+
|   status|count|
+---------+-----+
| APROVADO|   14|
|REPROVADO|    4|
+---------+-----+



## Juntando DataFrames - Joins

[DataFrame.join(*args)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.join.html)

In [134]:
produtos = spark.createDataFrame(
    [
        ('1', 'Bebidas', 'Água mineral'),
        ('2', 'Limpeza', 'Sabão em pó'),
        ('3', 'Frios', 'Queijo'),
        ('4', 'Bebidas', 'Refrigerante'),
        ('5', 'Pet', 'Ração para cães'),
    ],
    ['id', 'cat', 'prod']
)

impostos = spark.createDataFrame(
    [
        ('Bebidas', 0.15),
        ('Limpeza', 0.05),
        ('Frios', 0.065),
        ('Carnes', 0.08),
    ],
    ['cat', 'tax']
)

In [135]:
produtos.toPandas()

Unnamed: 0,id,cat,prod
0,1,Bebidas,Água mineral
1,2,Limpeza,Sabão em pó
2,3,Frios,Queijo
3,4,Bebidas,Refrigerante
4,5,Pet,Ração para cães


In [136]:
impostos.toPandas()

Unnamed: 0,cat,tax
0,Bebidas,0.15
1,Limpeza,0.05
2,Frios,0.065
3,Carnes,0.08


In [138]:
produtos\
  .join(impostos, 'cat', how='inner')\
  .sort('id')\
  .show()

+-------+---+------------+-----+
|    cat| id|        prod|  tax|
+-------+---+------------+-----+
|Bebidas|  1|Água mineral| 0.15|
|Limpeza|  2| Sabão em pó| 0.05|
|  Frios|  3|      Queijo|0.065|
|Bebidas|  4|Refrigerante| 0.15|
+-------+---+------------+-----+



In [140]:
produtos\
  .join(impostos, 'cat', how='left')\
  .sort('id')\
  .show()

+-------+---+---------------+-----+
|    cat| id|           prod|  tax|
+-------+---+---------------+-----+
|Bebidas|  1|   Água mineral| 0.15|
|Limpeza|  2|    Sabão em pó| 0.05|
|  Frios|  3|         Queijo|0.065|
|Bebidas|  4|   Refrigerante| 0.15|
|    Pet|  5|Ração para cães| NULL|
+-------+---+---------------+-----+



In [139]:
produtos\
  .join(impostos, 'cat', how='right')\
  .sort('id')\
  .show()

+-------+----+------------+-----+
|    cat|  id|        prod|  tax|
+-------+----+------------+-----+
| Carnes|NULL|        NULL| 0.08|
|Bebidas|   1|Água mineral| 0.15|
|Limpeza|   2| Sabão em pó| 0.05|
|  Frios|   3|      Queijo|0.065|
|Bebidas|   4|Refrigerante| 0.15|
+-------+----+------------+-----+



In [141]:
produtos\
  .join(impostos, 'cat', how='outer')\
  .sort('id')\
  .show()

+-------+----+---------------+-----+
|    cat|  id|           prod|  tax|
+-------+----+---------------+-----+
| Carnes|NULL|           NULL| 0.08|
|Bebidas|   1|   Água mineral| 0.15|
|Limpeza|   2|    Sabão em pó| 0.05|
|  Frios|   3|         Queijo|0.065|
|Bebidas|   4|   Refrigerante| 0.15|
|    Pet|   5|Ração para cães| NULL|
+-------+----+---------------+-----+



In [176]:
empresas_join = estabelecimentos.join(empresas, 'cnpj_basico', how='inner')

In [177]:
freq = empresas_join\
    .select(
        'cnpj_basico',
        f.year('data_de_inicio_atividade').alias('data_de_inicio')
    )\
    .where('data_de_inicio >= 2010')\
    .groupBy('data_de_inicio')\
    .agg(f.count('cnpj_basico').alias('frequencia'))\
    .orderBy('data_de_inicio', ascending=True)

## SparkSQL

[SparkSession.sql(sqlQuery)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.SparkSession.sql.html)

Para saber mais sobre performance: [Artigo - Spark RDDs vs DataFrames vs SparkSQL](https://community.cloudera.com/t5/Community-Articles/Spark-RDDs-vs-DataFrames-vs-SparkSQL/ta-p/246547)

In [200]:
empresas.createOrReplaceTempView('empresasView')

In [202]:
spark.sql('SELECT * FROM empresasView').show(5)

+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial|natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|        306|         FRANCAMAR REFRIGE...|             2240|                         49|                      0.0|               1|                       NULL|
|       1355|         BRASILEIRO & OLIV...|             2062|                         49|                      0.0|               5|                       NULL|
|       4820|         REGISTRO DE IMOVE...|             3034|                         32|                      0.0|               5|                       NULL|
|       5347|         ROSELY APARE

In [203]:
spark\
  .sql("""
    SELECT *
      FROM empresasView
      WHERE capital_social_da_empresa = 50
  """)\
  .show(5)

+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial|natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|   17350147|         ERIK MARCELO DOS ...|             2135|                         50|                     50.0|               1|                       NULL|
|   17833214|         ALEXANDRE MACHADO...|             2135|                         50|                     50.0|               1|                       NULL|
|   20860830|         YASMIN MOURA DA F...|             2135|                         50|                     50.0|               1|                       NULL|
|   22242856|         JOAO CESAR M

In [204]:
spark\
  .sql("""
    SELECT porte_da_empresa, MEAN(capital_social_da_empresa) AS capital_media
      FROM empresasView
      GROUP BY porte_da_empresa
  """)\
  .show(5)

+----------------+------------------+
|porte_da_empresa|     capital_media|
+----------------+------------------+
|               1|339994.53313506936|
|               3|2601001.7677092673|
|               5| 708660.4208249798|
|               0|  8.35421888053467|
+----------------+------------------+



In [208]:
empresas_join.createOrReplaceTempView('empresasJoinView')

In [73]:
spark\
  .sql("""
    SELECT YEAR(data_de_inicio_atividade) AS data_de_inicio, COUNT(cnpj_basico) AS count
      FROM empresasJoinView
      WHERE YEAR(data_de_inicio_atividade) >= 2010
      GROUP BY data_de_inicio
      ORDER BY data_de_inicio
  """)\
  .show()

# Formas de Armazenamento
---

## Arquivos CSV

[property DataFrame.write](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.write.html)

[DataFrameWriter.csv(*args)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrameWriter.csv.html)

In [210]:
empresas.write.csv(
    path='/content/drive/MyDrive/curso-spark/empresas/csv',
    mode='overwrite',
    sep=';',
    header=True
)

In [211]:
empresas2 = spark.read.csv(
    "/content/drive/MyDrive/curso-spark/empresas/csv",
    sep=';',
    header=True,
    inferSchema=True
)

empresas2.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



## Faça como eu fiz

In [212]:
socios.write.csv(
    path='/content/drive/MyDrive/curso-spark/socios/csv',
    mode='overwrite',
    sep=';',
    header=True
)

## Arquivos PARQUET

[Apache Parquet](https://parquet.apache.org/)

[DataFrameWriter.parquet(*args)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrameWriter.parquet.html)

In [214]:
empresas.write.parquet(
    path='/content/drive/MyDrive/curso-spark/empresas/parquet',
    mode='overwrite',
)

In [218]:
empresas_parquet = spark.read.schema(empresas.schema).parquet(
    path='/content/drive/MyDrive/curso-spark/empresas/parquet'
)

empresas_parquet.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



## Particionamento dos dados

[DataFrameWriter.partitionBy(*cols)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrameWriter.partitionBy.html)

In [219]:
empresas.coalesce(1).write.csv(
    path='/content/drive/MyDrive/curso-spark/empresas/csv-unico',
    mode='overwrite',
    sep=';',
    header=True
)

In [220]:
empresas.write.parquet(
    path='/content/drive/MyDrive/curso-spark/empresas/parquet-patitionBy',
    mode='overwrite',
    partitionBy='porte_da_empresa'
)

In [221]:
spark.stop()