# Começando o Trabalho
---

## Apache Spark - Introdução

### [Apache Spark](https://spark.apache.org/)

Apache Spark é uma plataforma de computação em *cluster* que fornece uma API para programação distribuída para processamento de dados em larga escala, semelhante ao modelo *MapReduce*, mas projetada para ser rápida para consultas interativas e algoritmos iterativos.

O Spark permite que você distribua dados e tarefas em clusters com vários nós. Imagine cada nó como um computador separado. A divisão dos dados torna mais fácil o trabalho com conjuntos de dados muito grandes porque cada nó funciona processa apenas uma parte parte do volume total de dados.

O Spark é amplamente utilizado em projetos analíticos nas seguintes frentes:

- Preparação de dados
- Modelos de machine learning
- Análise de dados em tempo real

---

### [PySpark](https://spark.apache.org/docs/3.1.2/api/python/index.html)

PySpark é uma interface para Apache Spark em Python. Ele não apenas permite que você escreva aplicativos Spark usando APIs Python, mas também fornece o *shell* PySpark para analisar interativamente seus dados em um ambiente distribuído. O PySpark oferece suporte à maioria dos recursos do Spark, como Spark SQL, DataFrame, Streaming, MLlib (Machine Learning) e Spark Core.

<center><img src="https://caelum-online-public.s3.amazonaws.com/2273-introducao-spark/01/img-001.png"/></center>

#### Spark SQL e DataFrame

Spark SQL é um módulo Spark para processamento de dados estruturados. Ele fornece uma abstração de programação chamada DataFrame e também pode atuar como mecanismo de consulta SQL distribuído.

#### Spark Streaming

Executando em cima do Spark, o recurso de *streaming* no Apache Spark possibilita o uso de poderosas aplicações interativas e analíticas em *streaming* e dados históricos, enquanto herda a facilidade de uso do Spark e as características de tolerância a falhas.

#### Spark MLlib

Construído sobre o Spark, MLlib é uma biblioteca de aprendizado de máquina escalonável que fornece um conjunto uniforme de APIs de alto nível que ajudam os usuários a criar e ajustar *pipelines* de aprendizado de máquina práticos.

#### Spark Core

Spark Core é o mecanismo de execução geral subjacente para a plataforma Spark sobre o qual todas as outras funcionalidades são construídas. Ele fornece um RDD (*Resilient Distributed Dataset*) e recursos de computação na memória.

---

## Utilizando o Spark no Windows

[fonte](https://spark.apache.org/docs/3.1.2/api/python/getting_started/install.html)

#### Passo 1 - Instalando o Java

O PySpark requer a instalação do Java na versão 7 ou superior. Obtenha a versão mais recente clicando [aqui](https://www.java.com/pt-BR/download/). Para verificar a versão que está instalada em sua máquina execute a seguinte linha de código no seu *prompt*:

```
java -version
```

#### Passo 2 - Instalando o Python

O Python deve ser instalado em sua versão 2.6 ou superior. Para obter a versão mais recente clique [aqui](https://www.python.org/downloads/windows/). Para verificar a versão do Python que está instalada em sua máquina digite o seguinte comando em seu *prompt*:

```
python --version
```

#### Passo 3 - Instalando o Apache Spark 

Selecione a versão mais estável clicando [aqui](http://spark.apache.org/downloads.html). Na criação deste projeto utilizamos a versão do Spark **3.1.2** e como tipo de pacote selecionamos **Pre-built for Apache Hadoop 2.7**.

Para instalar o Apache Spark não é necessário executar um instalador, basta descomprimir os arquivos em uma pasta de sua escolha.

<font color=red>Obs.: certifique-se de que o caminho onde os arquivos do Spark foram armazenados não contenham espaços (ex.: **"C:\spark\spark-3.1.2-bin-hadoop2.7"**).</font>

Para testar o funcionamento do Spark execute os comandos abaixo em seu *prompt* de comando. Esses comandos assumem que você extraiu os arquivos do Spark na pasta **"C:\spark\"**.

```
cd C:\spark\spark-3.1.2-bin-hadoop2.7
```

```
bin\pyspark
```

O comando acima inicia o *shell* do PySpark que permite trabalhar interativamente com o Spark.

Para sair basta digitar `exit()` e logo depois presionar *Enter*. Para voltar ao *prompt* pressione *Enter* novamente.

#### Passo 4 - Instalando o findspark

```
pip install findspark
```

#### Passo 5 - Instalando o winutils

Os arquivos do Spark não incluem o utilitário **winutils.exe** que é utilizado pelo Spark no Windows. Se não informar onde o Spark deve procurar este utilitário, veremos alguns erros no console e também não conseguiremos executar *scripts* Python utilizando o utilitário `spark-submit`.

Faça o [download](https://github.com/steveloughran/winutils) para a versão do Hadoop para a qual sua instalação do Spark foi construída. Em nosso exemplo foi utilizada a [versão 2.7](https://github.com/steveloughran/winutils/tree/master/hadoop-2.7.1/bin). Faça o *download* apenas do arquivo **winutils.exe**.

Crie a pasta **"hadoop\bin"** dentro da pasta que contém os arquivos do Spark (em nosso exemplo **"C:\spark\spark-3.1.2-bin-hadoop2.7"**) e copie o arquivo **winutils.exe** para dentro desta pasta.

Crie duas variáveis de ambiente no seu Windows. A primeira chamada **SPARK_HOME** que aponta para a pasta onde os arquivos Spark foram armazenados (em nosso exemplo **"C:\spark\spark-3.1.2-bin-hadoop2.7"**). A segunda chamada **HADOOP_HOME** que aponta para **%SPARK_HOME%\hadoop** (assim podemos modificar **SPARK_HOME** sem precisar alterar **HADOOP_HOME**).

---

## Utilizando o Spark no Google Colab

Para facilitar o desenvolvimento de nosso projeto neste curso vamos utilizar o Google Colab como ferramenta e para configurar o PySpark basta executar os comandos abaixo na própria célula do seu *notebook*.

In [None]:
# Instalar as dependências
# Só utilize caso queira fazer o desenvolvimento no Google Colab

!apt-get update -qq
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
# Só utilize caso queira fazer o desenvolvimento no Google Colab

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

---

# Iniciando uma sessão Spark com Jupyter Notebook

In [6]:
# Verificar versão do python na máquina

!python --version

Python 3.9.12


In [7]:
# Verificar versão do java na máquina

!java --version

java 11.0.19 2023-04-18 LTS
Java(TM) SE Runtime Environment 18.9 (build 11.0.19+9-LTS-224)
Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.19+9-LTS-224, mixed mode)


In [8]:
# Importa uma biblioteca para conseguir acessar uma variável direto do sistema operacional

import os

os.environ['SPARK_HOME'] = 'D:\Programming\Spark\spark-3.5.1-bin-hadoop3'

In [9]:
# Importa uma biblioteca que encontra e possibilita a utilização do spark no notebook

import findspark

findspark.init()

In [10]:
# Importa a biblioteca para conseguir iniciar uma sessão do spark
# Cria uma sessão do spark para iniciar os trabalhos

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[*]') \
    .appName('Iniciando com Spark') \
    .getOrCreate()

In [11]:
# Iniciar a sessão do spark, informando a versão, local de execução e o nome do aplicativo

spark

---

## DataFrames com Spark


### Interfaces Spark

Existem três interfaces principais do Apache Spark que você deve conhecer: Resilient Distributed Dataset, DataFrame e Dataset.

- **Resilient Distributed Dataset**: A primeira abstração do Apache Spark foi o Resilient Distributed Dataset (RDD). É uma interface para uma sequência de objetos de dados que consiste em um ou mais tipos localizados em uma coleção de máquinas (um cluster). Os RDDs podem ser criados de várias maneiras e são a API de “nível mais baixo” disponível. Embora esta seja a estrutura de dados original do Apache Spark, você deve se concentrar na API DataFrame, que é um superconjunto da funcionalidade RDD. A API RDD está disponível nas linguagens Java, Python e Scala.

- **DataFrame**: Trata-se de um conceito similar ao DataFrame que você pode estar familiarizado como o pacote pandas do Python e a linguagem R . A API DataFrame está disponível nas linguagens Java, Python, R e Scala.

- **Dataset**: uma combinação de DataFrame e RDD. Ele fornece a interface digitada que está disponível em RDDs enquanto fornece a conveniência do DataFrame. A API Dataset está disponível nas linguagens Java e Scala.

Em muitos cenários, especialmente com as otimizações de desempenho incorporadas em DataFrames e Datasets, não será necessário trabalhar com RDDs. Mas é importante entender a abstração RDD porque:

- O RDD é a infraestrutura subjacente que permite que o Spark seja executado com tanta rapidez e forneça a linhagem de dados.

- Se você estiver mergulhando em componentes mais avançados do Spark, pode ser necessário usar RDDs.

- As visualizações na Spark UI fazem referência a RDDs.

In [12]:
# Criando dados ficticios para demonstração

data = [('Zeca', '35'), ('Eva', '29')]
colNames = ['Nome', 'Idade']
df = spark.createDataFrame(data, colNames)

In [13]:
%%time

# Visualizar os dados com o Data Frame nativo do Spark
# O comando (%%time) é nativo do Jupyter Notebook, para visualizar o tempo de execução dos comandos

df.show()

+----+-----+
|Nome|Idade|
+----+-----+
|Zeca|   35|
| Eva|   29|
+----+-----+

CPU times: total: 0 ns
Wall time: 12.4 s


In [14]:
%%time

# Visualizar os dados com o Data Frame nativo do Pandas
# O comando (%%time) é nativo do Jupyter Notebook, para visualizar o tempo de execução dos comandos

df.toPandas()

CPU times: total: 31.2 ms
Wall time: 8.27 s


Unnamed: 0,Nome,Idade
0,Zeca,35
1,Eva,29


---

# Carregamento de Dados

### Iremos utilizar o Jupyter notebook a partir daqui e desde a criação da sessão anteriormente

## [SparkSession](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.SparkSession.html)

O ponto de entrada para programar o Spark com a API Dataset e DataFrame.

Uma SparkSession pode ser utilizada para criar DataFrames, registrar DataFrames como tabelas, executar consultas SQL em tabelas, armazenar em cache e ler arquivos parquet. Para criar uma SparkSession, use o seguinte padrão de construtor:

In [None]:
# Caminho para caso o diretório que contém os arquivos estejam no mesmo local do script do notebook
# Realiza a contagem de registros do arquivo

path = 'gov-open-data/empresas/part-00000-58983ad4-8444-4405-aec6-9cd3e5413d1b-c000.csv'

empresas = spark.read.csv(path, sep=';', inferSchema = True)

empresas.count()

In [None]:
# Caminho para caso o diretório que contém os arquivos estejam em algum local do (C:)
# Realiza a contagem de registros do arquivo

path = '/C:/Users/pheli/Desktop/gov-open-data/empresas/part-00000-58983ad4-8444-4405-aec6-9cd3e5413d1b-c000.csv'

empresas = spark.read.csv(path, sep=';', inferSchema = True)

empresas.count()

In [None]:
# Caminho para caso o diretório que contém os arquivos estejam em algum local do (D:)
# Realiza a contagem de registros do arquivo

path = '/D:/Programming/Spark/gov-open-data/empresas/part-00000-58983ad4-8444-4405-aec6-9cd3e5413d1b-c000.csv'

empresas = spark.read.csv(path, sep=';', inferSchema = True)

empresas.count()

In [214]:
# Caminho para caso o diretório que contém os arquivos estejam em algum local do (D:), diferente do acima
# Realiza a contagem de registros do arquivo

path = '/D:/Archives/tests-tables/gov-open-data/empresas/part-00000-58983ad4-8444-4405-aec6-9cd3e5413d1b-c000.csv'

empresas = spark.read.csv(path, sep=';', inferSchema = True)

empresas.count()

458549

In [None]:
# Caminho para caso o diretório que contém os arquivos estejam em algum local do (D:)
# Realiza a contagem de registros do arquivo
# Neste caso, será realizada a leitura de todos arquivos que comecem com (part-).
# Lembre-se, só realize esse comando caso os arquivos pertençam e sejam iguais ao mesmo conjunto de dados.

# Exemplo de formato
path = '/D:/Archives/tests-tables/gov-open-data/empresas/part-?.csv'

# Outro exemplo de formato
path = '/D:/Archives/tests-tables/gov-open-data/empresas/part-*'

empresas = spark.read.csv(path, sep=';', inferSchema = True)

empresas.count()

### Carregamento de dados de (empresas)

In [15]:
%%time

# Caminho para caso o diretório que contém os arquivos estejam em algum local do (D:), diferente do acima
# Realiza a contagem de registros do arquivo
# Essa é a maneira para carregar os dados de (empresas) do jeito mais tradicional

import glob
from pyspark.sql import SparkSession

# Caminho do diretório e padrão de nome dos arquivos
path = 'D:/Archives/tests-tables/gov-open-data/empresas/*.csv'

# Listar todos os caminhos dos arquivos CSV no diretório
file_paths = glob.glob(path)

# Inicializar uma lista para armazenar os DataFrames
dfs = []

# Ler cada arquivo CSV em um DataFrame e adicionar à lista dfs
for file in file_paths:
    df = spark.read.csv(file, sep=';', inferSchema=True)
    dfs.append(df)

# Unir todos os DataFrames em um único DataFrame
empresas = None
for df in dfs:
    if empresas is None:
        empresas = df
    else:
        empresas = empresas.union(df)

# Contar o número total de linhas
total_linhas = empresas.count()

print(f"Total de linhas lidas: {total_linhas}")

Total de linhas lidas: 4585679
CPU times: total: 0 ns
Wall time: 13.3 s


In [16]:
%%time

# Caminho para caso o diretório que contém os arquivos estejam em algum local do (D:), diferente do acima
# Realiza a contagem de registros do arquivo
# Essa é a maneira para carregar os dados de (empresas) do jeito mais simplificado

import glob
from functools import reduce

path = 'D:/Archives/tests-tables/gov-open-data/empresas/*.csv'

arquivos = glob.glob(path)

# Ler cada arquivo CSV em um DataFrame e armazenar na lista dfs
dfs = [spark.read.csv(file, sep=';', inferSchema=True) for file in arquivos]

# Unir todos os DataFrames em um único DataFrame
empresas = reduce(lambda df1, df2: df1.union(df2), dfs)

print(f'Total de linhas lidas: ', empresas.count())

Total de linhas lidas:  4585679
CPU times: total: 15.6 ms
Wall time: 5.93 s


### Carregamento de dados de (estabelecimentos)

In [17]:
%%time

# Caminho para caso o diretório que contém os arquivos estejam em algum local do (D:), diferente do acima
# Realiza a contagem de registros do arquivo
# Essa é a maneira para carregar os dados de (estabelecimentos) do jeito mais tradicional

import glob
from pyspark.sql import SparkSession

# Caminho do diretório e padrão de nome dos arquivos
path = 'D:/Archives/tests-tables/gov-open-data/estabelecimentos/*.csv'

# Listar todos os caminhos dos arquivos CSV no diretório
file_paths = glob.glob(path)

# Inicializar uma lista para armazenar os DataFrames
dfs = []

# Ler cada arquivo CSV em um DataFrame e adicionar à lista dfs
for file in file_paths:
    df = spark.read.csv(file, sep=';', inferSchema=True)
    dfs.append(df)

# Unir todos os DataFrames em um único DataFrame
estabelecimentos = None
for df in dfs:
    if estabelecimentos is None:
        estabelecimentos = df
    else:
        estabelecimentos = estabelecimentos.union(df)

# Contar o número total de linhas
total_linhas = estabelecimentos.count()

print(f"Total de linhas lidas: {total_linhas}")

Total de linhas lidas: 4836219
CPU times: total: 0 ns
Wall time: 21.9 s


In [18]:
%%time

# Caminho para caso o diretório que contém os arquivos estejam em algum local do (D:), diferente do acima
# Realiza a contagem de registros do arquivo
# Essa é a maneira para carregar os dados de (estabelecimentos) do jeito mais simplificado

import glob
from functools import reduce

path = 'D:/Archives/tests-tables/gov-open-data/estabelecimentos/*.csv'

arquivos = glob.glob(path)

# Ler cada arquivo CSV em um DataFrame e armazenar na lista dfs
dfs = [spark.read.csv(file, sep=';', inferSchema=True) for file in arquivos]

# Unir todos os DataFrames em um único DataFrame
estabelecimentos = reduce(lambda df1, df2: df1.union(df2), dfs)

print(f'Total de linhas lidas: ', estabelecimentos.count())

Total de linhas lidas:  4836219
CPU times: total: 15.6 ms
Wall time: 8.08 s


### Carregamento de dados de (socios)

In [19]:
%%time

# Caminho para caso o diretório que contém os arquivos estejam em algum local do (D:), diferente do acima
# Realiza a contagem de registros do arquivo
# Essa é a maneira para carregar os dados de (socios) do jeito mais tradicional

import glob
from pyspark.sql import SparkSession

# Caminho do diretório e padrão de nome dos arquivos
path = 'D:/Archives/tests-tables/gov-open-data/socios/*.csv'

# Listar todos os caminhos dos arquivos CSV no diretório
file_paths = glob.glob(path)

# Inicializar uma lista para armazenar os DataFrames
dfs = []

# Ler cada arquivo CSV em um DataFrame e adicionar à lista dfs
for file in file_paths:
    df = spark.read.csv(file, sep=';', inferSchema=True)
    dfs.append(df)

# Unir todos os DataFrames em um único DataFrame
socios = None
for df in dfs:
    if socios is None:
        socios = df
    else:
        socios = socios.union(df)

# Contar o número total de linhas
total_linhas = socios.count()

print(f"Total de linhas lidas: {total_linhas}")

Total de linhas lidas: 2046430
CPU times: total: 15.6 ms
Wall time: 6.76 s


In [20]:
%%time

# Caminho para caso o diretório que contém os arquivos estejam em algum local do (D:), diferente do acima
# Realiza a contagem de registros do arquivo
# Essa é a maneira para carregar os dados de (socios) do jeito mais simplificado

import glob
from functools import reduce

path = 'D:/Archives/tests-tables/gov-open-data/socios/*.csv'

arquivos = glob.glob(path)

# Ler cada arquivo CSV em um DataFrame e armazenar na lista dfs
dfs = [spark.read.csv(file, sep=';', inferSchema=True) for file in arquivos]

# Unir todos os DataFrames em um único DataFrame
socios = reduce(lambda df1, df2: df1.union(df2), dfs)

print(f'Total de linhas lidas: ', socios.count())

Total de linhas lidas:  2046430
CPU times: total: 46.9 ms
Wall time: 3.69 s


---

# Manipulando os Dados

## Operações básicas

In [21]:
# Carregamento de Data Frame nativo do Spark

empresas.limit(15).show()

+-----+--------------------+----+---+---------+---+----+
|  _c0|                 _c1| _c2|_c3|      _c4|_c5| _c6|
+-----+--------------------+----+---+---------+---+----+
| 4519|DANIELA DA SILVA ...|2135| 50|     0,00|  5|NULL|
| 8638|JOAO DOS SANTOS F...|2135| 50|     0,00|  5|NULL|
|11748|PANIFICADORA E CO...|2062| 49|     0,00|  1|NULL|
|12027| L G SORVETERIA LTDA|2062| 49|     0,00|  5|NULL|
|13289|ANDREIA CRISTINA ...|2305| 65|100000,00|  1|NULL|
|13623|MARISTELA INDUSTR...|2062| 49|     0,00|  5|NULL|
|17389|DAICICOM MARKETIN...|2240| 49|     0,00|  1|NULL|
|18944|SAO GOTARDO-DISTR...|2062| 49|     0,00|  5|NULL|
|19204|TORTARIA CAMPINAS...|2062| 49|     0,00|  1|NULL|
|22223|S R FARIAS DA SIL...|2062| 49|     0,00|  5|NULL|
|23015|EVANGELINA P DE J...|2135| 50|     0,00|  1|NULL|
|24354|JUCELIA PEREIRA D...|2135| 50|     0,00|  1|NULL|
|26708|DONEIR RODRIGUES ...|2135| 50|     0,00|  1|NULL|
|28664|M ROCHA COML IMPO...|2062| 49|100000,00|  5|NULL|
|28759|LULETEF CONFECCAO...|206

In [22]:
# Carregamento do Data Frame com caractiristicas do Pandas

empresas.limit(15).toPandas()

Unnamed: 0,_c0,_c1,_c2,_c3,_c4,_c5,_c6
0,4519,DANIELA DA SILVA CRUZ,2135,50,0,5,
1,8638,JOAO DOS SANTOS FAGUNDES,2135,50,0,5,
2,11748,PANIFICADORA E CONFEITARIA CONFIANCA RIO PRETO...,2062,49,0,1,
3,12027,L G SORVETERIA LTDA,2062,49,0,5,
4,13289,ANDREIA CRISTINA DELSIN EIRELI,2305,65,10000000,1,
5,13623,MARISTELA INDUSTRIA E COMERCIO DE SORVETES LTDA,2062,49,0,5,
6,17389,DAICICOM MARKETING DIGITAL S/C LTDA,2240,49,0,1,
7,18944,SAO GOTARDO-DISTRIBUICAO E COMERCIO LTDA,2062,49,0,5,
8,19204,TORTARIA CAMPINAS COMERCIO DE ALIMENTOS LTDA,2062,49,0,1,
9,22223,S R FARIAS DA SILVA E CIA LTDA,2062,49,0,5,


In [23]:
# Carregamento do Data Frame com caractiristicas do Pandas

estabelecimentos.limit(15).toPandas()

Unnamed: 0,_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7,_c8,_c9,...,_c20,_c21,_c22,_c23,_c24,_c25,_c26,_c27,_c28,_c29
0,4519,1,48,1,GIRAFFAS,8,19950331,1,,,...,6219,,,,,,,,,
1,8638,1,79,1,AGROPECUARIA FAGUNDES,8,20150209,73,,,...,7255,,,,,,,,,
2,11748,1,90,1,,4,20181219,63,,,...,7097,,,,,,,,,
3,12027,1,2,1,,8,20081231,71,,,...,7107,,,,,,,,,
4,13289,1,83,1,JS MATERIAIS DE CONSTRUCAO,2,20040123,0,,,...,6915,19.0,35811286.0,,,,,CONTATO@LEONECONTABIL.COM.BR,,
5,13623,1,7,1,MARISTELA INDUSTRIA E COMERCIO DE SORVETES,8,20081231,71,,,...,9701,,,,,,,,,
6,17389,1,88,1,,4,20181004,63,,,...,7107,11.0,5352348.0,,,11.0,5352348.0,,,
7,18944,1,96,1,MONTANHES,8,19960506,1,,,...,7071,,,,,,,,,
8,19204,1,74,1,,8,20121004,1,,,...,6291,,,,,,,,,
9,22223,1,50,1,,8,20161010,1,,,...,6469,,,,,,,,,


In [24]:
# Carregamento do Data Frame com caractiristicas do Pandas

socios.limit(15).toPandas()

Unnamed: 0,_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7,_c8,_c9,_c10
0,11748,2,MARIO KATUMI HOSI,***504158**,49,19940530,,***000000**,,0,7
1,11748,2,ROBERTO YUKIO HOSI,***241578**,22,19940530,,***000000**,,0,7
2,13289,2,ANDREIA CRISTINA DELSIN,***787278**,65,20180615,,***000000**,,0,3
3,17389,2,MARCIA DO CANTO ARRUDA DAIER,***920408**,49,19940613,,***000000**,,0,7
4,19204,2,ALMIR CARLOS CAPELLINI,***299028**,49,19980908,,***000000**,,0,7
5,19204,2,EDMIR CARLOS CAPELLINI,***633158**,49,19980908,,***000000**,,0,7
6,22223,2,SILVIA REGINA FARIAS,***203598**,49,19940627,,***000000**,,0,6
7,22223,2,RUBENS BATISTA DA SILVA,***464638**,22,19990111,,***000000**,,0,7
8,28664,2,CLARICE YURIKO MAEDA FERREIRA,***979398**,49,19940802,,***000000**,,0,6
9,28664,2,MARIA CLARA ROCHA FERREIRA,***387348**,22,20210203,,***000000**,,0,3


---

## Renomeando as colunas do DataFrame

In [25]:
empresasColNames = ['cnpj_basico', 'razao_social_nome_empresarial', 'natureza_juridica', 'qualificacao_do_responsavel', 'capital_social_da_empresa', 'porte_da_empresa', 'ente_federativo_responsavel']

In [26]:
# Um loop simples para validar a posição e o nomes das colunas

for item in enumerate(empresasColNames):
    print(item)

(0, 'cnpj_basico')
(1, 'razao_social_nome_empresarial')
(2, 'natureza_juridica')
(3, 'qualificacao_do_responsavel')
(4, 'capital_social_da_empresa')
(5, 'porte_da_empresa')
(6, 'ente_federativo_responsavel')


In [27]:
# Um loop que irá capturar o index e o nome de (empresasColNames), que irá substituir o Data Frame (empresas)
# Nesse caso, deixamos um print antes de realizar a ação, para validar como irá antes das alterações

for index, colName in enumerate(empresasColNames):
    print(index, colName)

0 cnpj_basico
1 razao_social_nome_empresarial
2 natureza_juridica
3 qualificacao_do_responsavel
4 capital_social_da_empresa
5 porte_da_empresa
6 ente_federativo_responsavel


In [28]:
# Um loop que irá capturar o index e o nome de (empresasColNames), que irá substituir o Data Frame (empresas)

for index, colName in enumerate(empresasColNames):
    empresas = empresas.withColumnRenamed(f'_c{index}', colName)
    
empresas.columns

['cnpj_basico',
 'razao_social_nome_empresarial',
 'natureza_juridica',
 'qualificacao_do_responsavel',
 'capital_social_da_empresa',
 'porte_da_empresa',
 'ente_federativo_responsavel']

In [29]:
estabsColNames = ['cnpj_basico', 'cnpj_ordem', 'cnpj_dv', 'identificador_matriz_filial', 'nome_fantasia', 'situacao_cadastral', 'data_situacao_cadastral', 'motivo_situacao_cadastral', 'nome_da_cidade_no_exterior', 'pais', 'data_de_inicio_atividade', 'cnae_fiscal_principal', 'cnae_fiscal_secundaria', 'tipo_de_logradouro', 'logradouro', 'numero', 'complemento', 'bairro', 'cep', 'uf', 'municipio', 'ddd_1', 'telefone_1', 'ddd_2', 'telefone_2', 'ddd_do_fax', 'fax', 'correio_eletronico', 'situacao_especial', 'data_da_situacao_especial']

In [30]:
# Um loop simples para validar a posição e o nomes das colunas

for item in enumerate(estabsColNames):
    print(item)

(0, 'cnpj_basico')
(1, 'cnpj_ordem')
(2, 'cnpj_dv')
(3, 'identificador_matriz_filial')
(4, 'nome_fantasia')
(5, 'situacao_cadastral')
(6, 'data_situacao_cadastral')
(7, 'motivo_situacao_cadastral')
(8, 'nome_da_cidade_no_exterior')
(9, 'pais')
(10, 'data_de_inicio_atividade')
(11, 'cnae_fiscal_principal')
(12, 'cnae_fiscal_secundaria')
(13, 'tipo_de_logradouro')
(14, 'logradouro')
(15, 'numero')
(16, 'complemento')
(17, 'bairro')
(18, 'cep')
(19, 'uf')
(20, 'municipio')
(21, 'ddd_1')
(22, 'telefone_1')
(23, 'ddd_2')
(24, 'telefone_2')
(25, 'ddd_do_fax')
(26, 'fax')
(27, 'correio_eletronico')
(28, 'situacao_especial')
(29, 'data_da_situacao_especial')


In [31]:
# Um loop que irá capturar o index e o nome de (empresasColNames), que irá substituir o Data Frame (estabelecimentos)

for index, colName in enumerate(estabsColNames):
    estabelecimentos = estabelecimentos.withColumnRenamed(f'_c{index}', colName)
    
estabelecimentos.columns

['cnpj_basico',
 'cnpj_ordem',
 'cnpj_dv',
 'identificador_matriz_filial',
 'nome_fantasia',
 'situacao_cadastral',
 'data_situacao_cadastral',
 'motivo_situacao_cadastral',
 'nome_da_cidade_no_exterior',
 'pais',
 'data_de_inicio_atividade',
 'cnae_fiscal_principal',
 'cnae_fiscal_secundaria',
 'tipo_de_logradouro',
 'logradouro',
 'numero',
 'complemento',
 'bairro',
 'cep',
 'uf',
 'municipio',
 'ddd_1',
 'telefone_1',
 'ddd_2',
 'telefone_2',
 'ddd_do_fax',
 'fax',
 'correio_eletronico',
 'situacao_especial',
 'data_da_situacao_especial']

In [32]:
sociosColNames = ['cnpj_basico', 'identificador_de_socio', 'nome_do_socio_ou_razao_social', 'cnpj_ou_cpf_do_socio', 'qualificacao_do_socio', 'data_de_entrada_sociedade', 'pais', 'representante_legal', 'nome_do_representante', 'qualificacao_do_representante_legal', 'faixa_etaria']

In [33]:
# Um loop simples para validar a posição e o nomes das colunas

for item in enumerate(sociosColNames):
    print(item)

(0, 'cnpj_basico')
(1, 'identificador_de_socio')
(2, 'nome_do_socio_ou_razao_social')
(3, 'cnpj_ou_cpf_do_socio')
(4, 'qualificacao_do_socio')
(5, 'data_de_entrada_sociedade')
(6, 'pais')
(7, 'representante_legal')
(8, 'nome_do_representante')
(9, 'qualificacao_do_representante_legal')
(10, 'faixa_etaria')


In [34]:
# Um loop que irá capturar o index e o nome de (empresasColNames), que irá substituir o Data Frame (socios)

for index, colName in enumerate(sociosColNames):
    socios = socios.withColumnRenamed(f'_c{index}', colName)
    
socios.columns

['cnpj_basico',
 'identificador_de_socio',
 'nome_do_socio_ou_razao_social',
 'cnpj_ou_cpf_do_socio',
 'qualificacao_do_socio',
 'data_de_entrada_sociedade',
 'pais',
 'representante_legal',
 'nome_do_representante',
 'qualificacao_do_representante_legal',
 'faixa_etaria']

In [35]:
empresas.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,4519,DANIELA DA SILVA CRUZ,2135,50,0,5,
1,8638,JOAO DOS SANTOS FAGUNDES,2135,50,0,5,
2,11748,PANIFICADORA E CONFEITARIA CONFIANCA RIO PRETO...,2062,49,0,1,
3,12027,L G SORVETERIA LTDA,2062,49,0,5,
4,13289,ANDREIA CRISTINA DELSIN EIRELI,2305,65,10000000,1,


In [36]:
estabelecimentos.limit(5).toPandas()

Unnamed: 0,cnpj_basico,cnpj_ordem,cnpj_dv,identificador_matriz_filial,nome_fantasia,situacao_cadastral,data_situacao_cadastral,motivo_situacao_cadastral,nome_da_cidade_no_exterior,pais,...,municipio,ddd_1,telefone_1,ddd_2,telefone_2,ddd_do_fax,fax,correio_eletronico,situacao_especial,data_da_situacao_especial
0,4519,1,48,1,GIRAFFAS,8,19950331,1,,,...,6219,,,,,,,,,
1,8638,1,79,1,AGROPECUARIA FAGUNDES,8,20150209,73,,,...,7255,,,,,,,,,
2,11748,1,90,1,,4,20181219,63,,,...,7097,,,,,,,,,
3,12027,1,2,1,,8,20081231,71,,,...,7107,,,,,,,,,
4,13289,1,83,1,JS MATERIAIS DE CONSTRUCAO,2,20040123,0,,,...,6915,19.0,35811286.0,,,,,CONTATO@LEONECONTABIL.COM.BR,,


In [37]:
socios.limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,11748,2,MARIO KATUMI HOSI,***504158**,49,19940530,,***000000**,,0,7
1,11748,2,ROBERTO YUKIO HOSI,***241578**,22,19940530,,***000000**,,0,7
2,13289,2,ANDREIA CRISTINA DELSIN,***787278**,65,20180615,,***000000**,,0,3
3,17389,2,MARCIA DO CANTO ARRUDA DAIER,***920408**,49,19940613,,***000000**,,0,7
4,19204,2,ALMIR CARLOS CAPELLINI,***299028**,49,19980908,,***000000**,,0,7


---

## Analisando os dados

[Data Types](https://spark.apache.org/docs/3.1.2/api/python/reference/pyspark.sql.html#data-types)

In [38]:
# Visualizar somente 5 linhas do Data Frame, e com as caracteristicas nativas do Spark

empresas.limit(5).show()

+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial|natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|       4519|         DANIELA DA SILVA ...|             2135|                         50|                     0,00|               5|                       NULL|
|       8638|         JOAO DOS SANTOS F...|             2135|                         50|                     0,00|               5|                       NULL|
|      11748|         PANIFICADORA E CO...|             2062|                         49|                     0,00|               1|                       NULL|
|      12027|          L G SORVETE

In [39]:
# Visualizar somente 5 linhas do Data Frame, e com as caracteristicas do Pandas

empresas.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,4519,DANIELA DA SILVA CRUZ,2135,50,0,5,
1,8638,JOAO DOS SANTOS FAGUNDES,2135,50,0,5,
2,11748,PANIFICADORA E CONFEITARIA CONFIANCA RIO PRETO...,2062,49,0,1,
3,12027,L G SORVETERIA LTDA,2062,49,0,5,
4,13289,ANDREIA CRISTINA DELSIN EIRELI,2305,65,10000000,1,


In [40]:
# Visualizar o esquema do Data Frame, onde conseguimos ver as colunas, tipos dos dados e caracteristicas

empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: string (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [41]:
# Visualizar somente 5 linhas do Data Frame, e com as caracteristicas nativas do Spark

estabelecimentos.limit(5).show()

+-----------+----------+-------+---------------------------+--------------------+------------------+-----------------------+-------------------------+--------------------------+----+------------------------+---------------------+----------------------+------------------+------------------+------+-------------------+----------------+--------+---+---------+-----+----------+-----+----------+----------+----+--------------------+-----------------+-------------------------+
|cnpj_basico|cnpj_ordem|cnpj_dv|identificador_matriz_filial|       nome_fantasia|situacao_cadastral|data_situacao_cadastral|motivo_situacao_cadastral|nome_da_cidade_no_exterior|pais|data_de_inicio_atividade|cnae_fiscal_principal|cnae_fiscal_secundaria|tipo_de_logradouro|        logradouro|numero|        complemento|          bairro|     cep| uf|municipio|ddd_1|telefone_1|ddd_2|telefone_2|ddd_do_fax| fax|  correio_eletronico|situacao_especial|data_da_situacao_especial|
+-----------+----------+-------+----------------------

In [42]:
# Visualizar somente 5 linhas do Data Frame, e com as caracteristicas do Pandas

estabelecimentos.limit(5).toPandas()

Unnamed: 0,cnpj_basico,cnpj_ordem,cnpj_dv,identificador_matriz_filial,nome_fantasia,situacao_cadastral,data_situacao_cadastral,motivo_situacao_cadastral,nome_da_cidade_no_exterior,pais,...,municipio,ddd_1,telefone_1,ddd_2,telefone_2,ddd_do_fax,fax,correio_eletronico,situacao_especial,data_da_situacao_especial
0,4519,1,48,1,GIRAFFAS,8,19950331,1,,,...,6219,,,,,,,,,
1,8638,1,79,1,AGROPECUARIA FAGUNDES,8,20150209,73,,,...,7255,,,,,,,,,
2,11748,1,90,1,,4,20181219,63,,,...,7097,,,,,,,,,
3,12027,1,2,1,,8,20081231,71,,,...,7107,,,,,,,,,
4,13289,1,83,1,JS MATERIAIS DE CONSTRUCAO,2,20040123,0,,,...,6915,19.0,35811286.0,,,,,CONTATO@LEONECONTABIL.COM.BR,,


In [43]:
# Visualizar o esquema do Data Frame, onde conseguimos ver as colunas, tipos dos dados e caracteristicas

estabelecimentos.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: integer (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: integer (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: str

In [44]:
# Visualizar somente 5 linhas do Data Frame, e com as caracteristicas nativas do Spark

estabelecimentos.limit(5).show()

+-----------+----------+-------+---------------------------+--------------------+------------------+-----------------------+-------------------------+--------------------------+----+------------------------+---------------------+----------------------+------------------+------------------+------+-------------------+----------------+--------+---+---------+-----+----------+-----+----------+----------+----+--------------------+-----------------+-------------------------+
|cnpj_basico|cnpj_ordem|cnpj_dv|identificador_matriz_filial|       nome_fantasia|situacao_cadastral|data_situacao_cadastral|motivo_situacao_cadastral|nome_da_cidade_no_exterior|pais|data_de_inicio_atividade|cnae_fiscal_principal|cnae_fiscal_secundaria|tipo_de_logradouro|        logradouro|numero|        complemento|          bairro|     cep| uf|municipio|ddd_1|telefone_1|ddd_2|telefone_2|ddd_do_fax| fax|  correio_eletronico|situacao_especial|data_da_situacao_especial|
+-----------+----------+-------+----------------------

In [45]:
# Visualizar somente 5 linhas do Data Frame, e com as caracteristicas do Pandas

socios.limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,11748,2,MARIO KATUMI HOSI,***504158**,49,19940530,,***000000**,,0,7
1,11748,2,ROBERTO YUKIO HOSI,***241578**,22,19940530,,***000000**,,0,7
2,13289,2,ANDREIA CRISTINA DELSIN,***787278**,65,20180615,,***000000**,,0,3
3,17389,2,MARCIA DO CANTO ARRUDA DAIER,***920408**,49,19940613,,***000000**,,0,7
4,19204,2,ALMIR CARLOS CAPELLINI,***299028**,49,19980908,,***000000**,,0,7


In [46]:
# Visualizar o esquema do Data Frame, onde conseguimos ver as colunas, tipos dos dados e caracteristicas

socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: integer (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



In [47]:
# Filtrar um valor especifico da coluna (nome_do_socio_ou_razao_social)
# Visualizar com as caracteristicas nativas do Spark

socios.filter('nome_do_socio_ou_razao_social == "MARCIA DO CANTO ARRUDA DAIER"').show()

+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|cnpj_basico|identificador_de_socio|nome_do_socio_ou_razao_social|cnpj_ou_cpf_do_socio|qualificacao_do_socio|data_de_entrada_sociedade|pais|representante_legal|nome_do_representante|qualificacao_do_representante_legal|faixa_etaria|
+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|      17389|                     2|         MARCIA DO CANTO A...|         ***920408**|                   49|                 19940613|NULL|        ***000000**|                 NULL|                                  0|           7|
+-----------+----------------------+-----------------------------+------

In [48]:
# Filtrar um valor especifico da coluna (nome_do_socio_ou_razao_social)
# Visualizar com as caracteristicas do Pandas

socios.filter('nome_do_socio_ou_razao_social == "MARCIA DO CANTO ARRUDA DAIER"').toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,17389,2,MARCIA DO CANTO ARRUDA DAIER,***920408**,49,19940613,,***000000**,,0,7


---

## Modificando os tipos de dados

[Functions](https://spark.apache.org/docs/3.1.2/api/python/reference/pyspark.sql.html#functions)

[withColumn](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.withColumn.html)

### Convertendo String ➔ Double

#### `StringType ➔ DoubleType`

In [49]:
# Importação de bibliotecas para utilizar nas conversões dos atributos

from pyspark.sql.types import DoubleType, StringType
from pyspark.sql import functions as f

In [50]:
# Visualizar o esquema do Data Frame, onde conseguimos ver as colunas, tipos dos dados e caracteristicas

empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: string (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [51]:
# Visualizar somente 5 linhas do Data Frame, e com as caracteristicas do Pandas

empresas.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,4519,DANIELA DA SILVA CRUZ,2135,50,0,5,
1,8638,JOAO DOS SANTOS FAGUNDES,2135,50,0,5,
2,11748,PANIFICADORA E CONFEITARIA CONFIANCA RIO PRETO...,2062,49,0,1,
3,12027,L G SORVETERIA LTDA,2062,49,0,5,
4,13289,ANDREIA CRISTINA DELSIN EIRELI,2305,65,10000000,1,


In [52]:
# Substituir a vírgula por ponto no atributo (capital_social_da_empresa)
# Visualizar somente 5 linhas do Data Frame, e com as caracteristicas do Pandas

empresas = empresas.withColumn('capital_social_da_empresa', f.regexp_replace('capital_social_da_empresa', ',', '.'))

empresas.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,4519,DANIELA DA SILVA CRUZ,2135,50,0.0,5,
1,8638,JOAO DOS SANTOS FAGUNDES,2135,50,0.0,5,
2,11748,PANIFICADORA E CONFEITARIA CONFIANCA RIO PRETO...,2062,49,0.0,1,
3,12027,L G SORVETERIA LTDA,2062,49,0.0,5,
4,13289,ANDREIA CRISTINA DELSIN EIRELI,2305,65,100000.0,1,


In [53]:
# Visualizar o esquema do Data Frame, onde conseguimos ver as colunas, tipos dos dados e caracteristicas

empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: string (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [54]:
# Converter o atributo (capital_social_da_empresa) de string para um double.
# Visualizar somente 5 linhas do Data Frame, e com as caracteristicas do Pandas

empresas = empresas.withColumn('capital_social_da_empresa', empresas['capital_social_da_empresa'].cast(DoubleType()))
empresas.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,4519,DANIELA DA SILVA CRUZ,2135,50,0.0,5,
1,8638,JOAO DOS SANTOS FAGUNDES,2135,50,0.0,5,
2,11748,PANIFICADORA E CONFEITARIA CONFIANCA RIO PRETO...,2062,49,0.0,1,
3,12027,L G SORVETERIA LTDA,2062,49,0.0,5,
4,13289,ANDREIA CRISTINA DELSIN EIRELI,2305,65,100000.0,1,


In [55]:
# Visualizar o esquema do Data Frame, onde conseguimos ver as colunas, tipos dos dados e caracteristicas

empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



---

### Convertendo String ➔ Date

#### `StringType ➔ DateType`

[Datetime Patterns](https://spark.apache.org/docs/3.1.2/sql-ref-datetime-pattern.html)

In [56]:
# Criar um Data Frame com Spark

df = spark.createDataFrame([(20200924,), (20201022,), (20210215,)], ['data'])

In [57]:
# Visualizar o esquema do Data Frame, onde conseguimos ver as colunas, tipos dos dados e caracteristicas

df.printSchema()

root
 |-- data: long (nullable = true)



In [58]:
# Converter o atributo (data) de string para um date.

df = df.withColumn('data', f.to_date(df.data.cast(StringType()), 'yyyyMMdd'))

In [59]:
# Visualizar o esquema do Data Frame, onde conseguimos ver as colunas, tipos dos dados e caracteristicas

df.printSchema()

root
 |-- data: date (nullable = true)



In [60]:
# Visualizar as linhas do Data Frame, e com as caracteristicas do Pandas

df.toPandas()

Unnamed: 0,data
0,2020-09-24
1,2020-10-22
2,2021-02-15


In [61]:
# Visualizar somente 5 linhas do Data Frame, e com as caracteristicas do Pandas

estabelecimentos.limit(5).toPandas()

Unnamed: 0,cnpj_basico,cnpj_ordem,cnpj_dv,identificador_matriz_filial,nome_fantasia,situacao_cadastral,data_situacao_cadastral,motivo_situacao_cadastral,nome_da_cidade_no_exterior,pais,...,municipio,ddd_1,telefone_1,ddd_2,telefone_2,ddd_do_fax,fax,correio_eletronico,situacao_especial,data_da_situacao_especial
0,4519,1,48,1,GIRAFFAS,8,19950331,1,,,...,6219,,,,,,,,,
1,8638,1,79,1,AGROPECUARIA FAGUNDES,8,20150209,73,,,...,7255,,,,,,,,,
2,11748,1,90,1,,4,20181219,63,,,...,7097,,,,,,,,,
3,12027,1,2,1,,8,20081231,71,,,...,7107,,,,,,,,,
4,13289,1,83,1,JS MATERIAIS DE CONSTRUCAO,2,20040123,0,,,...,6915,19.0,35811286.0,,,,,CONTATO@LEONECONTABIL.COM.BR,,


In [62]:
# Visualizar o esquema do Data Frame, onde conseguimos ver as colunas, tipos dos dados e caracteristicas

estabelecimentos.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: integer (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: integer (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: str

In [63]:
# Converter o atributo (data_situacao_cadastral) de string para um date.

estabelecimentos = estabelecimentos\
    .withColumn(
        'data_situacao_cadastral',
        f.to_date(estabelecimentos.data_situacao_cadastral.cast(StringType()), 'yyyyMMdd')
    )\
    .withColumn(
        'data_de_inicio_atividade',
        f.to_date(estabelecimentos.data_de_inicio_atividade.cast(StringType()), 'yyyyMMdd')
    )\
    .withColumn(
        'data_da_situacao_especial',
        f.to_date(estabelecimentos.data_da_situacao_especial.cast(StringType()), 'yyyyMMdd')
    )

In [64]:
# Visualizar o esquema do Data Frame, onde conseguimos ver as colunas, tipos dos dados e caracteristicas

estabelecimentos.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: date (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: date (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: string (n

In [65]:
# Visualizar somente 5 linhas do Data Frame, e com as caracteristicas do Pandas

estabelecimentos.limit(5).toPandas()

Unnamed: 0,cnpj_basico,cnpj_ordem,cnpj_dv,identificador_matriz_filial,nome_fantasia,situacao_cadastral,data_situacao_cadastral,motivo_situacao_cadastral,nome_da_cidade_no_exterior,pais,...,municipio,ddd_1,telefone_1,ddd_2,telefone_2,ddd_do_fax,fax,correio_eletronico,situacao_especial,data_da_situacao_especial
0,4519,1,48,1,GIRAFFAS,8,1995-03-31,1,,,...,6219,,,,,,,,,
1,8638,1,79,1,AGROPECUARIA FAGUNDES,8,2015-02-09,73,,,...,7255,,,,,,,,,
2,11748,1,90,1,,4,2018-12-19,63,,,...,7097,,,,,,,,,
3,12027,1,2,1,,8,2008-12-31,71,,,...,7107,,,,,,,,,
4,13289,1,83,1,JS MATERIAIS DE CONSTRUCAO,2,2004-01-23,0,,,...,6915,19.0,35811286.0,,,,,CONTATO@LEONECONTABIL.COM.BR,,


In [66]:
# Visualizar somente 5 linhas do Data Frame, e com as caracteristicas do Pandas

socios.limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,11748,2,MARIO KATUMI HOSI,***504158**,49,19940530,,***000000**,,0,7
1,11748,2,ROBERTO YUKIO HOSI,***241578**,22,19940530,,***000000**,,0,7
2,13289,2,ANDREIA CRISTINA DELSIN,***787278**,65,20180615,,***000000**,,0,3
3,17389,2,MARCIA DO CANTO ARRUDA DAIER,***920408**,49,19940613,,***000000**,,0,7
4,19204,2,ALMIR CARLOS CAPELLINI,***299028**,49,19980908,,***000000**,,0,7


In [67]:
# Visualizar o esquema do Data Frame, onde conseguimos ver as colunas, tipos dos dados e caracteristicas

socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: integer (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



In [68]:
# Converter o atributo (data_situacao_cadastral) de string para um date.

socios = socios\
    .withColumn(
        'data_de_entrada_sociedade',
        f.to_date(socios.data_de_entrada_sociedade.cast(StringType()), 'yyyyMMdd')
)

In [69]:
# Visualizar somente 5 linhas do Data Frame, e com as caracteristicas do Pandas

socios.limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,11748,2,MARIO KATUMI HOSI,***504158**,49,1994-05-30,,***000000**,,0,7
1,11748,2,ROBERTO YUKIO HOSI,***241578**,22,1994-05-30,,***000000**,,0,7
2,13289,2,ANDREIA CRISTINA DELSIN,***787278**,65,2018-06-15,,***000000**,,0,3
3,17389,2,MARCIA DO CANTO ARRUDA DAIER,***920408**,49,1994-06-13,,***000000**,,0,7
4,19204,2,ALMIR CARLOS CAPELLINI,***299028**,49,1998-09-08,,***000000**,,0,7


In [70]:
# Visualizar o esquema do Data Frame, onde conseguimos ver as colunas, tipos dos dados e caracteristicas

socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: date (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



---

# Seleções e consultas

## Selecionando informações
 
[DataFrame.select(*cols)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.select.html)

In [71]:
# Visualizar os dados por meio de um select com o Spark
# O parâmetro (truncate) é para que seja possível visualizar os dados completos das colunas, sem que sejam abreviadas

empresas\
    .select('*')\
    .show(5, truncate=False)

+-----------+---------------------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial                      |natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+---------------------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|4519       |DANIELA DA SILVA CRUZ                              |2135             |50                         |0.0                      |5               |NULL                       |
|8638       |JOAO DOS SANTOS FAGUNDES                           |2135             |50                         |0.0                      |5               |NULL                       |
|11748      |PANIFICADORA E CONFEITARIA CONFIANCA RIO PRETO LTDA|2062             |49

In [72]:
# Visualizar o esquema do Data Frame, onde conseguimos ver as colunas, tipos dos dados e caracteristicas

empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [73]:
# Visualizar os dados por meio de um select com o Spark, selecionando somente algumas colunas do Data Frame
# O parâmetro (truncate) é para que seja possível visualizar os dados completos das colunas, sem que sejam abreviadas

empresas\
    .select('natureza_juridica', 'porte_da_empresa', 'capital_social_da_empresa')\
    .show(5, truncate=False)

+-----------------+----------------+-------------------------+
|natureza_juridica|porte_da_empresa|capital_social_da_empresa|
+-----------------+----------------+-------------------------+
|2135             |5               |0.0                      |
|2135             |5               |0.0                      |
|2062             |1               |0.0                      |
|2062             |5               |0.0                      |
|2305             |1               |100000.0                 |
+-----------------+----------------+-------------------------+
only showing top 5 rows



In [74]:
# Visualizar os dados por meio de um select com o Spark
# O parâmetro (truncate) é para que seja possível visualizar os dados completos das colunas, sem que sejam abreviadas

socios\
    .select('*')\
    .show(5, truncate=False)

+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|cnpj_basico|identificador_de_socio|nome_do_socio_ou_razao_social|cnpj_ou_cpf_do_socio|qualificacao_do_socio|data_de_entrada_sociedade|pais|representante_legal|nome_do_representante|qualificacao_do_representante_legal|faixa_etaria|
+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|11748      |2                     |MARIO KATUMI HOSI            |***504158**         |49                   |1994-05-30               |NULL|***000000**        |NULL                 |0                                  |7           |
|11748      |2                     |ROBERTO YUKIO HOSI           |***241

In [75]:
# Visualizar o esquema do Data Frame, onde conseguimos ver as colunas, tipos dos dados e caracteristicas

socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: date (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



In [76]:
# Visualizar os dados por meio de um select com o Spark, selecionando somente algumas colunas do Data Frame
# O parâmetro (truncate) é para que seja possível visualizar os dados completos das colunas, sem que sejam abreviadas

socios\
    .select('nome_do_socio_ou_razao_social', 'faixa_etaria', 'data_de_entrada_sociedade')\
    .show(5, truncate=False)

+-----------------------------+------------+-------------------------+
|nome_do_socio_ou_razao_social|faixa_etaria|data_de_entrada_sociedade|
+-----------------------------+------------+-------------------------+
|MARIO KATUMI HOSI            |7           |1994-05-30               |
|ROBERTO YUKIO HOSI           |7           |1994-05-30               |
|ANDREIA CRISTINA DELSIN      |3           |2018-06-15               |
|MARCIA DO CANTO ARRUDA DAIER |7           |1994-06-13               |
|ALMIR CARLOS CAPELLINI       |7           |1998-09-08               |
+-----------------------------+------------+-------------------------+
only showing top 5 rows



In [77]:
# Visualizar os dados por meio de um select com o Spark, selecionando somente algumas colunas do Data Frame
# O parâmetro (truncate) é para que seja possível visualizar os dados completos das colunas, sem que sejam abreviadas
# Também transformamos o atributo de data completa (data_de_entrada_sociedade) somente em (ano)

socios\
    .select('nome_do_socio_ou_razao_social', 'faixa_etaria', f.year('data_de_entrada_sociedade'))\
    .show(5, truncate=False)

+-----------------------------+------------+-------------------------------+
|nome_do_socio_ou_razao_social|faixa_etaria|year(data_de_entrada_sociedade)|
+-----------------------------+------------+-------------------------------+
|MARIO KATUMI HOSI            |7           |1994                           |
|ROBERTO YUKIO HOSI           |7           |1994                           |
|ANDREIA CRISTINA DELSIN      |3           |2018                           |
|MARCIA DO CANTO ARRUDA DAIER |7           |1994                           |
|ALMIR CARLOS CAPELLINI       |7           |1998                           |
+-----------------------------+------------+-------------------------------+
only showing top 5 rows



In [78]:
# Visualizar os dados por meio de um select com o Spark, selecionando somente algumas colunas do Data Frame
# O parâmetro (truncate) é para que seja possível visualizar os dados completos das colunas, sem que sejam abreviadas
# Também transformamos o atributo de data completa (data_de_entrada_sociedade) somente em (ano)
# Também renomeamos o atributo (data_de_entrada_sociedade) para (ano_entrada)

socios\
    .select('nome_do_socio_ou_razao_social', 'faixa_etaria', f.year('data_de_entrada_sociedade').alias('ano_entrada'))\
    .show(5, False)

+-----------------------------+------------+-----------+
|nome_do_socio_ou_razao_social|faixa_etaria|ano_entrada|
+-----------------------------+------------+-----------+
|MARIO KATUMI HOSI            |7           |1994       |
|ROBERTO YUKIO HOSI           |7           |1994       |
|ANDREIA CRISTINA DELSIN      |3           |2018       |
|MARCIA DO CANTO ARRUDA DAIER |7           |1994       |
|ALMIR CARLOS CAPELLINI       |7           |1998       |
+-----------------------------+------------+-----------+
only showing top 5 rows



In [79]:
# Visualizar os dados por meio de um select com o Spark
# O parâmetro (truncate) é para que seja possível visualizar os dados completos das colunas, sem que sejam abreviadas

estabelecimentos\
    .select('*')\
    .show(5, truncate=False)

+-----------+----------+-------+---------------------------+--------------------------+------------------+-----------------------+-------------------------+--------------------------+----+------------------------+---------------------+----------------------+------------------+------------------+------+-------------------+----------------+--------+---+---------+-----+----------+-----+----------+----------+----+----------------------------+-----------------+-------------------------+
|cnpj_basico|cnpj_ordem|cnpj_dv|identificador_matriz_filial|nome_fantasia             |situacao_cadastral|data_situacao_cadastral|motivo_situacao_cadastral|nome_da_cidade_no_exterior|pais|data_de_inicio_atividade|cnae_fiscal_principal|cnae_fiscal_secundaria|tipo_de_logradouro|logradouro        |numero|complemento        |bairro          |cep     |uf |municipio|ddd_1|telefone_1|ddd_2|telefone_2|ddd_do_fax|fax |correio_eletronico          |situacao_especial|data_da_situacao_especial|
+-----------+----------+--

In [80]:
# Visualizar o esquema do Data Frame, onde conseguimos ver as colunas, tipos dos dados e caracteristicas

estabelecimentos.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: date (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: date (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: string (n

In [81]:
# Visualizar os dados por meio de um select com o Spark, selecionando somente algumas colunas do Data Frame
# O parâmetro (truncate) é para que seja possível visualizar os dados completos das colunas, sem que sejam abreviadas

estabelecimentos\
    .select('cnpj_basico', 'data_de_inicio_atividade', 'bairro', 'cep')\
    .show(5, truncate=False)

+-----------+------------------------+----------------+--------+
|cnpj_basico|data_de_inicio_atividade|bairro          |cep     |
+-----------+------------------------+----------------+--------+
|4519       |1994-05-16              |JARDIM AEROPORTO|17012205|
|8638       |1994-05-24              |CENTRO          |19275000|
|11748      |1994-05-31              |VILA DINIZ      |15013150|
|12027      |1994-06-08              |SAUDE           |4062003 |
|13289      |1994-06-01              |PORTO BELLO I   |13660000|
+-----------+------------------------+----------------+--------+
only showing top 5 rows



In [82]:
# Visualizar os dados por meio de um select com o Spark, selecionando somente algumas colunas do Data Frame
# O parâmetro (truncate) é para que seja possível visualizar os dados completos das colunas, sem que sejam abreviadas

estabelecimentos\
    .select('nome_fantasia', 'municipio')\
    .show(5, truncate=False)

+--------------------------+---------+
|nome_fantasia             |municipio|
+--------------------------+---------+
|GIRAFFAS                  |6219     |
|AGROPECUARIA FAGUNDES     |7255     |
|NULL                      |7097     |
|NULL                      |7107     |
|JS MATERIAIS DE CONSTRUCAO|6915     |
+--------------------------+---------+
only showing top 5 rows



In [83]:
# Visualizar os dados por meio de um select com o Spark, selecionando somente algumas colunas do Data Frame
# O parâmetro (truncate) é para que seja possível visualizar os dados completos das colunas, sem que sejam abreviadas

estabelecimentos\
    .select('nome_fantasia', 'municipio', 'data_situacao_cadastral', 'data_de_inicio_atividade', 'data_da_situacao_especial')\
    .show(5, truncate=False)

+--------------------------+---------+-----------------------+------------------------+-------------------------+
|nome_fantasia             |municipio|data_situacao_cadastral|data_de_inicio_atividade|data_da_situacao_especial|
+--------------------------+---------+-----------------------+------------------------+-------------------------+
|GIRAFFAS                  |6219     |1995-03-31             |1994-05-16              |NULL                     |
|AGROPECUARIA FAGUNDES     |7255     |2015-02-09             |1994-05-24              |NULL                     |
|NULL                      |7097     |2018-12-19             |1994-05-31              |NULL                     |
|NULL                      |7107     |2008-12-31             |1994-06-08              |NULL                     |
|JS MATERIAIS DE CONSTRUCAO|6915     |2004-01-23             |1994-06-01              |NULL                     |
+--------------------------+---------+-----------------------+------------------------+-

In [84]:
# Visualizar os dados por meio de um select com o Spark, selecionando somente algumas colunas do Data Frame
# O parâmetro (truncate) é para que seja possível visualizar os dados completos das colunas, sem que sejam abreviadas

estabelecimentos\
    .select('nome_fantasia', 'municipio', 'data_de_inicio_atividade', f.year('data_de_inicio_atividade').alias('ano_de_inicio_atividade'), f.month('data_de_inicio_atividade').alias('mes_de_inicio_atividade'))\
    .show(5, truncate=False)

+--------------------------+---------+------------------------+-----------------------+-----------------------+
|nome_fantasia             |municipio|data_de_inicio_atividade|ano_de_inicio_atividade|mes_de_inicio_atividade|
+--------------------------+---------+------------------------+-----------------------+-----------------------+
|GIRAFFAS                  |6219     |1994-05-16              |1994                   |5                      |
|AGROPECUARIA FAGUNDES     |7255     |1994-05-24              |1994                   |5                      |
|NULL                      |7097     |1994-05-31              |1994                   |5                      |
|NULL                      |7107     |1994-06-08              |1994                   |6                      |
|JS MATERIAIS DE CONSTRUCAO|6915     |1994-06-01              |1994                   |6                      |
+--------------------------+---------+------------------------+-----------------------+-----------------

In [85]:
# Criar um Data Frame com Spark

data = [
    ('GISELLE PAULA GUIMARAES CASTRO', 15),
    ('ELAINE GARCIA DE OLIVEIRA', 22),
    ('JOAO CARLOS ABNER DE LOURDES', 43),
    ('MARTA ZELI FERREIRA', 24),
    ('LAUDENETE WIGGERS ROEDER', 51)
]
colNames = ['nome', 'idade']
df = spark.createDataFrame(data, colNames)
df.show(truncate=False)

+------------------------------+-----+
|nome                          |idade|
+------------------------------+-----+
|GISELLE PAULA GUIMARAES CASTRO|15   |
|ELAINE GARCIA DE OLIVEIRA     |22   |
|JOAO CARLOS ABNER DE LOURDES  |43   |
|MARTA ZELI FERREIRA           |24   |
|LAUDENETE WIGGERS ROEDER      |51   |
+------------------------------+-----+



In [86]:
# Selecionar todas as colunas do Data Frame com Spark

df\
    .select('*')\
    .show(truncate=False)

+------------------------------+-----+
|nome                          |idade|
+------------------------------+-----+
|GISELLE PAULA GUIMARAES CASTRO|15   |
|ELAINE GARCIA DE OLIVEIRA     |22   |
|JOAO CARLOS ABNER DE LOURDES  |43   |
|MARTA ZELI FERREIRA           |24   |
|LAUDENETE WIGGERS ROEDER      |51   |
+------------------------------+-----+



In [87]:
# Selecionar somente algumas colunas do Data Frame com Spark

df\
    .select('nome', 'idade')\
    .show(truncate=False)

+------------------------------+-----+
|nome                          |idade|
+------------------------------+-----+
|GISELLE PAULA GUIMARAES CASTRO|15   |
|ELAINE GARCIA DE OLIVEIRA     |22   |
|JOAO CARLOS ABNER DE LOURDES  |43   |
|MARTA ZELI FERREIRA           |24   |
|LAUDENETE WIGGERS ROEDER      |51   |
+------------------------------+-----+



In [88]:
# Criar uma nova coluna a partir da (nome), onde, somente os dois primeiros caracteres de uma string são selecionados

df\
    .select('nome', f.substring(df.nome, 1, 2).alias('nome-substr'))\
    .show()

+--------------------+-----------+
|                nome|nome-substr|
+--------------------+-----------+
|GISELLE PAULA GUI...|         GI|
|ELAINE GARCIA DE ...|         EL|
|JOAO CARLOS ABNER...|         JO|
| MARTA ZELI FERREIRA|         MA|
|LAUDENETE WIGGERS...|         LA|
+--------------------+-----------+



In [89]:
# Criar uma nova coluna a partir da (nome), onde, realizamos a seleção do último e primeiro nome
# Também realizamos uma concatenação e dividimos por uma vírgula, formando um novo atributo

df \
    .select(
        f.concat_ws(
            ', ', 
            f.substring_index('nome', ' ', -1), 
            f.substring_index('nome', ' ', 1)
        ).alias('ident'), 
        'idade') \
    .show(truncate=False)

+-----------------+-----+
|ident            |idade|
+-----------------+-----+
|CASTRO, GISELLE  |15   |
|OLIVEIRA, ELAINE |22   |
|LOURDES, JOAO    |43   |
|FERREIRA, MARTA  |24   |
|ROEDER, LAUDENETE|51   |
+-----------------+-----+



---

## Identificando valores nulos

In [90]:
# Criação de um DataFrame com Spark, onde temos uma coluna criada como (None)
# Notar que esse atributo com valor informado de (None), aparece como (NaN), igual a (Not a Number)

df = spark.createDataFrame([(1,), (2,), (3,), (None,)], ['data'])
df.toPandas()

Unnamed: 0,data
0,1.0
1,2.0
2,3.0
3,


In [91]:
# Notar que quando executamos em um Data Frame convencional do Spak, esse valor aparece como (Null)

df.show()

+----+
|data|
+----+
|   1|
|   2|
|   3|
|NULL|
+----+



In [92]:
# Criação de um DataFrame com Spark, onde temos uma coluna criada como (flot(nan))
# Notar que esse atributo com valor informado de (None), aparece como (NaN), igual a (Not a Number)

df = spark.createDataFrame([(1.,), (2.,), (3.,), (float('nan'),)], ['data'])
df.toPandas()

Unnamed: 0,data
0,1.0
1,2.0
2,3.0
3,


In [93]:
# Notar que quando executamos em um Data Frame convencional do Spak, esse valor aparece como (NaN) também

df.show()

+----+
|data|
+----+
| 1.0|
| 2.0|
| 3.0|
| NaN|
+----+



In [94]:
# Criação de um DataFrame com Spark, onde temos uma coluna criada como (None) e os números são do tipo (string)
# Notar que esse atributo com valor informado de (None), aparece como (None) também

df = spark.createDataFrame([('1',), ('2',), ('3',), (None,)], ['data'])
df.toPandas()

Unnamed: 0,data
0,1.0
1,2.0
2,3.0
3,


In [95]:
# Notar que quando executamos em um Data Frame convencional do Spak, esse valor aparece como (Null)

df.show()

+----+
|data|
+----+
|   1|
|   2|
|   3|
|NULL|
+----+



In [96]:
# Visualizar os 5 primeiros valores de um Data Frame com caracteristicas do Pandas

socios.limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,11748,2,MARIO KATUMI HOSI,***504158**,49,1994-05-30,,***000000**,,0,7
1,11748,2,ROBERTO YUKIO HOSI,***241578**,22,1994-05-30,,***000000**,,0,7
2,13289,2,ANDREIA CRISTINA DELSIN,***787278**,65,2018-06-15,,***000000**,,0,3
3,17389,2,MARCIA DO CANTO ARRUDA DAIER,***920408**,49,1994-06-13,,***000000**,,0,7
4,19204,2,ALMIR CARLOS CAPELLINI,***299028**,49,1998-09-08,,***000000**,,0,7


In [97]:
# Visualizar somente o primeiro registro de um Data Frame com caracteristicas do Spark
# Notar que esse Data Frame está pivotado na vertical

socios.limit(1).show(truncate=False, vertical=True)

-RECORD 0------------------------------------------------
 cnpj_basico                         | 11748             
 identificador_de_socio              | 2                 
 nome_do_socio_ou_razao_social       | MARIO KATUMI HOSI 
 cnpj_ou_cpf_do_socio                | ***504158**       
 qualificacao_do_socio               | 49                
 data_de_entrada_sociedade           | 1994-05-30        
 pais                                | NULL              
 representante_legal                 | ***000000**       
 nome_do_representante               | NULL              
 qualificacao_do_representante_legal | 0                 
 faixa_etaria                        | 7                 



In [98]:
# Visualizar somente o primeiro registro de um Data Frame com caracteristicas do Spark

# socios.limit(5).show(truncate=False)

In [99]:
# Comando para realizar a contagem de quantas vezes o valor (Null) aparece nos atributos do Data Frame
# Visualizar os 5 primeiros valores de um Data Frame com caracteristicas do Pandas

socios.select([f.count(f.when(f.isnull(c), 1)).alias(c) for c in socios.columns ]).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,0,0,208,1234,0,1,2038255,0,1995432,0,0


In [100]:
# Visualizar os 5 primeiros valores de um Data Frame com caracteristicas do Pandas

socios.limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,11748,2,MARIO KATUMI HOSI,***504158**,49,1994-05-30,,***000000**,,0,7
1,11748,2,ROBERTO YUKIO HOSI,***241578**,22,1994-05-30,,***000000**,,0,7
2,13289,2,ANDREIA CRISTINA DELSIN,***787278**,65,2018-06-15,,***000000**,,0,3
3,17389,2,MARCIA DO CANTO ARRUDA DAIER,***920408**,49,1994-06-13,,***000000**,,0,7
4,19204,2,ALMIR CARLOS CAPELLINI,***299028**,49,1998-09-08,,***000000**,,0,7


In [101]:
# Visualizar o schema do Data Frame

socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: date (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



In [102]:
# Substituir todos os valores (na) por (0) no Data Frame
# Notar que só haverá a substituição de valores que são numéricos
# Visualizar os 5 primeiros valores de um Data Frame com caracteristicas do Pandas

socios.na.fill(0).limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,11748,2,MARIO KATUMI HOSI,***504158**,49,1994-05-30,0,***000000**,,0,7
1,11748,2,ROBERTO YUKIO HOSI,***241578**,22,1994-05-30,0,***000000**,,0,7
2,13289,2,ANDREIA CRISTINA DELSIN,***787278**,65,2018-06-15,0,***000000**,,0,3
3,17389,2,MARCIA DO CANTO ARRUDA DAIER,***920408**,49,1994-06-13,0,***000000**,,0,7
4,19204,2,ALMIR CARLOS CAPELLINI,***299028**,49,1998-09-08,0,***000000**,,0,7


In [103]:
# Substituir todos os valores (na) por (0) no Data Frame
# Notar que só haverá a substituição de valores que são do tipo string
# Visualizar os 5 primeiros valores de um Data Frame com caracteristicas do Pandas

socios.na.fill('-').limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,11748,2,MARIO KATUMI HOSI,***504158**,49,1994-05-30,,***000000**,-,0,7
1,11748,2,ROBERTO YUKIO HOSI,***241578**,22,1994-05-30,,***000000**,-,0,7
2,13289,2,ANDREIA CRISTINA DELSIN,***787278**,65,2018-06-15,,***000000**,-,0,3
3,17389,2,MARCIA DO CANTO ARRUDA DAIER,***920408**,49,1994-06-13,,***000000**,-,0,7
4,19204,2,ALMIR CARLOS CAPELLINI,***299028**,49,1998-09-08,,***000000**,-,0,7


---

## Ordenando os dados

[DataFrame.orderBy(*cols, **kwargs)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.orderBy.html)

In [104]:
# Visualizar e ordenar os dados com caracteristicas de um Data Frame em Spark

socios\
    .select('nome_do_socio_ou_razao_social', 'faixa_etaria', f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
    .orderBy('ano_de_entrada', ascending=False)\
    .show(5, False)

+-----------------------------------+------------+--------------+
|nome_do_socio_ou_razao_social      |faixa_etaria|ano_de_entrada|
+-----------------------------------+------------+--------------+
|VERSONE DEOVANE JACOBS             |5           |2021          |
|RAFAELE TOMAZ DE OLIVEIRA SCHNEIDER|3           |2021          |
|KARINA JORDANI MELO VIEIRA         |4           |2021          |
|JOSE AYRTON FERNANDES XAVIER       |3           |2021          |
|LUCAS ELIAQUIM CARDOSO SANTANA     |3           |2021          |
+-----------------------------------+------------+--------------+
only showing top 5 rows



In [105]:
# Visualizar e ordenar os dados com caracteristicas de um Data Frame em Spark

socios\
    .select('nome_do_socio_ou_razao_social', 'faixa_etaria', f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
    .orderBy(['ano_de_entrada', 'faixa_etaria'], ascending=[False, False])\
    .show(20, False)

+------------------------------+------------+--------------+
|nome_do_socio_ou_razao_social |faixa_etaria|ano_de_entrada|
+------------------------------+------------+--------------+
|FRANCISCO MONTEIRO DE CARVALHO|9           |2021          |
|MARIA ASSUNCAO DE OLIM MAROTE |9           |2021          |
|ALTIVO DE SOUZA               |9           |2021          |
|MANOEL LUIZ GOMES DA CUNHA    |9           |2021          |
|MOISES ARAUJO DOS SANTOS      |9           |2021          |
|FERNANDO COUTO MARQUES LISBOA |9           |2021          |
|MIGUEL CARLOS CALASANS SIMOES |9           |2021          |
|HARUE YAMAMOTO                |9           |2021          |
|ETELVINA DE FREITAS LIMA      |9           |2021          |
|DOMINGOS TEIXEIRA             |9           |2021          |
|JOAO ARTUR                    |9           |2021          |
|ADECRESCIO PEDRO DE AGUIAR    |9           |2021          |
|MIGUEL BERTOZZI               |9           |2021          |
|ARISTARCO ACIOLI DE OLI

In [106]:
# Criação de um Data Frame com Spark

data = [
    ('CARMINA RABELO', 4, 2010), 
    ('HERONDINA PEREIRA', 6, 2009), 
    ('IRANI DOS SANTOS', 12, 2010), 
    ('JOAO BOSCO DA FONSECA', 3, 2009), 
    ('CARLITO SOUZA', 1, 2010), 
    ('WALTER DIAS', 9, 2009), 
    ('BRENO VENTUROSO', 1, 2009), 
    ('ADELINA TEIXEIRA', 5, 2009), 
    ('ELIO SILVA', 7, 2010), 
    ('DENIS FONSECA', 6, 2010)
]
colNames = ['nome', 'mes', 'ano']
df = spark.createDataFrame(data, colNames)
df.show(truncate=False)

+---------------------+---+----+
|nome                 |mes|ano |
+---------------------+---+----+
|CARMINA RABELO       |4  |2010|
|HERONDINA PEREIRA    |6  |2009|
|IRANI DOS SANTOS     |12 |2010|
|JOAO BOSCO DA FONSECA|3  |2009|
|CARLITO SOUZA        |1  |2010|
|WALTER DIAS          |9  |2009|
|BRENO VENTUROSO      |1  |2009|
|ADELINA TEIXEIRA     |5  |2009|
|ELIO SILVA           |7  |2010|
|DENIS FONSECA        |6  |2010|
+---------------------+---+----+



In [107]:
# Visualizar e ordenar os dados com caracteristicas de um Data Frame em Spark

df\
    .select('*')\
    .orderBy(['ano', 'mes'], ascending=[True, True])\
    .show()

+--------------------+---+----+
|                nome|mes| ano|
+--------------------+---+----+
|     BRENO VENTUROSO|  1|2009|
|JOAO BOSCO DA FON...|  3|2009|
|    ADELINA TEIXEIRA|  5|2009|
|   HERONDINA PEREIRA|  6|2009|
|         WALTER DIAS|  9|2009|
|       CARLITO SOUZA|  1|2010|
|      CARMINA RABELO|  4|2010|
|       DENIS FONSECA|  6|2010|
|          ELIO SILVA|  7|2010|
|    IRANI DOS SANTOS| 12|2010|
+--------------------+---+----+



---

## Filtrando os dados

[DataFrame.where(condition)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.where.html) ou [DataFrame.filter(condition)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.filter.html)

In [108]:
# Visualizar e realizar um filtro com where

empresas\
    .where('capital_social_da_empresa==50')\
    .limit(5)\
    .toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,14715041,LIDIANE MARIA DO NASCIMENTO 14542418707,2135,50,50.0,1,
1,20601885,CRISTIANO AKIHITO BORDIN 04370949955,2135,50,50.0,1,
2,23661983,VITOR ALOISIO DO NASCIMENTO GUIA 12663882739,2135,50,50.0,1,
3,23714726,JOSELINA PANSINI CORREIA 05265948708,2135,50,50.0,1,
4,26022246,SUZANE SILVA DE OLIVEIRA 06850709992,2135,50,50.0,1,


In [109]:
# Visualizar e realizar um filtro com filter

socios\
    .select('nome_do_socio_ou_razao_social')\
    .filter(socios.nome_do_socio_ou_razao_social.startswith('RODRIGO'))\
    .filter(socios.nome_do_socio_ou_razao_social.endswith('DIAS'))\
    .limit(10)\
    .toPandas()

Unnamed: 0,nome_do_socio_ou_razao_social
0,RODRIGO ALVES DIAS
1,RODRIGO PEREIRA DIAS
2,RODRIGO SANTOS DIAS
3,RODRIGO OLIVEIRA DIAS
4,RODRIGO ROSENBLIT COLACO DIAS
5,RODRIGO PEDRO DIAS
6,RODRIGO FERNANDO DE MEDEIROS DIAS
7,RODRIGO DA RESSURREICAO DIAS
8,RODRIGO PINHEIRO DIAS
9,RODRIGO SOUZA DIAS


In [110]:
# Criação de um Data Frame com Spark

data = [
    ('CARMINA RABELO', 4, 2010), 
    ('HERONDINA PEREIRA', 6, 2009), 
    ('IRANI DOS SANTOS', 12, 2010), 
    ('JOAO BOSCO DA FONSECA', 3, 2009), 
    ('CARLITO SOUZA', 1, 2010), 
    ('WALTER DIAS', 9, 2009), 
    ('BRENO VENTUROSO', 1, 2009), 
    ('ADELINA TEIXEIRA', 5, 2009), 
    ('ELIO SILVA', 7, 2010), 
    ('DENIS FONSECA', 6, 2010)
]
colNames = ['nome', 'mes', 'ano']
df = spark.createDataFrame(data, colNames)
df.show(truncate=False)

+---------------------+---+----+
|nome                 |mes|ano |
+---------------------+---+----+
|CARMINA RABELO       |4  |2010|
|HERONDINA PEREIRA    |6  |2009|
|IRANI DOS SANTOS     |12 |2010|
|JOAO BOSCO DA FONSECA|3  |2009|
|CARLITO SOUZA        |1  |2010|
|WALTER DIAS          |9  |2009|
|BRENO VENTUROSO      |1  |2009|
|ADELINA TEIXEIRA     |5  |2009|
|ELIO SILVA           |7  |2010|
|DENIS FONSECA        |6  |2010|
+---------------------+---+----+



In [111]:
# Visualizar o schema do Data Frame

df.printSchema()

root
 |-- nome: string (nullable = true)
 |-- mes: long (nullable = true)
 |-- ano: long (nullable = true)



In [112]:
# Visualizar e filtrar os dados

df\
    .select('*')\
    .filter(df.ano==2009)\
    .filter(df.mes<=6)\
    .orderBy(['mes'], ascending=[True])\
    .toPandas()

Unnamed: 0,nome,mes,ano
0,BRENO VENTUROSO,1,2009
1,JOAO BOSCO DA FONSECA,3,2009
2,ADELINA TEIXEIRA,5,2009
3,HERONDINA PEREIRA,6,2009


In [113]:
# Visualizar e filtrar os dados de outra maneira

df\
    .filter("mes<=6")\
    .filter("ano=2009")\
    .show(truncate=False)

+---------------------+---+----+
|nome                 |mes|ano |
+---------------------+---+----+
|HERONDINA PEREIRA    |6  |2009|
|JOAO BOSCO DA FONSECA|3  |2009|
|BRENO VENTUROSO      |1  |2009|
|ADELINA TEIXEIRA     |5  |2009|
+---------------------+---+----+



In [114]:
# Visualizar e filtrar os dados de outra maneira

df\
    .filter((df.mes <= 6) & (df.ano == 2009))\
    .show(truncate=False)

+---------------------+---+----+
|nome                 |mes|ano |
+---------------------+---+----+
|HERONDINA PEREIRA    |6  |2009|
|JOAO BOSCO DA FONSECA|3  |2009|
|BRENO VENTUROSO      |1  |2009|
|ADELINA TEIXEIRA     |5  |2009|
+---------------------+---+----+



---

## O comando LIKE

[Column.like(other)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.Column.like.html)

In [115]:
# Criação de um Data Frame com Spark

df = spark.createDataFrame([('RESTAURANTE DO RUI',), ('Juca restaurantes ltda',), ('Joca Restaurante',)], ['data'])
df.toPandas()

Unnamed: 0,data
0,RESTAURANTE DO RUI
1,Juca restaurantes ltda
2,Joca Restaurante


In [116]:
# Visualizar e buscar somente por strings que contenham o valor (RESTAURANTES)

df\
    .where(f.upper(df.data).like('%RESTAURANTE%'))\
    .show(truncate=False)

+----------------------+
|data                  |
+----------------------+
|RESTAURANTE DO RUI    |
|Juca restaurantes ltda|
|Joca Restaurante      |
+----------------------+



In [117]:
# Visualizar e buscar somente por strings que comecem com o valor (RESTAURANTES)

df\
    .where(f.upper(df.data).like('RESTAURANTE%'))\
    .show(truncate=False)

+------------------+
|data              |
+------------------+
|RESTAURANTE DO RUI|
+------------------+



In [118]:
# Visualizar e buscar somente por strings que terminem com o valor (RESTAURANTES)

df\
    .where(f.upper(df.data).like('%RESTAURANTE'))\
    .show(truncate=False)

+----------------+
|data            |
+----------------+
|Joca Restaurante|
+----------------+



In [119]:
# Visualizar e buscar somente por strings que contenham o valor (RESTAURANTES)

empresas\
    .select('razao_social_nome_empresarial', 'natureza_juridica', 'porte_da_empresa', 'capital_social_da_empresa')\
    .filter(f.upper(empresas['razao_social_nome_empresarial']).like('%RESTAURANTES%'))\
    .show(15, False)

+-------------------------------------------------------------------------------------------------------------------+-----------------+----------------+-------------------------+
|razao_social_nome_empresarial                                                                                      |natureza_juridica|porte_da_empresa|capital_social_da_empresa|
+-------------------------------------------------------------------------------------------------------------------+-----------------+----------------+-------------------------+
|SABOR & ARTES RESTAURANTES EIRELI                                                                                  |2305             |1               |100000.0                 |
|NKM RESTAURANTES LTDA                                                                                              |2062             |1               |150000.0                 |
|GIULIA ROBERTA SANTOS BATISTA RESTAURANTES                                                              

In [120]:
# Criação de um Data Frame com Spark

data = [
    ('CARMINA RABELO', 4, 2010), 
    ('HERONDINA PEREIRA', 6, 2009), 
    ('IRANI DOS SANTOS', 12, 2010), 
    ('JOAO BOSCO DA FONSECA', 3, 2009), 
    ('CARLITO SOUZA', 1, 2010), 
    ('WALTER DIAS', 9, 2009), 
    ('BRENO VENTUROSO', 1, 2009), 
    ('ADELINA TEIXEIRA', 5, 2009), 
    ('ELIO SILVA', 7, 2010), 
    ('DENIS FONSECA', 6, 2010)
]
colNames = ['nome', 'mes', 'ano']
df = spark.createDataFrame(data, colNames)
df.show(truncate=False)

+---------------------+---+----+
|nome                 |mes|ano |
+---------------------+---+----+
|CARMINA RABELO       |4  |2010|
|HERONDINA PEREIRA    |6  |2009|
|IRANI DOS SANTOS     |12 |2010|
|JOAO BOSCO DA FONSECA|3  |2009|
|CARLITO SOUZA        |1  |2010|
|WALTER DIAS          |9  |2009|
|BRENO VENTUROSO      |1  |2009|
|ADELINA TEIXEIRA     |5  |2009|
|ELIO SILVA           |7  |2010|
|DENIS FONSECA        |6  |2010|
+---------------------+---+----+



In [121]:
# Visualizar e filtrar os nomes dos alunos que iniciem com a letra (C)

df\
    .filter(df.nome.like('C%'))\
    .show(truncate=False)

+--------------+---+----+
|nome          |mes|ano |
+--------------+---+----+
|CARMINA RABELO|4  |2010|
|CARLITO SOUZA |1  |2010|
+--------------+---+----+



---

# Agregações e Junções
---

[DataFrame.groupBy(*cols)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.groupBy.html)

[DataFrame.agg(*exprs)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.agg.html)

[DataFrame.summary(*statistics)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.summary.html)

> Funções:
[approx_count_distinct](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.approx_count_distinct.html) | 
[avg](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.avg.html) | 
[collect_list](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.collect_list.html) | 
[collect_set](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.collect_set.html) | 
[countDistinct](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.countDistinct.html) | 
[count](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.count.html) | 
[grouping](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.grouping.html) | 
[first](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.first.html) | 
[last](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.last.html) | 
[kurtosis](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.kurtosis.html) | 
[max](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.max.html) | 
[min](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.min.html) | 
[mean](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.mean.html) | 
[skewness](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.skewness.html) | 
[stddev ou stddev_samp](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.stddev.html) | 
[stddev_pop](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.stddev_pop.html) | 
[sum](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.sum.html) | 
[sumDistinct](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.sumDistinct.html) | 
[variance ou var_samp](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.variance.html) | 
[var_pop](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.var_pop.html)

In [129]:
# Visualizar os dados de forma filtrada, agrupada e ordenada

socios\
    .select(f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
    .where('ano_de_entrada >= 2010')\
    .groupBy('ano_de_entrada')\
    .count()\
    .orderBy(['ano_de_entrada'], ascending=True)\
    .toPandas()

Unnamed: 0,ano_de_entrada,count
0,2010,79337
1,2011,83906
2,2012,80101
3,2013,83919
4,2014,80590
5,2015,80906
6,2016,81587
7,2017,90221
8,2018,99935
9,2019,118248


In [135]:
# Visualizar os dados de forma agrupada, agregadas com cálculos e ordenadas

empresas\
    .select('cnpj_basico', 'porte_da_empresa', 'capital_social_da_empresa')\
    .groupBy('porte_da_empresa')\
    .agg(
        f.avg('capital_social_da_empresa').alias('capital_social_medio'),
        f.count('cnpj_basico').alias('frequencia')
    )\
    .orderBy('porte_da_empresa', ascending=True)\
    .show()

+----------------+--------------------+----------+
|porte_da_empresa|capital_social_medio|frequencia|
+----------------+--------------------+----------+
|            NULL|    8.35421888053467|      5985|
|               1|  339994.53313507035|   3129043|
|               3|   2601001.767709269|    115151|
|               5|   708660.4208249795|   1335500|
+----------------+--------------------+----------+



In [137]:
# Visualizar os dados com estatisticas já realizadas

empresas\
    .select('capital_social_da_empresa')\
    .summary()\
    .toPandas()

# .sumary('count', 'mean', 'stddev', 'min', '25%', '50%', '75%', 'max')

Unnamed: 0,summary,capital_social_da_empresa
0,count,4585679.0
1,mean,503694.5478542674
2,stddev,211186914.9053763
3,min,0.0
4,25%,0.0
5,50%,1000.0
6,75%,7000.0
7,max,322014670262.0


---

In [138]:
empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [139]:
# Visualizar os dados com estatisticas já realizadas

empresas\
    .select('capital_social_da_empresa', 'porte_da_empresa')\
    .summary()\
    .toPandas()

# .sumary('count', 'mean', 'stddev', 'min', '25%', '50%', '75%', 'max')

Unnamed: 0,summary,capital_social_da_empresa,porte_da_empresa
0,count,4585679.0,4579694.0
1,mean,503694.5478542674,2.216741118511412
2,stddev,211186914.9053763,1.8127137014595116
3,min,0.0,1.0
4,25%,0.0,1.0
5,50%,1000.0,1.0
6,75%,7000.0,5.0
7,max,322014670262.0,5.0


In [140]:
socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: date (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



In [141]:
# Visualizar os dados com estatisticas já realizadas

socios\
    .select('faixa_etaria')\
    .summary()\
    .toPandas()

# .sumary('count', 'mean', 'stddev', 'min', '25%', '50%', '75%', 'max')

Unnamed: 0,summary,faixa_etaria
0,count,2046430.0
1,mean,5.396815918453111
2,stddev,1.6877995261894378
3,min,0.0
4,25%,4.0
5,50%,5.0
6,75%,7.0
7,max,9.0


In [142]:
estabelecimentos.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: date (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: date (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: string (n

In [143]:
# Visualizar os dados com estatisticas já realizadas

estabelecimentos\
    .select('situacao_cadastral')\
    .summary()\
    .toPandas()

# .sumary('count', 'mean', 'stddev', 'min', '25%', '50%', '75%', 'max')

Unnamed: 0,summary,situacao_cadastral
0,count,4836219.0
1,mean,4.946726564698579
2,stddev,2.7877529010684787
3,min,1.0
4,25%,2.0
5,50%,4.0
6,75%,8.0
7,max,8.0


In [144]:
# Criação de um Data Frame com Spark

data = [
    ('CARLOS', 'MATEMÁTICA', 7), 
    ('IVO', 'MATEMÁTICA', 9), 
    ('MÁRCIA', 'MATEMÁTICA', 8), 
    ('LEILA', 'MATEMÁTICA', 9), 
    ('BRENO', 'MATEMÁTICA', 7), 
    ('LETÍCIA', 'MATEMÁTICA', 8), 
    ('CARLOS', 'FÍSICA', 2), 
    ('IVO', 'FÍSICA', 8), 
    ('MÁRCIA', 'FÍSICA', 10), 
    ('LEILA', 'FÍSICA', 9), 
    ('BRENO', 'FÍSICA', 1), 
    ('LETÍCIA', 'FÍSICA', 6), 
    ('CARLOS', 'QUÍMICA', 10), 
    ('IVO', 'QUÍMICA', 8), 
    ('MÁRCIA', 'QUÍMICA', 1), 
    ('LEILA', 'QUÍMICA', 10), 
    ('BRENO', 'QUÍMICA', 7), 
    ('LETÍCIA', 'QUÍMICA', 9)
]
colNames = ['nome', 'materia', 'nota']
df = spark.createDataFrame(data, colNames)
df.show()

+-------+----------+----+
|   nome|   materia|nota|
+-------+----------+----+
| CARLOS|MATEMÁTICA|   7|
|    IVO|MATEMÁTICA|   9|
| MÁRCIA|MATEMÁTICA|   8|
|  LEILA|MATEMÁTICA|   9|
|  BRENO|MATEMÁTICA|   7|
|LETÍCIA|MATEMÁTICA|   8|
| CARLOS|    FÍSICA|   2|
|    IVO|    FÍSICA|   8|
| MÁRCIA|    FÍSICA|  10|
|  LEILA|    FÍSICA|   9|
|  BRENO|    FÍSICA|   1|
|LETÍCIA|    FÍSICA|   6|
| CARLOS|   QUÍMICA|  10|
|    IVO|   QUÍMICA|   8|
| MÁRCIA|   QUÍMICA|   1|
|  LEILA|   QUÍMICA|  10|
|  BRENO|   QUÍMICA|   7|
|LETÍCIA|   QUÍMICA|   9|
+-------+----------+----+



In [146]:
# Visualizar os dados do Data Frame
# Criar uma condição, e a partir do resultado da condição, é criado um novo atributo

df = df.withColumn('status', f.when(df.nota >= 7, "APROVADO").otherwise("REPROVADO"))
df.show()

+-------+----------+----+---------+
|   nome|   materia|nota|   status|
+-------+----------+----+---------+
| CARLOS|MATEMÁTICA|   7| APROVADO|
|    IVO|MATEMÁTICA|   9| APROVADO|
| MÁRCIA|MATEMÁTICA|   8| APROVADO|
|  LEILA|MATEMÁTICA|   9| APROVADO|
|  BRENO|MATEMÁTICA|   7| APROVADO|
|LETÍCIA|MATEMÁTICA|   8| APROVADO|
| CARLOS|    FÍSICA|   2|REPROVADO|
|    IVO|    FÍSICA|   8| APROVADO|
| MÁRCIA|    FÍSICA|  10| APROVADO|
|  LEILA|    FÍSICA|   9| APROVADO|
|  BRENO|    FÍSICA|   1|REPROVADO|
|LETÍCIA|    FÍSICA|   6|REPROVADO|
| CARLOS|   QUÍMICA|  10| APROVADO|
|    IVO|   QUÍMICA|   8| APROVADO|
| MÁRCIA|   QUÍMICA|   1|REPROVADO|
|  LEILA|   QUÍMICA|  10| APROVADO|
|  BRENO|   QUÍMICA|   7| APROVADO|
|LETÍCIA|   QUÍMICA|   9| APROVADO|
+-------+----------+----+---------+



In [147]:
# Criação de um Data Frame com Spark

data = [
    ('CARLOS', 'MATEMÁTICA', 7), 
    ('IVO', 'MATEMÁTICA', 9), 
    ('MÁRCIA', 'MATEMÁTICA', 8), 
    ('LEILA', 'MATEMÁTICA', 9), 
    ('BRENO', 'MATEMÁTICA', 7), 
    ('LETÍCIA', 'MATEMÁTICA', 8), 
    ('CARLOS', 'FÍSICA', 2), 
    ('IVO', 'FÍSICA', 8), 
    ('MÁRCIA', 'FÍSICA', 10), 
    ('LEILA', 'FÍSICA', 9), 
    ('BRENO', 'FÍSICA', 1), 
    ('LETÍCIA', 'FÍSICA', 6), 
    ('CARLOS', 'QUÍMICA', 10), 
    ('IVO', 'QUÍMICA', 8), 
    ('MÁRCIA', 'QUÍMICA', 1), 
    ('LEILA', 'QUÍMICA', 10), 
    ('BRENO', 'QUÍMICA', 7), 
    ('LETÍCIA', 'QUÍMICA', 9)
]
colNames = ['nome', 'materia', 'nota']
df = spark.createDataFrame(data, colNames)
df = df.withColumn('status', f.when(df.nota >= 7, "APROVADO").otherwise("REPROVADO"))
df.show()

+-------+----------+----+---------+
|   nome|   materia|nota|   status|
+-------+----------+----+---------+
| CARLOS|MATEMÁTICA|   7| APROVADO|
|    IVO|MATEMÁTICA|   9| APROVADO|
| MÁRCIA|MATEMÁTICA|   8| APROVADO|
|  LEILA|MATEMÁTICA|   9| APROVADO|
|  BRENO|MATEMÁTICA|   7| APROVADO|
|LETÍCIA|MATEMÁTICA|   8| APROVADO|
| CARLOS|    FÍSICA|   2|REPROVADO|
|    IVO|    FÍSICA|   8| APROVADO|
| MÁRCIA|    FÍSICA|  10| APROVADO|
|  LEILA|    FÍSICA|   9| APROVADO|
|  BRENO|    FÍSICA|   1|REPROVADO|
|LETÍCIA|    FÍSICA|   6|REPROVADO|
| CARLOS|   QUÍMICA|  10| APROVADO|
|    IVO|   QUÍMICA|   8| APROVADO|
| MÁRCIA|   QUÍMICA|   1|REPROVADO|
|  LEILA|   QUÍMICA|  10| APROVADO|
|  BRENO|   QUÍMICA|   7| APROVADO|
|LETÍCIA|   QUÍMICA|   9| APROVADO|
+-------+----------+----+---------+



In [148]:
# Visualizar e criar uma sumarização

df\
    .select('nota')\
    .summary('min', '25%', '50%', '75%', 'max')\
    .show()

+-------+----+
|summary|nota|
+-------+----+
|    min|   1|
|    25%|   7|
|    50%|   8|
|    75%|   9|
|    max|  10|
+-------+----+



In [149]:
# Visualizar e agrupar valores com uma contagem simples

df\
    .groupBy('status')\
    .count()\
    .orderBy('status', ascending=True)\
    .show()

+---------+-----+
|   status|count|
+---------+-----+
| APROVADO|   14|
|REPROVADO|    4|
+---------+-----+



---

## Juntando DataFrames - Joins

[DataFrame.join(*args)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.join.html)

In [157]:
# Criação e visualização de um Data Frame

produtos = spark.createDataFrame(
    [
     ('1', 'Bebidas', 'Água mineral'),
     ('2', 'Limpeza', 'Sabão em pó'),
     ('3', 'Frios', 'Queijo'),
     ('4', 'Bebidas', 'Refrigerante'),
     ('5', 'Pet', 'Ração para cães'), 
    ],
    ['id', 'cat', 'prod']
)

produtos\
    .select('*')\
    .orderBy('id', ascending=True)\
    .toPandas()

Unnamed: 0,id,cat,prod
0,1,Bebidas,Água mineral
1,2,Limpeza,Sabão em pó
2,3,Frios,Queijo
3,4,Bebidas,Refrigerante
4,5,Pet,Ração para cães


In [158]:
# Criação e visualização de um Data Frame

impostos = spark.createDataFrame(
    [
     ('Bebidas', 0.15),
     ('Limpeza', 0.05),
     ('Frios', 0.065),
     ('Carnes', 0.08),
    ],
    ['cat', 'tax',]
)

impostos\
    .select('*')\
    .orderBy('tax', ascending=True)\
    .toPandas()

Unnamed: 0,cat,tax
0,Limpeza,0.05
1,Frios,0.065
2,Carnes,0.08
3,Bebidas,0.15


In [159]:
# Visualização e junção (join) de Data Frames

produtos\
    .join(impostos, 'cat', how='inner')\
    .sort('id')\
    .toPandas()

Unnamed: 0,cat,id,prod,tax
0,Bebidas,1,Água mineral,0.15
1,Limpeza,2,Sabão em pó,0.05
2,Frios,3,Queijo,0.065
3,Bebidas,4,Refrigerante,0.15


In [160]:
# Visualização e junção (join) de Data Frames

produtos\
    .join(impostos, 'cat', how='left')\
    .sort('id')\
    .toPandas()

Unnamed: 0,cat,id,prod,tax
0,Bebidas,1,Água mineral,0.15
1,Limpeza,2,Sabão em pó,0.05
2,Frios,3,Queijo,0.065
3,Bebidas,4,Refrigerante,0.15
4,Pet,5,Ração para cães,


In [161]:
# Visualização e junção (join) de Data Frames

produtos\
    .join(impostos, 'cat', how='right')\
    .sort('id')\
    .toPandas()

Unnamed: 0,cat,id,prod,tax
0,Carnes,,,0.08
1,Bebidas,1.0,Água mineral,0.15
2,Limpeza,2.0,Sabão em pó,0.05
3,Frios,3.0,Queijo,0.065
4,Bebidas,4.0,Refrigerante,0.15


In [162]:
# Visualização e junção (join) de Data Frames

produtos\
    .join(impostos, 'cat', how='outer')\
    .sort('id')\
    .toPandas()

Unnamed: 0,cat,id,prod,tax
0,Carnes,,,0.08
1,Bebidas,1.0,Água mineral,0.15
2,Limpeza,2.0,Sabão em pó,0.05
3,Frios,3.0,Queijo,0.065
4,Bebidas,4.0,Refrigerante,0.15
5,Pet,5.0,Ração para cães,


In [163]:
# Visualizar o schema do Data Frame

empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [164]:
# Visualizar o schema do Data Frame

socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: date (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



In [165]:
# Visualizar o schema do Data Frame

estabelecimentos.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: date (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: date (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: string (n

In [167]:
# Criação e visualização de um novo Data Frame, a partir do relacionamento de dois Data Frames

empresas_join = estabelecimentos.join(empresas, 'cnpj_basico', how='inner')

In [168]:
# Visualizar o schema do Data Frame

empresas_join.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: date (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: date (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: string (n

In [173]:
# Criação e visualização de um novo Data Frame, a partir do relacionamento de dois Data Frames

freq = empresas_join\
    .select(
        'cnpj_basico'
        ,f.year('data_de_inicio_atividade').alias('data_de_inicio')
    
    )\
    .where('data_de_inicio >= 2010')\
    .groupBy('data_de_inicio')\
    .agg(f.count('cnpj_basico').alias('frequencia'))\
    .orderBy('data_de_inicio', ascending = True)

In [174]:
# Visualização de um Data Frame

freq.toPandas()

Unnamed: 0,data_de_inicio,frequencia
0,2010,154159
1,2011,172677
2,2012,232480
3,2013,198424
4,2014,202276
5,2015,212523
6,2016,265417
7,2017,237292
8,2018,275435
9,2019,325922


In [176]:
# Criação e visualização de um Data Frame com criação de um atributo literal e uma soma

freq.union(
    freq.select(
    f.lit('Total').alias('data_de_inicio'),
    f.sum(freq.frequencia).alias('frequencia')
    )
).show()

+--------------+----------+
|data_de_inicio|frequencia|
+--------------+----------+
|          2010|    154159|
|          2011|    172677|
|          2012|    232480|
|          2013|    198424|
|          2014|    202276|
|          2015|    212523|
|          2016|    265417|
|          2017|    237292|
|          2018|    275435|
|          2019|    325922|
|          2020|    400654|
|          2021|    153275|
|         Total|   2830534|
+--------------+----------+



---

## SparkSQL

[SparkSession.sql(sqlQuery)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.SparkSession.sql.html)

Para saber mais sobre performance: [Artigo - Spark RDDs vs DataFrames vs SparkSQL](https://community.cloudera.com/t5/Community-Articles/Spark-RDDs-vs-DataFrames-vs-SparkSQL/ta-p/246547)

In [178]:
# Criação de view a partir do Data Frame

empresas.createOrReplaceTempView('empresasView')

In [182]:
# Selecionar e visualizar os dados da view, com o limite de 10 linhas

spark.sql('SELECT * FROM empresasView LIMIT 10').toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,4519,DANIELA DA SILVA CRUZ,2135,50,0.0,5,
1,8638,JOAO DOS SANTOS FAGUNDES,2135,50,0.0,5,
2,11748,PANIFICADORA E CONFEITARIA CONFIANCA RIO PRETO...,2062,49,0.0,1,
3,12027,L G SORVETERIA LTDA,2062,49,0.0,5,
4,13289,ANDREIA CRISTINA DELSIN EIRELI,2305,65,100000.0,1,
5,13623,MARISTELA INDUSTRIA E COMERCIO DE SORVETES LTDA,2062,49,0.0,5,
6,17389,DAICICOM MARKETING DIGITAL S/C LTDA,2240,49,0.0,1,
7,18944,SAO GOTARDO-DISTRIBUICAO E COMERCIO LTDA,2062,49,0.0,5,
8,19204,TORTARIA CAMPINAS COMERCIO DE ALIMENTOS LTDA,2062,49,0.0,1,
9,22223,S R FARIAS DA SILVA E CIA LTDA,2062,49,0.0,5,


In [186]:
# Selecionar e visualizar os dados da view, com o limite de 5 linhas

spark\
    .sql("""
        SELECT *
        
        FROM empresasView
        
        WHERE capital_social_da_empresa = 50
        
        LIMIT 5
    """)\
    .toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,14715041,LIDIANE MARIA DO NASCIMENTO 14542418707,2135,50,50.0,1,
1,20601885,CRISTIANO AKIHITO BORDIN 04370949955,2135,50,50.0,1,
2,23661983,VITOR ALOISIO DO NASCIMENTO GUIA 12663882739,2135,50,50.0,1,
3,23714726,JOSELINA PANSINI CORREIA 05265948708,2135,50,50.0,1,
4,26022246,SUZANE SILVA DE OLIVEIRA 06850709992,2135,50,50.0,1,


In [188]:
# Selecionar e visualizar os dados da view, com o limite de 5 linhas

spark\
    .sql("""
        SELECT
             porte_da_empresa
            ,mean(capital_social_da_empresa) as media
        
        FROM empresasView
        
        GROUP BY
            porte_da_empresa
        
        LIMIT 5
    """)\
    .show()

+----------------+------------------+
|porte_da_empresa|             media|
+----------------+------------------+
|            NULL|  8.35421888053467|
|               1|339994.53313507035|
|               3| 2601001.767709269|
|               5| 708660.4208249795|
+----------------+------------------+



In [190]:
# Criação de view a partir do Data Frame

empresas_join.createOrReplaceTempView('empresasJoinView')

In [191]:
# Selecionar e visualizar os dados da view, com o limite de 10 linhas

spark.sql('SELECT * FROM empresasJoinView LIMIT 10').toPandas()

Unnamed: 0,cnpj_basico,cnpj_ordem,cnpj_dv,identificador_matriz_filial,nome_fantasia,situacao_cadastral,data_situacao_cadastral,motivo_situacao_cadastral,nome_da_cidade_no_exterior,pais,...,fax,correio_eletronico,situacao_especial,data_da_situacao_especial,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,243,1,20,1,,2,2005-08-27,0,,,...,,,,,IN FOCO PRODUCOES FOTOGRAFICAS S/S LTDA,2240,49,500.0,1,
1,362,1,82,1,,8,2008-12-31,71,,,...,,,,,SOFT TRAINNING TREINAMENTO EM INFORMATICA S/C ...,2240,49,0.0,5,
2,451,1,29,1,CASA DO PASTOR,8,2008-12-31,71,,,...,,,,,CONSELHO PASTORAL BATISTA FUNDAMENTALISTA DO B...,3999,16,0.0,5,
3,458,1,40,1,,2,2005-11-03,0,,,...,,,,,AMERICO REGATIERI NETO REPRESENTACOES,2135,50,5000.0,1,
4,481,1,35,1,WM & C,8,2008-12-31,71,,,...,,,,,WM & C INFORMATICA LTDA,2240,49,0.0,5,
5,513,1,0,1,TABELIONATO FERNANDES,2,1998-07-28,0,,,...,,CARTORIOMAIRI@HOTMAIL.COM,,,CARTORIO 1 OFICIO E REGISTRO GERAL DE IMOVEIS,3034,42,0.0,5,
6,633,1,8,1,,2,1998-06-23,0,,,...,144691.0,IQE@IQE.ORG.BR,,,INSTITUTO QUALIDADE NO ENSINO,3999,16,0.0,5,
7,633,2,80,2,,2,2000-01-28,0,,,...,4248841.0,,,,INSTITUTO QUALIDADE NO ENSINO,3999,16,0.0,5,
8,642,1,90,1,CONDOMINIO EM CONSTRUCAO,3,2021-03-22,21,,,...,,,,,CONDOMINIO NOVA AMERICA,3085,5,0.0,5,
9,796,1,82,1,,3,2012-08-09,21,,,...,,,,,R.S.O. SERVICOS DE INFORMATICA S/C LTDA,2240,49,0.0,1,


In [194]:
# Selecionar e visualizar os dados da view

freq = spark\
    .sql("""
        SELECT
             year(data_de_inicio_atividade) as data_de_inicio
            ,count(cnpj_basico) as contagem
        
        FROM empresasJoinView
        
        GROUP BY
            data_de_inicio
        
        ORDER BY
            data_de_inicio
    """)

freq\
    .toPandas()

Unnamed: 0,data_de_inicio,contagem
0,,1
1,1199.0,1
2,1901.0,8
3,1903.0,1
4,1906.0,1
...,...,...
99,2017.0,237292
100,2018.0,275435
101,2019.0,325922
102,2020.0,400654


In [195]:
# Criação de view a partir do Data Frame

freq.createOrReplaceTempView('freqView')

In [196]:
# Selecionar e visualizar os dados da view, com o limite de 10 linhas

spark.sql('SELECT * FROM freqView LIMIT 10').toPandas()

Unnamed: 0,data_de_inicio,contagem
0,,1
1,1199.0,1
2,1901.0,8
3,1903.0,1
4,1906.0,1
5,1907.0,1
6,1912.0,1
7,1913.0,2
8,1917.0,1
9,1918.0,2


In [214]:
# Selecionar e visualizar os dados da view

freq = spark\
    .sql("""
        SELECT *
        FROM freqView
        UNION ALL
        SELECT 
            'Total' as data_de_inicio
            ,SUM(contagem) as count
        FROM freqView
        LIMIT 100
            
    """)

freq\
    .toPandas()

Unnamed: 0,data_de_inicio,contagem
0,,1
1,1199,1
2,1901,8
3,1903,1
4,1906,1
...,...,...
95,2013,198424
96,2014,202276
97,2015,212523
98,2016,265417


---

# Formas de Armazenamento
---

## Arquivos CSV

[property DataFrame.write](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.write.html)

[DataFrameWriter.csv(*args)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrameWriter.csv.html)

In [216]:
# Criação e visualização de um Data Frame

importacao = spark.createDataFrame(
    [
     ('Bebidas', 0.15),
     ('Limpeza', 0.05),
     ('Frios', 0.065),
     ('Carnes', 0.08),
    ],
    ['cat', 'tax',]
)

importacao\
    .select('*')\
    .orderBy('tax', ascending=True)\
    .toPandas()

Unnamed: 0,cat,tax
0,Limpeza,0.05
1,Frios,0.065
2,Carnes,0.08
3,Bebidas,0.15


In [None]:
# Criação de um arquivo no formato CSV

importacao.write.csv(
    path='/D:/Archives/tests-tables/gov-open-data/empresas/csv',
    mode='overwrite',
    sep=';',
    header=True
)

In [224]:
# Visualizar os dados de um arquivo CSV

importacao = spark.read.csv(
    path='/D:/Archives/tests-tables/gov-open-data/empresas/part-00000-58983ad4-8444-4405-aec6-9cd3e5413d1b-c000.csv',
    sep=';',
    inferSchema=True,
    header=True
)

importacao.limit(10).toPandas()

Unnamed: 0,4519,DANIELA DA SILVA CRUZ,2135,50,"0,00",5,_c6
0,8638,JOAO DOS SANTOS FAGUNDES,2135,50,0,5,
1,11748,PANIFICADORA E CONFEITARIA CONFIANCA RIO PRETO...,2062,49,0,1,
2,12027,L G SORVETERIA LTDA,2062,49,0,5,
3,13289,ANDREIA CRISTINA DELSIN EIRELI,2305,65,10000000,1,
4,13623,MARISTELA INDUSTRIA E COMERCIO DE SORVETES LTDA,2062,49,0,5,
5,17389,DAICICOM MARKETING DIGITAL S/C LTDA,2240,49,0,1,
6,18944,SAO GOTARDO-DISTRIBUICAO E COMERCIO LTDA,2062,49,0,5,
7,19204,TORTARIA CAMPINAS COMERCIO DE ALIMENTOS LTDA,2062,49,0,1,
8,22223,S R FARIAS DA SILVA E CIA LTDA,2062,49,0,5,
9,23015,EVANGELINA P DE JESUS OSASCO,2135,50,0,1,


---

## Arquivos PARQUET

[Apache Parquet](https://parquet.apache.org/)

[DataFrameWriter.parquet(*args)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrameWriter.parquet.html)

In [225]:
# Criação e visualização de um Data Frame

mercados = spark.createDataFrame(
    [
     ('Bebidas', 0.15),
     ('Limpeza', 0.05),
     ('Frios', 0.065),
     ('Carnes', 0.08),
    ],
    ['cat', 'tax',]
)

mercados\
    .select('*')\
    .orderBy('tax', ascending=True)\
    .toPandas()

Unnamed: 0,cat,tax
0,Limpeza,0.05
1,Frios,0.065
2,Carnes,0.08
3,Bebidas,0.15


In [None]:
# Criação de um arquivo no formato parquet

mercados.write.parquet(
    path='/D:/Archives/tests-tables/gov-open-data/empresas/parquet',
    mode='overwrite',
)

In [None]:
# Visualizar os dados de um arquivo parquet

mercados_parquet= spark.read.parquet(
    '/D:/Archives/tests-tables/gov-open-data/empresas/parquet'
)

In [None]:
# Visualizar o schema do Data Frame

mercados_parquet.printSchema()

---

## Particionamento dos dados

[DataFrameWriter.partitionBy(*cols)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrameWriter.partitionBy.html)

In [229]:
# Criação e visualização de um Data Frame

restaurantes_sp = spark.createDataFrame(
    [
     ('Bebidas', 0.15),
     ('Limpeza', 0.05),
     ('Frios', 0.065),
     ('Carnes', 0.08),
    ],
    ['cat', 'tax',]
)

restaurantes_sp\
    .select('*')\
    .orderBy('tax', ascending=True)\
    .toPandas()

Unnamed: 0,cat,tax
0,Limpeza,0.05
1,Frios,0.065
2,Carnes,0.08
3,Bebidas,0.15


In [None]:
# Criação de um arquivo no formato CSV
# Nesse caso, o arquivo CSV será criado em um único arquivo, e não particionado

restaurantes_sp.coalesce(1).write.csv(
    path='/D:/Archives/tests-tables/gov-open-data/empresas/csv-unico',
    mode='overwrite',
    sep=';',
    header=True
)

In [None]:
# Criação de um arquivo no formato parquet, com a partição pela coluna (porte_da_empresa)

restaurantes_sp.write.parquet(
    path='/D:/Archives/tests-tables/gov-open-data/empresas/parquet-partition',
    mode='overwrite',
    partitionBy = 'porte_da_empresa'
)

In [230]:
# Fechar a sessão spark

spark.stop()