# Apache Spark com Python

## 01. Começando o trabalho
---

### Instalando e inicializando Spark

```bash
sudo apt-get install openjdk-8-jdk-headless -qq > /dev/null

wget -q https://archive.apache.org/dist/spark/spark-3.5.4/spark-3.5.4-bin-hadoop3.tgz

tar xf spark-3.5.4-bin-hadoop3.tgz
```

In [1]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "./content/spark-3.5.4-bin-hadoop3"

In [2]:
import findspark
findspark.init()

#### [SparkSession](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.html)

O ponto de entrada para programar o Spark com a API Dataset e DataFrame.

Uma SparkSession pode ser utilizada para criar DataFrames, registrar DataFrames como tabelas, executar consultas SQL
em tabelas, armazenar em cache e ler arquivos parquet. Para criar uma SparkSession, use o seguinte padrão de construtor:

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("Iniciando com Spark") \
    .getOrCreate()

25/08/29 08:46:47 WARN Utils: Your hostname, DSN-1003 resolves to a loopback address: 127.0.1.1; using 172.29.1.248 instead (on interface enp3s0)
25/08/29 08:46:47 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/08/29 08:46:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
spark

### Para saber mais: Material de referência

Quando começamos a utilizar uma nova ferramenta é sempre uma boa ideia termos um material de referência para consultar.

Por ser uma ferramenta bastante utilizada no mercado, o Apache Spark tem bastante material de consulta. Vamos deixar uma lista de links interessantes que vão ajudar muito no seu desenvolvimento neste universo Big Data:

- [Apache Spark](https://spark.apache.org/)
- [Quem está usando Spark](https://spark.apache.org/powered-by.html)
- [Documentação PySpark](https://spark.apache.org/docs/latest/)

O [Databricks](https://databricks.com/) é uma plataforma de análise baseada no Apache Spark que permite que cientistas de dados, engenheiros(as) de dados e analistas de dados trabalhem juntos(as) em casos de uso como:

- Aplicação de análises avançadas para machine learning e processamento de grafos.
- Usar deep learning para solucionar problemas com dados não estruturados como interpretação de imagens, tradução automática, processamento de linguagem natural e muito mais.
- Tornar o armazenamento de dados rápido, simples e escalável.
- Detecção de ameaças de forma proativa com data science e IA.
- Análise de dados em tempo real.
- Nesse site temos uma lista de [livros](https://www.oreilly.com/search/?query=pyspark&extended_publisher_data=true&highlight=true&include_assessments=false&include_case_studies=true&include_courses=true&include_playlists=true&include_collections=true&include_notebooks=true&include_sandboxes=true&include_scenarios=true&is_academic_institution_account=false&source=user&formats=book&sort=relevance&facet_json=true&json_facets=true&page=0&include_facets=true&include_practice_exams=true).

Sugiro, principalmente, a leitura destes:

- [PySpark Cookbook](https://www.oreilly.com/library/view/pyspark-cookbook/9781788835367/)
- [Learning PySpark](https://www.oreilly.com/library/view/learning-pyspark/9781786463708/)
- [Applied Data Science Using PySpark](https://www.oreilly.com/library/view/applied-data-science/9781484265000/)

## 02. Carregamento de dados
---

### Para saber mais: Acessando o SparkUI no Colab

Esta atividade é uma atualização do próximo vídeo decorrente a uma modificação na forma de configurar o ngrok.

Antes de seguir os passos indicados pelo instrutor, no próximo vídeo, é preciso criar uma conta no site do [ngrok](https://ngrok.com/).

Caso você já tenha uma conta no ngrok, basta clicar no botão “Login”. Do contrário, clique no botão “Sign up” no canto superior direito da página.

Após efetuar o login, você será direcionado para o seguinte endereço: <a href="https://dashboard.ngrok.com/get-started/setup">https://dashboard.ngrok.com/get-started/setup</a>

No menu lateral, localizado no lado esquerdo da página, clique em “Your Authtoken” para ser direcionado para o seguinte endereço: <a href="https://dashboard.ngrok.com/get-started/your-authtoken">https://dashboard.ngrok.com/get-started/your-authtoken</a>. Nesta página, poderemos acessar o Authtoken e também configurar o token em nosso notebook.

Na seção “Command Line” copie a linha de comando para utilizar no ser notebook de aula. Esta linha de comando será a única modificação no código mostrado pelo instrutor no próximo vídeo.

Seguindo os passos do vídeo temos a seguinte sequência para configurar o Spark (instalar e criar uma sessão), configurar o ngrok e acessar o SparkUI no ser notebook do Colab.

Começando com as configurações iniciais:
```bash
sudo apt-get update -qq

sudo apt-get install openjdk-8-jdk-headless -qq > /dev/null

wget -q https://archive.apache.org/dist/spark/spark-3.5.4/spark-3.5.4-bin-hadoop3.tgz

tar xf spark-3.5.4-bin-hadoop3.tgz

pip install -q findspark

pip install pyspark==3.4.0
```

Definição das variáveis de ambiente:
```python
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "./content/spark-3.5.4-bin-hadoop3"
```

Iniciando o `findspark`:
```python
import findspark
findspark.init()
```

Após estas primeiras configurações, podemos iniciar uma SparkSession com os comandos abaixo:
```python
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("Iniciando com Spark") \
    .config("spark.ui.port", "4050") \
    .getOrCreate()
```

Logo depois, fazemos o download e extração dos arquivos do ngrok:
```bash
wget -q https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip

unzip ngrok-stable-linux-amd64.zip
```

Ao executar o comando acima você poderá receber a pergunta:
```bash
Archive:    ngrok-stable-linux-amd64.zip replace ngrok [y]es, [n]o, [A]ll, [N]one, [r]ename:
```

Clique na caixa de seleção e insira y.

No próximo conjunto de comandos, devemos configurar o nosso Authtoken obtido no site do ngrok.
A primeira linha do código abaixo é a única modificação em relação ao código apresentado pelo instrutor no próximo vídeo.
```python
get_ipython().system_raw("./ngrok authtoken DIGITE AQUI O SEU AUTHTOKEN")
get_ipython().system_raw("./ngrok http 4050 &")
```

Aguarde alguns segundos após a execução do código anterior e execute a linha de comando abaixo:
```bash
curl -s http://localhost:4050/api/tunnels
```

A saída deste código é um JSON semelhante a este:
```json
{"tunnels":[{"name":"command_line","uri":"/api/tunnels/command_line","public_url":"https://8463-35-204-8-97.ngrok.io","proto":"https","config":{"addr":"http://localhost:4050","inspect":true},"metrics":{"conns":{"count":0,"gauge":0,"rate1":0,"rate5":0,"rate15":0,"p50":0,"p90":0,"p95":0,"p99":0},"http":{"count":0,"rate1":0,"rate5":0,"rate15":0,"p50":0,"p90":0,"p95":0,"p99":0}}},{"name":"command_line (http)","uri":"/api/tunnels/command_line%20%28http%29","public_url":"http://8463-35-204-8-97.ngrok.io","proto":"http","config":{"addr":"http://localhost:4050","inspect":true},"metrics":{"conns":{"count":0,"gauge":0,"rate1":0,"rate5":0,"rate15":0,"p50":0,"p90":0,"p95":0,"p99":0},"http":{"count":0,"rate1":0,"rate5":0,"rate15":0,"p50":0,"p90":0,"p95":0,"p99":0}}}],"uri":"/api/tunnels"}
```

Procure em seu JSON a chave “public_url” e clique no endereço https informado em seu valor. Uma nova aba será aberta no seu navegador com o SparkUI funcionando corretamente.

### Para saber mais: A classe SparkSession

A classe SparkSession é o ponto de entrada para a programação do Spark com a API Dataset e DataFrame.

Uma SparkSession pode ser usada para criar DataFrames, registrar DataFrames como tabelas, executar comandos SQL sobre tabelas, armazenar tabelas em cache e ler arquivos em parquet.

Esta classe possui alguns métodos e atributos que utilizaremos ao longo das aulas e outros que seria interessante você conhecer. A documentação da classe pode ser acessada [nesse site do spark](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.html).

Para responder boa parte dos exercícios deste curso vamos sempre supor que a SparkSession spark está criada.

### DataFrames com Spark

#### Interfaces Spark

Existem três interfaces principais do Apache Spark que você deve conhecer: Resilient Distributed Dataset, DataFrame e Dataset.

- **Resilient Distributed Dataset**: A primeira abstração do Apache Spark foi o Resilient Distributed Dataset (RDD). É uma interface para uma sequência de objetos de dados que consiste em um ou mais tipos localizados em uma coleção de máquinas (um cluster). Os RDDs podem ser criados de várias maneiras e são a API de “nível mais baixo” disponível. Embora esta seja a estrutura de dados original do Apache Spark, você deve se concentrar na API DataFrame, que é um superconjunto da funcionalidade RDD. A API RDD está disponível nas linguagens Java, Python e Scala.
- **DataFrame**: Trata-se de um conceito similar ao DataFrame que você pode estar familiarizado como o pacote pandas do Python e a linguagem R. A API DataFrame está disponível nas linguagens Java, Python, R e Scala.
- **Dataset**: Uma combinação de DataFrame e RDD. Ele fornece a interface digitada que está disponível em RDDs enquanto fornece a conveniência do DataFrame. A API Dataset está disponível nas linguagens Java e Scala.

Em muitos cenários, especialmente com as otimizações de desempenho incorporadas em DataFrame e Datasets, não será necessário trabalhar com RDDs. Mas é importante entender a abstração RDD porque:

- O RDD é a infraestrutura subjacente que permite que o Spark seja executado com tanta rapidez e forneça a linhagem de dados.
- Se você estiver mergulhando em componentes mais avançados do Spark, pode ser necessário usar RDDs.
- As visualizações na Spark UI fazem referência a RDDs.

In [5]:
data = [("Zeca", "35"), ("Eva", "29")]
col_names = ["Nome", "Idade"]
df = spark.createDataFrame(data, col_names)
df

DataFrame[Nome: string, Idade: string]

In [6]:
df.show()

                                                                                

+----+-----+
|Nome|Idade|
+----+-----+
|Zeca|   35|
| Eva|   29|
+----+-----+



In [7]:
df.toPandas()

Unnamed: 0,Nome,Idade
0,Zeca,35
1,Eva,29


### Projeto

Nosso projeto consiste em ler, manipular, tratar e salvar um conjunto de dados volumosos utilizando como ferramenta o Spark.

### Carregamento de dados

#### Dados Públicos CNPJ
##### Receita Federal

> [Empresas](https://caelum-online-public.s3.amazonaws.com/2273-introducao-spark/01/empresas.zip)
> 
> [Estabelecimentos](https://caelum-online-public.s3.amazonaws.com/2273-introducao-spark/01/estabelecimentos.zip)
> 
> [Sócios](https://caelum-online-public.s3.amazonaws.com/2273-introducao-spark/01/socios.zip)

[Fonte original dos dados](https://www.gov.br/receitafederal/pt-br/assuntos/orientacao-tributaria/cadastros/consultas/dados-publicos-cnpj)

---
[property SparkSession.read](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.SparkSession.read.html)

[DataFrameReader.csv(*args)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrameReader.csv.html)

In [8]:
import zipfile

In [9]:
path = "./content/drive/curso-spark"

In [None]:
# zipfile.ZipFile(f"{path}/empresas.zip", "r").extractall(path)

In [10]:
empresas = spark.read.csv(f"{path}/empresas", sep=";", inferSchema=True)

                                                                                

In [11]:
empresas.count()

4585679

In [18]:
# zipfile.ZipFile(f"{path}/estabelecimentos.zip", "r").extractall(path)

In [12]:
estabelecimentos = spark.read.csv(f"{path}/estabelecimentos", sep=";", inferSchema=True)

                                                                                

In [13]:
estabelecimentos.count()

4836219

In [21]:
# zipfile.ZipFile(f"{path}/socios.zip", "r").extractall(path)

In [14]:
socios = spark.read.csv(f"{path}/socios", sep=";", inferSchema=True)

                                                                                

In [15]:
socios.count()

2046430

## 03. Manipulando os dados
---

### Operações básicas

In [25]:
empresas.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,612,LAR DOS IDOSOS ASTROGILDO RIBEIRO,3999,16,0,5,
1,5951,DISTRIBUIDORA DE BEBIDAS CLAURITA LTDA,2062,49,0,5,
2,10428,C.R.P. & MASER. COMERCIAL E DESENHOS LTDA,2062,49,0,1,
3,11086,H. P. TEC COMERCIO E REPRESENTACAO LTDA,2062,49,400000,1,
4,11727,JUSTINO GOMES CINTRA,2135,50,0,1,


### Renomeando as colunas do DataFrame

In [19]:
empresas_col_names = ['cnpj_basico', 'razao_social_nome_empresarial', 'natureza_juridica', 'qualificacao_do_responsavel', 'capital_social_da_empresa', 'porte_da_empresa', 'ente_federativo_responsavel']

In [20]:
for index, col_name in enumerate(empresas_col_names):
    empresas = empresas.withColumnRenamed(f"_c{index}", col_name)

empresas.columns

['cnpj_basico',
 'razao_social_nome_empresarial',
 'natureza_juridica',
 'qualificacao_do_responsavel',
 'capital_social_da_empresa',
 'porte_da_empresa',
 'ente_federativo_responsavel']

In [21]:
estabs_col_names = ['cnpj_basico', 'cnpj_ordem', 'cnpj_dv', 'identificador_matriz_filial', 'nome_fantasia', 'situacao_cadastral', 'data_situacao_cadastral', 'motivo_situacao_cadastral', 'nome_da_cidade_no_exterior', 'pais', 'data_de_inicio_atividade', 'cnae_fiscal_principal', 'cnae_fiscal_secundaria', 'tipo_de_logradouro', 'logradouro', 'numero', 'complemento', 'bairro', 'cep', 'uf', 'municipio', 'ddd_1', 'telefone_1', 'ddd_2', 'telefone_2', 'ddd_do_fax', 'fax', 'correio_eletronico', 'situacao_especial', 'data_da_situacao_especial']

In [22]:
for index, col_name in enumerate(estabs_col_names):
    estabelecimentos = estabelecimentos.withColumnRenamed(f"_c{index}", col_name)

estabelecimentos.columns

['cnpj_basico',
 'cnpj_ordem',
 'cnpj_dv',
 'identificador_matriz_filial',
 'nome_fantasia',
 'situacao_cadastral',
 'data_situacao_cadastral',
 'motivo_situacao_cadastral',
 'nome_da_cidade_no_exterior',
 'pais',
 'data_de_inicio_atividade',
 'cnae_fiscal_principal',
 'cnae_fiscal_secundaria',
 'tipo_de_logradouro',
 'logradouro',
 'numero',
 'complemento',
 'bairro',
 'cep',
 'uf',
 'municipio',
 'ddd_1',
 'telefone_1',
 'ddd_2',
 'telefone_2',
 'ddd_do_fax',
 'fax',
 'correio_eletronico',
 'situacao_especial',
 'data_da_situacao_especial']

In [23]:
socios_col_names = ['cnpj_basico', 'identificador_de_socio', 'nome_do_socio_ou_razao_social', 'cnpj_ou_cpf_do_socio', 'qualificacao_do_socio', 'data_de_entrada_sociedade', 'pais', 'representante_legal', 'nome_do_representante', 'qualificacao_do_representante_legal', 'faixa_etaria']

In [24]:
for index, col_name in enumerate(socios_col_names):
    socios = socios.withColumnRenamed(f"_c{index}", col_name)

socios.columns

['cnpj_basico',
 'identificador_de_socio',
 'nome_do_socio_ou_razao_social',
 'cnpj_ou_cpf_do_socio',
 'qualificacao_do_socio',
 'data_de_entrada_sociedade',
 'pais',
 'representante_legal',
 'nome_do_representante',
 'qualificacao_do_representante_legal',
 'faixa_etaria']

### Analisando os dados

In [25]:
empresas.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,612,LAR DOS IDOSOS ASTROGILDO RIBEIRO,3999,16,0,5,
1,5951,DISTRIBUIDORA DE BEBIDAS CLAURITA LTDA,2062,49,0,5,
2,10428,C.R.P. & MASER. COMERCIAL E DESENHOS LTDA,2062,49,0,1,
3,11086,H. P. TEC COMERCIO E REPRESENTACAO LTDA,2062,49,400000,1,
4,11727,JUSTINO GOMES CINTRA,2135,50,0,1,


In [28]:
empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: string (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [26]:
estabelecimentos.limit(5).toPandas()

Unnamed: 0,cnpj_basico,cnpj_ordem,cnpj_dv,identificador_matriz_filial,nome_fantasia,situacao_cadastral,data_situacao_cadastral,motivo_situacao_cadastral,nome_da_cidade_no_exterior,pais,...,municipio,ddd_1,telefone_1,ddd_2,telefone_2,ddd_do_fax,fax,correio_eletronico,situacao_especial,data_da_situacao_especial
0,1170,1,90,1,,8,20081231,71,,,...,6291,,,,,,,,,
1,2886,1,2,1,,8,20150209,73,,,...,6687,16.0,2822377.0,,,,,,,
2,4524,1,50,1,M E L REPRESENTACOES,8,20081231,71,,,...,7099,,,,,,,,,
3,6778,1,8,1,,8,20140903,1,,,...,7079,,,,,,,,,
4,10583,1,31,1,SUPERMECADO J J,8,20100723,54,,,...,6263,,,,,,,idelsilva@terra.com.br,,


In [29]:
estabelecimentos.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: integer (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: integer (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: str

In [27]:
socios.limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,403,2,ARCHIBALDE POUZA JUNIOR,***550688**,49,19941019,,***000000**,,0,8
1,403,2,MARIA LUCIA DE ARAUJO POUZA,***358948**,22,19941019,,***550688**,ARCHIBALDE POUZA JUNIOR,5,7
2,52803,2,PAULA PAVAN MAMED,***454918**,22,19980831,,***000000**,,0,6
3,52803,2,NASSIM MAMED JUNIOR,***040608**,49,19980831,,***000000**,,0,6
4,52803,2,CARLA PAVAN MAMED BONINI,***454898**,22,20130812,,***000000**,,0,6


In [30]:
socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: integer (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



### Para saber mais: Data Types

Identificar os tipos de dados que possuímos em nossos datasets é um trabalho bastante importante no procedimento inicial de análise.

Nesta etapa nós podemos identificar erros de definição de tipo que podem nos atrapalhar no uso de ferramentas específicas para tratamento e análise, por exemplo, quanto temos um dado numérico que esteja representado como uma string ou quando temos uma informação de data representada como uma string ou como um número.

Nestes dois casos de exemplo teríamos problemas para extrair e realizar operações específicas dos reais tipos, como realizar uma simples operação aritmética.

Nesse [link](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/data_types.html) está a documentação e temos acesso aos tipos de dados que podemos utilizar com Spark. Nos próximos vídeos vamos ver como trabalhar com alguns destes tipos e como resolver problemas como estes que são apresentados nos parágrafos acima.

### Modificando os tipos de dados

[Functions](https://spark.apache.org/docs/3.1.2/api/python/reference/pyspark.sql.html#functions)

[withColumn](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.withColumn.html)

### Convertendo String ➔ Double

#### `StringType ➔ DoubleType`

In [31]:
from pyspark.sql.types import DoubleType, StringType
from pyspark.sql import functions as f

In [32]:
empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: string (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [40]:
empresas.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,612,LAR DOS IDOSOS ASTROGILDO RIBEIRO,3999,16,0.0,5,
1,5951,DISTRIBUIDORA DE BEBIDAS CLAURITA LTDA,2062,49,0.0,5,
2,10428,C.R.P. & MASER. COMERCIAL E DESENHOS LTDA,2062,49,0.0,1,
3,11086,H. P. TEC COMERCIO E REPRESENTACAO LTDA,2062,49,4000.0,1,
4,11727,JUSTINO GOMES CINTRA,2135,50,0.0,1,


In [34]:
empresas = empresas.withColumn("capital_social_da_empresa", f.regexp_replace("capital_social_da_empresa", ",", "."))
empresas.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,612,LAR DOS IDOSOS ASTROGILDO RIBEIRO,3999,16,0.0,5,
1,5951,DISTRIBUIDORA DE BEBIDAS CLAURITA LTDA,2062,49,0.0,5,
2,10428,C.R.P. & MASER. COMERCIAL E DESENHOS LTDA,2062,49,0.0,1,
3,11086,H. P. TEC COMERCIO E REPRESENTACAO LTDA,2062,49,4000.0,1,
4,11727,JUSTINO GOMES CINTRA,2135,50,0.0,1,


In [38]:
empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: string (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [41]:
empresas = empresas.withColumn("capital_social_da_empresa", empresas["capital_social_da_empresa"].cast(DoubleType()))
empresas.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,612,LAR DOS IDOSOS ASTROGILDO RIBEIRO,3999,16,0.0,5,
1,5951,DISTRIBUIDORA DE BEBIDAS CLAURITA LTDA,2062,49,0.0,5,
2,10428,C.R.P. & MASER. COMERCIAL E DESENHOS LTDA,2062,49,0.0,1,
3,11086,H. P. TEC COMERCIO E REPRESENTACAO LTDA,2062,49,4000.0,1,
4,11727,JUSTINO GOMES CINTRA,2135,50,0.0,1,


In [42]:
empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



### Convertendo String → Date

#### `StringType → DateType`

[Datetime Patterns](https://spark.apache.org/docs/3.1.2/sql-ref-datetime-pattern.html)

In [48]:
df = spark.createDataFrame([(20200924,), (20201022,), (20210215,)], ["data"])
df.toPandas()

Unnamed: 0,data
0,20200924
1,20201022
2,20210215


In [49]:
df.printSchema()

root
 |-- data: long (nullable = true)



In [50]:
df = df.withColumn("data", f.to_date(df["data"].cast(StringType()), "yyyyMMdd"))
df.printSchema()

root
 |-- data: date (nullable = true)



In [51]:
df.toPandas()

Unnamed: 0,data
0,2020-09-24
1,2020-10-22
2,2021-02-15


In [52]:
estabelecimentos.limit(5).toPandas()

Unnamed: 0,cnpj_basico,cnpj_ordem,cnpj_dv,identificador_matriz_filial,nome_fantasia,situacao_cadastral,data_situacao_cadastral,motivo_situacao_cadastral,nome_da_cidade_no_exterior,pais,...,municipio,ddd_1,telefone_1,ddd_2,telefone_2,ddd_do_fax,fax,correio_eletronico,situacao_especial,data_da_situacao_especial
0,1170,1,90,1,,8,20081231,71,,,...,6291,,,,,,,,,
1,2886,1,2,1,,8,20150209,73,,,...,6687,16.0,2822377.0,,,,,,,
2,4524,1,50,1,M E L REPRESENTACOES,8,20081231,71,,,...,7099,,,,,,,,,
3,6778,1,8,1,,8,20140903,1,,,...,7079,,,,,,,,,
4,10583,1,31,1,SUPERMECADO J J,8,20100723,54,,,...,6263,,,,,,,idelsilva@terra.com.br,,


In [53]:
estabelecimentos.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: integer (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: integer (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: str

In [54]:
estabelecimentos = estabelecimentos\
    .withColumn(
        "data_situacao_cadastral",
        f.to_date(estabelecimentos["data_situacao_cadastral"].cast(StringType()), "yyyyMMdd")
    )\
    .withColumn(
        "data_de_inicio_atividade",
        f.to_date(estabelecimentos["data_de_inicio_atividade"].cast(StringType()), "yyyyMMdd")
    )\
    .withColumn(
        "data_da_situacao_especial",
        f.to_date(estabelecimentos["data_da_situacao_especial"].cast(StringType()), "yyyyMMdd")
    )

estabelecimentos.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: date (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: date (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: string (n

In [55]:
estabelecimentos.limit(5).toPandas()

Unnamed: 0,cnpj_basico,cnpj_ordem,cnpj_dv,identificador_matriz_filial,nome_fantasia,situacao_cadastral,data_situacao_cadastral,motivo_situacao_cadastral,nome_da_cidade_no_exterior,pais,...,municipio,ddd_1,telefone_1,ddd_2,telefone_2,ddd_do_fax,fax,correio_eletronico,situacao_especial,data_da_situacao_especial
0,1170,1,90,1,,8,2008-12-31,71,,,...,6291,,,,,,,,,
1,2886,1,2,1,,8,2015-02-09,73,,,...,6687,16.0,2822377.0,,,,,,,
2,4524,1,50,1,M E L REPRESENTACOES,8,2008-12-31,71,,,...,7099,,,,,,,,,
3,6778,1,8,1,,8,2014-09-03,1,,,...,7079,,,,,,,,,
4,10583,1,31,1,SUPERMECADO J J,8,2010-07-23,54,,,...,6263,,,,,,,idelsilva@terra.com.br,,


In [56]:
socios.limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,403,2,ARCHIBALDE POUZA JUNIOR,***550688**,49,19941019,,***000000**,,0,8
1,403,2,MARIA LUCIA DE ARAUJO POUZA,***358948**,22,19941019,,***550688**,ARCHIBALDE POUZA JUNIOR,5,7
2,52803,2,PAULA PAVAN MAMED,***454918**,22,19980831,,***000000**,,0,6
3,52803,2,NASSIM MAMED JUNIOR,***040608**,49,19980831,,***000000**,,0,6
4,52803,2,CARLA PAVAN MAMED BONINI,***454898**,22,20130812,,***000000**,,0,6


In [57]:
socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: integer (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



In [59]:
socios = socios.withColumn("data_de_entrada_sociedade", f.to_date(socios["data_de_entrada_sociedade"].cast(StringType()), "yyyyMMdd"))
socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: date (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



In [60]:
socios.limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,403,2,ARCHIBALDE POUZA JUNIOR,***550688**,49,1994-10-19,,***000000**,,0,8
1,403,2,MARIA LUCIA DE ARAUJO POUZA,***358948**,22,1994-10-19,,***550688**,ARCHIBALDE POUZA JUNIOR,5,7
2,52803,2,PAULA PAVAN MAMED,***454918**,22,1998-08-31,,***000000**,,0,6
3,52803,2,NASSIM MAMED JUNIOR,***040608**,49,1998-08-31,,***000000**,,0,6
4,52803,2,CARLA PAVAN MAMED BONINI,***454898**,22,2013-08-12,,***000000**,,0,6


## 04. Seleções e consultas
---

### Selecionando informações
 
[DataFrame.select(*cols)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.select.html)

In [62]:
empresas\
    .select("*")\
    .show(5, False)

+-----------+-----------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial            |natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+-----------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|612        |LAR DOS IDOSOS ASTROGILDO RIBEIRO        |3999             |16                         |0.0                      |5               |NULL                       |
|5951       |DISTRIBUIDORA DE BEBIDAS CLAURITA LTDA   |2062             |49                         |0.0                      |5               |NULL                       |
|10428      |C.R.P. & MASER. COMERCIAL E DESENHOS LTDA|2062             |49                         |0.0                      |1       

In [63]:
empresas\
    .select("natureza_juridica", "porte_da_empresa", "capital_social_da_empresa")\
    .show(5)

+-----------------+----------------+-------------------------+
|natureza_juridica|porte_da_empresa|capital_social_da_empresa|
+-----------------+----------------+-------------------------+
|             3999|               5|                      0.0|
|             2062|               5|                      0.0|
|             2062|               1|                      0.0|
|             2062|               1|                   4000.0|
|             2135|               1|                      0.0|
+-----------------+----------------+-------------------------+
only showing top 5 rows



In [64]:
socios\
    .select("nome_do_socio_ou_razao_social", "faixa_etaria", f.year("data_de_entrada_sociedade").alias("ano_de_entrada"))\
    .show(5, False)

+-----------------------------+------------+--------------+
|nome_do_socio_ou_razao_social|faixa_etaria|ano_de_entrada|
+-----------------------------+------------+--------------+
|ARCHIBALDE POUZA JUNIOR      |8           |1994          |
|MARIA LUCIA DE ARAUJO POUZA  |7           |1994          |
|PAULA PAVAN MAMED            |6           |1998          |
|NASSIM MAMED JUNIOR          |6           |1998          |
|CARLA PAVAN MAMED BONINI     |6           |2013          |
+-----------------------------+------------+--------------+
only showing top 5 rows



In [66]:
estabelecimentos.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: date (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: date (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: string (n

In [67]:
estabelecimentos\
    .select("nome_fantasia", "municipio", f.year("data_de_inicio_atividade").alias("ano_de_inicio_atividade"), f.month("data_de_inicio_atividade").alias("mes_de_inicio_atividade"))\
    .show(5)

+--------------------+---------+-----------------------+-----------------------+
|       nome_fantasia|municipio|ano_de_inicio_atividade|mes_de_inicio_atividade|
+--------------------+---------+-----------------------+-----------------------+
|                NULL|     6291|                   1994|                      5|
|                NULL|     6687|                   1994|                      5|
|M E L REPRESENTACOES|     7099|                   1994|                      5|
|                NULL|     7079|                   1994|                      5|
|     SUPERMECADO J J|     6263|                   1994|                      5|
+--------------------+---------+-----------------------+-----------------------+
only showing top 5 rows



### Utilizando as funções do Spark

In [68]:
data = [
    ("GISELLE PAULA GUIMARAES CASTRO", 15),
    ("ELAINE GARCIA DE OLIVEIRA", 22),
    ("JOAO CARLOS ABNER DE LOURDES", 43),
    ("MARTA ZELI FERREIRA", 24),
    ("LAUDENETE WIGGERS ROEDER", 51)
]
col_names = ["nome", "idade"]
df = spark.createDataFrame(data, col_names)
df.show(truncate=False)

+------------------------------+-----+
|nome                          |idade|
+------------------------------+-----+
|GISELLE PAULA GUIMARAES CASTRO|15   |
|ELAINE GARCIA DE OLIVEIRA     |22   |
|JOAO CARLOS ABNER DE LOURDES  |43   |
|MARTA ZELI FERREIRA           |24   |
|LAUDENETE WIGGERS ROEDER      |51   |
+------------------------------+-----+



In [75]:
df\
    .select(
        f.concat_ws(
            ", ",
            f.substring_index(df["nome"], " ", -1),
            f.substring_index(df["nome"], " ", 1)
        ).alias("ident"),
        "idade"
    )\
    .show(5, False)

+-----------------+-----+
|ident            |idade|
+-----------------+-----+
|CASTRO, GISELLE  |15   |
|OLIVEIRA, ELAINE |22   |
|LOURDES, JOAO    |43   |
|FERREIRA, MARTA  |24   |
|ROEDER, LAUDENETE|51   |
+-----------------+-----+



### Identificando valores nulos

In [76]:
df = spark.createDataFrame([(1,), (2,), (3,), (None,)], ["data"])
df.toPandas()

Unnamed: 0,data
0,1.0
1,2.0
2,3.0
3,


In [77]:
df.show()

+----+
|data|
+----+
|   1|
|   2|
|   3|
|NULL|
+----+



In [78]:
df = spark.createDataFrame([(1.,), (2.,), (3.,), (float("nan"),)], ["data"])
df.toPandas()

Unnamed: 0,data
0,1.0
1,2.0
2,3.0
3,


In [79]:
df.show()

+----+
|data|
+----+
| 1.0|
| 2.0|
| 3.0|
| NaN|
+----+



In [80]:
df = spark.createDataFrame([("1",), ("2",), ("3",), (None,)], ["data"])
df.toPandas()

Unnamed: 0,data
0,1.0
1,2.0
2,3.0
3,


In [81]:
df.show()

+----+
|data|
+----+
|   1|
|   2|
|   3|
|NULL|
+----+



In [82]:
socios.limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,403,2,ARCHIBALDE POUZA JUNIOR,***550688**,49,1994-10-19,,***000000**,,0,8
1,403,2,MARIA LUCIA DE ARAUJO POUZA,***358948**,22,1994-10-19,,***550688**,ARCHIBALDE POUZA JUNIOR,5,7
2,52803,2,PAULA PAVAN MAMED,***454918**,22,1998-08-31,,***000000**,,0,6
3,52803,2,NASSIM MAMED JUNIOR,***040608**,49,1998-08-31,,***000000**,,0,6
4,52803,2,CARLA PAVAN MAMED BONINI,***454898**,22,2013-08-12,,***000000**,,0,6


In [85]:
socios.show(5, False)

+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+-----------------------+-----------------------------------+------------+
|cnpj_basico|identificador_de_socio|nome_do_socio_ou_razao_social|cnpj_ou_cpf_do_socio|qualificacao_do_socio|data_de_entrada_sociedade|pais|representante_legal|nome_do_representante  |qualificacao_do_representante_legal|faixa_etaria|
+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+-----------------------+-----------------------------------+------------+
|403        |2                     |ARCHIBALDE POUZA JUNIOR      |***550688**         |49                   |1994-10-19               |NULL|***000000**        |NULL                   |0                                  |8           |
|403        |2                     |MARIA LUCIA DE ARAUJO POUZA 

In [86]:
socios.select([f.count(f.when(f.isnull(col), 1)).alias(col) for col in socios.columns]).show()



+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+-------+-------------------+---------------------+-----------------------------------+------------+
|cnpj_basico|identificador_de_socio|nome_do_socio_ou_razao_social|cnpj_ou_cpf_do_socio|qualificacao_do_socio|data_de_entrada_sociedade|   pais|representante_legal|nome_do_representante|qualificacao_do_representante_legal|faixa_etaria|
+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+-------+-------------------+---------------------+-----------------------------------+------------+
|          0|                     0|                          208|                1234|                    0|                        1|2038255|                  0|              1995432|                                  0|           0|
+-----------+----------------------+------------------------

                                                                                

In [87]:
socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: date (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



In [88]:
socios.na.fill(0).limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,403,2,ARCHIBALDE POUZA JUNIOR,***550688**,49,1994-10-19,0,***000000**,,0,8
1,403,2,MARIA LUCIA DE ARAUJO POUZA,***358948**,22,1994-10-19,0,***550688**,ARCHIBALDE POUZA JUNIOR,5,7
2,52803,2,PAULA PAVAN MAMED,***454918**,22,1998-08-31,0,***000000**,,0,6
3,52803,2,NASSIM MAMED JUNIOR,***040608**,49,1998-08-31,0,***000000**,,0,6
4,52803,2,CARLA PAVAN MAMED BONINI,***454898**,22,2013-08-12,0,***000000**,,0,6


In [89]:
socios.na.fill("-").limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,403,2,ARCHIBALDE POUZA JUNIOR,***550688**,49,1994-10-19,,***000000**,-,0,8
1,403,2,MARIA LUCIA DE ARAUJO POUZA,***358948**,22,1994-10-19,,***550688**,ARCHIBALDE POUZA JUNIOR,5,7
2,52803,2,PAULA PAVAN MAMED,***454918**,22,1998-08-31,,***000000**,-,0,6
3,52803,2,NASSIM MAMED JUNIOR,***040608**,49,1998-08-31,,***000000**,-,0,6
4,52803,2,CARLA PAVAN MAMED BONINI,***454898**,22,2013-08-12,,***000000**,-,0,6


### Ordenando os dados

[DataFrame.orderBy(*cols, **kwargs)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.orderBy.html)

In [92]:
socios\
    .select("nome_do_socio_ou_razao_social", "faixa_etaria", f.year("data_de_entrada_sociedade").alias("ano_de_entrada"))\
    .orderBy("ano_de_entrada", ascending=False)\
    .show(5, False)



+--------------------------------+------------+--------------+
|nome_do_socio_ou_razao_social   |faixa_etaria|ano_de_entrada|
+--------------------------------+------------+--------------+
|MARA LUCIETI MICHEL             |5           |2021          |
|DOUGLAS CAPPELLETTI             |3           |2021          |
|FERNANDO DO NASCIMENTO FERNANDES|6           |2021          |
|KELLI ANE SILVA CUTRIM          |4           |2021          |
|JOSE HUMBERTO PAIVA             |6           |2021          |
+--------------------------------+------------+--------------+
only showing top 5 rows



                                                                                

In [93]:
socios\
    .select("nome_do_socio_ou_razao_social", "faixa_etaria", f.year("data_de_entrada_sociedade").alias("ano_de_entrada"))\
    .orderBy(["ano_de_entrada", "faixa_etaria"], ascending=[False, False])\
    .show(10, False)

+----------------------------------+------------+--------------+
|nome_do_socio_ou_razao_social     |faixa_etaria|ano_de_entrada|
+----------------------------------+------------+--------------+
|AMERICO GONCALVES DA CRUZ         |9           |2021          |
|MARIA LUCIA GOMES LOMBA           |9           |2021          |
|ARLINDO PIMPINATO                 |9           |2021          |
|ANNA BERTAZO PITTON               |9           |2021          |
|APPARECIDA ALBANI DE LIMA         |9           |2021          |
|MARIA DE LOURDES DIAS DE LIMA DIAS|9           |2021          |
|CARLOS RISTORIS                   |9           |2021          |
|TAKIFE CUNACCIA ALMEIDA           |9           |2021          |
|FRANCISCO ALFREDO LOBO JUNGER     |9           |2021          |
|PAULO BORGES RODRIGUES DA CUNHA   |9           |2021          |
+----------------------------------+------------+--------------+
only showing top 10 rows



                                                                                

### Utilizando o método orderBy

In [94]:
data = [
    ("CARMINA RABELO", 4, 2010),
    ("HERONDINA PEREIRA", 6, 2009),
    ("IRANI DOS SANTOS", 12, 2010),
    ("JOAO BOSCO DA FONSECA", 3, 2009),
    ("CARLITO SOUZA", 1, 2010),
    ("WALTER DIAS", 9, 2009),
    ("BRENO VENTUROSO", 1, 2009),
    ("ADELINA TEIXEIRA", 5, 2009),
    ("ELIO SILVA", 7, 2010),
    ("DENIS FONSECA", 6, 2010),
]
col_names = ["nome", "mes", "ano"]
df = spark.createDataFrame(data, col_names)
df.show(truncate=False)

+---------------------+---+----+
|nome                 |mes|ano |
+---------------------+---+----+
|CARMINA RABELO       |4  |2010|
|HERONDINA PEREIRA    |6  |2009|
|IRANI DOS SANTOS     |12 |2010|
|JOAO BOSCO DA FONSECA|3  |2009|
|CARLITO SOUZA        |1  |2010|
|WALTER DIAS          |9  |2009|
|BRENO VENTUROSO      |1  |2009|
|ADELINA TEIXEIRA     |5  |2009|
|ELIO SILVA           |7  |2010|
|DENIS FONSECA        |6  |2010|
+---------------------+---+----+



In [111]:
df\
    .select("*")\
    .orderBy(["ano", "mes"], ascending=[False, False])\
    .show(truncate=False)

+---------------------+---+----+
|nome                 |mes|ano |
+---------------------+---+----+
|IRANI DOS SANTOS     |12 |2010|
|ELIO SILVA           |7  |2010|
|DENIS FONSECA        |6  |2010|
|CARMINA RABELO       |4  |2010|
|CARLITO SOUZA        |1  |2010|
|WALTER DIAS          |9  |2009|
|HERONDINA PEREIRA    |6  |2009|
|ADELINA TEIXEIRA     |5  |2009|
|JOAO BOSCO DA FONSECA|3  |2009|
|BRENO VENTUROSO      |1  |2009|
+---------------------+---+----+



### Filtrando os dados

[DataFrame.where(condition)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.where.html) ou [DataFrame.filter(condition)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.filter.html)

In [112]:
empresas\
    .where("capital_social_da_empresa==50")\
    .show(5, False)

+-----------+----------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial           |natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+----------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|17380228   |NOELSON MOREIRA DOS SANTOS 46305173591  |2135             |50                         |50.0                     |1               |NULL                       |
|19778517   |SUELY DOS SANTOS SILVA 11997882795      |2135             |50                         |50.0                     |1               |NULL                       |
|21459899   |MARIA ELIETE BARBOSA 69281211653        |2135             |50                         |50.0                     |1             

In [113]:
socios\
    .select("nome_do_socio_ou_razao_social")\
    .filter(socios["nome_do_socio_ou_razao_social"].startswith("RODRIGO"))\
    .filter(socios["nome_do_socio_ou_razao_social"].endswith("DIAS"))\
    .limit(10)\
    .toPandas()

Unnamed: 0,nome_do_socio_ou_razao_social
0,RODRIGO RITTER MAIA DIAS
1,RODRIGO DE ALMEIDA DIAS
2,RODRIGO COELHO DIAS
3,RODRIGO CESAR SILVA DIAS
4,RODRIGO BERNARDINI DIAS
5,RODRIGO FIUZA NOGUEIRA DIAS
6,RODRIGO ROBERTO DIAS
7,RODRIGO SANCHEZ RUIZ DIAS
8,RODRIGO KUSTER DIAS
9,RODRIGO CHAVES DIAS


### Definindo filtros

In [114]:
data = [
    ('CARMINA RABELO', 4, 2010),
    ('HERONDINA PEREIRA', 6, 2009),
    ('IRANI DOS SANTOS', 12, 2010),
    ('JOAO BOSCO DA FONSECA', 3, 2009),
    ('CARLITO SOUZA', 1, 2010),
    ('WALTER DIAS', 9, 2009),
    ('BRENO VENTUROSO', 1, 2009),
    ('ADELINA TEIXEIRA', 5, 2009),
    ('ELIO SILVA', 7, 2010),
    ('DENIS FONSECA', 6, 2010)
]
col_names = ['nome', 'mes', 'ano']
df = spark.createDataFrame(data, col_names)
df.show(truncate=False)

+---------------------+---+----+
|nome                 |mes|ano |
+---------------------+---+----+
|CARMINA RABELO       |4  |2010|
|HERONDINA PEREIRA    |6  |2009|
|IRANI DOS SANTOS     |12 |2010|
|JOAO BOSCO DA FONSECA|3  |2009|
|CARLITO SOUZA        |1  |2010|
|WALTER DIAS          |9  |2009|
|BRENO VENTUROSO      |1  |2009|
|ADELINA TEIXEIRA     |5  |2009|
|ELIO SILVA           |7  |2010|
|DENIS FONSECA        |6  |2010|
+---------------------+---+----+



In [121]:
df\
    .where("mes<=6")\
    .where("ano=2009")\
    .show(truncate=False)

+---------------------+---+----+
|nome                 |mes|ano |
+---------------------+---+----+
|HERONDINA PEREIRA    |6  |2009|
|JOAO BOSCO DA FONSECA|3  |2009|
|BRENO VENTUROSO      |1  |2009|
|ADELINA TEIXEIRA     |5  |2009|
+---------------------+---+----+



In [118]:
df\
    .where("mes<=6 and ano==2009")\
    .show(truncate=False)

+---------------------+---+----+
|nome                 |mes|ano |
+---------------------+---+----+
|HERONDINA PEREIRA    |6  |2009|
|JOAO BOSCO DA FONSECA|3  |2009|
|BRENO VENTUROSO      |1  |2009|
|ADELINA TEIXEIRA     |5  |2009|
+---------------------+---+----+



In [122]:
df\
    .filter((df["mes"] <= 6) & (df["ano"] == 2009))\
    .show(truncate=False)

+---------------------+---+----+
|nome                 |mes|ano |
+---------------------+---+----+
|HERONDINA PEREIRA    |6  |2009|
|JOAO BOSCO DA FONSECA|3  |2009|
|BRENO VENTUROSO      |1  |2009|
|ADELINA TEIXEIRA     |5  |2009|
+---------------------+---+----+



### O comando LIKE

[Column.like(other)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.Column.like.html)

In [123]:
df = spark.createDataFrame([('RESTAURANTE DO RUI',), ('Juca restaurantes ltda',), ('Joca Restaurante',)], ['data'])
df.toPandas()

Unnamed: 0,data
0,RESTAURANTE DO RUI
1,Juca restaurantes ltda
2,Joca Restaurante


In [128]:
df\
    .where(f.upper("data").like("%RESTAURANTE"))\
    .show(truncate=False)

+----------------+
|data            |
+----------------+
|Joca Restaurante|
+----------------+



In [129]:
empresas\
    .select("razao_social_nome_empresarial", "natureza_juridica", "porte_da_empresa", "capital_social_da_empresa")\
    .where(f.upper("razao_social_nome_empresarial").like("%RESTAURANTE%"))\
    .show(15, False)

+--------------------------------------------------------+-----------------+----------------+-------------------------+
|razao_social_nome_empresarial                           |natureza_juridica|porte_da_empresa|capital_social_da_empresa|
+--------------------------------------------------------+-----------------+----------------+-------------------------+
|LAS PALMEIRAS RESTAURANTE E PIZZARIA LTDA               |2062             |5               |0.0                      |
|BARD RESTAURANTE COMERCIO DE PRODUTOS ALIMENTICIOS LTDA |2062             |1               |0.0                      |
|AZTECA MEXICAN BAR E RESTAURANTE LTDA                   |2062             |1               |0.0                      |
|BAR E RESTAURANTE KAYOMI LTDA                           |2062             |3               |5000.0                   |
|NAUKATRINETA RESTAURANTE E PETISCARIA LTDA              |2062             |5               |0.0                      |
|RESTAURANTE OPCAO REFEICOES LTDA       

### Ferramenta de busca

In [130]:
data = [
    ('CARMINA RABELO', 4, 2010),
    ('HERONDINA PEREIRA', 6, 2009),
    ('IRANI DOS SANTOS', 12, 2010),
    ('JOAO BOSCO DA FONSECA', 3, 2009),
    ('CARLITO SOUZA', 1, 2010),
    ('WALTER DIAS', 9, 2009),
    ('BRENO VENTUROSO', 1, 2009),
    ('ADELINA TEIXEIRA', 5, 2009),
    ('ELIO SILVA', 7, 2010),
    ('DENIS FONSECA', 6, 2010)
]
col_names = ['nome', 'mes', 'ano']
df = spark.createDataFrame(data, col_names)
df.show(truncate=False)

+---------------------+---+----+
|nome                 |mes|ano |
+---------------------+---+----+
|CARMINA RABELO       |4  |2010|
|HERONDINA PEREIRA    |6  |2009|
|IRANI DOS SANTOS     |12 |2010|
|JOAO BOSCO DA FONSECA|3  |2009|
|CARLITO SOUZA        |1  |2010|
|WALTER DIAS          |9  |2009|
|BRENO VENTUROSO      |1  |2009|
|ADELINA TEIXEIRA     |5  |2009|
|ELIO SILVA           |7  |2010|
|DENIS FONSECA        |6  |2010|
+---------------------+---+----+



In [133]:
df.filter(df["nome"].like("C%")).show(truncate=False)

+--------------+---+----+
|nome          |mes|ano |
+--------------+---+----+
|CARMINA RABELO|4  |2010|
|CARLITO SOUZA |1  |2010|
+--------------+---+----+



## 05. Agregações e junções
---

[DataFrame.groupBy(*cols)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.groupBy.html)

[DataFrame.agg(*exprs)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.agg.html)

[DataFrame.summary(*statistics)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.summary.html)

> Funções:
[approx_count_distinct](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.approx_count_distinct.html) | 
[avg](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.avg.html) | 
[collect_list](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.collect_list.html) | 
[collect_set](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.collect_set.html) | 
[countDistinct](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.countDistinct.html) | 
[count](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.count.html) | 
[grouping](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.grouping.html) | 
[first](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.first.html) | 
[last](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.last.html) | 
[kurtosis](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.kurtosis.html) | 
[max](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.max.html) | 
[min](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.min.html) | 
[mean](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.mean.html) | 
[skewness](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.skewness.html) | 
[stddev ou stddev_samp](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.stddev.html) | 
[stddev_pop](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.stddev_pop.html) | 
[sum](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.sum.html) | 
[sumDistinct](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.sumDistinct.html) | 
[variance ou var_samp](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.variance.html) | 
[var_pop](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.var_pop.html)

### Sumarizando os dados

In [134]:
socios\
    .select(f.year("data_de_entrada_sociedade").alias("ano_de_entrada"))\
    .where("ano_de_entrada >= 2010")\
    .groupBy("ano_de_entrada")\
    .count()\
    .orderBy("ano_de_entrada", ascending=True)\
    .show()



+--------------+------+
|ano_de_entrada| count|
+--------------+------+
|          2010| 79337|
|          2011| 83906|
|          2012| 80101|
|          2013| 83919|
|          2014| 80590|
|          2015| 80906|
|          2016| 81587|
|          2017| 90221|
|          2018| 99935|
|          2019|118248|
|          2020|125927|
|          2021| 56316|
+--------------+------+



                                                                                

In [137]:
empresas\
    .select("cnpj_basico", "porte_da_empresa", "capital_social_da_empresa")\
    .groupBy("porte_da_empresa")\
    .agg(
        f.mean("capital_social_da_empresa").alias("capital_social_medio"),
        f.count("cnpj_basico").alias("frequencia")
    )\
    .orderBy("porte_da_empresa", ascending=True)\
    .show()



+----------------+--------------------+----------+
|porte_da_empresa|capital_social_medio|frequencia|
+----------------+--------------------+----------+
|            NULL|    8.35421888053467|      5985|
|               1|  339994.53313507047|   3129043|
|               3|  2601001.7677092687|    115151|
|               5|   708660.4208249792|   1335500|
+----------------+--------------------+----------+



                                                                                

In [138]:
empresas\
    .select("capital_social_da_empresa")\
    .summary()\
    .show()

    # .summary("count", "mean", "stddev", "min", "25%", "50%", "75%", "max")



+-------+-------------------------+
|summary|capital_social_da_empresa|
+-------+-------------------------+
|  count|                  4585679|
|   mean|       503694.54785426764|
| stddev|     2.1118691490537643E8|
|    min|                      0.0|
|    25%|                      0.0|
|    50%|                   1000.0|
|    75%|                   7000.0|
|    max|         3.22014670262E11|
+-------+-------------------------+



                                                                                

### Para saber mais: A função `when`

Uma ferramenta bastante interessante para criar novas colunas a partir de uma condição é a função `when` que combinada com o método `otherwise` torna o trabalho bastante simples.

Acompanhe o exemplo abaixo para entendermos melhor como seria um procedimento como este.
Suponha que temos um DataFrame com os nomes de alguns alunos, as matérias que eles cursaram e as respectivas notas em cada matéria:

In [139]:
data = [
    ('CARLOS', 'MATEMÁTICA', 7),
    ('IVO', 'MATEMÁTICA', 9),
    ('MÁRCIA', 'MATEMÁTICA', 8),
    ('LEILA', 'MATEMÁTICA', 9),
    ('BRENO', 'MATEMÁTICA', 7),
    ('LETÍCIA', 'MATEMÁTICA', 8),
    ('CARLOS', 'FÍSICA', 2),
    ('IVO', 'FÍSICA', 8),
    ('MÁRCIA', 'FÍSICA', 10),
    ('LEILA', 'FÍSICA', 9),
    ('BRENO', 'FÍSICA', 1),
    ('LETÍCIA', 'FÍSICA', 6),
    ('CARLOS', 'QUÍMICA', 10),
    ('IVO', 'QUÍMICA', 8),
    ('MÁRCIA', 'QUÍMICA', 1),
    ('LEILA', 'QUÍMICA', 10),
    ('BRENO', 'QUÍMICA', 7),
    ('LETÍCIA', 'QUÍMICA', 9)
]
col_names = ['nome', 'materia', 'nota']
df = spark.createDataFrame(data, col_names)
df.show()

+-------+----------+----+
|   nome|   materia|nota|
+-------+----------+----+
| CARLOS|MATEMÁTICA|   7|
|    IVO|MATEMÁTICA|   9|
| MÁRCIA|MATEMÁTICA|   8|
|  LEILA|MATEMÁTICA|   9|
|  BRENO|MATEMÁTICA|   7|
|LETÍCIA|MATEMÁTICA|   8|
| CARLOS|    FÍSICA|   2|
|    IVO|    FÍSICA|   8|
| MÁRCIA|    FÍSICA|  10|
|  LEILA|    FÍSICA|   9|
|  BRENO|    FÍSICA|   1|
|LETÍCIA|    FÍSICA|   6|
| CARLOS|   QUÍMICA|  10|
|    IVO|   QUÍMICA|   8|
| MÁRCIA|   QUÍMICA|   1|
|  LEILA|   QUÍMICA|  10|
|  BRENO|   QUÍMICA|   7|
|LETÍCIA|   QUÍMICA|   9|
+-------+----------+----+



Neste caso seria interessante ter uma rotina que criasse um indicador para os alunos APROVADOS ou REPROVADOS. Com a função `when` podemos criar esta nova coluna de forma bastante simples.

In [141]:
df = df.withColumn("status", f.when(df["nota"] >= 7, "APROVADO").otherwise("REPROVADO"))
df.show()

+-------+----------+----+---------+
|   nome|   materia|nota|   status|
+-------+----------+----+---------+
| CARLOS|MATEMÁTICA|   7| APROVADO|
|    IVO|MATEMÁTICA|   9| APROVADO|
| MÁRCIA|MATEMÁTICA|   8| APROVADO|
|  LEILA|MATEMÁTICA|   9| APROVADO|
|  BRENO|MATEMÁTICA|   7| APROVADO|
|LETÍCIA|MATEMÁTICA|   8| APROVADO|
| CARLOS|    FÍSICA|   2|REPROVADO|
|    IVO|    FÍSICA|   8| APROVADO|
| MÁRCIA|    FÍSICA|  10| APROVADO|
|  LEILA|    FÍSICA|   9| APROVADO|
|  BRENO|    FÍSICA|   1|REPROVADO|
|LETÍCIA|    FÍSICA|   6|REPROVADO|
| CARLOS|   QUÍMICA|  10| APROVADO|
|    IVO|   QUÍMICA|   8| APROVADO|
| MÁRCIA|   QUÍMICA|   1|REPROVADO|
|  LEILA|   QUÍMICA|  10| APROVADO|
|  BRENO|   QUÍMICA|   7| APROVADO|
|LETÍCIA|   QUÍMICA|   9| APROVADO|
+-------+----------+----+---------+



A função `when` é bem simples de ser utilizada, bastando passar no primeiro argumento a condição que queremos testar e no segundo argumento qual valor atribuir a nova coluna caso esta condição seja verdadeira. O método `otherwise` pode ser utilizado para indicar o valor que a nova coluna deve ter caso a condição testada na função `when` não seja verdadeira.

A documentação da função `when` e do método `otherwise` podem ser acessados nos links abaixo:

- [pyspark.sql.functions.when(condition, value)](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.when.html)
- [Column.otherwise(value)](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.Column.otherwise.html)

### Juntando DataFrames - Joins

[DataFrame.join(*args)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.join.html)

In [142]:
produtos = spark.createDataFrame(
    [
        ("1", "Bebidas", "Água mineral"),
        ("2", "Limpeza", "Sabão em pó"),
        ("3", "Frios", "Queijo"),
        ("4", "Bebidas", "Refrigerante"),
        ("5", "Pet", "Ração para cães"),
    ],
    ["id", "cat", "prod"]
)

impostos = spark.createDataFrame(
    [
        ("Bebidas", 0.15),
        ("Limpeza", 0.05),
        ("Frios", 0.065),
        ("Carnes", 0.08),
    ],
    ["cat", "tax"]
)

In [143]:
produtos.toPandas()

Unnamed: 0,id,cat,prod
0,1,Bebidas,Água mineral
1,2,Limpeza,Sabão em pó
2,3,Frios,Queijo
3,4,Bebidas,Refrigerante
4,5,Pet,Ração para cães


In [144]:
impostos.toPandas()

Unnamed: 0,cat,tax
0,Bebidas,0.15
1,Limpeza,0.05
2,Frios,0.065
3,Carnes,0.08


In [145]:
produtos\
    .join(impostos, "cat", how="inner")\
    .sort("id")\
    .show()

+-------+---+------------+-----+
|    cat| id|        prod|  tax|
+-------+---+------------+-----+
|Bebidas|  1|Água mineral| 0.15|
|Limpeza|  2| Sabão em pó| 0.05|
|  Frios|  3|      Queijo|0.065|
|Bebidas|  4|Refrigerante| 0.15|
+-------+---+------------+-----+



In [149]:
produtos\
    .join(impostos, "cat", how="left")\
    .sort("id")\
    .show()

+-------+---+---------------+-----+
|    cat| id|           prod|  tax|
+-------+---+---------------+-----+
|Bebidas|  1|   Água mineral| 0.15|
|Limpeza|  2|    Sabão em pó| 0.05|
|  Frios|  3|         Queijo|0.065|
|Bebidas|  4|   Refrigerante| 0.15|
|    Pet|  5|Ração para cães| NULL|
+-------+---+---------------+-----+



In [150]:
produtos\
    .join(impostos, "cat", how="right")\
    .sort("id")\
    .show()

+-------+----+------------+-----+
|    cat|  id|        prod|  tax|
+-------+----+------------+-----+
| Carnes|NULL|        NULL| 0.08|
|Bebidas|   1|Água mineral| 0.15|
|Limpeza|   2| Sabão em pó| 0.05|
|  Frios|   3|      Queijo|0.065|
|Bebidas|   4|Refrigerante| 0.15|
+-------+----+------------+-----+



In [151]:
produtos\
    .join(impostos, "cat", how="outer")\
    .sort("id")\
    .show()

+-------+----+---------------+-----+
|    cat|  id|           prod|  tax|
+-------+----+---------------+-----+
| Carnes|NULL|           NULL| 0.08|
|Bebidas|   1|   Água mineral| 0.15|
|Limpeza|   2|    Sabão em pó| 0.05|
|  Frios|   3|         Queijo|0.065|
|Bebidas|   4|   Refrigerante| 0.15|
|    Pet|   5|Ração para cães| NULL|
+-------+----+---------------+-----+



In [152]:
empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [153]:
socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: date (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



In [154]:
estabelecimentos.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: date (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: date (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: string (n

In [155]:
empresas_join = estabelecimentos.join(empresas, "cnpj_basico", how="inner")
empresas_join.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: date (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: date (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: string (n

In [156]:
freq = empresas_join\
    .select(
        "cnpj_basico",
        f.year("data_de_inicio_atividade").alias("data_de_inicio")
    )\
    .where("data_de_inicio >= 2010")\
    .groupBy("data_de_inicio")\
    .agg(f.count("cnpj_basico").alias("frequencia"))\
    .orderBy("data_de_inicio", acending=True)

In [157]:
freq.toPandas()

25/08/29 14:03:21 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/08/29 14:03:21 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/08/29 14:03:21 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/08/29 14:03:21 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/08/29 14:03:21 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/08/29 14:03:21 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/08/29 14:03:21 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/08/29 14:03:21 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/08/29 14:03:21 WARN RowBasedKeyValueBatch: Calling spill() on

Unnamed: 0,data_de_inicio,frequencia
0,2010,154159
1,2011,172677
2,2012,232480
3,2013,198424
4,2014,202276
5,2015,212523
6,2016,265417
7,2017,237292
8,2018,275435
9,2019,325922


In [158]:
freq.union(
    freq.select(
        f.lit("Total").alias("data_de_inicio"),
        f.sum(freq["frequencia"]).alias("frequencia")
    )
).show()

25/08/29 14:05:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/08/29 14:05:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/08/29 14:05:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/08/29 14:05:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/08/29 14:05:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/08/29 14:05:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/08/29 14:05:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/08/29 14:05:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/08/29 14:05:08 WARN RowBasedKeyValueBatch: Calling spill() on

+--------------+----------+
|data_de_inicio|frequencia|
+--------------+----------+
|          2010|    154159|
|          2011|    172677|
|          2012|    232480|
|          2013|    198424|
|          2014|    202276|
|          2015|    212523|
|          2016|    265417|
|          2017|    237292|
|          2018|    275435|
|          2019|    325922|
|          2020|    400654|
|          2021|    153275|
|         Total|   2830534|
+--------------+----------+



### SparkSQL

[SparkSession.sql(sqlQuery)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.SparkSession.sql.html)

Para saber mais sobre performance: [Artigo - Spark RDDs vs DataFrames vs SparkSQL](https://community.cloudera.com/t5/Community-Articles/Spark-RDDs-vs-DataFrames-vs-SparkSQL/ta-p/246547)

In [159]:
empresas.createOrReplaceTempView("empresas_view")

In [163]:
spark.sql("SELECT * FROM empresas_view ;").show(5)

+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial|natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|        612|         LAR DOS IDOSOS AS...|             3999|                         16|                      0.0|               5|                       NULL|
|       5951|         DISTRIBUIDORA DE ...|             2062|                         49|                      0.0|               5|                       NULL|
|      10428|         C.R.P. & MASER. C...|             2062|                         49|                      0.0|               1|                       NULL|
|      11086|         H. P. TEC CO

In [164]:
spark\
    .sql("""
    SELECT *
    FROM empresas_view
    WHERE capital_social_da_empresa = 50 ;
    """)\
    .show(5)

+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial|natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|   17380228|         NOELSON MOREIRA D...|             2135|                         50|                     50.0|               1|                       NULL|
|   19778517|         SUELY DOS SANTOS ...|             2135|                         50|                     50.0|               1|                       NULL|
|   21459899|         MARIA ELIETE BARB...|             2135|                         50|                     50.0|               1|                       NULL|
|   23201282|         LETICIA PIRE

In [168]:
spark\
    .sql("""
    SELECT porte_da_empresa, MEAN(capital_social_da_empresa) AS Media, COUNT(cnpj_basico) AS Frequencia
    FROM empresas_view
    GROUP BY porte_da_empresa ;
    """)\
    .show(5)



+----------------+------------------+----------+
|porte_da_empresa|             Media|Frequencia|
+----------------+------------------+----------+
|            NULL|  8.35421888053467|      5985|
|               1|339994.53313507047|   3129043|
|               3|2601001.7677092687|    115151|
|               5| 708660.4208249792|   1335500|
+----------------+------------------+----------+



                                                                                

In [169]:
empresas_join.createOrReplaceTempView("empresas_join_view")

In [170]:
freq = spark\
    .sql("""
    SELECT YEAR(data_de_inicio_atividade) AS data_de_inicio, COUNT(cnpj_basico) AS count
    FROM empresas_join_view
    WHERE YEAR(data_de_inicio_atividade) >= 2010
    GROUP BY data_de_inicio
    ORDER BY data_de_inicio ;
    """)

freq.show()

25/08/29 14:17:03 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/08/29 14:17:03 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/08/29 14:17:03 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/08/29 14:17:03 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/08/29 14:17:03 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/08/29 14:17:03 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/08/29 14:17:03 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/08/29 14:17:03 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/08/29 14:17:03 WARN RowBasedKeyValueBatch: Calling spill() on

+--------------+------+
|data_de_inicio| count|
+--------------+------+
|          2010|154159|
|          2011|172677|
|          2012|232480|
|          2013|198424|
|          2014|202276|
|          2015|212523|
|          2016|265417|
|          2017|237292|
|          2018|275435|
|          2019|325922|
|          2020|400654|
|          2021|153275|
+--------------+------+



                                                                                

In [171]:
freq.createOrReplaceTempView("freq_view")

In [172]:
spark\
    .sql("""
    SELECT *
    FROM freq_view
    UNION ALL
    SELECT 'Total' AS data_de_inicio, SUM(count) AS count
    FROM freq_view ;
    """)\
    .show()

25/08/29 14:20:52 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/08/29 14:20:52 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/08/29 14:20:52 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/08/29 14:20:52 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/08/29 14:20:52 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/08/29 14:20:52 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/08/29 14:20:52 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/08/29 14:20:52 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/08/29 14:20:52 WARN RowBasedKeyValueBatch: Calling spill() on

+--------------+-------+
|data_de_inicio|  count|
+--------------+-------+
|          2010| 154159|
|          2011| 172677|
|          2012| 232480|
|          2013| 198424|
|          2014| 202276|
|          2015| 212523|
|          2016| 265417|
|          2017| 237292|
|          2018| 275435|
|          2019| 325922|
|          2020| 400654|
|          2021| 153275|
|         Total|2830534|
+--------------+-------+



                                                                                

## 06. Formas de armazenamento

### Arquivos CSV

[property DataFrame.write](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.write.html)

[DataFrameWriter.csv(*args)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrameWriter.csv.html)

In [173]:
empresas.write.csv(
    path="./content/drive/curso-spark/empresas/csv",
    mode="overwrite",
    sep=";",
    header=True
)

                                                                                

In [174]:
empresas2 = spark.read.csv(
    path="./content/drive/curso-spark/empresas/csv",
    sep=";",
    inferSchema=True,
    header=True
)

                                                                                

In [177]:
empresas2.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [178]:
socios.write.csv(
    path="./content/drive/curso-spark/socios/csv",
    mode="overwrite",
    sep=";",
    header=True
)

                                                                                

In [179]:
estabelecimentos.write.csv(
    path="./content/drive/curso-spark/estabelecimentos/csv",
    mode="overwrite",
    sep=";",
    header=True
)

                                                                                

### Arquivos PARQUET

[Apache Parquet](https://parquet.apache.org/)

[DataFrameWriter.parquet(*args)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrameWriter.parquet.html)

Arquivos PARQUET salvam mas dão erro ao tentar ler.

In [None]:
# empresas.write.parquet(
#     path="./content/drive/curso-spark/empresas/parquet",
#     mode="overwrite"
# )

In [None]:
# empresas_parquet = spark.read.parquet(
#     path="./content/drive/curso-spark/empresas/parquet"
# )
# empresas_parquet.printSchema()

In [None]:
# socios.write.parquet(
#     path="./content/drive/curso-spark/socios/parquet",
#     mode="overwrite",
# )

In [None]:
# socios_parquet = spark.read.parquet(
#     path="./content/drive/curso-spark/socios/parquet"
# )
# socios_parquet.printSchema()

In [None]:
# estabelecimentos.write.parquet(
#     path="./content/drive/curso-spark/estabelecimentos/parquet",
#     mode="overwrite",
#     compression="snappy"
# )

### Para saber mais: Arquivos ORC

O projeto do ORC foi criado em 2013 como uma iniciativa de acelerar o Hive e reduzir o armazenamento no Hadoop. O foco era habilitar o processamento de alta velocidade e reduzir o tamanho dos arquivos.

Assim como o PARQUET, ORC é um formato de arquivo colunar. Ele é otimizado para grandes leituras de streaming, mas com suporte integrado para localizar as linhas necessárias rapidamente. O armazenamento de dados em formato colunar permite ler, descompactar e processar apenas os valores necessários para a consulta.

Muitos grandes usuários do Hadoop adotaram o ORC. Por exemplo, o Facebook usa ORC para salvar dezenas de petabytes em seu data warehouse e demonstrou que ORC é significativamente mais rápido do que RCFILE ou PARQUET. O Yahoo usa o ORC para armazenar seus dados de produção e divulgou alguns de seus [resultados de benchmark](https://www.slideshare.net/Hadoop_Summit/w-1205p230-aradhakrishnan-v3).

Para obter mais detalhes sobre as especificações do formato ORC, consulte a [documentação de especificação](https://orc.apache.org/specification/). Consulte também a [página do projeto](https://orc.apache.org/) para mais informações.

Para criar arquivos ORC com o Spark o procedimento é semelhante ao realizado para os arquivos PARQUET. Utilize o link para a documentação do método `orc`, [DataFrameWriter.orc(*args)](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.orc.html).

In [199]:
empresas.write.orc(
    path="./content/drive/curso-spark/empresas/orc",
    mode="overwrite"
)

                                                                                

In [200]:
empresas_orc = spark.read.orc(
    path="./content/drive/curso-spark/empresas/orc"
)
empresas_orc.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [205]:
socios.write.orc(
    path="./content/drive/curso-spark/socios/orc"
)

                                                                                

In [206]:
socios_orc = spark.read.orc(
    path="./content/drive/curso-spark/socios/orc"
)
socios_orc.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: date (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



In [203]:
estabelecimentos.write.orc(
    path="./content/drive/curso-spark/estabelecimentos/orc"
)

                                                                                

In [204]:
estabelecimentos_orc = spark.read.orc(
    path="./content/drive/curso-spark/estabelecimentos/orc"
)
estabelecimentos_orc.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: date (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: date (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: string (n

### Particionamento dos dados

[DataFrameWriter.partitionBy(*cols)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrameWriter.partitionBy.html)

In [207]:
empresas.coalesce(1).write.csv(
    path="./content/drive/curso-spark/empresas/csv-unico",
    mode="overwrite",
    sep=";",
    header=True
)

                                                                                

In [208]:
empresas.write.orc(
    path="./content/drive/curso-spark/empresas/orc-partition-by",
    mode="overwrite",
    partitionBy="porte_da_empresa"
)

                                                                                

In [209]:
spark.stop()