In [24]:
!apt-get update -qq
# !wget -q https://archive.apache.org/dist/spark/spark-3.3.0/spark-3.3.0-bin-hadoop3.tgz
# !tar xf spark-3.3.0-bin-hadoop3.tgz
# !pip install -q findspark

# import os
# os.environ["SPARK_HOME"] = "/content/spark-3.3.0-bin-hadoop3"

# import findspark
# findspark.init()

!pip install -q pyspark # isso funciona bem, não precisa de nada acima

from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[*]').appName("Iniciando com Spark").getOrCreate()

from pyspark.sql.types import DoubleType, StringType, DecimalType
from pyspark.sql import functions as f

In [3]:
from google.colab import drive
drive.mount('/content/drive') # experimentar adicionar o path abaixo direto

path_files = '/content/drive/MyDrive/apache_spark'

Mounted at /content/drive


# 0 Cria _DataFrames_ a partir (ou para?) arquivos CSV particionados

Essas etapas foram feitas ao longo sa seção 2, e estão resumidas aqui para um _quick start_.

In [4]:
df_empresas         = spark.read.csv(path_files+'/empresas', sep=';', inferSchema=True)
df_estabelecimentos = spark.read.csv(path_files+'/estabelecimentos/part-0000?-701c2cc9-d9db-469f-be12-341c24a77308-c000.csv', sep=';', inferSchema=True)
df_socios           = spark.read.csv(path_files+'/socios/part-0000*', sep=';', inferSchema=True)

In [5]:
col_names_empresas = ['cnpj_basico', 'razao_social', 'natureza_juridica', 'qualificacao_resp', 'capital_social', 'porte', 'ente_federativo_resp']
for i, column_name in enumerate(col_names_empresas):
    df_empresas = df_empresas.withColumnRenamed(f"_c{i}", column_name)

col_names_estabelecimentos = ['cnpj_basico', 'cnpj_ordem', 'cnpj_dv', 'id_matriz_filial', 'nome_fantasia', 'situacao_cadastral', 'data_situacao_cadastral', 
                              'motivo_situacao_cadastral', 'nome_cidade_no_exterior', 'pais', 'data_inicio_atividade', 'cnae_fiscal_principal', 
                              'cnae_fiscal_secundaria', 'tipo_logradouro', 'logradouro', 'numero', 'complemento', 'bairro', 'cep', 'uf', 'municipio', 'ddd_1', 
                              'telefone_1', 'ddd_2', 'telefone_2', 'ddd_fax', 'fax', 'correio_eletronico', 'situacao_especial', 'data_situacao_especial']
for i, column_name in enumerate(col_names_estabelecimentos):
    df_estabelecimentos = df_estabelecimentos.withColumnRenamed(f"_c{i}", column_name)

col_names_socios = ['cnpj_basico', 'id_socio', 'nome_socio_ou_razao_social', 'cnpj_ou_cpf_socio', 'qualificacao_socio', 'data_entrada_sociedade', 
                    'pais', 'representante_legal', 'nome_representante', 'qualificacao_representante_legal', 'faixa_etaria']
for i, column_name in enumerate(col_names_socios):
    df_socios = df_socios.withColumnRenamed(f"_c{i}", column_name)

In [6]:
df_empresas = df_empresas.withColumn('capital_social', f.regexp_replace('capital_social', ',', '.'))
df_empresas = df_empresas.withColumn('capital_social', df_empresas['capital_social'].cast(DoubleType()))

df_estabelecimentos = df_estabelecimentos\
    .withColumn('data_situacao_cadastral', f.to_date(df_estabelecimentos.data_situacao_cadastral.cast(StringType()), 'yyyymmdd'))\
    .withColumn('data_inicio_atividade', f.to_date(df_estabelecimentos.data_inicio_atividade.cast(StringType()), 'yyyymmdd'))\
    .withColumn('data_situacao_especial', f.to_date(df_estabelecimentos.data_situacao_especial.cast(StringType()), 'yyyymmdd'))

df_socios = df_socios.withColumn('data_entrada_sociedade', f.to_date(df_socios.data_entrada_sociedade.cast(StringType()), 'yyyymmdd'))

# 1 Iniciando com Spark

## 1.1 Configurando ambientes

### 1.1.1 Utilizando o Spark no Windows

[fonte](https://spark.apache.org/docs/latest/api/python/getting_started/install.html)

#### Passo 1 - Instalando o Java

O PySpark requer a instalação do Java na versão 7 ou superior. Obtenha a versão mais recente clicando [aqui](https://www.java.com/pt-BR/download/). Para verificar a versão que está instalada em sua máquina execute a seguinte linha de código no seu *prompt*:

```
java -version
```

#### Passo 2 - Instalando o Python

O Python deve ser instalado em sua versão 2.6 ou superior. Para obter a versão mais recente clique [aqui](https://www.python.org/downloads/windows/). Para verificar a versão do Python que está instalada em sua máquina digite o seguinte comando em seu *prompt*:

```
python --version
```

#### Passo 3 - Instalando o Apache Spark 

Selecione a versão mais estável clicando [aqui](http://spark.apache.org/downloads.html). Na criação deste projeto utilizamos a versão do Spark **3.1.2** e como tipo de pacote selecionamos **Pre-built for Apache Hadoop 2.7**.

Para instalar o Apache Spark não é necessário executar um instalador, basta descomprimir os arquivos em uma pasta de sua escolha.

<font color=red>Obs.: certifique-se de que o caminho onde os arquivos do Spark foram armazenados não contenham espaços (ex.: **"C:\spark\spark-3.1.2-bin-hadoop2.7"**).</font>

Para testar o funcionamento do Spark execute os comandos abaixo em seu *prompt* de comando. Esses comandos assumem que você extraiu os arquivos do Spark na pasta **"C:\spark\"**.

```
cd C:\spark\spark-3.1.2-bin-hadoop2.7
```

```
bin\pyspark
```

O comando acima inicia o *shell* do PySpark que permite trabalhar interativamente com o Spark.

Para sair basta digitar `exit()` e logo depois presionar *Enter*. Para voltar ao *prompt* pressione *Enter* novamente.

#### Passo 4 - Instalando o findspark

```
pip install findspark
```

#### Passo 5 - Instalando o winutils

Os arquivos do Spark não incluem o utilitário **winutils.exe** que é utilizado pelo Spark no Windows. Se não informar onde o Spark deve procurar este utilitário, veremos alguns erros no console e também não conseguiremos executar *scripts* Python utilizando o utilitário `spark-submit`.

Faça o [download](https://github.com/steveloughran/winutils) para a versão do Hadoop para a qual sua instalação do Spark foi construída. Em nosso exemplo foi utilizada a [versão 2.7](https://github.com/steveloughran/winutils/tree/master/hadoop-2.7.1/bin). Faça o *download* apenas do arquivo **winutils.exe**.

Crie a pasta **"hadoop\bin"** dentro da pasta que contém os arquivos do Spark (em nosso exemplo **"C:\spark\spark-3.1.2-bin-hadoop2.7"**) e copie o arquivo **winutils.exe** para dentro desta pasta.

Crie duas variáveis de ambiente no seu Windows. A primeira chamada **SPARK_HOME** que aponta para a pasta onde os arquivos Spark foram armazenados (em nosso exemplo **"C:\spark\spark-3.1.2-bin-hadoop2.7"**). A segunda chamada **HADOOP_HOME** que aponta para **%SPARK_HOME%\hadoop** (assim podemos modificar **SPARK_HOME** sem precisar alterar **HADOOP_HOME**).

### 1.1.2 Utilizando o Spark no Google Colab

Para configurar o PySpark no Google Colab basta executar os comandos abaixo na própria célula do seu _notebook_. Esses comandos vão instalar o Java Development Kit na versão 8, o Apache Spark 3.3.0 com Hadoop 3 e o pacote `findspark` (explicar pra que serve).

In [None]:
# instalar as dependências, o flag -q eh de 'quiet' e diminui a quantidade de info jogada na saída padrão
!apt-get update -qq
!apt-get install openjdk-8-jdk-headless -qq > /dev/null # instala o java (testar sem isso depois)
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz # faz download do spark com hadoop
# !wget -q https://archive.apache.org/dist/spark/spark-3.3.0/spark-3.3.0-bin-hadoop3.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz # descomprime o arquivo baixado acima
!pip install -q findspark # 

In [None]:
# definir variáveis de ambiente
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [None]:
# inicializar o findspark
import findspark
findspark.init()

## 1.2 SparkSession

O Spark possui um ponto de entrada que é a [**SparkSession**](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.SparkSession.html)  e ela deve ser criada antes de qualquer coisa, isso vai permitir programar o Spark com a API DataSet e DataFrame.

Uma SparkSession pode ser utilizada para criar DataFrames, registrar DataFrames como tabelas, executar consultas SQL em tabelas, armazenar em cache e ler arquivos parquet. Para criar uma SparkSession, use o seguinte padrão de construtor:

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master('local[*]').appName("Iniciando com Spark").getOrCreate()

In [None]:
spark

## 1.3 Interfaces Spark

Existem três interfaces principais do Apache Spark que você deve conhecer: Resilient Distributed Dataset, DataFrame e Dataset.

- **Resilient Distributed Dataset**: A primeira abstração do Apache Spark foi o Resilient Distributed Dataset (RDD). É uma interface para uma sequência de objetos de dados que consiste em um ou mais tipos localizados em uma coleção de máquinas (um cluster). Os RDDs podem ser criados de várias maneiras e são a API de “nível mais baixo” disponível. Embora esta seja a estrutura de dados original do Apache Spark, você deve se concentrar na API DataFrame, que é um superconjunto da funcionalidade RDD. A API RDD está disponível nas linguagens Java, Python e Scala.

- **DataFrame**: Trata-se de um conceito similar ao DataFrame que você pode estar familiarizado como o pacote pandas do Python e a linguagem R . A API DataFrame está disponível nas linguagens Java, Python, R e Scala.

- **Dataset**: uma combinação de DataFrame e RDD. Ele fornece a interface digitada que está disponível em RDDs enquanto fornece a conveniência do DataFrame. A API Dataset está disponível nas linguagens Java e Scala.

Em muitos cenários, especialmente com as otimizações de desempenho incorporadas em DataFrames e Datasets, não será necessário trabalhar com RDDs. Mas é importante entender a abstração RDD porque:

- O RDD é a infraestrutura subjacente que permite que o Spark seja executado com tanta rapidez e forneça a linhagem de dados.

- Se você estiver mergulhando em componentes mais avançados do Spark, pode ser necessário usar RDDs.

- As visualizações na Spark UI fazem referência a RDDs.

In [None]:
data         = [('Wall-E','705'), ('Eva', '1')] # note que eh uma lista com tuplas
column_names = ['Nome', 'Idade']

df_spark = spark.createDataFrame(data, column_names)
df_spark

DataFrame[Nome: string, Idade: string]

In [None]:
type(df_spark)

pyspark.sql.dataframe.DataFrame

In [None]:
df_spark.show()

+------+-----+
|  Nome|Idade|
+------+-----+
|Wall-E|  705|
|   Eva|    1|
+------+-----+



In [None]:
df_pandas = df_spark.toPandas()
type(df_pandas)

pandas.core.frame.DataFrame

In [None]:
df_pandas

Unnamed: 0,Nome,Idade
0,Wall-E,705
1,Eva,1


In [None]:
# outra forma de criar um df
spark.createDataFrame([{'Nome': 'Wall-E', 'Idade': '705'}, {'Nome': 'Eva', 'Idade': '1'}]).show() # note que os dicionários então dentro de um iterável (lista)

+-----+------+
|Idade|  Nome|
+-----+------+
|  705|Wall-E|
|    1|   Eva|
+-----+------+



# 2 Carregando e manipulando dados

## 2.1 Carregando dados particionados

### 2.1.1 Obtendo dados

Nosso projeto consiste em ler, manipular, tratar e salvar um conjunto de dados volumosos utilizando como ferramenta o Spark. Vamos utilizar dados de CNPJ que podem ser encontrados no site da [Receita Federal](https://www.gov.br/receitafederal/pt-br/assuntos/orientacao-tributaria/cadastros/consultas/dados-publicos-cnpj).

Acessando o site da RV vemos que os dados são disponibilizados de forma particionada, ou seja, estão separados em vários arquivos. Isso ocorre por se tratar de uma quantidade muito grande de dados, e dessa forma a tarefa de recuperar os dados pode ser feita etapas. O instrutor da Alura montou arquivos `zip` com amostras dos dados originais para facilitar o acompanhamento do curso, eles estão disponíveis nos seguintes links:

- [Empresas](https://caelum-online-public.s3.amazonaws.com/2273-introducao-spark/01/empresas.zip)
- [Estabelecimentos](https://caelum-online-public.s3.amazonaws.com/2273-introducao-spark/01/estabelecimentos.zip)
- [Sócios](https://caelum-online-public.s3.amazonaws.com/2273-introducao-spark/01/socios.zip)

### 2.1.2 Montando o _drive_

Como os arquivos carregados para um Notebook Colab são excluídos sempre que o ambiente é encerrado, faz mais sentido mantermos os _datasets_ em nosso GoogleDrive e criar um "_link_" na nossa árvore de arquivos do Notebook. Para isso basta rodar os comandos abaixo:

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


### 2.1.3 Carregando os dados dapa _DataFrames_

Com os arquivos de dados devidamente carregados e disponíves podemos verificar que se tratam de arquivos do tipo `zip`, portanto devemos extrair seus contúdos e para isso vamos usar a biblioteca `zipfile`:

In [None]:
import zipfile as zf

path_files = '/content/drive/MyDrive/apache_spark'
subjects = ['empresas', 'estabelecimentos', 'socios']

for i in subjects:
    zf.ZipFile(f"{path_files}/{i}.zip", 'r').extractall(path_files)

Após extrair os arquivos vemos que são do tipo `CSV` e então podemos usar o método [`csv`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrameReader.csv.html) do atributo `read` da SparkSession para criar uma referência a esses arquivos. Isso significa que diferente do Pandas não podemos apagar os arquivos após sua leitura, pois o _DataFrame_ Spark é apenas um "ponteiro"(?) para esses arquivos.

In [None]:
df_empresas = spark.read.csv(path_files+'/empresas', sep=';', inferSchema=True)

In [None]:
df_empresas.count() # quantidade de registros nos arquivos particionados de empresas

4585679

In [None]:
df_estabelecimentos = spark.read.csv(path_files+'/estabelecimentos/part-0000?-701c2cc9-d9db-469f-be12-341c24a77308-c000.csv', sep=';', inferSchema=True)
df_estabelecimentos.count()

4836219

In [None]:
df_socios = spark.read.csv(path_files+'/socios/part-0000*', sep=';', inferSchema=True)
df_socios.count()

2046430

## 2.2 Manipulando os Dados

### 2.2.1 Renomeando colunas

A primeira coisa que (geralmente) fazemos ao carregar dados para um _DataFrame_ Pandas é verificar o valor do atributo `shape` para saber a dimensão do df, e de certa forma foi o que fizemos ao chamar o método `count()` acima. A segunda coisa é chamar é método `head()` para termos um _preview_ do df, no Spark podemos fazer isso com o `limit()` antes de `show()`:

In [None]:
df_empresas.limit(5)

DataFrame[_c0: int, _c1: string, _c2: int, _c3: int, _c4: string, _c5: int, _c6: string]

In [None]:
df_empresas.limit(5).show()

+----+--------------------+----+---+-------+---+----+
| _c0|                 _c1| _c2|_c3|    _c4|_c5| _c6|
+----+--------------------+----+---+-------+---+----+
| 306|FRANCAMAR REFRIGE...|2240| 49|   0,00|  1|null|
|1355|BRASILEIRO & OLIV...|2062| 49|   0,00|  5|null|
|4820|REGISTRO DE IMOVE...|3034| 32|   0,00|  5|null|
|5347|ROSELY APARECIDA ...|2135| 50|   0,00|  5|null|
|6846|BADU E FILHOS TEC...|2062| 49|4000,00|  1|null|
+----+--------------------+----+---+-------+---+----+



Mas isso não retorna uma visualização tão bonitinha como a do Pandas, então:

In [None]:
df_empresas.limit(5).toPandas()

Unnamed: 0,_c0,_c1,_c2,_c3,_c4,_c5,_c6
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,400000,1,


In [None]:
df_empresas.limit(5).toPandas()._c3.unique().sum()

131

Note que os nomes das colunas também não estão "bonitinhos", pois foram definidos de forma automática. E se formos verificar os arquivos particionados veremos que eles realmente não possuem um _header_, então vamos renomear as colunas com nomes que sejam significativos.

Para renomear a coluna de um df Spark preciso passar o rótulo original de cada coluna, ao contrário do Pandas que basta passar um contentor com os novos rótulos na devida ordem. Note que o nome das colunas seguem um padrão: `_c0`, `_c1`, ..., `_cn`, então basta pensar em uma forma de iterar sobre os rótulos mantendo índices numéricos dos atuais rótulos.

Vamos definir uma lista com os novos nomes para as colunas na ordem em que elas aparecem e iterar sobre a função `enumerate()` passando essa lista:

In [None]:
col_names_empresas = ['cnpj_basico', 'razao_social', 'natureza_juridica', 'qualificacao_resp', 'capital_social', 'porte', 'ente_federativo_resp']

for i in enumerate(col_names_empresas):
    print(i)

(0, 'cnpj_basico')
(1, 'razao_social')
(2, 'natureza_juridica')
(3, 'qualificacao_resp')
(4, 'capital_social')
(5, 'porte')
(6, 'ente_federativo_resp')


In [None]:
for i, column_name in enumerate(col_names_empresas):
    df_empresas = df_empresas.withColumnRenamed(f"_c{i}", column_name)

df_empresas.columns

['cnpj_basico',
 'razao_social',
 'natureza_juridica',
 'qualificacao_resp',
 'capital_social',
 'porte',
 'ente_federativo_resp']

In [None]:
df_empresas.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social,natureza_juridica,qualificacao_resp,capital_social,porte,ente_federativo_resp
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,400000,1,


In [None]:
col_names_estabelecimentos = ['cnpj_basico', 'cnpj_ordem', 'cnpj_dv', 'id_matriz_filial', 'nome_fantasia', 'situacao_cadastral', 'data_situacao_cadastral', 
                              'motivo_situacao_cadastral', 'nome_cidade_no_exterior', 'pais', 'data_inicio_atividade', 'cnae_fiscal_principal', 
                              'cnae_fiscal_secundaria', 'tipo_logradouro', 'logradouro', 'numero', 'complemento', 'bairro', 'cep', 'uf', 'municipio', 'ddd_1', 
                              'telefone_1', 'ddd_2', 'telefone_2', 'ddd_fax', 'fax', 'correio_eletronico', 'situacao_especial', 'data_situacao_especial']

for i, column_name in enumerate(col_names_estabelecimentos):
    df_estabelecimentos = df_estabelecimentos.withColumnRenamed(f"_c{i}", column_name)

df_estabelecimentos.limit(5).toPandas()

Unnamed: 0,cnpj_basico,cnpj_ordem,cnpj_dv,id_matriz_filial,nome_fantasia,situacao_cadastral,data_situacao_cadastral,motivo_situacao_cadastral,nome_cidade_no_exterior,pais,...,municipio,ddd_1,telefone_1,ddd_2,telefone_2,ddd_fax,fax,correio_eletronico,situacao_especial,data_situacao_especial
0,1879,1,96,1,PIRAMIDE M. C.,8,20011029,1,,,...,7107,,,,,,,,,
1,2818,1,43,1,,8,20081231,71,,,...,7107,,,,,,,,,
2,3110,1,7,1,,8,19971231,1,,,...,7107,,,,,,,,,
3,3733,1,80,1,,8,20081231,71,,,...,7107,,,,,,,,,
4,4628,3,27,2,EMBROIDERY & GIFT,8,19980429,1,,,...,7075,,,,,,,,,


In [None]:
col_names_socios = ['cnpj_basico', 'id_socio', 'nome_socio_ou_razao_social', 'cnpj_ou_cpf_socio', 'qualificacao_socio', 'data_de_entrada_sociedade', 
                    'pais', 'representante_legal', 'nome_representante', 'qualificacao_representante_legal', 'faixa_etaria']

for i, column_name in enumerate(col_names_socios):
    df_socios = df_socios.withColumnRenamed(f"_c{i}", column_name)

df_socios.limit(5).toPandas()

Unnamed: 0,cnpj_basico,id_socio,nome_socio_ou_razao_social,cnpj_ou_cpf_socio,qualificacao_socio,data_de_entrada_sociedade,pais,representante_legal,nome_representante,qualificacao_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,19940725,,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,19940725,,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,19940516,,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,19940516,,***000000**,,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,19940609,,***000000**,,0,8


### 2.2.2 Convertendo tipos de dados

Agora podemos começar analisar os dados e o próximo passo é verifiar os [tipos de dados](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html#data-types) em cada coluna, Para isso usamos o método `printSchema()`:

In [None]:
df_empresas.limit(5).toPandas().tail(1)

Unnamed: 0,cnpj_basico,razao_social,natureza_juridica,qualificacao_resp,capital_social,porte,ente_federativo_resp
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,400000,1,


In [None]:
df_empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_resp: integer (nullable = true)
 |-- capital_social: string (nullable = true)
 |-- porte: integer (nullable = true)
 |-- ente_federativo_resp: string (nullable = true)



In [None]:
df_socios.limit(2).toPandas()[[i for i in col_names_socios if i not in ('cnpj_ou_cpf_socio', 'representante_legal')]]

Unnamed: 0,cnpj_basico,id_socio,nome_socio_ou_razao_social,qualificacao_socio,data_de_entrada_sociedade,pais,nome_representante,qualificacao_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,22,19940725,,,0,7
1,411,2,CRISTINA HUNDERTMARK,28,19940725,,,0,7


In [None]:
df_socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- id_socio: integer (nullable = true)
 |-- nome_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_socio: string (nullable = true)
 |-- qualificacao_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: integer (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_representante: string (nullable = true)
 |-- qualificacao_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



In [None]:
df_estabelecimentos.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- id_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: integer (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_inicio_atividade: integer (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: string (nullable = true

Com isso já podemos ver algumas transformações que serão necessárias, como por exemplo:

- `capital_social` das empresas: representa uma quantidade de dinheiro, mas está como _string_
- `data_de_entrada_sociedade` dos sócios: é uma data, mas está como um número inteiro. E esse problema se repete em alguns campos de estabelecimentos

#### 2.2.2.1 Convertendo String para Double

Para começar precisamos importar alguns objetos do Spark: as classes de tipos `Double` e `String` e o objeto [`functions`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html#functions).

Antes de converter o campo do tipo string para numérico precisamos resolver o problema do seu separador decimal, trocando a vírgula por ponto, e faremos isso com a função `regexp_replace()`, disponível no objeto `functions`. E para aplicar essa função em uma coluna específica usamos o método [`withColumn()`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.withColumn.html), que permite criar ou substituir uma coluna no df. Para converter o tipo de dado da coluna usaremos o método `cast()` passando o tipo como parâmetro.

Sempre que fizermos uma tranformação no df, teremos como retorno o df com a(s) nova coluna ou a coluna modificada (não tem o `inplace=True`???).

In [None]:
from pyspark.sql.types import DoubleType
from pyspark.sql import functions as f

In [None]:
# cria nova coluna para o valor com substituição
df_empresas.withColumn('capital_social_2', f.regexp_replace('capital_social', ',', '.')).limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social,natureza_juridica,qualificacao_resp,capital_social,porte,ente_federativo_resp,capital_social_2
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0,1,,0.0
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0,5,,0.0
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0,5,,0.0
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0,5,,0.0
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,400000,1,,4000.0


In [None]:
# faz a substiuição na coluna original e re-atribui o df
df_empresas = df_empresas.withColumn('capital_social', f.regexp_replace('capital_social', ',', '.'))

In [None]:
# converte o tipo de string para double usando o método cast()
df_empresas = df_empresas.withColumn('capital_social', df_empresas['capital_social'].cast(DoubleType()))

In [None]:
df_empresas.limit(5).toPandas().tail(1)

Unnamed: 0,cnpj_basico,razao_social,natureza_juridica,qualificacao_resp,capital_social,porte,ente_federativo_resp
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,4000.0,1,


In [None]:
df_empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_resp: integer (nullable = true)
 |-- capital_social: double (nullable = true)
 |-- porte: integer (nullable = true)
 |-- ente_federativo_resp: string (nullable = true)



#### 2.2.2.2 Convertendo String para Date

Para conveter valores para o tipo `Date` podemos usar a função `to_date()` da classe `Funcions`. Como essa função recebe _strings_ como parâmetro, devemos converter o valor inteiro para _string_, seguindo o mesmo procedimento feito a pouco. Além da _string_ que será convetida, devemos passar o [padrão](https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html) em que a data está escrita.

Vamos criar um df _dummy_ para visualiar o que deve acontecer:

In [None]:
from pyspark.sql.types import StringType

In [None]:
df = spark.createDataFrame([(20200924,), (20201022,), (20210215,)], ['data_string'])
print(df.printSchema())
df.toPandas()

root
 |-- data_string: long (nullable = true)

None


Unnamed: 0,data_string
0,20200924
1,20201022
2,20210215


In [None]:
df = df.withColumn('data_date', f.to_date(df.data_string.cast(StringType()), 'yyyymmdd'))
print(df.printSchema())
df.toPandas()

root
 |-- data_string: long (nullable = true)
 |-- data_date: date (nullable = true)

None


Unnamed: 0,data_string,data_date
0,20200924,2020-01-24
1,20201022,2020-01-22
2,20210215,2021-01-15


In [None]:
df.printSchema()

root
 |-- data_string: long (nullable = true)
 |-- data_date: date (nullable = true)



Agora sim vamos fazer essas conversões em nossos _DataFrames_ com os dados públicos, mas vamos realizar essas operções de forma encadeada. No df de **estabelecimentos** temos três campos que devem ser convertidos: 'data_situacao_cadastral', 'data_inicio_atividade' e 'data_situacao_especial'.

In [None]:
df_estabelecimentos[['data_situacao_cadastral', 'data_inicio_atividade', 'data_situacao_especial']].printSchema()

root
 |-- data_situacao_cadastral: integer (nullable = true)
 |-- data_inicio_atividade: integer (nullable = true)
 |-- data_situacao_especial: integer (nullable = true)



In [None]:
df_estabelecimentos = df_estabelecimentos.withColumn(
        'data_situacao_cadastral', 
        f.to_date(df_estabelecimentos.data_situacao_cadastral.cast(StringType()), 'yyyymmdd')
    ).withColumn(
        'data_inicio_atividade', 
        f.to_date(df_estabelecimentos.data_inicio_atividade.cast(StringType()), 'yyyymmdd')
    ).withColumn(
        'data_situacao_especial', 
        f.to_date(df_estabelecimentos.data_situacao_especial.cast(StringType()), 'yyyymmdd')
    )

df_estabelecimentos[['data_situacao_cadastral', 'data_inicio_atividade', 'data_situacao_especial']].printSchema()

root
 |-- data_situacao_cadastral: date (nullable = true)
 |-- data_inicio_atividade: date (nullable = true)
 |-- data_situacao_especial: date (nullable = true)



In [None]:
df_estabelecimentos[['data_situacao_cadastral', 'data_inicio_atividade', 'data_situacao_especial']].limit(5).toPandas()

Unnamed: 0,data_situacao_cadastral,data_inicio_atividade,data_situacao_especial
0,2001-01-29,1994-01-09,
1,2008-01-31,1994-01-12,
2,1997-01-31,1994-01-12,
3,2008-01-31,1994-01-13,
4,1998-01-29,1995-01-09,


E agora para o df de sócios:

In [None]:
df_socios[['data_de_entrada_sociedade']].printSchema()

root
 |-- data_de_entrada_sociedade: integer (nullable = true)



In [None]:
df_socios = df_socios.withColumn('data_de_entrada_sociedade', f.to_date(df_socios.data_de_entrada_sociedade.cast(StringType()), 'yyyymmdd'))

df_socios[['data_de_entrada_sociedade']].printSchema()

root
 |-- data_de_entrada_sociedade: date (nullable = true)



In [None]:
df_socios[['data_de_entrada_sociedade']].limit(5).toPandas()

Unnamed: 0,data_de_entrada_sociedade
0,1994-01-25
1,1994-01-25
2,1994-01-16
3,1994-01-16
4,1994-01-09


Existe outra forma de escrever chamadas encadeadas no python que é usando o "recurso" (gambiarra) do caractér `\`:

```python
df_estabelecimentos = df_estabelecimentos \
    .withColumn('data_situacao_cadastral', f.to_date(df_estabelecimentos.data_situacao_cadastral.cast(StringType()), 'yyyymmdd')) \
    .withColumn('data_inicio_atividade', f.to_date(df_estabelecimentos.data_inicio_atividade.cast(StringType()), 'yyyymmdd')) \
    .withColumn('data_situacao_especial', f.to_date(df_estabelecimentos.data_situacao_especial.cast(StringType()), 'yyyymmdd'))
```

Aparentemente essa é a forma mais "difundida" pra isso (não com minha aprovação).

**Errata**: quando escrevi isso pela primeira vez não tinha gostado dessa notação com a barra invertida pra quebrar linha, mas acontece que isso ajuda a visualização do código. Então agora eu sou "_team_ barra invertida pra pular linha no Spark".

**Obs.**: não pode ter nada a direita da barra! Pode ter quantos espaços quiser a esquerda, mas se tiver um espaço em branco ou comentário na direita, vai dar erro: `SyntaxError: unexpected character after line continuation character`.

# 3 Seleções e consultas

## 3.1 Selecionando informações

Ao fazer seleções em um df estamos criando um novo _DataFrame_ a partir de do original, tranzendo apenas as informações em que estamos interessados, também podemos criar novas colunas a partir das que já existem e como estamos trabalhando com dados estruturados, o Spark nos fornece método que são basicamente comandos SQL.

O primeiro que vamos conhecer é o [select()](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.select.html):

In [None]:
df_empresas.select('*').show(5)

+-----------+--------------------+-----------------+-----------------+--------------+-----+--------------------+
|cnpj_basico|        razao_social|natureza_juridica|qualificacao_resp|capital_social|porte|ente_federativo_resp|
+-----------+--------------------+-----------------+-----------------+--------------+-----+--------------------+
|        306|FRANCAMAR REFRIGE...|             2240|               49|           0.0|    1|                null|
|       1355|BRASILEIRO & OLIV...|             2062|               49|           0.0|    5|                null|
|       4820|REGISTRO DE IMOVE...|             3034|               32|           0.0|    5|                null|
|       5347|ROSELY APARECIDA ...|             2135|               50|           0.0|    5|                null|
|       6846|BADU E FILHOS TEC...|             2062|               49|        4000.0|    1|                null|
+-----------+--------------------+-----------------+-----------------+--------------+-----+-----

In [None]:
df_empresas.select('razao_social', 'natureza_juridica', 'porte', 'capital_social').show(5, truncate=False)

+--------------------------------------------------------------------------------------------+-----------------+-----+--------------+
|razao_social                                                                                |natureza_juridica|porte|capital_social|
+--------------------------------------------------------------------------------------------+-----------------+-----+--------------+
|FRANCAMAR REFRIGERACAO TECNICA S/C LTDA                                                     |2240             |1    |0.0           |
|BRASILEIRO & OLIVEIRA LTDA                                                                  |2062             |5    |0.0           |
|REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E TABELIONATO E REGISTRO DE CONSTRATOS MARITIMOS|3034             |5    |0.0           |
|ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS                                                |2135             |5    |0.0           |
|BADU E FILHOS TECIDOS LTDA                                   

In [None]:
df_socios.select('nome_socio_ou_razao_social', 'faixa_etaria', f.year('data_entrada_sociedade').alias('ano_entrada')).show(5, truncate=False)

+-------------------------------+------------+-----------+
|nome_socio_ou_razao_social     |faixa_etaria|ano_entrada|
+-------------------------------+------------+-----------+
|LILIANA PATRICIA GUASTAVINO    |7           |1994       |
|CRISTINA HUNDERTMARK           |7           |1994       |
|CELSO EDUARDO DE CASTRO STEPHAN|8           |1994       |
|EDUARDO BERRINGER STEPHAN      |5           |1994       |
|HANNE MAHFOUD FADEL            |8           |1994       |
+-------------------------------+------------+-----------+
only showing top 5 rows



In [None]:
df_estabelecimentos.select(
    'nome_fantasia', 'municipio', 'nome_cidade_no_exterior', f.year('data_inicio_atividade').alias('ano_inicio_atividade'), f.month('data_inicio_atividade').alias('mes_inicio_atividade')
    ).show(5, truncate=False)

+-----------------+---------+-----------------------+--------------------+--------------------+
|nome_fantasia    |municipio|nome_cidade_no_exterior|ano_inicio_atividade|mes_inicio_atividade|
+-----------------+---------+-----------------------+--------------------+--------------------+
|PIRAMIDE M. C.   |7107     |null                   |1994                |1                   |
|null             |7107     |null                   |1994                |1                   |
|null             |7107     |null                   |1994                |1                   |
|null             |7107     |null                   |1994                |1                   |
|EMBROIDERY & GIFT|7075     |null                   |1995                |1                   |
+-----------------+---------+-----------------------+--------------------+--------------------+
only showing top 5 rows



A classe [Functions](https://spark.apache.org/docs/3.1.2/api/python/reference/pyspark.sql.html#functions) do Spark oferece várias métodos que se parecem muito com os de SQL:

In [None]:
data = [('GISELLE PAULA GUIMARAES CASTRO', 15) 
    ,('ELAINE GARCIA DE OLIVEIRA', 22) 
    ,('JOAO CARLOS ABNER DE LOURDES', 43) 
    ,('MARTA ZELI FERREIRA', 24) 
    ,('LAUDENETE WIGGERS ROEDER', 51)]

df = spark.createDataFrame(data, ['nome', 'idade'])
df.show(truncate=False)

+------------------------------+-----+
|nome                          |idade|
+------------------------------+-----+
|GISELLE PAULA GUIMARAES CASTRO|15   |
|ELAINE GARCIA DE OLIVEIRA     |22   |
|JOAO CARLOS ABNER DE LOURDES  |43   |
|MARTA ZELI FERREIRA           |24   |
|LAUDENETE WIGGERS ROEDER      |51   |
+------------------------------+-----+



In [None]:
df.select(f.concat_ws(', ', f.substring_index('nome', ' ', -1), f.substring_index('nome', ' ', 1)).alias('ident'), 'idade').show(truncate=False)

+-----------------+-----+
|ident            |idade|
+-----------------+-----+
|CASTRO, GISELLE  |15   |
|OLIVEIRA, ELAINE |22   |
|LOURDES, JOAO    |43   |
|FERREIRA, MARTA  |24   |
|ROEDER, LAUDENETE|51   |
+-----------------+-----+



## 3.2 Identificando valores nulos

### 3.2.1 _DataFrames dummies_

Vamos criar _DataFrames_ para avaliar como cada um deles lida com valores nulos ou valores não-numéricos em campos numéricos.

Primeiro criamos um com valores numéricos inteiros + um valor nulo:

In [None]:
df = spark.createDataFrame([(1,), (2,), (3,), (None,)], ['data'])
df.toPandas()

Unnamed: 0,data
0,1.0
1,2.0
2,3.0
3,


In [None]:
df.toPandas().info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4 entries, 0 to 3
Data columns (total 1 columns):
 #   Column  Non-Null Count  Dtype  
---  ------  --------------  -----  
 0   data    3 non-null      float64
dtypes: float64(1)
memory usage: 160.0 bytes


In [None]:
df.show()

+----+
|data|
+----+
|   1|
|   2|
|   3|
|null|
+----+



In [None]:
df.printSchema()

root
 |-- data: long (nullable = true)



Vemos que a representação em Pandas converte os valores para ponto flutuante e o valor vazio fica como `NaN` (_not a number_), enquanto que a representação em Spark mantém os valores como foram definidos.

Agora vamos ser explícitos em definir os valores como ponto flutuante + um _float NaN_:

In [None]:
df2 = spark.createDataFrame([(1.,), (2.,), (3.,), (float('nan'),)], ['data'])
df2.toPandas()

Unnamed: 0,data
0,1.0
1,2.0
2,3.0
3,


In [None]:
df.toPandas().equals(df2.toPandas())

True

In [None]:
df2.show()

+----+
|data|
+----+
| 1.0|
| 2.0|
| 3.0|
| NaN|
+----+



a representação Pandas permanece exatamente igual a anterior, mas a Spark é "atualizada" para a nova definição.

Uma outra forma de definir o "mesmo" (bem entre aspas) _DataFrame_ é passando os valores como _strings_ + um valor nulo, temos:

In [None]:
df3 = spark.createDataFrame([('1',), ('2',), ('3',), (None,)], ['data'])
df3.toPandas()

Unnamed: 0,data
0,1.0
1,2.0
2,3.0
3,


In [None]:
df3.toPandas().info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4 entries, 0 to 3
Data columns (total 1 columns):
 #   Column  Non-Null Count  Dtype 
---  ------  --------------  ----- 
 0   data    3 non-null      object
dtypes: object(1)
memory usage: 160.0+ bytes


In [None]:
df3.show()

+----+
|data|
+----+
|   1|
|   2|
|   3|
|null|
+----+



### 3.2.2 _DataFrames_ de dados públicos

Vimos que é importante entender como cada tipo de valor vazio é representado e qual o tipo da coluna para determinar qual método deve ser aplicado para encontrar/tratá-los.

Voltando ao _DataFrame_ de sócios, focando nos campos `pais` e `nome_representante`, vamos verificar o resultado de aplicar funções para verificar se um valor é nulo ou _NaN_.

In [None]:
df_socios.select('pais', 'nome_representante').limit(5).toPandas()

Unnamed: 0,pais,nome_representante
0,,
1,,
2,,
3,,
4,,


In [None]:
df_socios.select('pais', 'nome_representante').limit(5).show()

+----+------------------+
|pais|nome_representante|
+----+------------------+
|null|              null|
|null|              null|
|null|              null|
|null|              null|
|null|              null|
+----+------------------+



In [None]:
df_socios.select('pais', 'nome_representante').printSchema()

root
 |-- pais: integer (nullable = true)
 |-- nome_representante: string (nullable = true)



Novamente, temos representações diferentes para cada tipo de _DataFrame_. E ao verificar o _Schema_ percebemos que já deveríamos esperar isso, como a coluna `país` é do tipo inteiro o Pandas tenta convertê-la para ponto flutuante.

Então vamos contar a quantidade de ocorrências dos casos verdadeiros (_truly_) nessas duas colunas, avaliando se o valor é nulo e _NaN_:

In [None]:
df_socios.select(
    [f.count(f.when(f.isnull(i), 1)).alias(f"{i}_null") for i in df_socios.columns if i in ('pais', 'nome_representante')]
    + [f.count(f.when(f.isnan(i), 1)).alias(f"{i}_NaN") for i in df_socios.columns if i in ('pais', 'nome_representante')]
    ).show()

+---------+-----------------------+--------+----------------------+
|pais_null|nome_representante_null|pais_NaN|nome_representante_NaN|
+---------+-----------------------+--------+----------------------+
|  2038255|                1995432|       0|                     0|
+---------+-----------------------+--------+----------------------+



Agora vamos preencher os valores vazios no _DataFrame_ selecionado, primeiro passando um valor númerico e depois uma _string_:

In [None]:
df_socios.select('pais', 'nome_representante').na.fill(0).limit(5).toPandas()

Unnamed: 0,pais,nome_representante
0,0,
1,0,
2,0,
3,0,
4,0,


In [None]:
df_socios.select('pais', 'nome_representante').na.fill('0').limit(5).toPandas()

Unnamed: 0,pais,nome_representante
0,,0
1,,0
2,,0
3,,0
4,,0


Note que os valores preenchidos precisam ser coerentes com o tipo da coluna, do contrário a ação não será realizada. E o método `na.fill()`, ou também `fillna()` deve ser aplicado no _DataFrame_:

In [None]:
df_socios.select(f.when(f.isnull('pais'), 0).alias('pais'), f.when(f.isnull('nome_representante'), '0').alias('nome_representante')).limit(5).toPandas()

Unnamed: 0,pais,nome_representante
0,0,0
1,0,0
2,0,0
3,0,0
4,0,0


In [None]:
df_socios.select('pais', 'nome_representante').na.fill('0', ['pais']).limit(5).toPandas()

Unnamed: 0,pais,nome_representante
0,,
1,,
2,,
3,,
4,,


## 3.3 Ordenando os dados

Basta utilizar o método [`orderBy()`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.orderBy.html), passando os campos em que a ordenação será baseada.

In [None]:
df_socios.select(df_socios.nome_socio_ou_razao_social.alias('nome'), 'faixa_etaria', f.year('data_entrada_sociedade').alias('ano_entrada')).limit(10).show(truncate=False)

+-------------------------------+------------+-----------+
|nome                           |faixa_etaria|ano_entrada|
+-------------------------------+------------+-----------+
|LILIANA PATRICIA GUASTAVINO    |7           |1994       |
|CRISTINA HUNDERTMARK           |7           |1994       |
|CELSO EDUARDO DE CASTRO STEPHAN|8           |1994       |
|EDUARDO BERRINGER STEPHAN      |5           |1994       |
|HANNE MAHFOUD FADEL            |8           |1994       |
|CLOD ASSAD FADEL               |6           |1994       |
|WALKYRIA ALGARVES              |7           |1997       |
|SEBASTIAO JADIR TEIXEIRA NUNES |5           |2009       |
|JOSE JOAO ADAMO                |7           |1994       |
|ROSEMARY CANTUARIA AFONSO ADAMO|6           |1994       |
+-------------------------------+------------+-----------+



In [None]:
df_socios\
    .select(df_socios.nome_socio_ou_razao_social.alias('nome'), 'faixa_etaria', f.year('data_entrada_sociedade').alias('ano_entrada'))\
    .limit(10)\
    .orderBy('faixa_etaria', ascending=True)\
    .show(truncate=False)

+-------------------------------+------------+-----------+
|nome                           |faixa_etaria|ano_entrada|
+-------------------------------+------------+-----------+
|EDUARDO BERRINGER STEPHAN      |5           |1994       |
|SEBASTIAO JADIR TEIXEIRA NUNES |5           |2009       |
|CLOD ASSAD FADEL               |6           |1994       |
|ROSEMARY CANTUARIA AFONSO ADAMO|6           |1994       |
|LILIANA PATRICIA GUASTAVINO    |7           |1994       |
|CRISTINA HUNDERTMARK           |7           |1994       |
|WALKYRIA ALGARVES              |7           |1997       |
|JOSE JOAO ADAMO                |7           |1994       |
|CELSO EDUARDO DE CASTRO STEPHAN|8           |1994       |
|HANNE MAHFOUD FADEL            |8           |1994       |
+-------------------------------+------------+-----------+



In [None]:
df_socios\
    .select(df_socios.nome_socio_ou_razao_social.alias('nome'), 'faixa_etaria', f.year('data_entrada_sociedade').alias('ano_entrada'))\
    .limit(10)\
    .orderBy(['faixa_etaria', 'nome'], ascending=[True, False])\
    .show(truncate=False)

+-------------------------------+------------+-----------+
|nome                           |faixa_etaria|ano_entrada|
+-------------------------------+------------+-----------+
|SEBASTIAO JADIR TEIXEIRA NUNES |5           |2009       |
|EDUARDO BERRINGER STEPHAN      |5           |1994       |
|ROSEMARY CANTUARIA AFONSO ADAMO|6           |1994       |
|CLOD ASSAD FADEL               |6           |1994       |
|WALKYRIA ALGARVES              |7           |1997       |
|LILIANA PATRICIA GUASTAVINO    |7           |1994       |
|JOSE JOAO ADAMO                |7           |1994       |
|CRISTINA HUNDERTMARK           |7           |1994       |
|HANNE MAHFOUD FADEL            |8           |1994       |
|CELSO EDUARDO DE CASTRO STEPHAN|8           |1994       |
+-------------------------------+------------+-----------+



## 3.4 Filtrando os dados

Assim como a ordenação, a ação de filtrar dados é bem direta, temos os métodos 
[`where()`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.where.html) e [`filter()`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.filter.html), sendo que um é _alias_ para o outro.

In [None]:
df_empresas.where("capital_social == 50").show(5, truncate=False)

+-----------+------------------------------------+-----------------+-----------------+--------------+-----+--------------------+
|cnpj_basico|razao_social                        |natureza_juridica|qualificacao_resp|capital_social|porte|ente_federativo_resp|
+-----------+------------------------------------+-----------------+-----------------+--------------+-----+--------------------+
|17350147   |ERIK MARCELO DOS SANTOS 42107848858 |2135             |50               |50.0          |1    |null                |
|17833214   |ALEXANDRE MACHADO LIMA 73750123772  |2135             |50               |50.0          |1    |null                |
|20860830   |YASMIN MOURA DA FONSECA 13457709793 |2135             |50               |50.0          |1    |null                |
|22242856   |JOAO CESAR MESSIAS 08707149883      |2135             |50               |50.0          |1    |null                |
|23238540   |EVERTON ROBERTO DA SILVA 42101963809|2135             |50               |50.0       

In [None]:
df_socios\
    .select(df_socios.nome_socio_ou_razao_social.alias('nome'))\
    .filter(df_socios.nome_socio_ou_razao_social.startswith('JOAO'))         \
    .filter(df_socios.nome_socio_ou_razao_social.endswith('SILVA'))\
    .show(10, truncate=False)

+-----------------------------------+
|nome                               |
+-----------------------------------+
|JOAO AMARO DA SILVA                |
|JOAO PINTO DA SILVA                |
|JOAO BATISTA DA SILVA              |
|JOAO OLAVO DA SILVA                |
|JOAO MARIA DA SILVA                |
|JOAO MATEUS AMARAL AVELINO DA SILVA|
|JOAO RITA FERREIRA DA SILVA        |
|JOAO ROQUE DA SILVA                |
|JOAO DIAS DA SILVA                 |
|JOAO BATISTA DA SILVA              |
+-----------------------------------+
only showing top 10 rows



In [None]:
# mesmo dando um alias pra coluna, precisa usar o rótulo original pra aplicar uma função
df_socios.select(df_socios.nome_socio_ou_razao_social.alias('nome')).filter("nome = 'JOAO PINTO DA SILVA'").show(5)
# mas se a condição for avaliar o campo e não o retorno de uma função, ta susse
# note que usei um sinal de '=' e não dois '==' na string da query, são equivalentes

+-------------------+
|               nome|
+-------------------+
|JOAO PINTO DA SILVA|
|JOAO PINTO DA SILVA|
|JOAO PINTO DA SILVA|
|JOAO PINTO DA SILVA|
|JOAO PINTO DA SILVA|
+-------------------+
only showing top 5 rows



## 3.5 O comando LIKE

Assim como os métodos `orderBy()` e `where()` o [`like()`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.Column.like.html) tem um funcionamento muito próximo do SQL, bastando adaptar a sintaxe do Spark.

In [None]:
df = spark.createDataFrame([('RESTAURANTE DO RUI',), ('Juca restaurantes ltda',), ('Joca Restaurante',)], ['data'])
df.toPandas()

Unnamed: 0,data
0,RESTAURANTE DO RUI
1,Juca restaurantes ltda
2,Joca Restaurante


In [None]:
df.where(f.lower(df.data).like('%restaurante%')).show(truncate=False)

+----------------------+
|data                  |
+----------------------+
|RESTAURANTE DO RUI    |
|Juca restaurantes ltda|
|Joca Restaurante      |
+----------------------+



In [None]:
df.where(df.data.like('%restaurante%')).show(truncate=False)

+----------------------+
|data                  |
+----------------------+
|Juca restaurantes ltda|
+----------------------+



## 3.6 Os comandos WHEN e OTHERWISE

Um recurso bastante interessante e semântico para criar novos campos quando seu valor depende de outros é a combinação dos métodos **`when()`** e **`otherwise()`**:

In [None]:
df_socios.select('faixa_etaria')\
    .withColumn('paridade_faixa_etaria', f.when(df_socios.faixa_etaria % 2 == 1, 'IMPAR').otherwise('PAR'))\
    .show(10)

+------------+---------------------+
|faixa_etaria|paridade_faixa_etaria|
+------------+---------------------+
|           7|                IMPAR|
|           7|                IMPAR|
|           8|                  PAR|
|           5|                IMPAR|
|           8|                  PAR|
|           6|                  PAR|
|           7|                IMPAR|
|           5|                IMPAR|
|           7|                IMPAR|
|           6|                  PAR|
+------------+---------------------+
only showing top 10 rows



# 4 Agregações e Junções

Agora vamos entender como podemos realizar agrupamentos e sumarizar os _DataFrames_ Spark. Isso é muito útil pois seremos capazes de avaliar medidas de tendência dos nossos dados e entender como eles se comportam, que é basicamente o feijão com arroz do Ciêntista de Dados. A principais funções que usaremos são: [**groupBy()**](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.groupBy.html), [**agg()**](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.agg.html) (de _aggregate_) e [**summary()**](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.summary.html). 

E além disso temos as funções usadas para calcular medidas estatísticas, as mais comuns são:

> [approx_count_distinct](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.approx_count_distinct.html) | 
[avg](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.avg.html) | 
[collect_list](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.collect_list.html) | 
[collect_set](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.collect_set.html) | 
[countDistinct](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.countDistinct.html) | 
[count](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.count.html) | 
[grouping](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.grouping.html) | 
[first](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.first.html) | 
[last](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.last.html) | 
[kurtosis](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.kurtosis.html) | 
[max](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.max.html) | 
[min](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.min.html) | 
[mean](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.mean.html) | 
[skewness](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.skewness.html) | 
[stddev ou stddev_samp](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.stddev.html) | 
[stddev_pop](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.stddev_pop.html) | 
[sum](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.sum.html) | 
[sumDistinct](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.sumDistinct.html) | 
[variance ou var_samp](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.variance.html) | 
[var_pop](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.var_pop.html)

## 4.1 Sumarizando os dados

Vamos começar com o df de sócios, definindo um novo campo que contenha apenas o "ano de entrada". Vamos usar o **`groupBy()`** para agrupar os dados em função desse novo campo, e assim, dessa forma, todas as medidas estatísticas que calcularmos no df serão calculadas em função desse campo. Por exemplo, o **`count()`** vai retornar a contagem de sócios agrupados para cada ano:

In [None]:
df_socios\
    .select(f.year('data_entrada_sociedade').alias('ano_entrada'))\
    .where('ano_entrada >= 2010')\
    .groupBy('ano_entrada')\
    .count()\
    .orderBy('ano_entrada', ascending=True)\
    .show()

+-----------+------+
|ano_entrada| count|
+-----------+------+
|       2010| 79337|
|       2011| 83906|
|       2012| 80101|
|       2013| 83919|
|       2014| 80590|
|       2015| 80906|
|       2016| 81587|
|       2017| 90221|
|       2018| 99935|
|       2019|118248|
|       2020|125927|
|       2021| 56316|
+-----------+------+



Vamos mudar para o df de empresas agrupando os dados pelo "porte da empresa" e vamos também usar a função **`agg()`** para agregar mais de uma medida estatística em função do "porte da empresa". Nesse caso vamos calcular a média  (**`avg()`**), mínimo **`min()`** e o máximo **`max()`** para o "capital social", e além disso, novamente fazemos a contagem com  **`count()`**: 

In [None]:
df_empresas\
    .select('cnpj_basico', 'porte', 'capital_social')\
    .groupBy('porte')\
    .agg(
        f.avg('capital_social').alias('capital_social_medio'),
        f.min('capital_social').alias('capital_social_min'),
        f.max('capital_social').alias('capital_social_max'),
        f.count('cnpj_basico').alias('frequencia')
    )\
    .orderBy('porte', ascending=True)\
    .show()

+-----+--------------------+------------------+------------------+----------+
|porte|capital_social_medio|capital_social_min|capital_social_max|frequencia|
+-----+--------------------+------------------+------------------+----------+
| null|    8.35421888053467|               0.0|           50000.0|      5985|
|    1|  339994.53313506936|               0.0|  3.22014670262E11|   3129043|
|    3|  2601001.7677092673|               0.0|  2.52006125741E11|    115151|
|    5|   708660.4208249798|               0.0|            5.0E10|   1335500|
+-----+--------------------+------------------+------------------+----------+



In [None]:
from pyspark.sql.types import DecimalType

df_empresas.select('porte', 'capital_social')\
    .withColumn('capital_social', df_empresas.capital_social.cast(DecimalType(18, 2)))\
    .groupBy('porte').agg(f.max('capital_social').alias('capital_social_max'))\
    .show()

+-----+------------------+
|porte|capital_social_max|
+-----+------------------+
| null|          50000.00|
|    1|   322014670262.00|
|    3|   252006125741.00|
|    5|    50000000000.00|
+-----+------------------+



Bastante similair ao `describe()` do Pandas, temos o **`summary()`**:

In [None]:
df_empresas.select('capital_social').summary().show()

+-------+--------------------+
|summary|      capital_social|
+-------+--------------------+
|  count|             4585679|
|   mean|   503694.5478542675|
| stddev|2.1118691490537405E8|
|    min|                 0.0|
|    25%|                 0.0|
|    50%|              1000.0|
|    75%|              7000.0|
|    max|    3.22014670262E11|
+-------+--------------------+



In [None]:
df_empresas.select('capital_social').summary('mean', 'stddev').show()

+-------+--------------------+
|summary|      capital_social|
+-------+--------------------+
|   mean|   503694.5478542675|
| stddev|2.1118691490537405E8|
+-------+--------------------+



## 4.2 Juntando _DataFrames_ com JOIN e UNION

Existem duas formas que podemos juntar dados de dois _DataFrames_: as junções "laterais" com [**`join()`**](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.join.html), em que usamos uma ou mais variáveis de ligação; e as junções "verticais" com **`union()`**, em que apenas empilhamos os dados.

### 4.2.1 Juntando _DataFrames_ lateralmente com JOIN

Vamos começar com dois _DataFrames_ dummies:

In [None]:
produtos = spark.createDataFrame(
    [('1', 'Bebidas', 'Água mineral') 
    ,('2', 'Limpeza', 'Sabão em pó') 
    ,('3', 'Frios', 'Queijo') 
    ,('4', 'Bebidas', 'Refrigerante') 
    ,('5', 'Pet', 'Ração para cães')],
    ['id', 'categoria', 'produto']
)

produtos.show()

+---+---------+---------------+
| id|categoria|        produto|
+---+---------+---------------+
|  1|  Bebidas|   Água mineral|
|  2|  Limpeza|    Sabão em pó|
|  3|    Frios|         Queijo|
|  4|  Bebidas|   Refrigerante|
|  5|      Pet|Ração para cães|
+---+---------+---------------+



In [None]:
impostos = spark.createDataFrame(
    [('Bebidas', 0.15)
    ,('Limpeza', 0.05)
    ,('Frios', 0.065)
    ,('Carnes', 0.08)],
    ['categoria', 'aliquota']
)

impostos.show()

+---------+--------+
|categoria|aliquota|
+---------+--------+
|  Bebidas|    0.15|
|  Limpeza|    0.05|
|    Frios|   0.065|
|   Carnes|    0.08|
+---------+--------+



In [None]:
produtos\
    .join(impostos, 'categoria', how='inner')\
    .sort('id')\
    .show()

+---------+---+------------+--------+
|categoria| id|     produto|aliquota|
+---------+---+------------+--------+
|  Bebidas|  1|Água mineral|    0.15|
|  Limpeza|  2| Sabão em pó|    0.05|
|    Frios|  3|      Queijo|   0.065|
|  Bebidas|  4|Refrigerante|    0.15|
+---------+---+------------+--------+



In [None]:
produtos\
    .join(impostos, produtos['categoria']==impostos['categoria'], how='left')\
    .sort('id')\
    .show()

+---+---------+---------------+---------+--------+
| id|categoria|        produto|categoria|aliquota|
+---+---------+---------------+---------+--------+
|  1|  Bebidas|   Água mineral|  Bebidas|    0.15|
|  2|  Limpeza|    Sabão em pó|  Limpeza|    0.05|
|  3|    Frios|         Queijo|    Frios|   0.065|
|  4|  Bebidas|   Refrigerante|  Bebidas|    0.15|
|  5|      Pet|Ração para cães|     null|    null|
+---+---------+---------------+---------+--------+



In [None]:
produtos\
    .join(impostos, 'categoria', how='right')\
    .orderBy('id')\
    .show()

+---------+----+------------+--------+
|categoria|  id|     produto|aliquota|
+---------+----+------------+--------+
|   Carnes|null|        null|    0.08|
|  Bebidas|   1|Água mineral|    0.15|
|  Limpeza|   2| Sabão em pó|    0.05|
|    Frios|   3|      Queijo|   0.065|
|  Bebidas|   4|Refrigerante|    0.15|
+---------+----+------------+--------+



In [None]:
produtos\
    .join(impostos, produtos.categoria==impostos.categoria, how='outer')\
    .orderBy('id')\
    .show()

+----+---------+---------------+---------+--------+
|  id|categoria|        produto|categoria|aliquota|
+----+---------+---------------+---------+--------+
|null|     null|           null|   Carnes|    0.08|
|   1|  Bebidas|   Água mineral|  Bebidas|    0.15|
|   2|  Limpeza|    Sabão em pó|  Limpeza|    0.05|
|   3|    Frios|         Queijo|    Frios|   0.065|
|   4|  Bebidas|   Refrigerante|  Bebidas|    0.15|
|   5|      Pet|Ração para cães|     null|    null|
+----+---------+---------------+---------+--------+



Note que como a variável de ligação possui o mesmo nome nos dois _DataFrames_ podemos apenas passar o valor rótulo dessa variável. Mas para o caso em que elas são diferentes devemos indicar o df de onde cada uma está vindo. Perceba ainda que nessa situação ambas as colunas são "trazidas" para o resultado.

Outra coisa que vale pontuar é o uso de **`sort()`** para ordenação. Esse método é mais performático por separar o _DataFrame_ em _clusters_(?), porém menos confiável que o **`orderBy()`**.

Agora vamos passar para os _DataFrames_ de dados públicos:

In [None]:
df_estabelecimentos.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- id_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: date (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_inicio_atividade: date (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: string (nullable = true)
 |--

In [None]:
df_empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_resp: integer (nullable = true)
 |-- capital_social: double (nullable = true)
 |-- porte: integer (nullable = true)
 |-- ente_federativo_resp: string (nullable = true)



In [None]:
empresas_join = df_estabelecimentos.join(df_empresas, 'cnpj_basico', how='inner')
empresas_join.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- id_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: date (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_inicio_atividade: date (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: string (nullable = true)
 |--

### 4.2.2 Empilhando _DataFrames_ UNION

In [None]:
freq = empresas_join\
    .select('cnpj_basico', f.year('data_inicio_atividade').alias('ano_inicio'))\
    .where('ano_inicio >= 2015')\
    .groupBy('ano_inicio')\
    .agg(f.count("cnpj_basico").alias("frequencia"))\
    .orderBy('ano_inicio', ascending=True)

freq.show()

+----------+----------+
|ano_inicio|frequencia|
+----------+----------+
|      2015|    212523|
|      2016|    265417|
|      2017|    237292|
|      2018|    275435|
|      2019|    325922|
|      2020|    400654|
|      2021|    153275|
+----------+----------+



In [None]:
empresas_join\
    .select(f.year('data_inicio_atividade').alias('ano_inicio'))\
    .where('ano_inicio >= 2015')\
    .groupBy('ano_inicio')\
    .count().alias("frequencia")\
    .orderBy('ano_inicio', ascending=True)\
    .show()

+----------+------+
|ano_inicio| count|
+----------+------+
|      2015|212523|
|      2016|265417|
|      2017|237292|
|      2018|275435|
|      2019|325922|
|      2020|400654|
|      2021|153275|
+----------+------+



Outra forma de escrever seria nessa última célula, note que mesmo atribuindo um _alias_ para o método `count()` essa ação não teve efeito e o nome da coluna ficou apenas `count`. Isso pode trazer problemas na etapa seguinte, que no caso é gerar o df que será adicionado em baixo, possuindo apenas uma linha com a contagem total:

In [None]:
freq.select(f.lit('Total').alias('ano_inicio'), f.sum(freq.frequencia).alias('frequencia')).show()

+----------+----------+
|ano_inicio|frequencia|
+----------+----------+
|     Total|   1870518|
+----------+----------+



Note o uso do método **`lit()`** para definir um valor literal e que ambos _DataFrames_ possuem os mesmos nomes nas colunas.

In [None]:
freq.union(
    freq.select(
        f.lit('Total').alias('data_de_inicio'), f.sum(freq.frequencia).alias('frequencia')   
    )
).show()

+----------+----------+
|ano_inicio|frequencia|
+----------+----------+
|      2015|    212523|
|      2016|    265417|
|      2017|    237292|
|      2018|    275435|
|      2019|    325922|
|      2020|    400654|
|      2021|    153275|
|     Total|   1870518|
+----------+----------+



## 4.3 SparkSQL

A SparkSession possui o método [**`sql()`**](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.SparkSession.sql.html) que nos permite realizar consultas em nosso df escrevendo código SQL. Mas para utilizar esse recurso precisamos criar uma _view_ temporária do df usando **`createOrReplaceTempView()`**, que serve como uma espécie de _link_ para o df(?).

Essa _view_ temporária vai existir enquanto a SparkSession existir ou até ser destruída com **`catalog.dropTempView()`**.

Para saber mais sobre performance: [Artigo - Spark RDDs vs DataFrames vs SparkSQL](https://community.cloudera.com/t5/Community-Articles/Spark-RDDs-vs-DataFrames-vs-SparkSQL/ta-p/246547)

In [None]:
df_empresas.createOrReplaceTempView("vw_empresas")

In [None]:
spark.catalog.dropTempView("vw_empresas")

True

In [None]:
df_empresas.createOrReplaceTempView("vw_empresas")

In [None]:
spark.sql("SELECT * FROM vw_empresas").show(5)

+-----------+--------------------+-----------------+-----------------+--------------+-----+--------------------+
|cnpj_basico|        razao_social|natureza_juridica|qualificacao_resp|capital_social|porte|ente_federativo_resp|
+-----------+--------------------+-----------------+-----------------+--------------+-----+--------------------+
|        306|FRANCAMAR REFRIGE...|             2240|               49|           0.0|    1|                null|
|       1355|BRASILEIRO & OLIV...|             2062|               49|           0.0|    5|                null|
|       4820|REGISTRO DE IMOVE...|             3034|               32|           0.0|    5|                null|
|       5347|ROSELY APARECIDA ...|             2135|               50|           0.0|    5|                null|
|       6846|BADU E FILHOS TEC...|             2062|               49|        4000.0|    1|                null|
+-----------+--------------------+-----------------+-----------------+--------------+-----+-----

In [None]:
type(spark.sql("SELECT * FROM vw_empresas"))

pyspark.sql.dataframe.DataFrame

In [None]:
spark.sql("SELECT * FROM vw_empresas LIMIT 5").show() # top N não funciona

+-----------+--------------------+-----------------+-----------------+--------------+-----+--------------------+
|cnpj_basico|        razao_social|natureza_juridica|qualificacao_resp|capital_social|porte|ente_federativo_resp|
+-----------+--------------------+-----------------+-----------------+--------------+-----+--------------------+
|        306|FRANCAMAR REFRIGE...|             2240|               49|           0.0|    1|                null|
|       1355|BRASILEIRO & OLIV...|             2062|               49|           0.0|    5|                null|
|       4820|REGISTRO DE IMOVE...|             3034|               32|           0.0|    5|                null|
|       5347|ROSELY APARECIDA ...|             2135|               50|           0.0|    5|                null|
|       6846|BADU E FILHOS TEC...|             2062|               49|        4000.0|    1|                null|
+-----------+--------------------+-----------------+-----------------+--------------+-----+-----

### 4.3.1 _Sandbox_

In [None]:
# não eh case-sensitive
# aceita operador de igualdade do python
spark.sql(
    """
    SELECT * 
    FROM vw_empresas
    WHERE cApitaL_socIal = 50
        and porte == 1
    """
).show(5)

+-----------+--------------------+-----------------+-----------------+--------------+-----+--------------------+
|cnpj_basico|        razao_social|natureza_juridica|qualificacao_resp|capital_social|porte|ente_federativo_resp|
+-----------+--------------------+-----------------+-----------------+--------------+-----+--------------------+
|   17350147|ERIK MARCELO DOS ...|             2135|               50|          50.0|    1|                null|
|   17833214|ALEXANDRE MACHADO...|             2135|               50|          50.0|    1|                null|
|   20860830|YASMIN MOURA DA F...|             2135|               50|          50.0|    1|                null|
|   22242856|JOAO CESAR MESSIA...|             2135|               50|          50.0|    1|                null|
|   23238540|EVERTON ROBERTO D...|             2135|               50|          50.0|    1|                null|
+-----------+--------------------+-----------------+-----------------+--------------+-----+-----

In [None]:
# aceita AVG e MEAN
spark.sql(
    """
    SELECT porte, mean(capital_social) as capital_social_medio, count(*) as frequencia
    FROM vw_empresas
    GROUP BY porte
    """
).show(5)

+-----+--------------------+----------+
|porte|capital_social_medio|frequencia|
+-----+--------------------+----------+
| null|    8.35421888053467|      5985|
|    1|  339994.53313506936|   3129043|
|    3|  2601001.7677092673|    115151|
|    5|   708660.4208249798|   1335500|
+-----+--------------------+----------+



In [None]:
# relembrando o JOIN feito sa seção 4.2.1 entre empresas e estabelecimentos
empresas_join = df_estabelecimentos.join(df_empresas, 'cnpj_basico', how='inner')

empresas_join.createOrReplaceTempView("vw_empresas_estabelecimentos")

In [None]:
df_freq = spark\
    .sql(
        """
        SELECT YEAR(data_inicio_atividade) AS ano_inicio 
            ,COUNT(cnpj_basico) AS count
        FROM vw_empresas_estabelecimentos 
        WHERE YEAR(data_inicio_atividade) >= 2015
        GROUP BY ano_inicio
        ORDER BY ano_inicio
        """
    )

df_freq.show()

+----------+------+
|ano_inicio| count|
+----------+------+
|      2015|212523|
|      2016|265417|
|      2017|237292|
|      2018|275435|
|      2019|325922|
|      2020|400654|
|      2021|153275|
+----------+------+



In [None]:
df_freq.createOrReplaceTempView("vw_freq")

In [None]:
spark.sql(
    """
    SELECT *
        FROM vw_freq
    UNION ALL
    SELECT 'Total' AS ano_inicio, SUM(count) AS count
        FROM vw_freq
""").show()

+----------+-------+
|ano_inicio|  count|
+----------+-------+
|      2015| 212523|
|      2016| 265417|
|      2017| 237292|
|      2018| 275435|
|      2019| 325922|
|      2020| 400654|
|      2021| 153275|
|     Total|1870518|
+----------+-------+



In [None]:
# posso criar a view direto sem problemas
spark.sql(
        """
        SELECT YEAR(data_inicio_atividade) AS ano_inicio 
            ,COUNT(cnpj_basico) AS count
        FROM vw_empresas_estabelecimentos 
        WHERE YEAR(data_inicio_atividade) >= 2015
        GROUP BY ano_inicio
        ORDER BY ano_inicio
        """
    ).createOrReplaceTempView("vw_freq_2")

spark.sql(
    """
    SELECT *
        FROM vw_freq_2
    UNION ALL
    SELECT 'Total' AS ano_inicio, SUM(count) AS count
        FROM vw_freq_2
    """).show()

+----------+-------+
|ano_inicio|  count|
+----------+-------+
|      2015| 212523|
|      2016| 265417|
|      2017| 237292|
|      2018| 275435|
|      2019| 325922|
|      2020| 400654|
|      2021| 153275|
|     Total|1870518|
+----------+-------+



# 5 Formas de Armazenamento

## 5.1 Arquivos CSV

Para exportar os dados de um _df_ para um arquivo CSV usamos o método [**`csv()`**](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrameWriter.csv.html) do atributo [**`write`**](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.write.html) do _DataFrame_. Vamos fazer isso com os dados de empresa:


In [7]:
df_empresas.write.csv(
    path=path_files+'/empresas/csv', 
    mode='overwrite', 
    sep=';', 
    header=True
)

Note que o modo de escrita passado foi o `'overwrite'`, que vai sobrescrever o conteúdo do arquivo caso ele já exista; e também foi indicado que o cabeçalho do _df_ seja enviado para os arquivos. Sim, arquivos no plural mesmo, o próprio método decidiu particionar em três arquivos. Além disso foi gerado um arquivo chamado `_SUCCESS` (e mais uns ocultos), que serve apenas para indicar que a operação foi finalizada com sucesso.

Para ler esses arquivos exportados, diferente da primeira vez, precisamos apenas indicar que eles possuem cabeçalho agora:

In [8]:
df_empresas_csv_2 = spark.read.csv(path_files+'/empresas/csv', sep=';', inferSchema=True, header=True)

In [9]:
df_empresas_csv_2.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_resp: integer (nullable = true)
 |-- capital_social: double (nullable = true)
 |-- porte: integer (nullable = true)
 |-- ente_federativo_resp: string (nullable = true)



In [10]:
df_empresas_csv_2.count()

4585679

## 5.2 Arquivos PARQUET

O [Apache Parquet](https://parquet.apache.org/) é um formato de armazenamento colunar disponível (olá BQ...) para todos os projetos do ecossistema Hadoop, diferente dos modelos tradicionais que são orientados por linhas. Esse paradigma de armazenamento torna mais eficiêntes a compreensão de colunas (é possível especificar um algoritmo de compreesão para cada uma) e _queries_ que utilizam mais de uma coluna (como?), pois é possível ler e processar apenas os valores necessários.

Agora vamos chamar o método [**`parquet()`**](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrameWriter.parquet.html) no atributo `write`:

In [11]:
df_empresas.write.parquet(
    path=path_files+'/empresas/parquet', 
    mode='overwrite'
)

In [12]:
df_empresas_parquet = spark.read.parquet(path_files+'/empresas/parquet')

In [13]:
df_empresas_parquet.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_resp: integer (nullable = true)
 |-- capital_social: double (nullable = true)
 |-- porte: integer (nullable = true)
 |-- ente_federativo_resp: string (nullable = true)



In [14]:
df_empresas_parquet.count()

4585679

## 5.3 Arquivos ORC

Assim como o PARQUET, o ORC é um formato de armazenamento colunar. Foi desenvolvido para volumosas _streamings_ e com índices(?). otimizado para leituras

In [23]:
df_estabelecimentos.write.orc(
    path=path_files+'/estabelecimentos/orc', 
    mode='overwrite'
)

In [20]:
df_estabelecimentos_orc = spark.read.orc(path_files+'/estabelecimentos/orc')

In [21]:
df_estabelecimentos_orc.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_resp: integer (nullable = true)
 |-- capital_social: double (nullable = true)
 |-- porte: integer (nullable = true)
 |-- ente_federativo_resp: string (nullable = true)



In [22]:
df_estabelecimentos_orc.count()

4585679

## 5.4 Particionamento dos dados

Nos processos acima que geramos arquivos a partir de um _df_, obtivemos como resultado a criação de arquivos particionados e isso ocorreu de forma automática. Existem formas de controlar esse particionamento e vamos citar algumas delas aqui, começando com o [**`coalesce()`**](https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrame.coalesce.html).

Este é um método de _DataFrame_ e retorna outro _df_ com N _partitions_, onde N é o inteiro que ele recebe. Se N for maior que o número de partições que o _df_ já possui (então isso é um atributo? na real faz todo sentido né... "paralelismo"), o número de partições continua o mesmo. Mas se N for menor que o atual, então a mágica acontece. Ou seja, esse método serve apenas para **diminuir**:

In [15]:
df_empresas.coalesce(1).write.parquet(
    path=path_files+'/empresas/parquet_unico', 
    mode='overwrite'
)

Outro método bastante utilizado é o [**`repartition()`**](https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.sql.DataFrame.repartition.html) usado tanto para aumentar ou diminuir o número de partições. Na prática é só aumentar, se for para diminuir `coalesce()` é mais performático.

In [16]:
df_empresas.repartition(20).rdd.getNumPartitions()

20

A última forma de particionar os dados que vamos citar é o [partitionBy()](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.partitionBy.html), que faz o particionamento em função das categorias de um campo do _df_. Diferente dos outros dois métodos citados, o `partitionBy()` não cria um arquivo para cada categoria, no caso de `'porte'` são quatro, mas sim uma pasta para categoria e dentro de cada pasta é feito o particionamento _default_:

In [17]:
df_empresas.write.parquet(
    path=path_files+'/empresas/parquet_partitionBy_porte_1', 
    mode='overwrite',
    partitionBy='porte'
)

In [18]:
df_empresas.write.partitionBy('porte').parquet(path=path_files+'/empresas/parquet_partitionBy_porte_2', mode='overwrite')

# 6 Encerrando a SparkSession

In [25]:
spark.stop()