<a href="https://colab.research.google.com/github/ALXAVIER-DEV/learning-spark/blob/master/aula0_projeto_spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Começando o Trabalho
---

## Apache Spark - Introdução

### [Apache Spark](https://spark.apache.org/)

Apache Spark é uma plataforma de computação em *cluster* que fornece uma API para programação distribuída para processamento de dados em larga escala, semelhante ao modelo *MapReduce*, mas projetada para ser rápida para consultas interativas e algoritmos iterativos.

O Spark permite que você distribua dados e tarefas em clusters com vários nós. Imagine cada nó como um computador separado. A divisão dos dados torna mais fácil o trabalho com conjuntos de dados muito grandes porque cada nó funciona processa apenas uma parte parte do volume total de dados.

O Spark é amplamente utilizado em projetos analíticos nas seguintes frentes:

- Preparação de dados
- Modelos de machine learning
- Análise de dados em tempo real

### [PySpark](https://spark.apache.org/docs/3.1.2/api/python/index.html)

PySpark é uma interface para Apache Spark em Python. Ele não apenas permite que você escreva aplicativos Spark usando APIs Python, mas também fornece o *shell* PySpark para analisar interativamente seus dados em um ambiente distribuído. O PySpark oferece suporte à maioria dos recursos do Spark, como Spark SQL, DataFrame, Streaming, MLlib (Machine Learning) e Spark Core.

<center><img src="https://caelum-online-public.s3.amazonaws.com/2273-introducao-spark/01/img-001.png"/></center>

#### Spark SQL e DataFrame

Spark SQL é um módulo Spark para processamento de dados estruturados. Ele fornece uma abstração de programação chamada DataFrame e também pode atuar como mecanismo de consulta SQL distribuído.

#### Spark Streaming

Executando em cima do Spark, o recurso de *streaming* no Apache Spark possibilita o uso de poderosas aplicações interativas e analíticas em *streaming* e dados históricos, enquanto herda a facilidade de uso do Spark e as características de tolerância a falhas.

#### Spark MLlib

Construído sobre o Spark, MLlib é uma biblioteca de aprendizado de máquina escalonável que fornece um conjunto uniforme de APIs de alto nível que ajudam os usuários a criar e ajustar *pipelines* de aprendizado de máquina práticos.

#### Spark Core

Spark Core é o mecanismo de execução geral subjacente para a plataforma Spark sobre o qual todas as outras funcionalidades são construídas. Ele fornece um RDD (*Resilient Distributed Dataset*) e recursos de computação na memória.

## Utilizando o Spark no Windows

[fonte](https://spark.apache.org/docs/3.1.2/api/python/getting_started/install.html)

#### Passo 1 - Instalando o Java

O PySpark requer a instalação do Java na versão 7 ou superior. Obtenha a versão mais recente clicando [aqui](https://www.java.com/pt-BR/download/). Para verificar a versão que está instalada em sua máquina execute a seguinte linha de código no seu *prompt*:

```
java -version
```

#### Passo 2 - Instalando o Python

O Python deve ser instalado em sua versão 2.6 ou superior. Para obter a versão mais recente clique [aqui](https://www.python.org/downloads/windows/). Para verificar a versão do Python que está instalada em sua máquina digite o seguinte comando em seu *prompt*:

```
python --version
```

#### Passo 3 - Instalando o Apache Spark

Selecione a versão mais estável clicando [aqui](http://spark.apache.org/downloads.html). Na criação deste projeto utilizamos a versão do Spark **3.1.2** e como tipo de pacote selecionamos **Pre-built for Apache Hadoop 2.7**.

Para instalar o Apache Spark não é necessário executar um instalador, basta descomprimir os arquivos em uma pasta de sua escolha.

<font color=red>Obs.: certifique-se de que o caminho onde os arquivos do Spark foram armazenados não contenham espaços (ex.: **"C:\spark\spark-3.1.2-bin-hadoop2.7"**).</font>

Para testar o funcionamento do Spark execute os comandos abaixo em seu *prompt* de comando. Esses comandos assumem que você extraiu os arquivos do Spark na pasta **"C:\spark\"**.

```
cd C:\spark\spark-3.1.2-bin-hadoop2.7
```

```
bin\pyspark
```

O comando acima inicia o *shell* do PySpark que permite trabalhar interativamente com o Spark.

Para sair basta digitar `exit()` e logo depois presionar *Enter*. Para voltar ao *prompt* pressione *Enter* novamente.

#### Passo 4 - Instalando o findspark

```
pip install findspark
```

#### Passo 5 - Instalando o winutils

Os arquivos do Spark não incluem o utilitário **winutils.exe** que é utilizado pelo Spark no Windows. Se não informar onde o Spark deve procurar este utilitário, veremos alguns erros no console e também não conseguiremos executar *scripts* Python utilizando o utilitário `spark-submit`.

Faça o [download](https://github.com/steveloughran/winutils) para a versão do Hadoop para a qual sua instalação do Spark foi construída. Em nosso exemplo foi utilizada a [versão 2.7](https://github.com/steveloughran/winutils/tree/master/hadoop-2.7.1/bin). Faça o *download* apenas do arquivo **winutils.exe**.

Crie a pasta **"hadoop\bin"** dentro da pasta que contém os arquivos do Spark (em nosso exemplo **"C:\spark\spark-3.1.2-bin-hadoop2.7"**) e copie o arquivo **winutils.exe** para dentro desta pasta.

Crie duas variáveis de ambiente no seu Windows. A primeira chamada **SPARK_HOME** que aponta para a pasta onde os arquivos Spark foram armazenados (em nosso exemplo **"C:\spark\spark-3.1.2-bin-hadoop2.7"**). A segunda chamada **HADOOP_HOME** que aponta para **%SPARK_HOME%\hadoop** (assim podemos modificar **SPARK_HOME** sem precisar alterar **HADOOP_HOME**).

## Utilizando o Spark no Google Colab

Para facilitar o desenvolvimento de nosso projeto neste curso vamos utilizar o Google Colab como ferramenta e para configurar o PySpark basta executar os comandos abaixo na própria célula do seu *notebook*.

In [1]:
# instalar as dependências
!apt-get update -qq
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz
!pip install -q findspark
!pip install pyspark -qq

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [3]:
import findspark
findspark.init()

# Carregamento de Dados
---

## [SparkSession](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.SparkSession.html)

O ponto de entrada para programar o Spark com a API Dataset e DataFrame.

Uma SparkSession pode ser utilizada para criar DataFrames, registrar DataFrames como tabelas, executar consultas SQL em tabelas, armazenar em cache e ler arquivos parquet. Para criar uma SparkSession, use o seguinte padrão de construtor:

In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[*]') \
    .appName("Iniciando com Spark") \
    .getOrCreate()

In [5]:
spark

## Acessando o [Spark UI](https://spark.apache.org/docs/3.1.2/web-ui.html) (Google Colab)

[Site ngrok](https://ngrok.com)

## DataFrames com Spark


### Interfaces Spark

Existem três interfaces principais do Apache Spark que você deve conhecer: Resilient Distributed Dataset, DataFrame e Dataset.

- **Resilient Distributed Dataset**: A primeira abstração do Apache Spark foi o Resilient Distributed Dataset (RDD). É uma interface para uma sequência de objetos de dados que consiste em um ou mais tipos localizados em uma coleção de máquinas (um cluster). Os RDDs podem ser criados de várias maneiras e são a API de “nível mais baixo” disponível. Embora esta seja a estrutura de dados original do Apache Spark, você deve se concentrar na API DataFrame, que é um superconjunto da funcionalidade RDD. A API RDD está disponível nas linguagens Java, Python e Scala.

- **DataFrame**: Trata-se de um conceito similar ao DataFrame que você pode estar familiarizado como o pacote pandas do Python e a linguagem R . A API DataFrame está disponível nas linguagens Java, Python, R e Scala.

- **Dataset**: uma combinação de DataFrame e RDD. Ele fornece a interface digitada que está disponível em RDDs enquanto fornece a conveniência do DataFrame. A API Dataset está disponível nas linguagens Java e Scala.

Em muitos cenários, especialmente com as otimizações de desempenho incorporadas em DataFrames e Datasets, não será necessário trabalhar com RDDs. Mas é importante entender a abstração RDD porque:

- O RDD é a infraestrutura subjacente que permite que o Spark seja executado com tanta rapidez e forneça a linhagem de dados.

- Se você estiver mergulhando em componentes mais avançados do Spark, pode ser necessário usar RDDs.

- As visualizações na Spark UI fazem referência a RDDs.

In [6]:
data = [('Zeca','35'), ('Eva', '29')]
colNames = ['Nome', 'Idade']
df = spark.createDataFrame(data, colNames)

In [7]:
df.show()

+----+-----+
|Nome|Idade|
+----+-----+
|Zeca|   35|
| Eva|   29|
+----+-----+



In [8]:
df.toPandas()

Unnamed: 0,Nome,Idade
0,Zeca,35
1,Eva,29


## Projeto

Nosso projeto consiste em ler, manipular, tratar e salvar um conjunto de dados volumosos utilizando como ferramenta o Spark.

## Carregamento de dados

### Dados Públicos CNPJ
#### Receita Federal

> [Empresas](https://caelum-online-public.s3.amazonaws.com/2273-introducao-spark/01/empresas.zip)
>
> [Estabelecimentos](https://caelum-online-public.s3.amazonaws.com/2273-introducao-spark/01/estabelecimentos.zip)
>
> [Sócios](https://caelum-online-public.s3.amazonaws.com/2273-introducao-spark/01/socios.zip)

[Fonte original dos dados](https://www.gov.br/receitafederal/pt-br/assuntos/orientacao-tributaria/cadastros/consultas/dados-publicos-cnpj)

---
[property SparkSession.read](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.SparkSession.read.html)

[DataFrameReader.csv(*args)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrameReader.csv.html)


### Montando nosso drive

In [9]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


### Carregando os dados das empresas

In [10]:
import zipfile

In [11]:
zipfile.ZipFile('/content/drive/MyDrive/curso-spark/empresas.zip', 'r').extractall('/content/drive/MyDrive/curso-spark')

In [12]:
path = '/content/drive/MyDrive/curso-spark/empresas'
empresas = spark.read.csv(path, sep=';', inferSchema=True)

In [13]:
empresas.count()

4585679

In [14]:
empresas.show()

+-----+--------------------+----+---+-------+---+----+
|  _c0|                 _c1| _c2|_c3|    _c4|_c5| _c6|
+-----+--------------------+----+---+-------+---+----+
|  306|FRANCAMAR REFRIGE...|2240| 49|   0,00|  1|null|
| 1355|BRASILEIRO & OLIV...|2062| 49|   0,00|  5|null|
| 4820|REGISTRO DE IMOVE...|3034| 32|   0,00|  5|null|
| 5347|ROSELY APARECIDA ...|2135| 50|   0,00|  5|null|
| 6846|BADU E FILHOS TEC...|2062| 49|4000,00|  1|null|
| 8416|  ELETRICA RUBI LTDA|2062| 49|   0,00|  5|null|
| 8992|SHIROMA VEICULOS ...|2062| 49|   0,00|  5|null|
| 9091|CONTATOS BAR E LA...|2062| 49|   0,00|  5|null|
| 9614|ANTONIA APARECIDA...|2135| 50|   0,00|  5|null|
| 9896|DORACY CORAT DA C...|2135| 50|   0,00|  5|null|
|12112|LANCHONETE RIO VE...|2062| 49|   0,00|  5|null|
|12605|VALMAR JACAREI CO...|2062| 49|   0,00|  5|null|
|13407|ROSANA CRISTINA D...|2135| 50|   0,00|  5|null|
|13408|CELIO RODRIGUES D...|2135| 50|   0,00|  5|null|
|13721|MAQFRAN COMERCIO ...|2062| 49|   0,00|  1|null|
|21181|MOU

## Faça como eu fiz: Estabelecimentos e Sócios

### Carregando os dados dos socios

In [15]:
zipfile.ZipFile('/content/drive/MyDrive/curso-spark/socios.zip', 'r').extractall('/content/drive/MyDrive/curso-spark')

In [16]:
path = '/content/drive/MyDrive/curso-spark/socios'
socios = spark.read.csv(path, sep=';', inferSchema=True)

In [17]:
socios.count()

2046430

In [18]:
socios.show()

+-----+---+--------------------+-----------+---+--------+----+-----------+----+---+----+
|  _c0|_c1|                 _c2|        _c3|_c4|     _c5| _c6|        _c7| _c8|_c9|_c10|
+-----+---+--------------------+-----------+---+--------+----+-----------+----+---+----+
|  411|  2|LILIANA PATRICIA ...|***678188**| 22|19940725|null|***000000**|null|  0|   7|
|  411|  2|CRISTINA HUNDERTMARK|***637848**| 28|19940725|null|***000000**|null|  0|   7|
| 5813|  2|CELSO EDUARDO DE ...|***786068**| 49|19940516|null|***000000**|null|  0|   8|
| 5813|  2|EDUARDO BERRINGER...|***442348**| 49|19940516|null|***000000**|null|  0|   5|
|14798|  2| HANNE MAHFOUD FADEL|***760388**| 49|19940609|null|***000000**|null|  0|   8|
|14798|  2|    CLOD ASSAD FADEL|***205668**| 22|19940609|null|***000000**|null|  0|   6|
|17826|  2|   WALKYRIA ALGARVES|***689078**| 49|19970227|null|***000000**|null|  0|   7|
|17826|  2|SEBASTIAO JADIR T...|***904728**| 49|20090813|null|***000000**|null|  0|   5|
|19491|  2|     JOSE 

### Carregando os dados dos sócios

In [19]:
zipfile.ZipFile('/content/drive/MyDrive/curso-spark/estabelecimentos.zip', 'r').extractall('/content/drive/MyDrive/curso-spark')

In [20]:
path = '/content/drive/MyDrive/curso-spark/estabelecimentos'
estabelecimentos = spark.read.csv(path, sep=';', inferSchema=True)

In [21]:
estabelecimentos.count()

4836219

In [22]:
estabelecimentos.show()

+-----+---+---+---+--------------------+---+--------+---+----+----+--------+-------+--------------------+-------+--------------------+------+-------------------+------------------+--------+----+----+----+--------+----+--------+----+--------+--------------------+----+----+
|  _c0|_c1|_c2|_c3|                 _c4|_c5|     _c6|_c7| _c8| _c9|    _c10|   _c11|                _c12|   _c13|                _c14|  _c15|               _c16|              _c17|    _c18|_c19|_c20|_c21|    _c22|_c23|    _c24|_c25|    _c26|                _c27|_c28|_c29|
+-----+---+---+---+--------------------+---+--------+---+----+----+--------+-------+--------------------+-------+--------------------+------+-------------------+------------------+--------+----+----+----+--------+----+--------+----+--------+--------------------+----+----+
| 1879|  1| 96|  1|      PIRAMIDE M. C.|  8|20011029|  1|null|null|19940509|1412602|                null|    RUA|     JOSE FIGLIOLINI|   608|               null|         VILA NILO| 

### Creating a dynamic extraction of data by type and loading.

In [23]:
# import os
# import zipfile
# from pyspark.sql import SparkSession
# from pyspark.sql.functions import input_file_name

# # Criar uma sessão Spark
# spark = SparkSession.builder \
#     .appName("Extract and Consolidate Data") \
#     .getOrCreate()

# # Diretório contendo os arquivos zip
# path = "/content/drive/MyDrive/curso-spark"

# # Função para extrair e consolidar dados de cada arquivo zip
# def extract_and_consolidate(zip_file):
#     # Extrair nome do arquivo sem extensão
#     file_name = os.path.splitext(os.path.basename(zip_file))[0]
#     # Criar pasta com o mesmo nome do arquivo
#     extract_path = os.path.join(path, file_name)
#     os.makedirs(extract_path, exist_ok=True)
#     # Extrair conteúdo do arquivo zip
#     with zipfile.ZipFile(zip_file, 'r') as zip_ref:
#         zip_ref.extractall(extract_path)
#     # Ler todos os arquivos extraídos e consolidá-los em um DataFrame
#     df = spark.read.option("header", "true").csv(extract_path)
#     # Adicionar uma coluna com o nome do arquivo original
#     df = df.withColumn("source_file", input_file_name())
#     return df

# # Lista todos os arquivos zip no diretório
# zip_files = [os.path.join(path, f) for f in os.listdir(path) if f.endswith(".zip")]

# # Iterar sobre os arquivos zip, extrair e consolidar dados
# consolidated_df = None
# for zip_file in zip_files:
#     df = extract_and_consolidate(zip_file)
#     if consolidated_df is None:
#         consolidated_df = df
#     else:
#         consolidated_df = consolidated_df.union(df)

# # Mostrar esquema do DataFrame consolidado
# consolidated_df.printSchema()

# # Mostrar os dados do DataFrame consolidado
# consolidated_df.show()

# # Se desejar, você pode salvar o DataFrame consolidado em um único arquivo
# # Por exemplo, para salvar em formato CSV
# consolidated_df.write.csv("/content/drive/MyDrive/curso-spark/consolidado.csv", header=True, mode="overwrite")

# # Por fim, pare a sessão Spark
# spark.stop()


# Manipulando os Dados
---

## Operações básicas

In [24]:
empresas.limit(5).toPandas()

Unnamed: 0,_c0,_c1,_c2,_c3,_c4,_c5,_c6
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,400000,1,


### Renomeando as colunas do DataFrame

In [25]:
empresasColNames = ['cnpj_basico', 'razao_social_nome_empresarial', 'natureza_juridica', 'qualificacao_do_responsavel', 'capital_social_da_empresa', 'porte_da_empresa', 'ente_federativo_responsavel']

In [26]:
for index, colName in enumerate(empresasColNames):
  empresas = empresas.withColumnRenamed(f"_c{index}", colName)

empresas.columns

['cnpj_basico',
 'razao_social_nome_empresarial',
 'natureza_juridica',
 'qualificacao_do_responsavel',
 'capital_social_da_empresa',
 'porte_da_empresa',
 'ente_federativo_responsavel']

In [27]:
estabsColNames = ['cnpj_basico', 'cnpj_ordem', 'cnpj_dv', 'identificador_matriz_filial', 'nome_fantasia', 'situacao_cadastral', 'data_situacao_cadastral', 'motivo_situacao_cadastral', 'nome_da_cidade_no_exterior', 'pais', 'data_de_inicio_atividade', 'cnae_fiscal_principal', 'cnae_fiscal_secundaria', 'tipo_de_logradouro', 'logradouro', 'numero', 'complemento', 'bairro', 'cep', 'uf', 'municipio', 'ddd_1', 'telefone_1', 'ddd_2', 'telefone_2', 'ddd_do_fax', 'fax', 'correio_eletronico', 'situacao_especial', 'data_da_situacao_especial']

In [28]:
for index, colName in enumerate(estabsColNames):
  estabelecimentos = estabelecimentos.withColumnRenamed(f"_c{index}", colName)

estabelecimentos.columns

['cnpj_basico',
 'cnpj_ordem',
 'cnpj_dv',
 'identificador_matriz_filial',
 'nome_fantasia',
 'situacao_cadastral',
 'data_situacao_cadastral',
 'motivo_situacao_cadastral',
 'nome_da_cidade_no_exterior',
 'pais',
 'data_de_inicio_atividade',
 'cnae_fiscal_principal',
 'cnae_fiscal_secundaria',
 'tipo_de_logradouro',
 'logradouro',
 'numero',
 'complemento',
 'bairro',
 'cep',
 'uf',
 'municipio',
 'ddd_1',
 'telefone_1',
 'ddd_2',
 'telefone_2',
 'ddd_do_fax',
 'fax',
 'correio_eletronico',
 'situacao_especial',
 'data_da_situacao_especial']

In [29]:
sociosColNames = ['cnpj_basico', 'identificador_de_socio', 'nome_do_socio_ou_razao_social', 'cnpj_ou_cpf_do_socio', 'qualificacao_do_socio', 'data_de_entrada_sociedade', 'pais', 'representante_legal', 'nome_do_representante', 'qualificacao_do_representante_legal', 'faixa_etaria']

In [30]:
for index, colName in enumerate(sociosColNames):
  socios = socios.withColumnRenamed(f"_c{index}", colName)

socios.columns

['cnpj_basico',
 'identificador_de_socio',
 'nome_do_socio_ou_razao_social',
 'cnpj_ou_cpf_do_socio',
 'qualificacao_do_socio',
 'data_de_entrada_sociedade',
 'pais',
 'representante_legal',
 'nome_do_representante',
 'qualificacao_do_representante_legal',
 'faixa_etaria']

## Analisando os dados

[Data Types](https://spark.apache.org/docs/3.1.2/api/python/reference/pyspark.sql.html#data-types)

In [31]:
empresas.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,400000,1,


In [32]:
empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: string (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [33]:
socios.limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,19940725,,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,19940725,,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,19940516,,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,19940516,,***000000**,,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,19940609,,***000000**,,0,8


In [34]:
socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: integer (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



In [35]:
estabelecimentos.limit(5).toPandas()

Unnamed: 0,cnpj_basico,cnpj_ordem,cnpj_dv,identificador_matriz_filial,nome_fantasia,situacao_cadastral,data_situacao_cadastral,motivo_situacao_cadastral,nome_da_cidade_no_exterior,pais,...,municipio,ddd_1,telefone_1,ddd_2,telefone_2,ddd_do_fax,fax,correio_eletronico,situacao_especial,data_da_situacao_especial
0,1879,1,96,1,PIRAMIDE M. C.,8,20011029,1,,,...,7107,,,,,,,,,
1,2818,1,43,1,,8,20081231,71,,,...,7107,,,,,,,,,
2,3110,1,7,1,,8,19971231,1,,,...,7107,,,,,,,,,
3,3733,1,80,1,,8,20081231,71,,,...,7107,,,,,,,,,
4,4628,3,27,2,EMBROIDERY & GIFT,8,19980429,1,,,...,7075,,,,,,,,,


In [36]:
estabelecimentos.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: integer (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: integer (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: str

## Modificando os tipos de dados

[Functions](https://spark.apache.org/docs/3.1.2/api/python/reference/pyspark.sql.html#functions)

[withColumn](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.withColumn.html)

### Convertendo String ➔ Double

#### `StringType ➔ DoubleType`

In [37]:
from pyspark.sql.types import DoubleType, StringType
from pyspark.sql import functions as  f

In [38]:
empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: string (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [39]:
empresas.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,400000,1,


In [40]:
empresas = empresas.withColumn('capital_social_da_empresa', f.regexp_replace('capital_social_da_empresa', ',','.'))
empresas.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0.0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0.0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0.0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0.0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,4000.0,1,


In [41]:
empresas = empresas.withColumn('capital_social_da_empresa', empresas['capital_social_da_empresa'].cast(DoubleType()))
empresas.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0.0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0.0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0.0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0.0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,4000.0,1,


### Convertendo String ➔ Date

#### `StringType ➔ DateType`

[Datetime Patterns](https://spark.apache.org/docs/3.1.2/sql-ref-datetime-pattern.html)

In [42]:
df = spark.createDataFrame([(20200924,), (20201022,), (20210215,)], ['data'])
df.toPandas()

Unnamed: 0,data
0,20200924
1,20201022
2,20210215


In [43]:
df.printSchema()

root
 |-- data: long (nullable = true)



In [44]:
df = df.withColumn('data', f.to_date(df.data.cast(StringType()), 'yyyyMMdd'))
df.printSchema()

root
 |-- data: date (nullable = true)



In [45]:
df.toPandas()

Unnamed: 0,data
0,2020-09-24
1,2020-10-22
2,2021-02-15


In [46]:
estabelecimentos.limit(5).toPandas()

Unnamed: 0,cnpj_basico,cnpj_ordem,cnpj_dv,identificador_matriz_filial,nome_fantasia,situacao_cadastral,data_situacao_cadastral,motivo_situacao_cadastral,nome_da_cidade_no_exterior,pais,...,municipio,ddd_1,telefone_1,ddd_2,telefone_2,ddd_do_fax,fax,correio_eletronico,situacao_especial,data_da_situacao_especial
0,1879,1,96,1,PIRAMIDE M. C.,8,20011029,1,,,...,7107,,,,,,,,,
1,2818,1,43,1,,8,20081231,71,,,...,7107,,,,,,,,,
2,3110,1,7,1,,8,19971231,1,,,...,7107,,,,,,,,,
3,3733,1,80,1,,8,20081231,71,,,...,7107,,,,,,,,,
4,4628,3,27,2,EMBROIDERY & GIFT,8,19980429,1,,,...,7075,,,,,,,,,


In [47]:
estabelecimentos.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: integer (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: integer (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: str

In [48]:
estabelecimentos = estabelecimentos\
    .withColumn(
        "data_situacao_cadastral",
        f.to_date(estabelecimentos.data_situacao_cadastral.cast(StringType()), 'yyyyMMdd')
    )\
    .withColumn(
      'data_de_inicio_atividade',
      f.to_date(estabelecimentos.data_de_inicio_atividade.cast(StringType()), 'yyyyMMdd')
    )\
    .withColumn(
      'data_da_situacao_especial',
      f.to_date(estabelecimentos.data_da_situacao_especial.cast(StringType()), 'yyyyMMdd')
    )

estabelecimentos.printSchema()



root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: date (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: date (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: string (n

In [49]:
estabelecimentos.limit(5).toPandas()

Unnamed: 0,cnpj_basico,cnpj_ordem,cnpj_dv,identificador_matriz_filial,nome_fantasia,situacao_cadastral,data_situacao_cadastral,motivo_situacao_cadastral,nome_da_cidade_no_exterior,pais,...,municipio,ddd_1,telefone_1,ddd_2,telefone_2,ddd_do_fax,fax,correio_eletronico,situacao_especial,data_da_situacao_especial
0,1879,1,96,1,PIRAMIDE M. C.,8,2001-10-29,1,,,...,7107,,,,,,,,,
1,2818,1,43,1,,8,2008-12-31,71,,,...,7107,,,,,,,,,
2,3110,1,7,1,,8,1997-12-31,1,,,...,7107,,,,,,,,,
3,3733,1,80,1,,8,2008-12-31,71,,,...,7107,,,,,,,,,
4,4628,3,27,2,EMBROIDERY & GIFT,8,1998-04-29,1,,,...,7075,,,,,,,,,


In [50]:
socios.limit(4).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,19940725,,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,19940725,,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,19940516,,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,19940516,,***000000**,,0,5


In [51]:
socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: integer (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



In [52]:
socios = socios\
    .withColumn(
        'data_de_entrada_sociedade',
        f.to_date(socios.data_de_entrada_sociedade.cast(StringType()), 'yyyyMMdd')
    )


In [53]:
socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: date (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



# Seleções e consultas
---

## Selecionando informações

[DataFrame.select(*cols)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.select.html)

In [54]:
empresas\
    .select('*')\
    .show(5, False)

+-----------+--------------------------------------------------------------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial                                                               |natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+--------------------------------------------------------------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|306        |FRANCAMAR REFRIGERACAO TECNICA S/C LTDA                                                     |2240             |49                         |0.0                      |1               |null                       |
|1355       |BRASILEIRO & OLIVEIRA LTDA                                                                 

In [55]:
empresas\
    .select('natureza_juridica', 'porte_da_empresa', 'capital_social_da_empresa')\
    .show(5)

+-----------------+----------------+-------------------------+
|natureza_juridica|porte_da_empresa|capital_social_da_empresa|
+-----------------+----------------+-------------------------+
|             2240|               1|                      0.0|
|             2062|               5|                      0.0|
|             3034|               5|                      0.0|
|             2135|               5|                      0.0|
|             2062|               1|                   4000.0|
+-----------------+----------------+-------------------------+
only showing top 5 rows



In [56]:
socios\
    .select('nome_do_socio_ou_razao_social', 'faixa_etaria', f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
    .show(15, False)

+-------------------------------+------------+--------------+
|nome_do_socio_ou_razao_social  |faixa_etaria|ano_de_entrada|
+-------------------------------+------------+--------------+
|LILIANA PATRICIA GUASTAVINO    |7           |1994          |
|CRISTINA HUNDERTMARK           |7           |1994          |
|CELSO EDUARDO DE CASTRO STEPHAN|8           |1994          |
|EDUARDO BERRINGER STEPHAN      |5           |1994          |
|HANNE MAHFOUD FADEL            |8           |1994          |
|CLOD ASSAD FADEL               |6           |1994          |
|WALKYRIA ALGARVES              |7           |1997          |
|SEBASTIAO JADIR TEIXEIRA NUNES |5           |2009          |
|JOSE JOAO ADAMO                |7           |1994          |
|ROSEMARY CANTUARIA AFONSO ADAMO|6           |1994          |
|MARCOS AURELIO MOTTA           |7           |1994          |
|EDVAN CANDIDO ALENCAR          |5           |1994          |
|JAIME MOURE COLINO             |6           |1994          |
|SANDRA 

## Faça como eu fiz

In [57]:
estabelecimentos\
    .select("nome_fantasia", "municipio", f.year('data_de_inicio_atividade').alias("ano_de_inicio_atividade "),\
                                                f.month('data_de_inicio_atividade').alias('mes_de_inicio_atividade '))\
    .show(5)

+-----------------+---------+------------------------+------------------------+
|    nome_fantasia|municipio|ano_de_inicio_atividade |mes_de_inicio_atividade |
+-----------------+---------+------------------------+------------------------+
|   PIRAMIDE M. C.|     7107|                    1994|                       5|
|             null|     7107|                    1994|                       5|
|             null|     7107|                    1994|                       5|
|             null|     7107|                    1994|                       5|
|EMBROIDERY & GIFT|     7075|                    1995|                       5|
+-----------------+---------+------------------------+------------------------+
only showing top 5 rows



## Identificando valores nulos

In [58]:
df = spark.createDataFrame([(1,), (2,), (3,), (None,)], ['data'])
df.toPandas()

Unnamed: 0,data
0,1.0
1,2.0
2,3.0
3,


In [59]:
df.show()

+----+
|data|
+----+
|   1|
|   2|
|   3|
|null|
+----+



In [60]:
df = spark.createDataFrame([(1.,), (2.,), (3.,), (float('nan'),)], ['data'])
df.toPandas()

Unnamed: 0,data
0,1.0
1,2.0
2,3.0
3,


In [61]:
df.show()

+----+
|data|
+----+
| 1.0|
| 2.0|
| 3.0|
| NaN|
+----+



In [62]:
df = spark.createDataFrame([('1',), ('2',), ('3',), (None,)], ['data'])
df.toPandas()

Unnamed: 0,data
0,1.0
1,2.0
2,3.0
3,


In [63]:
df.show()

+----+
|data|
+----+
|   1|
|   2|
|   3|
|null|
+----+



In [64]:
socios.limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,1994-07-25,,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,1994-07-25,,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,1994-05-16,,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,1994-05-16,,***000000**,,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,1994-06-09,,***000000**,,0,8


In [65]:
socios.limit(5).show()

+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|cnpj_basico|identificador_de_socio|nome_do_socio_ou_razao_social|cnpj_ou_cpf_do_socio|qualificacao_do_socio|data_de_entrada_sociedade|pais|representante_legal|nome_do_representante|qualificacao_do_representante_legal|faixa_etaria|
+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|        411|                     2|         LILIANA PATRICIA ...|         ***678188**|                   22|               1994-07-25|null|        ***000000**|                 null|                                  0|           7|
|        411|                     2|         CRISTINA HUNDERTMARK|      

In [66]:
socios.select([f.count(f.when(f.isnull(c), 1)).alias(c) for c in socios.columns]).show()

+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+-------+-------------------+---------------------+-----------------------------------+------------+
|cnpj_basico|identificador_de_socio|nome_do_socio_ou_razao_social|cnpj_ou_cpf_do_socio|qualificacao_do_socio|data_de_entrada_sociedade|   pais|representante_legal|nome_do_representante|qualificacao_do_representante_legal|faixa_etaria|
+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+-------+-------------------+---------------------+-----------------------------------+------------+
|          0|                     0|                          208|                1234|                    0|                        1|2038255|                  0|              1995432|                                  0|           0|
+-----------+----------------------+------------------------

In [67]:
socios.limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,1994-07-25,,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,1994-07-25,,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,1994-05-16,,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,1994-05-16,,***000000**,,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,1994-06-09,,***000000**,,0,8


In [68]:
socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: date (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



In [69]:
socios.na.fill(0).limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,1994-07-25,0,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,1994-07-25,0,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,1994-05-16,0,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,1994-05-16,0,***000000**,,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,1994-06-09,0,***000000**,,0,8


In [70]:
socios.na.fill('-').limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,1994-07-25,,***000000**,-,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,1994-07-25,,***000000**,-,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,1994-05-16,,***000000**,-,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,1994-05-16,,***000000**,-,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,1994-06-09,,***000000**,-,0,8


## Ordenando os dados

[DataFrame.orderBy(*cols, **kwargs)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.orderBy.html)

In [71]:
socios\
    .select('nome_do_socio_ou_razao_social', 'faixa_etaria', f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
    .orderBy('ano_de_entrada', ascending=False)\
    .show(10, False)

+------------------------------------+------------+--------------+
|nome_do_socio_ou_razao_social       |faixa_etaria|ano_de_entrada|
+------------------------------------+------------+--------------+
|EDUARDO DE ANDRADE PEIXOTO          |5           |2021          |
|JOSE DE RIBAMAR SILVA FILHO         |6           |2021          |
|LEILANE CRISTINA CARRIJO CLAUSING   |4           |2021          |
|JOSE ALCEU DO ROSARIO JUNIOR        |5           |2021          |
|MICHELE SANTOS DO NASCIMENTO ANDRADE|5           |2021          |
|MARCELO MOCELIN                     |5           |2021          |
|BENILDES BARBOSA RODRIGUES          |8           |2021          |
|PABLO BARTH KOECHE                  |2           |2021          |
|ELENICE SOUZA DA LUZ                |3           |2021          |
|ROSANA EMIKO YAMANAKA               |5           |2021          |
+------------------------------------+------------+--------------+
only showing top 10 rows



In [72]:
socios\
    .select('nome_do_socio_ou_razao_social', 'faixa_etaria', f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
    .orderBy(['ano_de_entrada','faixa_etaria'], ascending=[False, False])\
    .show(10, False)

+---------------------------------+------------+--------------+
|nome_do_socio_ou_razao_social    |faixa_etaria|ano_de_entrada|
+---------------------------------+------------+--------------+
|ANTONIO TAVARES DE ANDRADE       |9           |2021          |
|ANNA MARIA TELLES FERREIRA SANTOS|9           |2021          |
|ANTONIA DE SOUSA VIEIRA          |9           |2021          |
|AURA MARIA DE ANDRADE            |9           |2021          |
|SONIA MARQUES SAMAJA             |9           |2021          |
|CARLOS ERANE DE AGUIAR           |9           |2021          |
|MATILDE CONCEICAO DE JESUS       |9           |2021          |
|MANUEL TAVARES DE SOUSA          |9           |2021          |
|ALBERTO DE BANDOS MENDES         |9           |2021          |
|EMERSON AZEVEDO                  |9           |2021          |
+---------------------------------+------------+--------------+
only showing top 10 rows



## Filtrando os dados

[DataFrame.where(condition)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.where.html) ou [DataFrame.filter(condition)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.filter.html)

In [73]:
empresas\
    .filter("capital_social_da_empresa == 50")\
    .show(10, False)

+-----------+------------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial             |natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+------------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|17350147   |ERIK MARCELO DOS SANTOS 42107848858       |2135             |50                         |50.0                     |1               |null                       |
|17833214   |ALEXANDRE MACHADO LIMA 73750123772        |2135             |50                         |50.0                     |1               |null                       |
|20860830   |YASMIN MOURA DA FONSECA 13457709793       |2135             |50                         |50.0                     |1 

In [74]:
socios\
    .select('*')\
    .filter(socios.nome_do_socio_ou_razao_social.startswith('ALEXANDRE'))\
    .where(socios.nome_do_socio_ou_razao_social.endswith('XAVIER'))\
    .limit(100)\
    .toPandas()



Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,7019669,2,ALEXANDRE XAVIER,***377168**,49,2008-04-22,,***000000**,,0,5
1,4134469,2,ALEXANDRE ANTONIO MATIACCI XAVIER,***422958**,28,2000-10-31,,***000000**,,0,5
2,32916515,2,ALEXANDRE REZENDE PALMERSTON XAVIER,***408291**,10,2019-02-28,,***000000**,,0,4
3,15578456,2,ALEXANDRE REZENDE PALMERSTON XAVIER,***408291**,10,2012-05-16,,***000000**,,0,4
4,23541661,2,ALEXANDRE BERALDI XAVIER,***368889**,49,2016-04-29,,***000000**,,0,5
5,33601076,2,ALEXANDRE REZENDE PALMERSTON XAVIER,***408291**,49,2019-05-13,,***000000**,,0,4
6,21147317,2,ALEXANDRE XAVIER,***820038**,49,2018-08-31,,***000000**,,0,5
7,17555348,2,ALEXANDRE XAVIER,***988588**,22,2013-01-21,,***000000**,,0,5
8,6216937,2,ALEXANDRE MAXIMO XAVIER,***641186**,49,2004-03-30,,***000000**,,0,6
9,16886442,2,ALEXANDRE CESAR SILVA XAVIER,***113206**,49,2021-01-12,,***000000**,,0,4


## O comando LIKE

[Column.like(other)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.Column.like.html)

In [75]:
df = spark.createDataFrame([('RESTAURANTE DO RUI',), ('Juca restaurantes ltda',), ('Joca Restaurante',)], ['data'])
df.toPandas()

Unnamed: 0,data
0,RESTAURANTE DO RUI
1,Juca restaurantes ltda
2,Joca Restaurante


In [76]:
df\
    .where(f.upper(df.data).like('RESTAURANTE%'))\
    .show()

+------------------+
|              data|
+------------------+
|RESTAURANTE DO RUI|
+------------------+



In [77]:
empresas\
    .select('razao_social_nome_empresarial', 'natureza_juridica', 'porte_da_empresa', 'capital_social_da_empresa')\
    .filter(f.upper(empresas['razao_social_nome_empresarial']).like('%RESTAURANTE%'))\
    .show(15, False)

+-------------------------------------------------------+-----------------+----------------+-------------------------+
|razao_social_nome_empresarial                          |natureza_juridica|porte_da_empresa|capital_social_da_empresa|
+-------------------------------------------------------+-----------------+----------------+-------------------------+
|RESTAURANTE IMIGRANTE PORTUGUES LTDA.                  |2062             |5               |0.0                      |
|MORAIS & CARVALHO RESTAURANTE E PIZZARIA LTDA          |2062             |1               |0.0                      |
|BAR E RESTAURANTE PAGANOTTO LTDA                       |2062             |5               |0.0                      |
|RODRIGUES & RODRIGUES RESTAURANTE LTDA                 |2062             |5               |0.0                      |
|TEXAS RANCH BAR RESTAURANTE PRODUCOES ARTISTICAS E CULT|2062             |1               |0.0                      |
|V V SANTOS RESTAURANTE BAR E ATIV DESPORTIVAS L

In [78]:
data = [
    ('CARMINA RABELO', 4, 2010),
    ('HERONDINA PEREIRA', 6, 2009),
    ('IRANI DOS SANTOS', 12, 2010),
    ('JOAO BOSCO DA FONSECA', 3, 2009),
    ('CARLITO SOUZA', 1, 2010),
    ('WALTER DIAS', 9, 2009),
    ('BRENO VENTUROSO', 1, 2009),
    ('ADELINA TEIXEIRA', 5, 2009),
    ('ELIO SILVA', 7, 2010),
    ('DENIS FONSECA', 6, 2010)
]
colNames = ['nome', 'mes', 'ano']
df = spark.createDataFrame(data, colNames)
df.show(truncate=False)

+---------------------+---+----+
|nome                 |mes|ano |
+---------------------+---+----+
|CARMINA RABELO       |4  |2010|
|HERONDINA PEREIRA    |6  |2009|
|IRANI DOS SANTOS     |12 |2010|
|JOAO BOSCO DA FONSECA|3  |2009|
|CARLITO SOUZA        |1  |2010|
|WALTER DIAS          |9  |2009|
|BRENO VENTUROSO      |1  |2009|
|ADELINA TEIXEIRA     |5  |2009|
|ELIO SILVA           |7  |2010|
|DENIS FONSECA        |6  |2010|
+---------------------+---+----+



In [79]:
socios\
    .select(f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
    .where('ano_de_entrada >= 2010')\
    .groupBy('ano_de_entrada')\
    .count()\
    .orderBy('ano_de_entrada', ascending=True)\
    .toPandas()


Unnamed: 0,ano_de_entrada,count
0,2010,79337
1,2011,83906
2,2012,80101
3,2013,83919
4,2014,80590
5,2015,80906
6,2016,81587
7,2017,90221
8,2018,99935
9,2019,118248


In [80]:
empresas\
    .select('cnpj_basico', 'porte_da_empresa', 'capital_social_da_empresa')\
    .groupBy('porte_da_empresa')\
    .agg(
        f.avg('capital_social_da_empresa').alias('capital_social_medio'),
        f.count('cnpj_basico').alias('frequencia')
    )\
    .orderBy('porte_da_empresa', ascending=True)\
    .show()

+----------------+--------------------+----------+
|porte_da_empresa|capital_social_medio|frequencia|
+----------------+--------------------+----------+
|            null|    8.35421888053467|      5985|
|               1|  339994.53313506936|   3129043|
|               3|  2601001.7677092673|    115151|
|               5|   708660.4208249798|   1335500|
+----------------+--------------------+----------+



# Agregações e Junções
---

[DataFrame.groupBy(*cols)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.groupBy.html)

[DataFrame.agg(*exprs)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.agg.html)

[DataFrame.summary(*statistics)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.summary.html)

> Funções:
[approx_count_distinct](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.approx_count_distinct.html) |
[avg](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.avg.html) |
[collect_list](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.collect_list.html) |
[collect_set](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.collect_set.html) |
[countDistinct](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.countDistinct.html) |
[count](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.count.html) |
[grouping](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.grouping.html) |
[first](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.first.html) |
[last](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.last.html) |
[kurtosis](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.kurtosis.html) |
[max](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.max.html) |
[min](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.min.html) |
[mean](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.mean.html) |
[skewness](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.skewness.html) |
[stddev ou stddev_samp](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.stddev.html) |
[stddev_pop](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.stddev_pop.html) |
[sum](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.sum.html) |
[sumDistinct](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.sumDistinct.html) |
[variance ou var_samp](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.variance.html) |
[var_pop](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.var_pop.html)

## Sumarizando os dados

In [81]:
empresas\
    .select('capital_social_da_empresa')\
    .summary()\
    .show()

+-------+-------------------------+
|summary|capital_social_da_empresa|
+-------+-------------------------+
|  count|                  4585679|
|   mean|        503694.5478542675|
| stddev|     2.1118691490537405E8|
|    min|                      0.0|
|    25%|                      0.0|
|    50%|                   1000.0|
|    75%|                   7000.0|
|    max|         3.22014670262E11|
+-------+-------------------------+



In [82]:
data = [
    ('CARLOS', 'MATEMÁTICA', 7),
    ('IVO', 'MATEMÁTICA', 9),
    ('MÁRCIA', 'MATEMÁTICA', 8),
    ('LEILA', 'MATEMÁTICA', 9),
    ('BRENO', 'MATEMÁTICA', 7),
    ('LETÍCIA', 'MATEMÁTICA', 8),
    ('CARLOS', 'FÍSICA', 2),
    ('IVO', 'FÍSICA', 8),
    ('MÁRCIA', 'FÍSICA', 10),
    ('LEILA', 'FÍSICA', 9),
    ('BRENO', 'FÍSICA', 1),
    ('LETÍCIA', 'FÍSICA', 6),
    ('CARLOS', 'QUÍMICA', 10),
    ('IVO', 'QUÍMICA', 8),
    ('MÁRCIA', 'QUÍMICA', 1),
    ('LEILA', 'QUÍMICA', 10),
    ('BRENO', 'QUÍMICA', 7),
    ('LETÍCIA', 'QUÍMICA', 9)
]
colNames = ['nome', 'materia', 'nota']
df = spark.createDataFrame(data, colNames)
df.show()

+-------+----------+----+
|   nome|   materia|nota|
+-------+----------+----+
| CARLOS|MATEMÁTICA|   7|
|    IVO|MATEMÁTICA|   9|
| MÁRCIA|MATEMÁTICA|   8|
|  LEILA|MATEMÁTICA|   9|
|  BRENO|MATEMÁTICA|   7|
|LETÍCIA|MATEMÁTICA|   8|
| CARLOS|    FÍSICA|   2|
|    IVO|    FÍSICA|   8|
| MÁRCIA|    FÍSICA|  10|
|  LEILA|    FÍSICA|   9|
|  BRENO|    FÍSICA|   1|
|LETÍCIA|    FÍSICA|   6|
| CARLOS|   QUÍMICA|  10|
|    IVO|   QUÍMICA|   8|
| MÁRCIA|   QUÍMICA|   1|
|  LEILA|   QUÍMICA|  10|
|  BRENO|   QUÍMICA|   7|
|LETÍCIA|   QUÍMICA|   9|
+-------+----------+----+



In [83]:

df = df.withColumn('status', f.when(df.nota >= 7, "APROVADO").otherwise("REPROVADO"))
df.show()

+-------+----------+----+---------+
|   nome|   materia|nota|   status|
+-------+----------+----+---------+
| CARLOS|MATEMÁTICA|   7| APROVADO|
|    IVO|MATEMÁTICA|   9| APROVADO|
| MÁRCIA|MATEMÁTICA|   8| APROVADO|
|  LEILA|MATEMÁTICA|   9| APROVADO|
|  BRENO|MATEMÁTICA|   7| APROVADO|
|LETÍCIA|MATEMÁTICA|   8| APROVADO|
| CARLOS|    FÍSICA|   2|REPROVADO|
|    IVO|    FÍSICA|   8| APROVADO|
| MÁRCIA|    FÍSICA|  10| APROVADO|
|  LEILA|    FÍSICA|   9| APROVADO|
|  BRENO|    FÍSICA|   1|REPROVADO|
|LETÍCIA|    FÍSICA|   6|REPROVADO|
| CARLOS|   QUÍMICA|  10| APROVADO|
|    IVO|   QUÍMICA|   8| APROVADO|
| MÁRCIA|   QUÍMICA|   1|REPROVADO|
|  LEILA|   QUÍMICA|  10| APROVADO|
|  BRENO|   QUÍMICA|   7| APROVADO|
|LETÍCIA|   QUÍMICA|   9| APROVADO|
+-------+----------+----+---------+



## Juntando DataFrames - Joins

[DataFrame.join(*args)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.join.html)

In [84]:
produtos = spark.createDataFrame(
    [
        ('1', 'Bebidas', 'Agua Mineral'),
        ('2', 'Limpeza', 'Sabao em Po'),
        ('3', 'Frios', 'Queijo'),
        ('4', 'Bebidas', 'Refrigerante'),
        ('5', 'Pet', 'Racao para Caes')
    ],
    ['id', 'cat', 'prod']
)

impostos = spark.createDataFrame(
    [
        ('Bebidas', 0.15),
        ('Limpeza', 0.05),
        ('Frios', 0.065),
        ('Carnes', 0.08)
    ],
    ['cat', 'tax']
)

In [85]:
produtos.toPandas()


Unnamed: 0,id,cat,prod
0,1,Bebidas,Agua Mineral
1,2,Limpeza,Sabao em Po
2,3,Frios,Queijo
3,4,Bebidas,Refrigerante
4,5,Pet,Racao para Caes


In [86]:
impostos.toPandas()

Unnamed: 0,cat,tax
0,Bebidas,0.15
1,Limpeza,0.05
2,Frios,0.065
3,Carnes,0.08


In [87]:
produtos.join(impostos, 'cat', how='inner')\
    .sort('id')\
    .show()

+-------+---+------------+-----+
|    cat| id|        prod|  tax|
+-------+---+------------+-----+
|Bebidas|  1|Agua Mineral| 0.15|
|Limpeza|  2| Sabao em Po| 0.05|
|  Frios|  3|      Queijo|0.065|
|Bebidas|  4|Refrigerante| 0.15|
+-------+---+------------+-----+



In [88]:
produtos.join(impostos, 'cat', how='left')\
    .sort('id')\
    .show()

+-------+---+---------------+-----+
|    cat| id|           prod|  tax|
+-------+---+---------------+-----+
|Bebidas|  1|   Agua Mineral| 0.15|
|Limpeza|  2|    Sabao em Po| 0.05|
|  Frios|  3|         Queijo|0.065|
|Bebidas|  4|   Refrigerante| 0.15|
|    Pet|  5|Racao para Caes| null|
+-------+---+---------------+-----+



In [89]:
produtos.join(impostos, 'cat', how='right')\
    .sort('id')\
    .show()

+-------+----+------------+-----+
|    cat|  id|        prod|  tax|
+-------+----+------------+-----+
| Carnes|null|        null| 0.08|
|Bebidas|   1|Agua Mineral| 0.15|
|Limpeza|   2| Sabao em Po| 0.05|
|  Frios|   3|      Queijo|0.065|
|Bebidas|   4|Refrigerante| 0.15|
+-------+----+------------+-----+



In [90]:
produtos.join(impostos, 'cat', how='OUTER')\
    .sort('id')\
    .show()

+-------+----+---------------+-----+
|    cat|  id|           prod|  tax|
+-------+----+---------------+-----+
| Carnes|null|           null| 0.08|
|Bebidas|   1|   Agua Mineral| 0.15|
|Limpeza|   2|    Sabao em Po| 0.05|
|  Frios|   3|         Queijo|0.065|
|Bebidas|   4|   Refrigerante| 0.15|
|    Pet|   5|Racao para Caes| null|
+-------+----+---------------+-----+



In [91]:
socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: date (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



In [92]:
estabelecimentos.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: date (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: date (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: string (n

In [93]:
empresas_join = estabelecimentos.join(empresas, 'cnpj_basico', how='inner')

In [94]:
empresas_join.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: date (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: date (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: string (n

In [95]:
freq = empresas_join\
    .select(
        'cnpj_basico',
        f.year('data_de_inicio_atividade').alias('data_de_inicio')
    )\
    .where('data_de_inicio >= 2010')\
    .groupBy('data_de_inicio')\
    .agg(f.count('cnpj_basico').alias('frequencia'))\
    .orderBy('data_de_inicio', ascending=True)

In [96]:
freq.toPandas()

Unnamed: 0,data_de_inicio,frequencia
0,2010,154159
1,2011,172677
2,2012,232480
3,2013,198424
4,2014,202276
5,2015,212523
6,2016,265417
7,2017,237292
8,2018,275435
9,2019,325922


In [97]:
freq.union(
    freq.select(
        f.lit('Total').alias('data_de_inicio'),
        f.sum(freq.frequencia).alias('frequencia')
    )
).show()


+--------------+----------+
|data_de_inicio|frequencia|
+--------------+----------+
|          2010|    154159|
|          2011|    172677|
|          2012|    232480|
|          2013|    198424|
|          2014|    202276|
|          2015|    212523|
|          2016|    265417|
|          2017|    237292|
|          2018|    275435|
|          2019|    325922|
|          2020|    400654|
|          2021|    153275|
|         Total|   2830534|
+--------------+----------+



In [98]:
idades = spark.createDataFrame(
    [
        ('CARLOS', 15),
        ('IVO', 14),
        ('MÁRCIA', 16),
        ('LEILA', 17),
        ('LETÍCIA', 14)
    ],
    ['nomes', 'idades']
)

notas = spark.createDataFrame(
    [
        ('CARLOS', 10),
        ('MÁRCIA', 1),
        ('LEILA', 10),
        ('BRENO', 7),
        ('LETÍCIA', 9)
    ],
    ['nomes', 'notas']
)

In [99]:
idades.join(notas, 'nomes', how='outer')\
    .sort('nomes')\
    .show()

+-------+------+-----+
|  nomes|idades|notas|
+-------+------+-----+
|  BRENO|  null|    7|
| CARLOS|    15|   10|
|    IVO|    14| null|
|  LEILA|    17|   10|
|LETÍCIA|    14|    9|
| MÁRCIA|    16|    1|
+-------+------+-----+



## SparkSQL

[SparkSession.sql(sqlQuery)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.SparkSession.sql.html)

Para saber mais sobre performance: [Artigo - Spark RDDs vs DataFrames vs SparkSQL](https://community.cloudera.com/t5/Community-Articles/Spark-RDDs-vs-DataFrames-vs-SparkSQL/ta-p/246547)

In [100]:
empresas.createOrReplaceTempView('empresasView')

In [101]:
spark.sql("SELECT * FROM empresasView").show(truncate=False)

+-----------+--------------------------------------------------------------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial                                                               |natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+--------------------------------------------------------------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|306        |FRANCAMAR REFRIGERACAO TECNICA S/C LTDA                                                     |2240             |49                         |0.0                      |1               |null                       |
|1355       |BRASILEIRO & OLIVEIRA LTDA                                                                 

In [102]:
spark\
    .sql("""
        SELECT *
            FROM empresasView
            WHERE capital_social_da_empresa = 50
    """).show(truncate=False)

+-----------+------------------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial                   |natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+------------------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|17350147   |ERIK MARCELO DOS SANTOS 42107848858             |2135             |50                         |50.0                     |1               |null                       |
|17833214   |ALEXANDRE MACHADO LIMA 73750123772              |2135             |50                         |50.0                     |1               |null                       |
|20860830   |YASMIN MOURA DA FONSECA 13457709793             |2135             |50                  

In [103]:
spark\
    .sql("""
        SELECT porte_da_empresa, MEAN(capital_social_da_empresa) AS Media
        FROM empresasView
        GROUP BY porte_da_empresa
    """)\
    .show(truncate=False)

+----------------+------------------+
|porte_da_empresa|Media             |
+----------------+------------------+
|null            |8.35421888053467  |
|1               |339994.53313506936|
|3               |2601001.7677092673|
|5               |708660.4208249798 |
+----------------+------------------+



In [104]:
empresas_join.createOrReplaceTempView("empresasJoinView")

In [105]:
spark.sql("SHOW TABLES").show()

+--------+----------------+-----------+
|database|       tableName|isTemporary|
+--------+----------------+-----------+
|        |empresasjoinview|       true|
|        |    empresasview|       true|
+--------+----------------+-----------+



In [106]:
freq = spark\
    .sql("""
        SELECT YEAR(data_de_inicio_atividade) AS data_de_inicio, COUNT(cnpj_basico) AS count
        from empresasjoinview
        WHERE YEAR(data_de_inicio_atividade) >= 2010
        GROUP BY data_de_inicio
        ORDER BY data_de_inicio
    """)

freq\
    .show(truncate=False)

+--------------+------+
|data_de_inicio|count |
+--------------+------+
|2010          |154159|
|2011          |172677|
|2012          |232480|
|2013          |198424|
|2014          |202276|
|2015          |212523|
|2016          |265417|
|2017          |237292|
|2018          |275435|
|2019          |325922|
|2020          |400654|
|2021          |153275|
+--------------+------+



In [107]:
freq.createOrReplaceTempView("freqView")

In [108]:
spark\
    .sql("""
        SELECT *
          FROM freqView
        UNION ALL
        SELECT 'Total' AS data_de_inicio, SUM(count) AS count
          FROM freqView
    """)\
    .show()

+--------------+-------+
|data_de_inicio|  count|
+--------------+-------+
|          2010| 154159|
|          2011| 172677|
|          2012| 232480|
|          2013| 198424|
|          2014| 202276|
|          2015| 212523|
|          2016| 265417|
|          2017| 237292|
|          2018| 275435|
|          2019| 325922|
|          2020| 400654|
|          2021| 153275|
|         Total|2830534|
+--------------+-------+



In [109]:
empresas_join.createOrReplaceTempView("empresasJoinView")

freq = spark\
    .sql("""
        SELECT YEAR(data_de_inicio_atividade) AS data_de_inicio, COUNT(cnpj_basico) AS count
            FROM empresasJoinView
            WHERE YEAR(data_de_inicio_atividade) >= 2010
            GROUP BY data_de_inicio
            ORDER BY data_de_inicio
    """)

freq\
    .show()


+--------------+------+
|data_de_inicio| count|
+--------------+------+
|          2010|154159|
|          2011|172677|
|          2012|232480|
|          2013|198424|
|          2014|202276|
|          2015|212523|
|          2016|265417|
|          2017|237292|
|          2018|275435|
|          2019|325922|
|          2020|400654|
|          2021|153275|
+--------------+------+



In [110]:
empresas_join\
    .select(f.year(empresas_join.data_de_inicio_atividade).alias('data_de_inicio'), f.count(empresas_join.cnpj_basico).alias('count'))\
    .where("data_de_inicio >= 2010")\
    .groupBy('data_de_inicio')\
    .orderBy('data_de_inicio')\
    .show()

AnalysisException: grouping expressions sequence is empty, and '`data_de_inicio_atividade`' is not an aggregate function. Wrap '(count(`cnpj_basico`) AS `count`)' in windowing function(s) or wrap '`data_de_inicio_atividade`' in first() (or first_value) if you don't care which value you get.;
Aggregate [year(data_de_inicio_atividade#1738) AS data_de_inicio#2966, count(cnpj_basico#526) AS count#2968L]
+- Project [cnpj_basico#526, cnpj_ordem#557, cnpj_dv#588, identificador_matriz_filial#619, nome_fantasia#650, situacao_cadastral#681, data_situacao_cadastral#1707, motivo_situacao_cadastral#743, nome_da_cidade_no_exterior#774, pais#805, data_de_inicio_atividade#1738, cnae_fiscal_principal#867, cnae_fiscal_secundaria#898, tipo_de_logradouro#929, logradouro#960, numero#991, complemento#1022, bairro#1053, cep#1084, uf#1115, municipio#1146, ddd_1#1177, telefone_1#1208, ddd_2#1239, ... 12 more fields]
   +- Join Inner, (cnpj_basico#526 = cnpj_basico#470)
      :- Project [cnpj_basico#526, cnpj_ordem#557, cnpj_dv#588, identificador_matriz_filial#619, nome_fantasia#650, situacao_cadastral#681, data_situacao_cadastral#1707, motivo_situacao_cadastral#743, nome_da_cidade_no_exterior#774, pais#805, data_de_inicio_atividade#1738, cnae_fiscal_principal#867, cnae_fiscal_secundaria#898, tipo_de_logradouro#929, logradouro#960, numero#991, complemento#1022, bairro#1053, cep#1084, uf#1115, municipio#1146, ddd_1#1177, telefone_1#1208, ddd_2#1239, ... 6 more fields]
      :  +- Project [cnpj_basico#526, cnpj_ordem#557, cnpj_dv#588, identificador_matriz_filial#619, nome_fantasia#650, situacao_cadastral#681, data_situacao_cadastral#1707, motivo_situacao_cadastral#743, nome_da_cidade_no_exterior#774, pais#805, to_date(cast(data_de_inicio_atividade#836 as string), Some(yyyyMMdd)) AS data_de_inicio_atividade#1738, cnae_fiscal_principal#867, cnae_fiscal_secundaria#898, tipo_de_logradouro#929, logradouro#960, numero#991, complemento#1022, bairro#1053, cep#1084, uf#1115, municipio#1146, ddd_1#1177, telefone_1#1208, ddd_2#1239, ... 6 more fields]
      :     +- Project [cnpj_basico#526, cnpj_ordem#557, cnpj_dv#588, identificador_matriz_filial#619, nome_fantasia#650, situacao_cadastral#681, to_date(cast(data_situacao_cadastral#712 as string), Some(yyyyMMdd)) AS data_situacao_cadastral#1707, motivo_situacao_cadastral#743, nome_da_cidade_no_exterior#774, pais#805, data_de_inicio_atividade#836, cnae_fiscal_principal#867, cnae_fiscal_secundaria#898, tipo_de_logradouro#929, logradouro#960, numero#991, complemento#1022, bairro#1053, cep#1084, uf#1115, municipio#1146, ddd_1#1177, telefone_1#1208, ddd_2#1239, ... 6 more fields]
      :        +- Project [cnpj_basico#526, cnpj_ordem#557, cnpj_dv#588, identificador_matriz_filial#619, nome_fantasia#650, situacao_cadastral#681, data_situacao_cadastral#712, motivo_situacao_cadastral#743, nome_da_cidade_no_exterior#774, pais#805, data_de_inicio_atividade#836, cnae_fiscal_principal#867, cnae_fiscal_secundaria#898, tipo_de_logradouro#929, logradouro#960, numero#991, complemento#1022, bairro#1053, cep#1084, uf#1115, municipio#1146, ddd_1#1177, telefone_1#1208, ddd_2#1239, ... 6 more fields]
      :           +- Project [cnpj_basico#526, cnpj_ordem#557, cnpj_dv#588, identificador_matriz_filial#619, nome_fantasia#650, situacao_cadastral#681, data_situacao_cadastral#712, motivo_situacao_cadastral#743, nome_da_cidade_no_exterior#774, pais#805, data_de_inicio_atividade#836, cnae_fiscal_principal#867, cnae_fiscal_secundaria#898, tipo_de_logradouro#929, logradouro#960, numero#991, complemento#1022, bairro#1053, cep#1084, uf#1115, municipio#1146, ddd_1#1177, telefone_1#1208, ddd_2#1239, ... 6 more fields]
      :              +- Project [cnpj_basico#526, cnpj_ordem#557, cnpj_dv#588, identificador_matriz_filial#619, nome_fantasia#650, situacao_cadastral#681, data_situacao_cadastral#712, motivo_situacao_cadastral#743, nome_da_cidade_no_exterior#774, pais#805, data_de_inicio_atividade#836, cnae_fiscal_principal#867, cnae_fiscal_secundaria#898, tipo_de_logradouro#929, logradouro#960, numero#991, complemento#1022, bairro#1053, cep#1084, uf#1115, municipio#1146, ddd_1#1177, telefone_1#1208, ddd_2#1239, ... 6 more fields]
      :                 +- Project [cnpj_basico#526, cnpj_ordem#557, cnpj_dv#588, identificador_matriz_filial#619, nome_fantasia#650, situacao_cadastral#681, data_situacao_cadastral#712, motivo_situacao_cadastral#743, nome_da_cidade_no_exterior#774, pais#805, data_de_inicio_atividade#836, cnae_fiscal_principal#867, cnae_fiscal_secundaria#898, tipo_de_logradouro#929, logradouro#960, numero#991, complemento#1022, bairro#1053, cep#1084, uf#1115, municipio#1146, ddd_1#1177, telefone_1#1208, ddd_2#1239, ... 6 more fields]
      :                    +- Project [cnpj_basico#526, cnpj_ordem#557, cnpj_dv#588, identificador_matriz_filial#619, nome_fantasia#650, situacao_cadastral#681, data_situacao_cadastral#712, motivo_situacao_cadastral#743, nome_da_cidade_no_exterior#774, pais#805, data_de_inicio_atividade#836, cnae_fiscal_principal#867, cnae_fiscal_secundaria#898, tipo_de_logradouro#929, logradouro#960, numero#991, complemento#1022, bairro#1053, cep#1084, uf#1115, municipio#1146, ddd_1#1177, telefone_1#1208, ddd_2#1239, ... 6 more fields]
      :                       +- Project [cnpj_basico#526, cnpj_ordem#557, cnpj_dv#588, identificador_matriz_filial#619, nome_fantasia#650, situacao_cadastral#681, data_situacao_cadastral#712, motivo_situacao_cadastral#743, nome_da_cidade_no_exterior#774, pais#805, data_de_inicio_atividade#836, cnae_fiscal_principal#867, cnae_fiscal_secundaria#898, tipo_de_logradouro#929, logradouro#960, numero#991, complemento#1022, bairro#1053, cep#1084, uf#1115, municipio#1146, ddd_1#1177, telefone_1#1208, ddd_2#1239, ... 6 more fields]
      :                          +- Project [cnpj_basico#526, cnpj_ordem#557, cnpj_dv#588, identificador_matriz_filial#619, nome_fantasia#650, situacao_cadastral#681, data_situacao_cadastral#712, motivo_situacao_cadastral#743, nome_da_cidade_no_exterior#774, pais#805, data_de_inicio_atividade#836, cnae_fiscal_principal#867, cnae_fiscal_secundaria#898, tipo_de_logradouro#929, logradouro#960, numero#991, complemento#1022, bairro#1053, cep#1084, uf#1115, municipio#1146, ddd_1#1177, telefone_1#1208, _c23#240 AS ddd_2#1239, ... 6 more fields]
      :                             +- Project [cnpj_basico#526, cnpj_ordem#557, cnpj_dv#588, identificador_matriz_filial#619, nome_fantasia#650, situacao_cadastral#681, data_situacao_cadastral#712, motivo_situacao_cadastral#743, nome_da_cidade_no_exterior#774, pais#805, data_de_inicio_atividade#836, cnae_fiscal_principal#867, cnae_fiscal_secundaria#898, tipo_de_logradouro#929, logradouro#960, numero#991, complemento#1022, bairro#1053, cep#1084, uf#1115, municipio#1146, ddd_1#1177, _c22#239 AS telefone_1#1208, _c23#240, ... 6 more fields]
      :                                +- Project [cnpj_basico#526, cnpj_ordem#557, cnpj_dv#588, identificador_matriz_filial#619, nome_fantasia#650, situacao_cadastral#681, data_situacao_cadastral#712, motivo_situacao_cadastral#743, nome_da_cidade_no_exterior#774, pais#805, data_de_inicio_atividade#836, cnae_fiscal_principal#867, cnae_fiscal_secundaria#898, tipo_de_logradouro#929, logradouro#960, numero#991, complemento#1022, bairro#1053, cep#1084, uf#1115, municipio#1146, _c21#238 AS ddd_1#1177, _c22#239, _c23#240, ... 6 more fields]
      :                                   +- Project [cnpj_basico#526, cnpj_ordem#557, cnpj_dv#588, identificador_matriz_filial#619, nome_fantasia#650, situacao_cadastral#681, data_situacao_cadastral#712, motivo_situacao_cadastral#743, nome_da_cidade_no_exterior#774, pais#805, data_de_inicio_atividade#836, cnae_fiscal_principal#867, cnae_fiscal_secundaria#898, tipo_de_logradouro#929, logradouro#960, numero#991, complemento#1022, bairro#1053, cep#1084, uf#1115, _c20#237 AS municipio#1146, _c21#238, _c22#239, _c23#240, ... 6 more fields]
      :                                      +- Project [cnpj_basico#526, cnpj_ordem#557, cnpj_dv#588, identificador_matriz_filial#619, nome_fantasia#650, situacao_cadastral#681, data_situacao_cadastral#712, motivo_situacao_cadastral#743, nome_da_cidade_no_exterior#774, pais#805, data_de_inicio_atividade#836, cnae_fiscal_principal#867, cnae_fiscal_secundaria#898, tipo_de_logradouro#929, logradouro#960, numero#991, complemento#1022, bairro#1053, cep#1084, _c19#236 AS uf#1115, _c20#237, _c21#238, _c22#239, _c23#240, ... 6 more fields]
      :                                         +- Project [cnpj_basico#526, cnpj_ordem#557, cnpj_dv#588, identificador_matriz_filial#619, nome_fantasia#650, situacao_cadastral#681, data_situacao_cadastral#712, motivo_situacao_cadastral#743, nome_da_cidade_no_exterior#774, pais#805, data_de_inicio_atividade#836, cnae_fiscal_principal#867, cnae_fiscal_secundaria#898, tipo_de_logradouro#929, logradouro#960, numero#991, complemento#1022, bairro#1053, _c18#235 AS cep#1084, _c19#236, _c20#237, _c21#238, _c22#239, _c23#240, ... 6 more fields]
      :                                            +- Project [cnpj_basico#526, cnpj_ordem#557, cnpj_dv#588, identificador_matriz_filial#619, nome_fantasia#650, situacao_cadastral#681, data_situacao_cadastral#712, motivo_situacao_cadastral#743, nome_da_cidade_no_exterior#774, pais#805, data_de_inicio_atividade#836, cnae_fiscal_principal#867, cnae_fiscal_secundaria#898, tipo_de_logradouro#929, logradouro#960, numero#991, complemento#1022, _c17#234 AS bairro#1053, _c18#235, _c19#236, _c20#237, _c21#238, _c22#239, _c23#240, ... 6 more fields]
      :                                               +- Project [cnpj_basico#526, cnpj_ordem#557, cnpj_dv#588, identificador_matriz_filial#619, nome_fantasia#650, situacao_cadastral#681, data_situacao_cadastral#712, motivo_situacao_cadastral#743, nome_da_cidade_no_exterior#774, pais#805, data_de_inicio_atividade#836, cnae_fiscal_principal#867, cnae_fiscal_secundaria#898, tipo_de_logradouro#929, logradouro#960, numero#991, _c16#233 AS complemento#1022, _c17#234, _c18#235, _c19#236, _c20#237, _c21#238, _c22#239, _c23#240, ... 6 more fields]
      :                                                  +- Project [cnpj_basico#526, cnpj_ordem#557, cnpj_dv#588, identificador_matriz_filial#619, nome_fantasia#650, situacao_cadastral#681, data_situacao_cadastral#712, motivo_situacao_cadastral#743, nome_da_cidade_no_exterior#774, pais#805, data_de_inicio_atividade#836, cnae_fiscal_principal#867, cnae_fiscal_secundaria#898, tipo_de_logradouro#929, logradouro#960, _c15#232 AS numero#991, _c16#233, _c17#234, _c18#235, _c19#236, _c20#237, _c21#238, _c22#239, _c23#240, ... 6 more fields]
      :                                                     +- Project [cnpj_basico#526, cnpj_ordem#557, cnpj_dv#588, identificador_matriz_filial#619, nome_fantasia#650, situacao_cadastral#681, data_situacao_cadastral#712, motivo_situacao_cadastral#743, nome_da_cidade_no_exterior#774, pais#805, data_de_inicio_atividade#836, cnae_fiscal_principal#867, cnae_fiscal_secundaria#898, tipo_de_logradouro#929, _c14#231 AS logradouro#960, _c15#232, _c16#233, _c17#234, _c18#235, _c19#236, _c20#237, _c21#238, _c22#239, _c23#240, ... 6 more fields]
      :                                                        +- Project [cnpj_basico#526, cnpj_ordem#557, cnpj_dv#588, identificador_matriz_filial#619, nome_fantasia#650, situacao_cadastral#681, data_situacao_cadastral#712, motivo_situacao_cadastral#743, nome_da_cidade_no_exterior#774, pais#805, data_de_inicio_atividade#836, cnae_fiscal_principal#867, cnae_fiscal_secundaria#898, _c13#230 AS tipo_de_logradouro#929, _c14#231, _c15#232, _c16#233, _c17#234, _c18#235, _c19#236, _c20#237, _c21#238, _c22#239, _c23#240, ... 6 more fields]
      :                                                           +- Project [cnpj_basico#526, cnpj_ordem#557, cnpj_dv#588, identificador_matriz_filial#619, nome_fantasia#650, situacao_cadastral#681, data_situacao_cadastral#712, motivo_situacao_cadastral#743, nome_da_cidade_no_exterior#774, pais#805, data_de_inicio_atividade#836, cnae_fiscal_principal#867, _c12#229 AS cnae_fiscal_secundaria#898, _c13#230, _c14#231, _c15#232, _c16#233, _c17#234, _c18#235, _c19#236, _c20#237, _c21#238, _c22#239, _c23#240, ... 6 more fields]
      :                                                              +- Project [cnpj_basico#526, cnpj_ordem#557, cnpj_dv#588, identificador_matriz_filial#619, nome_fantasia#650, situacao_cadastral#681, data_situacao_cadastral#712, motivo_situacao_cadastral#743, nome_da_cidade_no_exterior#774, pais#805, data_de_inicio_atividade#836, _c11#228 AS cnae_fiscal_principal#867, _c12#229, _c13#230, _c14#231, _c15#232, _c16#233, _c17#234, _c18#235, _c19#236, _c20#237, _c21#238, _c22#239, _c23#240, ... 6 more fields]
      :                                                                 +- Project [cnpj_basico#526, cnpj_ordem#557, cnpj_dv#588, identificador_matriz_filial#619, nome_fantasia#650, situacao_cadastral#681, data_situacao_cadastral#712, motivo_situacao_cadastral#743, nome_da_cidade_no_exterior#774, pais#805, _c10#227 AS data_de_inicio_atividade#836, _c11#228, _c12#229, _c13#230, _c14#231, _c15#232, _c16#233, _c17#234, _c18#235, _c19#236, _c20#237, _c21#238, _c22#239, _c23#240, ... 6 more fields]
      :                                                                    +- Project [cnpj_basico#526, cnpj_ordem#557, cnpj_dv#588, identificador_matriz_filial#619, nome_fantasia#650, situacao_cadastral#681, data_situacao_cadastral#712, motivo_situacao_cadastral#743, nome_da_cidade_no_exterior#774, _c9#226 AS pais#805, _c10#227, _c11#228, _c12#229, _c13#230, _c14#231, _c15#232, _c16#233, _c17#234, _c18#235, _c19#236, _c20#237, _c21#238, _c22#239, _c23#240, ... 6 more fields]
      :                                                                       +- Project [cnpj_basico#526, cnpj_ordem#557, cnpj_dv#588, identificador_matriz_filial#619, nome_fantasia#650, situacao_cadastral#681, data_situacao_cadastral#712, motivo_situacao_cadastral#743, _c8#225 AS nome_da_cidade_no_exterior#774, _c9#226, _c10#227, _c11#228, _c12#229, _c13#230, _c14#231, _c15#232, _c16#233, _c17#234, _c18#235, _c19#236, _c20#237, _c21#238, _c22#239, _c23#240, ... 6 more fields]
      :                                                                          +- Project [cnpj_basico#526, cnpj_ordem#557, cnpj_dv#588, identificador_matriz_filial#619, nome_fantasia#650, situacao_cadastral#681, data_situacao_cadastral#712, _c7#224 AS motivo_situacao_cadastral#743, _c8#225, _c9#226, _c10#227, _c11#228, _c12#229, _c13#230, _c14#231, _c15#232, _c16#233, _c17#234, _c18#235, _c19#236, _c20#237, _c21#238, _c22#239, _c23#240, ... 6 more fields]
      :                                                                             +- Project [cnpj_basico#526, cnpj_ordem#557, cnpj_dv#588, identificador_matriz_filial#619, nome_fantasia#650, situacao_cadastral#681, _c6#223 AS data_situacao_cadastral#712, _c7#224, _c8#225, _c9#226, _c10#227, _c11#228, _c12#229, _c13#230, _c14#231, _c15#232, _c16#233, _c17#234, _c18#235, _c19#236, _c20#237, _c21#238, _c22#239, _c23#240, ... 6 more fields]
      :                                                                                +- Project [cnpj_basico#526, cnpj_ordem#557, cnpj_dv#588, identificador_matriz_filial#619, nome_fantasia#650, _c5#222 AS situacao_cadastral#681, _c6#223, _c7#224, _c8#225, _c9#226, _c10#227, _c11#228, _c12#229, _c13#230, _c14#231, _c15#232, _c16#233, _c17#234, _c18#235, _c19#236, _c20#237, _c21#238, _c22#239, _c23#240, ... 6 more fields]
      :                                                                                   +- Project [cnpj_basico#526, cnpj_ordem#557, cnpj_dv#588, identificador_matriz_filial#619, _c4#221 AS nome_fantasia#650, _c5#222, _c6#223, _c7#224, _c8#225, _c9#226, _c10#227, _c11#228, _c12#229, _c13#230, _c14#231, _c15#232, _c16#233, _c17#234, _c18#235, _c19#236, _c20#237, _c21#238, _c22#239, _c23#240, ... 6 more fields]
      :                                                                                      +- Project [cnpj_basico#526, cnpj_ordem#557, cnpj_dv#588, _c3#220 AS identificador_matriz_filial#619, _c4#221, _c5#222, _c6#223, _c7#224, _c8#225, _c9#226, _c10#227, _c11#228, _c12#229, _c13#230, _c14#231, _c15#232, _c16#233, _c17#234, _c18#235, _c19#236, _c20#237, _c21#238, _c22#239, _c23#240, ... 6 more fields]
      :                                                                                         +- Project [cnpj_basico#526, cnpj_ordem#557, _c2#219 AS cnpj_dv#588, _c3#220, _c4#221, _c5#222, _c6#223, _c7#224, _c8#225, _c9#226, _c10#227, _c11#228, _c12#229, _c13#230, _c14#231, _c15#232, _c16#233, _c17#234, _c18#235, _c19#236, _c20#237, _c21#238, _c22#239, _c23#240, ... 6 more fields]
      :                                                                                            +- Project [cnpj_basico#526, _c1#218 AS cnpj_ordem#557, _c2#219, _c3#220, _c4#221, _c5#222, _c6#223, _c7#224, _c8#225, _c9#226, _c10#227, _c11#228, _c12#229, _c13#230, _c14#231, _c15#232, _c16#233, _c17#234, _c18#235, _c19#236, _c20#237, _c21#238, _c22#239, _c23#240, ... 6 more fields]
      :                                                                                               +- Project [_c0#217 AS cnpj_basico#526, _c1#218, _c2#219, _c3#220, _c4#221, _c5#222, _c6#223, _c7#224, _c8#225, _c9#226, _c10#227, _c11#228, _c12#229, _c13#230, _c14#231, _c15#232, _c16#233, _c17#234, _c18#235, _c19#236, _c20#237, _c21#238, _c22#239, _c23#240, ... 6 more fields]
      :                                                                                                  +- Relation[_c0#217,_c1#218,_c2#219,_c3#220,_c4#221,_c5#222,_c6#223,_c7#224,_c8#225,_c9#226,_c10#227,_c11#228,_c12#229,_c13#230,_c14#231,_c15#232,_c16#233,_c17#234,_c18#235,_c19#236,_c20#237,_c21#238,_c22#239,_c23#240,... 6 more fields] csv
      +- Project [cnpj_basico#470, razao_social_nome_empresarial#478, natureza_juridica#486, qualificacao_do_responsavel#494, cast(capital_social_da_empresa#1643 as double) AS capital_social_da_empresa#1658, porte_da_empresa#510, ente_federativo_responsavel#518]
         +- Project [cnpj_basico#470, razao_social_nome_empresarial#478, natureza_juridica#486, qualificacao_do_responsavel#494, regexp_replace(capital_social_da_empresa#502, ,, ., 1) AS capital_social_da_empresa#1643, porte_da_empresa#510, ente_federativo_responsavel#518]
            +- Project [cnpj_basico#470, razao_social_nome_empresarial#478, natureza_juridica#486, qualificacao_do_responsavel#494, capital_social_da_empresa#502, porte_da_empresa#510, _c6#35 AS ente_federativo_responsavel#518]
               +- Project [cnpj_basico#470, razao_social_nome_empresarial#478, natureza_juridica#486, qualificacao_do_responsavel#494, capital_social_da_empresa#502, _c5#34 AS porte_da_empresa#510, _c6#35]
                  +- Project [cnpj_basico#470, razao_social_nome_empresarial#478, natureza_juridica#486, qualificacao_do_responsavel#494, _c4#33 AS capital_social_da_empresa#502, _c5#34, _c6#35]
                     +- Project [cnpj_basico#470, razao_social_nome_empresarial#478, natureza_juridica#486, _c3#32 AS qualificacao_do_responsavel#494, _c4#33, _c5#34, _c6#35]
                        +- Project [cnpj_basico#470, razao_social_nome_empresarial#478, _c2#31 AS natureza_juridica#486, _c3#32, _c4#33, _c5#34, _c6#35]
                           +- Project [cnpj_basico#470, _c1#30 AS razao_social_nome_empresarial#478, _c2#31, _c3#32, _c4#33, _c5#34, _c6#35]
                              +- Project [_c0#29 AS cnpj_basico#470, _c1#30, _c2#31, _c3#32, _c4#33, _c5#34, _c6#35]
                                 +- Relation[_c0#29,_c1#30,_c2#31,_c3#32,_c4#33,_c5#34,_c6#35] csv


In [111]:
empresas_join\
    .select(empresas_join.data_de_inicio_atividade.alias('data_de_inicio'))\
    .where(empresas_join.data_de_inicio_atividade >= '2010')\
    .groupBy('data_de_inicio')\
    .count()\
    .orderBy('data_de_inicio')\
    .show()

+--------------+-----+
|data_de_inicio|count|
+--------------+-----+
|    2010-01-01|   12|
|    2010-01-02|    5|
|    2010-01-03|   10|
|    2010-01-04|  272|
|    2010-01-05|  303|
|    2010-01-06|  318|
|    2010-01-07|  317|
|    2010-01-08|  315|
|    2010-01-09|    2|
|    2010-01-10|    1|
|    2010-01-11|  337|
|    2010-01-12|  347|
|    2010-01-13|  361|
|    2010-01-14|  348|
|    2010-01-15|  351|
|    2010-01-16|    2|
|    2010-01-17|    1|
|    2010-01-18|  423|
|    2010-01-19|  377|
|    2010-01-20|  402|
+--------------+-----+
only showing top 20 rows



In [112]:
empresas_join\
    .select(f.year(empresas_join.data_de_inicio_atividade).alias('data_de_inicio'))\
    .where("data_de_inicio >= 2010")\
    .groupBy('data_de_inicio')\
    .count()\
    .orderBy('data_de_inicio')\
    .show()

+--------------+------+
|data_de_inicio| count|
+--------------+------+
|          2010|154159|
|          2011|172677|
|          2012|232480|
|          2013|198424|
|          2014|202276|
|          2015|212523|
|          2016|265417|
|          2017|237292|
|          2018|275435|
|          2019|325922|
|          2020|400654|
|          2021|153275|
+--------------+------+



# Formas de Armazenamento
---

## Arquivos CSV

[property DataFrame.write](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.write.html)

[DataFrameWriter.csv(*args)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrameWriter.csv.html)

In [113]:
empresas.write.csv(
    path= '/content/drive/MyDrive/curso-spark/empresas/csv',
    mode='overwrite',
    sep=';',
    header=True
)

In [114]:
empresas3 = spark.read.csv(
    '/content/drive/MyDrive/curso-spark/empresas/csv',
    sep=';',
    inferSchema=True,
    header= True
)

In [115]:
empresas3.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



## Faça como eu fiz

In [116]:
socios.write.csv(
    path= '/content/drive/MyDrive/curso-spark/socios/csv',
    mode='overwrite',
    sep=';',
    header=True
)

In [117]:
socios2 = spark.read.csv(
    '/content/drive/MyDrive/curso-spark/socios/csv',
    sep=';',
    inferSchema=True,
    header= True
)

In [118]:
socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: date (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



In [119]:
estabelecimentos.write.csv(
    path= '/content/drive/MyDrive/curso-spark/estabelecimentos/csv',
    mode='overwrite',
    sep=';',
    header=True
)

In [120]:
estabelecimentos2 = spark.read.csv(
    '/content/drive/MyDrive/curso-spark/estabelecimentos/csv',
    sep=';',
    inferSchema=True,
    header= True
)

In [121]:
estabelecimentos.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: date (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: date (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: string (n

## Arquivos PARQUET

[Apache Parquet](https://parquet.apache.org/)

[DataFrameWriter.parquet(*args)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrameWriter.parquet.html)

In [134]:
empresas.write.parquet(
    path= '/content/drive/MyDrive/curso-spark/empresas/parquet',
    mode='overwrite'
)

In [138]:
empresas_parquet = spark.read.parquet(
    '/content/drive/MyDrive/curso-spark/empresas/parquet'

)

In [139]:
empresas_parquet.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



## Particionamento dos dados

[DataFrameWriter.partitionBy(*cols)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrameWriter.partitionBy.html)

In [140]:
empresas.coalesce(1).write.csv(
    path= '/content/drive/MyDrive/curso-spark/empresas/csv-unico',
    mode='overwrite',
    sep = ';',
    header= True
)

In [141]:
empresas.write.parquet(
    path='/content/drive/MyDrive/curso-spark/empresas/parquet-partitionBy',
    mode= 'overwrite',
    partitionBy= 'porte_da_empresa'
)

In [142]:
spark.stop()