### **Workshop PySpark**




#### **Uma Breve Introdução ao PySpark**

**PySpark** é um linguagem de programação criada 
PySpark é a API Python para Apache Spark, framework criado pela *Apache Software Foundation* que permite a análise e processamento de grandes conjuntos de dados de forma distribuída em um cluster de computadores. 

O PySpark combina a capacidade de aprendizado e a facilidade de uso do Python com o poder do Apache Spark para permitir o processamento e a análise de dados de qualquer tamanho para todos que estão familiarizados com o Python.

<br>
Isso torna o PySpark uma excelente opção para lidar com dados em grande escala (Big Data) e para realizar tarefas como:
1. Análise exploratória de dados;
2. Construção de pipelines de dados;
3. Criação de modelos de aprendizado de máquina;
4. Criação de ETLs.

A principal vantagem do **PySpark** é sua capacidade de lidar com grandes conjuntos de dados de maneira eficiente e escalável. Ele usa o ***Apache Spark***, um motor de **computação distribuído**, para dividir o processamento de dados em várias máquinas. Isso permite que o PySpark processe grandes quantidades de dados muito mais rapidamente do que seria possível com uma única máquina.


#### **Sistema distribuído**

Um motor de computação distribuído é um software que permite que uma tarefa ou processamento seja dividido em várias partes, para ser executado simultaneamente em diferentes computadores de um cluster.
<br> 
Isso permite que o processamento seja feito de forma mais rápida e eficiente do que seria possível em um único computador, já que várias máquinas trabalham juntas para realizar a tarefa em paralelo.

### Ementa

*Professor Ronisson Lucas Calmon da Conceição*

Cientista de Dados no setor bancário, doutorando em Economia Aplicada pela UFBA e professor de Data Science/Analytics.

* Comandos Básicos
* Filtro
* Manipulação de colunas
* SQL no Spark
* Agregação
* Agrupamento
* Visualização de Dados

### Load Data

In [6]:
# imports
# biblioteca com funções SQL
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# Inicialize a SparkSession
spark = SparkSession.builder.appName("NomeDoApp").getOrCreate()

# Caminho absoluto para o arquivo microdados_enem
caminho_arquivo = "D:/Estudos/Arquivos para analise/microdados_enem.csv"

# Crie o DataFrame usando o caminho absoluto para o arquivo CSV
df = spark.read.csv(caminho_arquivo, header=True, inferSchema=True)

In [7]:
display(df)

DataFrame[NU_INSCRICAO: bigint, NU_ANO: int, TP_FAIXA_ETARIA: int, TP_SEXO: string, TP_ESTADO_CIVIL: int, TP_COR_RACA: int, TP_NACIONALIDADE: int, TP_ST_CONCLUSAO: int, TP_ANO_CONCLUIU: int, TP_ESCOLA: int, TP_ENSINO: double, IN_TREINEIRO: int, CO_MUNICIPIO_ESC: double, NO_MUNICIPIO_ESC: string, CO_UF_ESC: double, SG_UF_ESC: string, TP_DEPENDENCIA_ADM_ESC: double, TP_LOCALIZACAO_ESC: double, TP_SIT_FUNC_ESC: double, CO_MUNICIPIO_PROVA: int, NO_MUNICIPIO_PROVA: string, CO_UF_PROVA: int, SG_UF_PROVA: string, TP_PRESENCA_CN: int, TP_PRESENCA_CH: int, TP_PRESENCA_LC: int, TP_PRESENCA_MT: int, CO_PROVA_CN: double, CO_PROVA_CH: double, CO_PROVA_LC: double, CO_PROVA_MT: double, NU_NOTA_CN: double, NU_NOTA_CH: double, NU_NOTA_LC: double, NU_NOTA_MT: double, TX_RESPOSTAS_CN: string, TX_RESPOSTAS_CH: string, TX_RESPOSTAS_LC: string, TX_RESPOSTAS_MT: string, TP_LINGUA: int, TX_GABARITO_CN: string, TX_GABARITO_CH: string, TX_GABARITO_LC: string, TX_GABARITO_MT: string, TP_STATUS_REDACAO: double, N

### Comandos básicos

In [8]:
# estrutura da base
df.printSchema()

root
 |-- NU_INSCRICAO: long (nullable = true)
 |-- NU_ANO: integer (nullable = true)
 |-- TP_FAIXA_ETARIA: integer (nullable = true)
 |-- TP_SEXO: string (nullable = true)
 |-- TP_ESTADO_CIVIL: integer (nullable = true)
 |-- TP_COR_RACA: integer (nullable = true)
 |-- TP_NACIONALIDADE: integer (nullable = true)
 |-- TP_ST_CONCLUSAO: integer (nullable = true)
 |-- TP_ANO_CONCLUIU: integer (nullable = true)
 |-- TP_ESCOLA: integer (nullable = true)
 |-- TP_ENSINO: double (nullable = true)
 |-- IN_TREINEIRO: integer (nullable = true)
 |-- CO_MUNICIPIO_ESC: double (nullable = true)
 |-- NO_MUNICIPIO_ESC: string (nullable = true)
 |-- CO_UF_ESC: double (nullable = true)
 |-- SG_UF_ESC: string (nullable = true)
 |-- TP_DEPENDENCIA_ADM_ESC: double (nullable = true)
 |-- TP_LOCALIZACAO_ESC: double (nullable = true)
 |-- TP_SIT_FUNC_ESC: double (nullable = true)
 |-- CO_MUNICIPIO_PROVA: integer (nullable = true)
 |-- NO_MUNICIPIO_PROVA: string (nullable = true)
 |-- CO_UF_PROVA: integer (nulla

In [15]:
# type
type(df)

pyspark.sql.dataframe.DataFrame

In [16]:
# conversão para Pandas
df_pandas = df.toPandas()

In [17]:
# conversão Pandas para Spark
df_spark = spark.createDataFrame(df_pandas)
display(df_spark)

DataFrame[NU_INSCRICAO: bigint, NU_ANO: bigint, TP_FAIXA_ETARIA: bigint, TP_SEXO: string, TP_ESTADO_CIVIL: bigint, TP_COR_RACA: bigint, TP_NACIONALIDADE: bigint, TP_ST_CONCLUSAO: bigint, TP_ANO_CONCLUIU: bigint, TP_ESCOLA: bigint, TP_ENSINO: double, IN_TREINEIRO: bigint, CO_MUNICIPIO_ESC: double, NO_MUNICIPIO_ESC: string, CO_UF_ESC: double, SG_UF_ESC: string, TP_DEPENDENCIA_ADM_ESC: double, TP_LOCALIZACAO_ESC: double, TP_SIT_FUNC_ESC: double, CO_MUNICIPIO_PROVA: bigint, NO_MUNICIPIO_PROVA: string, CO_UF_PROVA: bigint, SG_UF_PROVA: string, TP_PRESENCA_CN: bigint, TP_PRESENCA_CH: bigint, TP_PRESENCA_LC: bigint, TP_PRESENCA_MT: bigint, CO_PROVA_CN: double, CO_PROVA_CH: double, CO_PROVA_LC: double, CO_PROVA_MT: double, NU_NOTA_CN: double, NU_NOTA_CH: double, NU_NOTA_LC: double, NU_NOTA_MT: double, TX_RESPOSTAS_CN: string, TX_RESPOSTAS_CH: string, TX_RESPOSTAS_LC: string, TX_RESPOSTAS_MT: string, TP_LINGUA: bigint, TX_GABARITO_CN: string, TX_GABARITO_CH: string, TX_GABARITO_LC: string, TX_G

* Seleção de colunas

In [18]:
df.select('NU_INSCRICAO').show()

+------------+
|NU_INSCRICAO|
+------------+
|210053270998|
|210051914571|
|210053592727|
|210051819501|
|210052271642|
|210051334162|
|210054090746|
|210051018671|
|210052350881|
|210051588838|
|210052314230|
|210051923564|
|210052563255|
|210054208012|
|210051016383|
|210052752600|
|210054040152|
|210051423784|
|210054226624|
|210051936472|
+------------+
only showing top 20 rows



In [19]:
df.select(['NU_INSCRICAO']).show()

+------------+
|NU_INSCRICAO|
+------------+
|210053270998|
|210051914571|
|210053592727|
|210051819501|
|210052271642|
|210051334162|
|210054090746|
|210051018671|
|210052350881|
|210051588838|
|210052314230|
|210051923564|
|210052563255|
|210054208012|
|210051016383|
|210052752600|
|210054040152|
|210051423784|
|210054226624|
|210051936472|
+------------+
only showing top 20 rows



In [20]:
df.select(F.col('NU_INSCRICAO')).show()

+------------+
|NU_INSCRICAO|
+------------+
|210053270998|
|210051914571|
|210053592727|
|210051819501|
|210052271642|
|210051334162|
|210054090746|
|210051018671|
|210052350881|
|210051588838|
|210052314230|
|210051923564|
|210052563255|
|210054208012|
|210051016383|
|210052752600|
|210054040152|
|210051423784|
|210054226624|
|210051936472|
+------------+
only showing top 20 rows



Selecionando várias colunas


In [22]:
df.select(F.col('NU_INSCRICAO'), F.col('TP_SEXO')).show()

+------------+-------+
|NU_INSCRICAO|TP_SEXO|
+------------+-------+
|210053270998|      M|
|210051914571|      M|
|210053592727|      F|
|210051819501|      M|
|210052271642|      F|
|210051334162|      F|
|210054090746|      M|
|210051018671|      F|
|210052350881|      F|
|210051588838|      M|
|210052314230|      F|
|210051923564|      M|
|210052563255|      F|
|210054208012|      F|
|210051016383|      F|
|210052752600|      F|
|210054040152|      F|
|210051423784|      F|
|210054226624|      F|
|210051936472|      M|
+------------+-------+
only showing top 20 rows



In [23]:
df.select(['NU_INSCRICAO','TP_SEXO']).show()

+------------+-------+
|NU_INSCRICAO|TP_SEXO|
+------------+-------+
|210053270998|      M|
|210051914571|      M|
|210053592727|      F|
|210051819501|      M|
|210052271642|      F|
|210051334162|      F|
|210054090746|      M|
|210051018671|      F|
|210052350881|      F|
|210051588838|      M|
|210052314230|      F|
|210051923564|      M|
|210052563255|      F|
|210054208012|      F|
|210051016383|      F|
|210052752600|      F|
|210054040152|      F|
|210051423784|      F|
|210054226624|      F|
|210051936472|      M|
+------------+-------+
only showing top 20 rows



In [64]:
df.select('NU_INSCRICAO','TP_SEXO').show()

+------------+-------+
|NU_INSCRICAO|TP_SEXO|
+------------+-------+
|210053270998|      M|
|210051914571|      M|
|210053592727|      F|
|210051819501|      M|
|210052271642|      F|
|210051334162|      F|
|210054090746|      M|
|210051018671|      F|
|210052350881|      F|
|210051588838|      M|
|210052314230|      F|
|210051923564|      M|
|210052563255|      F|
|210054208012|      F|
|210051016383|      F|
|210052752600|      F|
|210054040152|      F|
|210051423784|      F|
|210054226624|      F|
|210051936472|      M|
+------------+-------+
only showing top 20 rows



Sumário Estatístico

In [24]:
(df
 .select(['NU_NOTA_MT', 'NU_NOTA_CH', 'NU_NOTA_CN', 'NU_NOTA_LC'])
 .describe()
 .show()
)

+-------+-----------------+-----------------+-----------------+-----------------+
|summary|       NU_NOTA_MT|       NU_NOTA_CH|       NU_NOTA_CN|       NU_NOTA_LC|
+-------+-----------------+-----------------+-----------------+-----------------+
|  count|               31|               32|               31|               32|
|   mean|503.8612903225805|        505.26875|458.3548387096774|483.3624999999999|
| stddev|90.29810141016016|83.38392526102068|70.41272062494427|61.81394951658329|
|    min|            363.5|            361.7|            350.2|            344.9|
|    max|            746.0|            670.7|            671.5|            577.8|
+-------+-----------------+-----------------+-----------------+-----------------+



In [25]:
(df
 .select(['NU_NOTA_MT', 'NU_NOTA_CH', 'NU_NOTA_CN', 'NU_NOTA_LC'])
 .summary()
 .show()
)

+-------+-----------------+-----------------+-----------------+-----------------+
|summary|       NU_NOTA_MT|       NU_NOTA_CH|       NU_NOTA_CN|       NU_NOTA_LC|
+-------+-----------------+-----------------+-----------------+-----------------+
|  count|               31|               32|               31|               32|
|   mean|503.8612903225805|        505.26875|458.3548387096774|483.3624999999999|
| stddev|90.29810141016016|83.38392526102068|70.41272062494427|61.81394951658329|
|    min|            363.5|            361.7|            350.2|            344.9|
|    25%|            432.8|            438.3|            400.9|            442.3|
|    50%|            492.5|            487.0|            448.1|            488.3|
|    75%|            553.6|            568.1|            493.5|            532.8|
|    max|            746.0|            670.7|            671.5|            577.8|
+-------+-----------------+-----------------+-----------------+-----------------+



Para obter os valores únicos de uma coluna podemos usar o comando `.distinct()`.

In [27]:
# distinct - similar ao unique do Pandas
df.select(F.col('TP_SEXO')).distinct().show()

+-------+
|TP_SEXO|
+-------+
|      F|
|      M|
+-------+



### Filtro

Um procedimento muito comum ao manipular dados é a necessidade de filtrar bases por algum condição ou mesmo por um conjunto de condições. Para tanto, podemos usar os comandos `.filter()` e `.where()`.

In [28]:
# condição simples
df.filter(F.col('TP_SEXO') == 'F').show()

# Outras sintaxes:
# df.filter(df.TP_SEXO == 'F').display()
# df.filter('TP_SEXO == "F"').display()

+------------+------+---------------+-------+---------------+-----------+----------------+---------------+---------------+---------+---------+------------+----------------+----------------+---------+---------+----------------------+------------------+---------------+------------------+------------------+-----------+-----------+--------------+--------------+--------------+--------------+-----------+-----------+-----------+-----------+----------+----------+----------+----------+--------------------+--------------------+--------------------+--------------------+---------+--------------------+--------------------+--------------------+--------------------+-----------------+-------------+-------------+-------------+-------------+-------------+---------------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|NU_INSCRICAO|NU_ANO|TP_FAIXA_ETARIA|TP_SEXO|TP_ESTADO_CIVIL|TP_COR_RACA|TP_NACIONALIDADE|TP_ST_CONCLUSAO|TP_AN

Vejamos agora como filtrar os dados a partir de uma condição múltipla. Podemos usar os operadores `AND` (**&**) ou `OR` (**|**). Lembre-se que o operador `AND` retorna True se todas as condições forem verdadeiras, ao passo que o operador `OR` retorna True se pelo menos uma condição for True.

In [30]:
subset = (
    df
    .filter(
        (F.col('TP_SEXO') == 'F') &
        (F.col('NU_NOTA_MT') >= 300 ) &
        (F.col('NO_MUNICIPIO_PROVA') == 'Salvador')
    )
    .select('NU_INSCRICAO', 'TP_SEXO', 'NU_NOTA_MT')
)

display(subset)

DataFrame[NU_INSCRICAO: bigint, TP_SEXO: string, NU_NOTA_MT: double]

Para filtrar missing values podemos usar `isNull()` e `isNotNull()`.

In [31]:
# filtrando missing values - isNull()
subset2 = (
    df
    .filter(F.col('NU_NOTA_MT').isNull())
    .select('NU_INSCRICAO', 'TP_SEXO', 'NU_NOTA_MT')
)
display(subset2)

DataFrame[NU_INSCRICAO: bigint, TP_SEXO: string, NU_NOTA_MT: double]

In [0]:
# filtrando missing values - isNotNull()
subset3 = (
    df
    .filter(F.col('NU_NOTA_MT').isNotNull())
    .select('NU_INSCRICAO', 'TP_SEXO', 'NU_NOTA_MT')
)
display(subset3)

NU_INSCRICAO,TP_SEXO,NU_NOTA_MT
210053270998,M,431.5
210053592727,F,528.0
210051819501,M,480.8
210052271642,F,477.7
210051334162,F,504.8
210052350881,F,398.1
210051588838,M,719.8
210051923564,M,419.2
210054208012,F,420.5
210051016383,F,416.5


Na próxima query vamos usar o operador `.isin()` para filtrar os candidatos que residem apenas nas cidades que iremos definir dentro da lista.

In [33]:
(
    df
    .filter((F.col('NO_MUNICIPIO_PROVA').isin(['Salvador', 'Feira de Santana'])) &
            (F.col('NU_NOTA_MT').isNotNull()) &
            (F.col('TP_SEXO') == 'F') &
            (F.col('NU_NOTA_MT') >= 450)
           )
    .select(['NU_INSCRICAO', 'TP_SEXO', 'NO_MUNICIPIO_PROVA', 'NU_NOTA_MT'])
    .show()
)

+------------+-------+------------------+----------+
|NU_INSCRICAO|TP_SEXO|NO_MUNICIPIO_PROVA|NU_NOTA_MT|
+------------+-------+------------------+----------+
|210053768668|      F|          Salvador|     499.9|
|210052542394|      F|          Salvador|     492.5|
+------------+-------+------------------+----------+



Podemos fazer o mesmo comando no SQL:

In [83]:
%sql
select
  NU_INSCRICAO as id_aluno, 
  TP_SEXO      as sexo, 
  NO_MUNICIPIO_PROVA as municipio, 
  NU_NOTA_MT as nota_mt
from default.microdados_enem
where 
  NO_MUNICIPIO_PROVA IN ('Salvador', 'Feira de Santana')
  AND NU_NOTA_MT IS NOT NULL
  AND TP_SEXO = 'F'
  AND NU_NOTA_MT >= 450
;

AnalysisException: [TABLE_OR_VIEW_NOT_FOUND] The table or view `default`.`microdados_enem` cannot be found. Verify the spelling and correctness of the schema and catalog.
If you did not qualify the name with a schema, verify the current_schema() output, or qualify the name with the correct schema and catalog.
To tolerate the error on drop use DROP VIEW IF EXISTS or DROP TABLE IF EXISTS.; line 7 pos 8;
'Project ['NU_INSCRICAO AS id_aluno#4990, 'TP_SEXO AS sexo#4991, 'NO_MUNICIPIO_PROVA AS municipio#4992, 'NU_NOTA_MT AS nota_mt#4993]
+- 'Filter (('NO_MUNICIPIO_PROVA IN (Salvador,Feira de Santana) AND isnotnull('NU_NOTA_MT)) AND (('TP_SEXO = F) AND ('NU_NOTA_MT >= 450)))
   +- 'UnresolvedRelation [default, microdados_enem], [], false


Podemos negar uma condição com o operador `~` (similar ao pandas).

In [0]:
(
    df.filter(~(F.col('NO_MUNICIPIO_PROVA').isin(['Salvador'])))
    .select('NO_MUNICIPIO_PROVA').distinct().display()
)

# outra sintaxe possível:
# (
#     df.filter(F.col('NO_MUNICIPIO_PROVA').isin(['Salvador']) == False)
#     .select('NO_MUNICIPIO_PROVA').distinct().display()
# )

NO_MUNICIPIO_PROVA
Porto Seguro
Ipiaú
Ilhéus
Cachoeira
Eunápolis
Lapão
Cansanção
Valença
Candeias
Cícero Dantas


Podemos usar ainda a função `where()` para efetuar fazer filtros. Ambas as funções produzem os mesmos resultados.

In [38]:
(df.filter((F.col('TP_SEXO') == 'M') & (F.col('NU_NOTA_MT') >= 650))
 .select('NU_INSCRICAO', 'TP_SEXO', 'NU_NOTA_MT')
 .show()
)

+------------+-------+----------+
|NU_INSCRICAO|TP_SEXO|NU_NOTA_MT|
+------------+-------+----------+
|210051588838|      M|     719.8|
|210051085130|      M|     746.0|
+------------+-------+----------+



In [39]:
(
    df
    .where('(TP_SEXO == "M") AND (NU_NOTA_MT >= 650)')
    .select('NU_INSCRICAO', 'TP_SEXO', 'NU_NOTA_MT')
    .show()
)

+------------+-------+----------+
|NU_INSCRICAO|TP_SEXO|NU_NOTA_MT|
+------------+-------+----------+
|210051588838|      M|     719.8|
|210051085130|      M|     746.0|
+------------+-------+----------+



In [40]:
# Outra sintaxe:
(
    df
    .where((F.col('TP_SEXO') == 'M') & (F.col('NU_NOTA_MT') >= 650))
    .select('NU_INSCRICAO', 'TP_SEXO', 'NU_NOTA_MT')
    .show()
)

+------------+-------+----------+
|NU_INSCRICAO|TP_SEXO|NU_NOTA_MT|
+------------+-------+----------+
|210051588838|      M|     719.8|
|210051085130|      M|     746.0|
+------------+-------+----------+



In [41]:
# Outra sintaxe:
(
    df
    .where((df.TP_SEXO == "M") & (df.NU_NOTA_MT >= 650))
    .select('NU_INSCRICAO', 'TP_SEXO', 'NU_NOTA_MT')
    .show()
)

+------------+-------+----------+
|NU_INSCRICAO|TP_SEXO|NU_NOTA_MT|
+------------+-------+----------+
|210051588838|      M|     719.8|
|210051085130|      M|     746.0|
+------------+-------+----------+



Podemos usar o operador LIKE de forma análoga ao que fizemos no SQL.

In [44]:
display(
    (
    df.filter(
        F.lower(F.col('NO_MUNICIPIO_PROVA')).like('%salvador%')
                   )
    .select('NO_MUNICIPIO_PROVA')
    .distinct()
)

)

DataFrame[NO_MUNICIPIO_PROVA: string]

Para filtrar um dado textual que inicia ou finaliza com um determinado padrão podemos usar as funções `startswith()` ou `endswith()`.

Com o operador `BETWEEN` podemos filtrar um intervalo. Vamos utilizar este comando para filtrar os candidatos cque tenham idade dentro de um determinado intervalo.

In [45]:
display((
    df.filter(F.col('TP_FAIXA_ETARIA').between(1, 3))
))

DataFrame[NU_INSCRICAO: bigint, NU_ANO: int, TP_FAIXA_ETARIA: int, TP_SEXO: string, TP_ESTADO_CIVIL: int, TP_COR_RACA: int, TP_NACIONALIDADE: int, TP_ST_CONCLUSAO: int, TP_ANO_CONCLUIU: int, TP_ESCOLA: int, TP_ENSINO: double, IN_TREINEIRO: int, CO_MUNICIPIO_ESC: double, NO_MUNICIPIO_ESC: string, CO_UF_ESC: double, SG_UF_ESC: string, TP_DEPENDENCIA_ADM_ESC: double, TP_LOCALIZACAO_ESC: double, TP_SIT_FUNC_ESC: double, CO_MUNICIPIO_PROVA: int, NO_MUNICIPIO_PROVA: string, CO_UF_PROVA: int, SG_UF_PROVA: string, TP_PRESENCA_CN: int, TP_PRESENCA_CH: int, TP_PRESENCA_LC: int, TP_PRESENCA_MT: int, CO_PROVA_CN: double, CO_PROVA_CH: double, CO_PROVA_LC: double, CO_PROVA_MT: double, NU_NOTA_CN: double, NU_NOTA_CH: double, NU_NOTA_LC: double, NU_NOTA_MT: double, TX_RESPOSTAS_CN: string, TX_RESPOSTAS_CH: string, TX_RESPOSTAS_LC: string, TX_RESPOSTAS_MT: string, TP_LINGUA: int, TX_GABARITO_CN: string, TX_GABARITO_CH: string, TX_GABARITO_LC: string, TX_GABARITO_MT: string, TP_STATUS_REDACAO: double, N

### Manipulação de colunas

Renomear colunas

In [47]:
# altera o nome de todas as colunas do dF para o formato lower()
new_cols_name = [col.lower() for col in df.columns]
newDF = df.toDF(*new_cols_name)
newDF.show()

+------------+------+---------------+-------+---------------+-----------+----------------+---------------+---------------+---------+---------+------------+----------------+----------------+---------+---------+----------------------+------------------+---------------+------------------+------------------+-----------+-----------+--------------+--------------+--------------+--------------+-----------+-----------+-----------+-----------+----------+----------+----------+----------+--------------------+--------------------+--------------------+--------------------+---------+--------------------+--------------------+--------------------+--------------------+-----------------+-------------+-------------+-------------+-------------+-------------+---------------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|nu_inscricao|nu_ano|tp_faixa_etaria|tp_sexo|tp_estado_civil|tp_cor_raca|tp_nacionalidade|tp_st_conclusao|tp_an

Para renomear colunas podemos usar o comando `.withColumnRenamed(old_name, new_name)`.

In [48]:
(
    df
    .withColumnRenamed('NU_INSCRICAO', 'id')
    .withColumnRenamed('NU_ANO', 'ano')
    .withColumnRenamed('TP_SEXO', 'sexo')
    .select('id', 'ano', 'sexo')
    .show()
)

+------------+----+----+
|          id| ano|sexo|
+------------+----+----+
|210053270998|2021|   M|
|210051914571|2021|   M|
|210053592727|2021|   F|
|210051819501|2021|   M|
|210052271642|2021|   F|
|210051334162|2021|   F|
|210054090746|2021|   M|
|210051018671|2021|   F|
|210052350881|2021|   F|
|210051588838|2021|   M|
|210052314230|2021|   F|
|210051923564|2021|   M|
|210052563255|2021|   F|
|210054208012|2021|   F|
|210051016383|2021|   F|
|210052752600|2021|   F|
|210054040152|2021|   F|
|210051423784|2021|   F|
|210054226624|2021|   F|
|210051936472|2021|   M|
+------------+----+----+
only showing top 20 rows



Remoção de colunas

In [50]:
cols_to_drop = [
 'TX_RESPOSTAS_CN',
 'TX_RESPOSTAS_CH',
 'TX_RESPOSTAS_LC',
 'TX_RESPOSTAS_MT',
 'TX_GABARITO_CN',
 'TX_GABARITO_CH',
 'TX_GABARITO_LC',
 'TX_GABARITO_MT',
 'CO_PROVA_CN',
 'CO_PROVA_CH',
 'CO_PROVA_LC',
 'CO_PROVA_MT'
]

df = df.drop(*cols_to_drop)
display(df)

DataFrame[NU_INSCRICAO: bigint, NU_ANO: int, TP_FAIXA_ETARIA: int, TP_SEXO: string, TP_ESTADO_CIVIL: int, TP_COR_RACA: int, TP_NACIONALIDADE: int, TP_ST_CONCLUSAO: int, TP_ANO_CONCLUIU: int, TP_ESCOLA: int, TP_ENSINO: double, IN_TREINEIRO: int, CO_MUNICIPIO_ESC: double, NO_MUNICIPIO_ESC: string, CO_UF_ESC: double, SG_UF_ESC: string, TP_DEPENDENCIA_ADM_ESC: double, TP_LOCALIZACAO_ESC: double, TP_SIT_FUNC_ESC: double, CO_MUNICIPIO_PROVA: int, NO_MUNICIPIO_PROVA: string, CO_UF_PROVA: int, SG_UF_PROVA: string, TP_PRESENCA_CN: int, TP_PRESENCA_CH: int, TP_PRESENCA_LC: int, TP_PRESENCA_MT: int, NU_NOTA_CN: double, NU_NOTA_CH: double, NU_NOTA_LC: double, NU_NOTA_MT: double, TP_LINGUA: int, TP_STATUS_REDACAO: double, NU_NOTA_COMP1: double, NU_NOTA_COMP2: double, NU_NOTA_COMP3: double, NU_NOTA_COMP4: double, NU_NOTA_COMP5: double, NU_NOTA_REDACAO: double, Q001: string, Q002: string, Q003: string, Q004: string, Q005: double, Q006: string, Q007: string, Q008: string, Q009: string, Q010: string, Q

Criando colunas

A função `.withColumn()` possibilia a criação de colunas, vejamos alguns exemplos. Com a função `lit()` podemos criar uma coluna com uma constante ou valor literal.

In [52]:
(
    df
    .withColumn('log_nota_mt', F.log1p('NU_NOTA_MT'))
    .withColumn('flag_enem', F.lit(1))
    .withColumnRenamed('NU_INSCRICAO', 'id_aluno')
    .withColumnRenamed('NU_NOTA_MT', 'nota_mt')
    .select('id_aluno', 'nota_mt', 'log_nota_mt', 'flag_enem')
    .show()
)

+------------+-------+------------------+---------+
|    id_aluno|nota_mt|       log_nota_mt|flag_enem|
+------------+-------+------------------+---------+
|210053270998|  431.5| 6.069582326371934|        1|
|210051914571|   NULL|              NULL|        1|
|210053592727|  528.0| 6.270988431858299|        1|
|210051819501|  480.8| 6.177529090180771|        1|
|210052271642|  477.7| 6.171074096398463|        1|
|210051334162|  504.8| 6.226141334235865|        1|
|210054090746|   NULL|              NULL|        1|
|210051018671|   NULL|              NULL|        1|
|210052350881|  398.1| 5.989212012054688|        1|
|210051588838|  719.8| 6.580361706294128|        1|
|210052314230|   NULL|              NULL|        1|
|210051923564|  419.2|   6.0407307884109|        1|
|210052563255|   NULL|              NULL|        1|
|210054208012|  420.5|  6.04381977744191|        1|
|210051016383|  416.5|6.0342845442909105|        1|
|210052752600|   NULL|              NULL|        1|
|21005404015

A instrução `when()` permite criar uma estrutura `if-else-then` de forma análoga ao SQL. 

In [53]:
# TP_SEXO
# F - Feminino
# M - Masculino

# TP_ESCOLA
# 1 - Não respondeu
# 2 - Pública
# 3 - Privada

(
    df
    .withColumn('TP_SEXO', F.when(F.col('TP_SEXO') == 'F', 'Feminino')
                .when(F.col('TP_SEXO') == 'M', 'Masculino')
                .when(F.col('TP_SEXO').isNull(), 'missing')
                .otherwise(F.lit('verificar'))
               )
    .withColumn('TP_ESCOLA', F.when(F.col('TP_ESCOLA') == 1, 'Não respondeu')
                .when(F.col('TP_ESCOLA') == 2, 'Pública')
                .when(F.col('TP_ESCOLA') == 3, 'Privada')
                .when(F.col('TP_ESCOLA').isNull(), 'missing')
                .otherwise(F.lit('verificar'))
               )
    .select('NU_INSCRICAO', 'TP_SEXO', 'TP_ESCOLA')
    .show()
)

+------------+---------+-------------+
|NU_INSCRICAO|  TP_SEXO|    TP_ESCOLA|
+------------+---------+-------------+
|210053270998|Masculino|Não respondeu|
|210051914571|Masculino|Não respondeu|
|210053592727| Feminino|Não respondeu|
|210051819501|Masculino|      Pública|
|210052271642| Feminino|Não respondeu|
|210051334162| Feminino|      Pública|
|210054090746|Masculino|      Pública|
|210051018671| Feminino|Não respondeu|
|210052350881| Feminino|      Pública|
|210051588838|Masculino|      Privada|
|210052314230| Feminino|Não respondeu|
|210051923564|Masculino|      Pública|
|210052563255| Feminino|Não respondeu|
|210054208012| Feminino|      Pública|
|210051016383| Feminino|      Pública|
|210052752600| Feminino|Não respondeu|
|210054040152| Feminino|Não respondeu|
|210051423784| Feminino|      Pública|
|210054226624| Feminino|Não respondeu|
|210051936472|Masculino|      Pública|
+------------+---------+-------------+
only showing top 20 rows



No SQL temos que:

In [0]:
%sql
select 
  NU_INSCRICAO,
  case
    when TP_SEXO = 'F' then 'Feminino'
    when TP_SEXO = 'M' then 'Masculino'
    when TP_SEXO IS NULL then 'missing'
    else 'verificar'
  end as TP_SEXO,
  case 
    when TP_ESCOLA = 1 then 'Não respondeu'
    when TP_ESCOLA = 2 then 'Pública'
    when TP_ESCOLA = 3 then 'Privada'
    when TP_ESCOLA IS NULL then 'missing'
    else 'verificar'
  end as TP_ESCOLA
from default.microdados_enem;

NU_INSCRICAO,TP_SEXO,TP_ESCOLA
210053270998,Masculino,Não respondeu
210051914571,Masculino,Não respondeu
210053592727,Feminino,Não respondeu
210051819501,Masculino,Pública
210052271642,Feminino,Não respondeu
210051334162,Feminino,Pública
210054090746,Masculino,Pública
210051018671,Feminino,Não respondeu
210052350881,Feminino,Pública
210051588838,Masculino,Privada


Outra possibilidade para o CASE WHEN no PySpark seria utilizando a função `expr()`, que permite executar expressões SQL em um DataFrame PySpark. Iremos implementar o CASE WHEN utilizando esta função.

In [54]:
(
    df
    .select('NU_INSCRICAO', 'TP_SEXO', 'TP_ESCOLA')
    .withColumn('TP_SEXO', F.expr("""
    case
         when TP_SEXO = 'M' then 'Masculino'
         when TP_SEXO = 'F' then 'Feminino'
         when TP_SEXO is null then 'missing'
         else 'verificar' 
    end
    """))
    .withColumn('TP_ESCOLA', F.expr("""
    case 
        when TP_ESCOLA = 1 then 'Não respondeu'
        when TP_ESCOLA = 2 then 'Pública'
        when TP_ESCOLA = 3 then 'Privada'
        when TP_ESCOLA IS NULL then 'missing'
        else 'verificar'
    end
    """))
    .show()
)

+------------+---------+-------------+
|NU_INSCRICAO|  TP_SEXO|    TP_ESCOLA|
+------------+---------+-------------+
|210053270998|Masculino|Não respondeu|
|210051914571|Masculino|Não respondeu|
|210053592727| Feminino|Não respondeu|
|210051819501|Masculino|      Pública|
|210052271642| Feminino|Não respondeu|
|210051334162| Feminino|      Pública|
|210054090746|Masculino|      Pública|
|210051018671| Feminino|Não respondeu|
|210052350881| Feminino|      Pública|
|210051588838|Masculino|      Privada|
|210052314230| Feminino|Não respondeu|
|210051923564|Masculino|      Pública|
|210052563255| Feminino|Não respondeu|
|210054208012| Feminino|      Pública|
|210051016383| Feminino|      Pública|
|210052752600| Feminino|Não respondeu|
|210054040152| Feminino|Não respondeu|
|210051423784| Feminino|      Pública|
|210054226624| Feminino|Não respondeu|
|210051936472|Masculino|      Pública|
+------------+---------+-------------+
only showing top 20 rows




O PySpark fornece as funções fillna() e fill() para substituir valores NULL/None.

* fillna() e fill()

```python
fillna(value, subset)

fill(value, subset)
```


In [58]:
(
    df
    .fillna({'NU_NOTA_MT': -1, 'NU_NOTA_CH': -1})
    .select('NU_INSCRICAO', 'NU_NOTA_MT', 'NU_NOTA_CH')
    .show()
)

+------------+----------+----------+
|NU_INSCRICAO|NU_NOTA_MT|NU_NOTA_CH|
+------------+----------+----------+
|210053270998|     431.5|     426.7|
|210051914571|      -1.0|      -1.0|
|210053592727|     528.0|     653.4|
|210051819501|     480.8|     425.9|
|210052271642|     477.7|     518.8|
|210051334162|     504.8|     498.8|
|210054090746|      -1.0|      -1.0|
|210051018671|      -1.0|      -1.0|
|210052350881|     398.1|     447.2|
|210051588838|     719.8|     670.7|
|210052314230|      -1.0|      -1.0|
|210051923564|     419.2|     482.9|
|210052563255|      -1.0|      -1.0|
|210054208012|     420.5|     542.1|
|210051016383|     416.5|     521.1|
|210052752600|      -1.0|      -1.0|
|210054040152|     484.1|     568.1|
|210051423784|     440.0|     371.6|
|210054226624|     569.8|     476.1|
|210051936472|      -1.0|     524.8|
+------------+----------+----------+
only showing top 20 rows



In [59]:
(
    df
    .na.fill({'NU_NOTA_MT': -1, 'NU_NOTA_CH': -1})
    .select('NU_INSCRICAO', 'NU_NOTA_MT', 'NU_NOTA_CH')
    .show()
)

+------------+----------+----------+
|NU_INSCRICAO|NU_NOTA_MT|NU_NOTA_CH|
+------------+----------+----------+
|210053270998|     431.5|     426.7|
|210051914571|      -1.0|      -1.0|
|210053592727|     528.0|     653.4|
|210051819501|     480.8|     425.9|
|210052271642|     477.7|     518.8|
|210051334162|     504.8|     498.8|
|210054090746|      -1.0|      -1.0|
|210051018671|      -1.0|      -1.0|
|210052350881|     398.1|     447.2|
|210051588838|     719.8|     670.7|
|210052314230|      -1.0|      -1.0|
|210051923564|     419.2|     482.9|
|210052563255|      -1.0|      -1.0|
|210054208012|     420.5|     542.1|
|210051016383|     416.5|     521.1|
|210052752600|      -1.0|      -1.0|
|210054040152|     484.1|     568.1|
|210051423784|     440.0|     371.6|
|210054226624|     569.8|     476.1|
|210051936472|      -1.0|     524.8|
+------------+----------+----------+
only showing top 20 rows



### SQL no Spark

In [0]:
# cria uma tabela cujo nome é df_enem
df.createOrReplaceTempView('df_enem')

In [0]:
newSubset = sqlContext.sql("""
select * from df_enem
where TP_SEXO = 'F'
""")

In [0]:
newSubset.display()

NU_INSCRICAO,NU_ANO,TP_FAIXA_ETARIA,TP_SEXO,TP_ESTADO_CIVIL,TP_COR_RACA,TP_NACIONALIDADE,TP_ST_CONCLUSAO,TP_ANO_CONCLUIU,TP_ESCOLA,TP_ENSINO,IN_TREINEIRO,CO_MUNICIPIO_ESC,NO_MUNICIPIO_ESC,CO_UF_ESC,SG_UF_ESC,TP_DEPENDENCIA_ADM_ESC,TP_LOCALIZACAO_ESC,TP_SIT_FUNC_ESC,CO_MUNICIPIO_PROVA,NO_MUNICIPIO_PROVA,CO_UF_PROVA,SG_UF_PROVA,TP_PRESENCA_CN,TP_PRESENCA_CH,TP_PRESENCA_LC,TP_PRESENCA_MT,NU_NOTA_CN,NU_NOTA_CH,NU_NOTA_LC,NU_NOTA_MT,TP_LINGUA,TP_STATUS_REDACAO,NU_NOTA_COMP1,NU_NOTA_COMP2,NU_NOTA_COMP3,NU_NOTA_COMP4,NU_NOTA_COMP5,NU_NOTA_REDACAO,Q001,Q002,Q003,Q004,Q005,Q006,Q007,Q008,Q009,Q010,Q011,Q012,Q013,Q014,Q015,Q016,Q017,Q018,Q019,Q020,Q021,Q022,Q023,Q024,Q025
210053592727,2021,10,F,1,3,1,1,7,1,,0,,,,,,,,2928208,Santana,29,BA,1,1,1,1,572.3,653.4,577.8,528.0,1,1.0,120.0,200.0,160.0,140.0,120.0,740.0,E,G,B,D,3.0,F,A,B,D,A,A,B,B,A,A,A,A,A,B,A,A,D,A,B,B
210052271642,2021,2,F,1,1,1,3,0,1,,1,,,,,,,,2916401,Itapetinga,29,BA,1,1,1,1,443.9,518.8,490.4,477.7,0,1.0,120.0,180.0,120.0,160.0,140.0,720.0,C,E,C,B,4.0,B,A,B,D,B,A,B,B,B,A,A,A,A,C,A,A,D,A,A,B
210051334162,2021,3,F,1,2,2,2,0,2,1.0,0,,,,,,,,2918209,Jiquiriçá,29,BA,1,1,1,1,461.9,498.8,475.6,504.8,1,1.0,100.0,180.0,120.0,160.0,120.0,680.0,B,C,A,A,4.0,B,A,B,D,A,B,B,A,A,A,A,A,A,B,A,A,C,A,A,B
210051018671,2021,7,F,1,3,1,1,3,1,,0,,,,,,,,2927408,Salvador,29,BA,0,0,0,0,,,,,0,,,,,,,,C,E,B,B,3.0,C,A,B,D,A,A,B,B,B,B,B,A,A,B,A,A,E,A,B,B
210052350881,2021,5,F,1,3,1,2,0,2,1.0,0,2927408.0,Salvador,29.0,BA,2.0,1.0,1.0,2927408,Salvador,29,BA,1,1,1,1,493.5,447.2,510.7,398.1,0,3.0,0.0,0.0,0.0,0.0,0.0,0.0,H,D,B,D,4.0,D,A,C,D,A,A,B,B,B,A,B,A,B,C,A,A,E,A,A,B
210052314230,2021,8,F,1,3,1,1,5,1,,0,,,,,,,,2907509,Catu,29,BA,0,0,0,0,,,,,1,,,,,,,,B,F,D,A,4.0,D,A,B,C,B,B,B,A,B,A,B,A,A,B,A,A,D,A,B,B
210052563255,2021,8,F,1,3,1,1,3,1,,0,,,,,,,,2910503,Entre Rios,29,BA,0,0,0,0,,,,,1,,,,,,,,H,D,F,B,2.0,A,A,B,C,A,A,B,A,A,A,A,A,A,A,A,A,B,A,A,B
210054208012,2021,2,F,1,3,1,2,0,2,1.0,0,2910909.0,Firmino Alves,29.0,BA,2.0,1.0,1.0,2916401,Itapetinga,29,BA,1,1,1,1,408.0,542.1,446.0,420.5,0,1.0,140.0,160.0,120.0,120.0,140.0,680.0,E,E,C,C,5.0,C,A,B,C,A,A,B,B,A,A,A,A,A,B,B,A,C,A,B,B
210051016383,2021,3,F,1,3,1,2,0,2,1.0,0,,,,,,,,2919157,Lapão,29,BA,1,1,1,1,566.3,521.1,566.5,416.5,0,1.0,100.0,140.0,100.0,120.0,180.0,640.0,F,E,D,F,2.0,B,A,B,C,A,A,B,A,A,A,A,A,A,B,A,A,C,A,B,A
210052752600,2021,8,F,1,3,1,1,5,1,,0,,,,,,,,2906873,Capim Grosso,29,BA,0,0,0,0,,,,,1,,,,,,,,E,E,A,B,1.0,A,A,B,B,A,A,B,A,A,A,A,A,A,A,A,A,B,A,A,B


In [0]:
type(newSubset)

Out[137]: pyspark.sql.dataframe.DataFrame

#### Agregação

O PySpark fornece um conjunto de funções para efetuarmos operaçõres de agregação. Algumas funções:
    
Função|Descrição|
------|---------|
mean(column)|Média dos valores|
max(column)|Valor máximo|
min(column)|Valor mínimo|
sum(column)|Soma|
avg(column)|Média|
agg(column)|Calcular mais de um valor agregado por vez|
count(column)|Contagem de valores|
countDistinct(column)|Contagem de valores distintos|

In [0]:
# aplica funções de agregação:

(
    df.select(F.mean('NU_NOTA_MT').alias('avg_nu_nota_mt'),
              F.max('NU_NOTA_MT').alias('max_nu_nota_mt')
             ).display()
)

avg_nu_nota_mt,max_nu_nota_mt
503.8612903225805,746.0


In [0]:
# aplica mais de uma função de agregação usando .agg():
(
    df
    .agg(
        F.count('NU_INSCRICAO').alias('QuantidadeInscritos'),
        F.countDistinct('NU_INSCRICAO').alias('QuantidadeInscritosDistintos'),
        F.min('NU_NOTA_MT').alias('min_nota_mt'),
        F.mean(F.col('NU_NOTA_MT')).alias('mean_nota_mt'),
        F.median(F.col('NU_NOTA_MT')).alias('median_nota_mt'),
        F.stddev_samp(F.col('NU_NOTA_MT')).alias('std_nota_mt'),
        F.max(F.col('NU_NOTA_MT')).alias('max_nota_mt') 
    )
    .display()
)

QuantidadeInscritos,QuantidadeInscritosDistintos,min_nota_mt,mean_nota_mt,median_nota_mt,std_nota_mt,max_nota_mt
50,50,363.5,503.8612903225806,492.5,90.29810141016014,746.0


#### Agrupamento

De forma parecida com o `GROUP BY` do SQL podemos usar a função `groupBy()` do PySpark para criar grupos e aplicar funções, como soma, máximo, mínimo, por grupos.

A lógica por trás do Group By é similar ao Pandas, em que usamos o método split()-apply()-combine().

<img src = 'https://www.w3resource.com/w3r_images/pandas-groupby-split-apply-combine.svg' width = 500/>

In [0]:
# estatística descritiva do desempenho em Matemática por gênero e tipo de escola
(
    df
    .filter(F.col('TP_ESCOLA') != 1)
    .groupBy('TP_SEXO', 'TP_ESCOLA')
    .agg(
        F.min('NU_NOTA_MT').alias('min_nota_mt'),
        F.mean('NU_NOTA_MT').alias('mean_nota_mt'),
        F.median('NU_NOTA_MT').alias('median_nota_mt'),
        F.max('NU_NOTA_MT').alias('max_nota_mt')
    )
    .display()
)

TP_SEXO,TP_ESCOLA,min_nota_mt,mean_nota_mt,median_nota_mt,max_nota_mt
F,3,499.9,499.9,499.9,499.9
F,2,363.5,443.7875,430.25,541.9
M,2,419.2,489.76666666666665,480.8,569.3
M,3,479.9,564.5999999999999,494.1,719.8


### Avançando

Para finalizar a aula iremos criar um DataFrame com métricas do ENEM agrupadas por município.

In [60]:
capitais_brasileiras = [
    "Rio Branco",
    "Maceió",
    "Macapá",
    "Manaus",
    "Salvador",
    "Fortaleza",
    "Brasília",
    "Vitória",
    "Goiânia",
    "São Luís",
    "Cuiabá",
    "Campo Grande",
    "Belo Horizonte",
    "Belém",
    "João Pessoa",
    "Curitiba",
    "Recife",
    "Teresina",
    "Rio de Janeiro",
    "Natal",
    "Porto Alegre",
    "Porto Velho",
    "Boa Vista",
    "Florianópolis",
    "São Paulo",
    "Aracaju",
    "Palmas"
]

In [61]:
df_enem = (
    df

    .withColumn('TP_SEXO_DESC', F.when(F.col('TP_SEXO') == 'M', 'Masculino')
                  .when(F.col('TP_SEXO') == 'F', 'Feminino')
                  .when(F.col('TP_SEXO').isNull(), 'missing')
                  .otherwise(F.lit('other'))
                  )
    
    .withColumn('TP_ESTADO_CIVIL_DESC', F.when(F.col('TP_ESTADO_CIVIL') == 0, 'Não informado')
                  .when(F.col('TP_ESTADO_CIVIL') == 1, 'Solteiro(a)')
                  .when(F.col('TP_ESTADO_CIVIL') == 2, 'Casado(a)/Mora com companheiro(a)')
                  .when(F.col('TP_ESTADO_CIVIL') == 3, 'Divorciado(a)/Desquitado(a)/Separado(a)')
                  .when(F.col('TP_ESTADO_CIVIL') == 4, 'Viúvo(a)')
                  .when(F.col('TP_ESTADO_CIVIL').isNull(), 'missing')
                  .otherwise(F.lit('other'))
                  )
    
    .withColumn('TP_ESCOLA_DESC', F.when(F.col('TP_ESCOLA') == 1, 'Não Respondeu')
                .when(F.col('TP_ESCOLA') == 2, 'Pública')
                .when(F.col('TP_ESCOLA') == 3, 'Privada')
                .when(F.col('TP_ESCOLA') == 4, 'Exterior')
                .when(F.col('TP_ESCOLA').isNull(), 'missing')
                .otherwise(F.lit('other'))
                )
    
    .withColumn('IN_ABSTENCAO_MT', F.when(F.col('TP_PRESENCA_MT') == 0, 1).otherwise(F.lit(0))
                )
    
    .withColumn('IN_ABSTENCAO_CN', F.when(F.col('TP_PRESENCA_CN') == 0, 1).otherwise(F.lit(0))
                )
    
    .withColumn('IN_ABSTENCAO_CH', F.when(F.col('TP_PRESENCA_CH') == 0, 1).otherwise(F.lit(0))
                )
    
    .withColumn('IN_ABSTENCAO_LC', F.when(F.col('TP_PRESENCA_LC') == 0, 1).otherwise(F.lit(0))
                )
    
    .withColumn('IN_ABSTENCAO_01d', F.when((F.col('IN_ABSTENCAO_LC') == 1) & (F.col('IN_ABSTENCAO_CH') == 1), 1).otherwise(F.lit(0))
                )
    
    .withColumn('IN_ABSTENCAO_02d', F.when((F.col('IN_ABSTENCAO_MT') == 1) & (F.col('IN_ABSTENCAO_CN') == 1), 1).otherwise(F.lit(0))
                )
    
    .withColumn('IN_CAPITAL_PROVA', F.when(F.col('NO_MUNICIPIO_PROVA').isin(capitais_brasileiras), 1).otherwise(F.lit(0))
                )

)


df_enem_metrics = (
    df_enem
    .groupBy('SG_UF_PROVA', 'CO_UF_PROVA', 'NO_MUNICIPIO_PROVA', 'CO_MUNICIPIO_PROVA', 'NU_ANO')
    .agg(
        F.count('NU_INSCRICAO').alias('NU_TOTAL_INSCRITOS'),

        F.count(F.when(F.col('TP_SEXO') == 'F', F.lit(1))).alias('NU_RATIO_MULHER'),

        F.count(F.when(F.col('TP_ESCOLA') == 1, F.lit(1))).alias('NU_ESCOLA_MISSING'),

        F.count(F.when(F.col('TP_ESCOLA') == 2, F.lit(1))).alias('NU_ESCOLA_PUBLICA'),

        F.count(F.when(F.col('TP_ESCOLA') == 3, F.lit(1))).alias('NU_ESCOLA_PRIVADA'),

        F.count(F.when(F.col('TP_ESCOLA') == 4, F.lit(1))).alias('NU_ESCOLA_EXTERIOR'),

        F.mean('IN_ABSTENCAO_01d').alias('NU_ABSTENCAO_01d'),

        F.mean('IN_ABSTENCAO_02d').alias('NU_ABSTENCAO_02d'),

        F.mean(
            F.when(
                (F.col('NU_NOTA_MT') != 0) 
                & (F.col('NU_NOTA_MT').isNotNull()), F.col('NU_NOTA_MT'))).alias('AVG_NOTA_MT'),
        
        F.mean(
            F.when(
                (F.col('NU_NOTA_CN') != 0) 
                & (F.col('NU_NOTA_CN').isNotNull()), F.col('NU_NOTA_CN'))).alias('AVG_NOTA_CN'),
        
        F.mean(
            F.when(
                (F.col('NU_NOTA_CH') != 0) 
                & (F.col('NU_NOTA_CH').isNotNull()), F.col('NU_NOTA_CH'))).alias('AVG_NOTA_CH'),
        
        F.mean(
            F.when(
                (F.col('NU_NOTA_LC') != 0) 
                & (F.col('NU_NOTA_LC').isNotNull()), F.col('NU_NOTA_LC'))).alias('AVG_NOTA_LC'),
        )
    .na.fill(value = -1, subset = ['AVG_NOTA_MT', 'AVG_NOTA_CN', 'AVG_NOTA_CH', 'AVG_NOTA_LC'])
)

In [62]:
display(df_enem_metrics)

DataFrame[SG_UF_PROVA: string, CO_UF_PROVA: int, NO_MUNICIPIO_PROVA: string, CO_MUNICIPIO_PROVA: int, NU_ANO: int, NU_TOTAL_INSCRITOS: bigint, NU_RATIO_MULHER: bigint, NU_ESCOLA_MISSING: bigint, NU_ESCOLA_PUBLICA: bigint, NU_ESCOLA_PRIVADA: bigint, NU_ESCOLA_EXTERIOR: bigint, NU_ABSTENCAO_01d: double, NU_ABSTENCAO_02d: double, AVG_NOTA_MT: double, AVG_NOTA_CN: double, AVG_NOTA_CH: double, AVG_NOTA_LC: double]

In [63]:
df_enem_metrics.write.saveAsTable(
    name = 'default.microdados_enem_metrics',
    mode = 'overwrite'
    )

Py4JJavaError: An error occurred while calling o711.saveAsTable.
: java.lang.RuntimeException: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:735)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:270)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:286)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:978)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:660)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:700)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:788)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:356)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:188)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:269)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:374)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:402)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:374)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.execution.datasources.DataSource.writeAndRead(DataSource.scala:511)
	at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.saveDataIntoTable(createDataSourceTables.scala:229)
	at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.run(createDataSourceTables.scala:183)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
	at org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:700)
	at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:678)
	at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:571)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.fileNotFoundException(Shell.java:547)
	at org.apache.hadoop.util.Shell.getHadoopHomeDir(Shell.java:568)
	at org.apache.hadoop.util.Shell.getQualifiedBin(Shell.java:591)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:688)
	at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:79)
	at org.apache.hadoop.conf.Configuration.getTimeDurationHelper(Configuration.java:1907)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1867)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1840)
	at org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
	at org.apache.hadoop.util.ShutdownHookManager$HookEntry.<init>(ShutdownHookManager.java:207)
	at org.apache.hadoop.util.ShutdownHookManager.addShutdownHook(ShutdownHookManager.java:304)
	at org.apache.spark.util.SparkShutdownHookManager.install(ShutdownHookManager.scala:181)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks$lzycompute(ShutdownHookManager.scala:50)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks(ShutdownHookManager.scala:48)
	at org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:153)
	at org.apache.spark.util.ShutdownHookManager$.<init>(ShutdownHookManager.scala:58)
	at org.apache.spark.util.ShutdownHookManager$.<clinit>(ShutdownHookManager.scala)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:242)
	at org.apache.spark.util.SparkFileUtils.createTempDir(SparkFileUtils.scala:103)
	at org.apache.spark.util.SparkFileUtils.createTempDir$(SparkFileUtils.scala:102)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:94)
	at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:372)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:964)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1120)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1129)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset.
	at org.apache.hadoop.util.Shell.checkHadoopHomeInner(Shell.java:467)
	at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:438)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:515)
	... 25 more


In [0]:
_df_enem_metrics = spark.table('default.microdados_enem_metrics')
display(_df_enem_metrics)

SG_UF_PROVA,CO_UF_PROVA,NO_MUNICIPIO_PROVA,CO_MUNICIPIO_PROVA,NU_ANO,NU_TOTAL_INSCRITOS,NU_RATIO_MULHER,NU_ESCOLA_MISSING,NU_ESCOLA_PUBLICA,NU_ESCOLA_PRIVADA,NU_ESCOLA_EXTERIOR,NU_ABSTENCAO_01d,NU_ABSTENCAO_02d,AVG_NOTA_MT,AVG_NOTA_CN,AVG_NOTA_CH,AVG_NOTA_LC
BA,29,Porto Seguro,2925303,2021,1,1,1,0,0,0,0.0,0.0,382.3,403.6,433.6,442.3
BA,29,Caculé,2905008,2021,1,0,0,1,0,0,0.0,0.0,569.3,502.7,612.8,502.3
BA,29,Entre Rios,2910503,2021,1,1,1,0,0,0,1.0,1.0,-1.0,-1.0,-1.0,-1.0
BA,29,Santana,2928208,2021,1,1,1,0,0,0,0.0,0.0,528.0,572.3,653.4,577.8
BA,29,Macaúbas,2919801,2021,1,1,0,1,0,0,0.0,0.0,465.0,393.5,378.9,354.6
BA,29,Senhor do Bonfim,2930105,2021,1,0,0,1,0,0,0.0,0.0,419.2,448.1,482.9,532.8
BA,29,Cachoeira,2904902,2021,1,1,1,0,0,0,0.0,0.0,510.1,430.5,597.3,560.5
BA,29,Barreiras,2903201,2021,1,1,1,0,0,0,1.0,1.0,-1.0,-1.0,-1.0,-1.0
BA,29,Itaberaba,2914703,2021,2,0,1,1,0,0,1.0,1.0,-1.0,-1.0,-1.0,-1.0
BA,29,Capim Grosso,2906873,2021,2,2,1,1,0,0,1.0,1.0,-1.0,-1.0,-1.0,-1.0
