# Dataframes e SparkSQL

Spark oferece abstrações de mais alto nível chamadas APIs estruturadas. Ao todo, são três: Dataframes, Datasets e SparkSQL. 

![Pilha de APIs, disponível em: https://www.oreilly.com/library/view/spark-the-definitive/9781491912201/](https://izhangzhihao.github.io/assets/images/spark-05.png)

Datasets não são implementados em PySpark, porém veremos nesse notebook exemplos de uso das outras duas.

**Dataframes** são estruturas tabulares, assim como os DataFrames de Pandas. A diferença, aqui, é que Dataframes em Spark são _distribuídos_ e construídos em cima de RDDs. 

**SparkSQL** é um conjunto de funcionalidades que são operadas em Dataframes. Veremos que podemos manipular Dataframes tanto programaticamente (através de _transformações_) quanto por linguagem SQL. Além disso, SparkSQL oferece uma série de outras ferramentas para a realização de operações tabulares distribuídas.

Vamos começar importando PySpark e criando um objeto `SparkContext`, assim como fizemos anteriormente. Lembre-se que `SparkContext` é a porta de entrada para manipulação de RDDs.

In [1]:
import findspark
findspark.init()

import pyspark
import random

sc = pyspark.SparkContext(appName='Dataframes e SparkSQL')

2021-12-11 09:45:33,761 WARN util.Utils: Your hostname, bigdatavm-VirtualBox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
2021-12-11 09:45:33,763 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2021-12-11 09:45:34,923 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# O objeto SparkSession

Para manipularmos Dataframes e utilizar as funções do SparkSQL, porém, precisamos criar um objeto `SparkSession`. O `SparkSession` é um objeto que é construído em cima do SparkContext, e é o ponto de entrada para as APIs estruturadas do Spark. 
https://imgs.developpaper.com/imgs/147289973-592685e12263f_articlex.png ![SparkSession vs SparkContext em: https://developpaper.com/application-sparksession-sparkcontext-and-rdd-in-spark-and-their-extensions/](https://imgs.developpaper.com/imgs/147289973-592685e12263f_articlex.png)

Para inicializar um SparkSession, importamos sua definição de `pyspark.sql` e criamos um objeto com `SparkSession.builder.getOrCreate()`. Esse método checa se já existe um `SparkSession` ativo, e se não existir, cria um novo. Observe que não precisei mencionar o `SparkContext`. Ele recupera sua instância automaticamente, ou a cria se não existir também.

In [5]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

print(spark)

<pyspark.sql.session.SparkSession object at 0x7fc654950e20>


# Criando Dataframes

Dataframes são objetos tabulares (lembre-se do _Pandas_) distribuídos. Você pode criar um dataframe lendo arquivos csv, JSON, [ORC](https://orc.apache.org/) e [Parquet](https://github.com/apache/parquet-format). Além disso, você pode criar Dataframes a partir de RDDs também. 

Vamos fazer alguns exemplos com a versão simplificada de leitura e criação de arquivos. Para mais opções, consultar a [documentação](https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html):

In [6]:
# importando de um arquivo csv

flights_df = spark.read.csv('file:///home/bigdata-vm/Desktop/BigDataAulasPUC/Datasets/Flights_old/flights.csv')
flights_df.show(10)

+----------+-----+---+---+---+----+------+----+------+------+-------+
|       _c0|  _c1|_c2|_c3|_c4| _c5|   _c6| _c7|   _c8|   _c9|   _c10|
+----------+-----+---+---+---+----+------+----+------+------+-------+
|2014-04-01|19805|  1|JFK|LAX|0854| -6.00|1217|  2.00|355.00|2475.00|
|2014-04-01|19805|  2|LAX|JFK|0944| 14.00|1736|-29.00|269.00|2475.00|
|2014-04-01|19805|  3|JFK|LAX|1224| -6.00|1614| 39.00|371.00|2475.00|
|2014-04-01|19805|  4|LAX|JFK|1240| 25.00|2028|-27.00|264.00|2475.00|
|2014-04-01|19805|  5|DFW|HNL|1300| -5.00|1650| 15.00|510.00|3784.00|
|2014-04-01|19805|  6|OGG|DFW|1901|126.00|0640| 95.00|385.00|3711.00|
|2014-04-01|19805|  7|DFW|OGG|1410|125.00|1743|138.00|497.00|3711.00|
|2014-04-01|19805|  8|HNL|DFW|1659|  4.00|0458|-22.00|398.00|3784.00|
|2014-04-01|19805|  9|JFK|LAX|0648| -7.00|1029| 19.00|365.00|2475.00|
|2014-04-01|19805| 10|LAX|JFK|2156| 21.00|0556|  1.00|265.00|2475.00|
+----------+-----+---+---+---+----+------+----+------+------+-------+
only showing top 10 

In [9]:
# criando a partir de um RDD

text_rdd = sc.textFile('file:///home/bigdata-vm/Desktop/BigDataAulasPUC/Datasets/shakespeare.txt')
print(text_rdd.take(5))

def extract_key(line):
    split = line.split('\t', 1)
    return (split[0], split[1].replace('\t', ' ').strip())

text_rdd = text_rdd.map(extract_key)
print(text_rdd.take(5))

text_df = spark.createDataFrame(text_rdd)
text_df.show(5)

['hamlet@0\t\tHAMLET', 'hamlet@8\t', 'hamlet@9\t', 'hamlet@10\t\tDRAMATIS PERSONAE', 'hamlet@29\t']
[('hamlet@0', 'HAMLET'), ('hamlet@8', ''), ('hamlet@9', ''), ('hamlet@10', 'DRAMATIS PERSONAE'), ('hamlet@29', '')]
+---------+-----------------+
|       _1|               _2|
+---------+-----------------+
| hamlet@0|           HAMLET|
| hamlet@8|                 |
| hamlet@9|                 |
|hamlet@10|DRAMATIS PERSONAE|
|hamlet@29|                 |
+---------+-----------------+
only showing top 5 rows



# Schemas

Quando lendo de um csv ou criando um Dataframe a partir de um RDD, o Spark pode fazer múltiplas leituras para automaticamente tentar inferir qual é o _Schema_ do Dataframe, ou seja, qual é a tipagem de cada coluna e se ela pode ter ou não valores _null_. 

In [12]:
import time

flights_df.printSchema()

#checar schema
start_time = time.time()
flights_df = spark.read.csv('file:///home/bigdata-vm/Desktop/BigDataAulasPUC/Datasets/Flights_old/flights.csv', inferSchema=True)
print(time.time() - start_time)
flights_df.printSchema()


root
 |-- _c0: string (nullable = true)
 |-- _c1: integer (nullable = true)
 |-- _c2: integer (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: integer (nullable = true)
 |-- _c6: double (nullable = true)
 |-- _c7: integer (nullable = true)
 |-- _c8: double (nullable = true)
 |-- _c9: double (nullable = true)
 |-- _c10: double (nullable = true)

0.722050666809082
root
 |-- _c0: string (nullable = true)
 |-- _c1: integer (nullable = true)
 |-- _c2: integer (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: integer (nullable = true)
 |-- _c6: double (nullable = true)
 |-- _c7: integer (nullable = true)
 |-- _c8: double (nullable = true)
 |-- _c9: double (nullable = true)
 |-- _c10: double (nullable = true)





A inferência de _Schema_ pode causar lentidão na leitura dos dados, devido a necessidade de eles serem lidos múltiplas vezes. Você pode, alternativamente, definir um _Schema_ e utilizá-lo na leitura. 

In [18]:
#criar um Schema e reimportar flights.csv

import pyspark.sql.types as types

flights_headers_file = '/home/bigdata-vm/Desktop/BigDataAulasPUC/Datasets/Flights_old/flights_header.csv'
with open(flights_headers_file) as f:
    flights_header = f.read()

list_headers = flights_header.split(',')

flight_schema = types.StructType() \
                        .add(list_headers[0], types.DateType(), True) \
                        .add(list_headers[1], types.IntegerType(), True) \
                        .add(list_headers[2], types.IntegerType(), True) \
                        .add(list_headers[3], types.StringType(), True) \
                        .add(list_headers[4], types.StringType(), True) \
                        .add(list_headers[5], types.IntegerType(), True) \
                        .add(list_headers[6], types.DoubleType(), True) \
                        .add(list_headers[7], types.IntegerType(), True) \
                        .add(list_headers[8], types.DoubleType(), True) \
                        .add(list_headers[9], types.DoubleType(), True) \
                        .add(list_headers[10].strip(), types.DoubleType(), True) \

start_time = time.time()
flight_schema_df = spark.read.format('csv').schema(flight_schema).load('file:///home/bigdata-vm/Desktop/BigDataAulasPUC/Datasets/Flights_old/flights.csv')
print(time.time() - start_time)

0.020491600036621094


# Operações em Dataframes

Vamos começar a realizar operações em Dataframes. Nesse primeiro momento, iremos ver programaticamente algumas das principais funções para manipulação de Dataframes. A maioria aqui é _transformação_, exceto aquelas que precisam devolver algum retorno para o Programa _Driver_, como `count()`, `show()` e `collect()`.

### Visualizar informações sobre um Dataframe

Vamos ver algumas funções básicas para retornar informações a respeito do Dataframe como um todo. São elas:
- `count()`: retorna o número de linhas
- `len(df.columns)`: `df.columns` retorna uma lista com o nome das colunas. Se aplicarmos `len()`, podemos ver a quantidade de colunas em um Dataframe.
- `describe()`: retorna estatísticas sobre as colunas de um Dataframe. Podemos definir quais colunas queremos, ou a tabela inteira se não passarmos parâmetros.



In [21]:
print(f'Quantidade de linhas {flight_schema_df.count()}')
print(f'Quantidade de colunas {len(flight_schema_df.columns)}')
print('Estatisticas gerais')
flight_schema_df.describe().show()
flight_schema_df.describe(['min_atraso_partida', 'min_atraso_chegada']).show()

Quantidade de linhas 476881
Quantidade de colunas 11
Estatisticas gerais


                                                                                

+-------+-----------------+-----------------+------+-------+------------------+------------------+------------------+------------------+------------------+-----------------+
|summary|     id_companhia|         num_voos|origem|destino|      hora_partida|min_atraso_partida|      hora_chegada|min_atraso_chegada|           duracao|        distancia|
+-------+-----------------+-----------------+------+-------+------------------+------------------+------------------+------------------+------------------+-----------------+
|  count|           476881|           476881|476881| 476881|            476881|            476881|            476881|            476881|            476881|           476881|
|   mean|19990.46749818089| 2245.92602766728|  null|   null|1334.1795374527399| 8.313877046894298| 1485.241848595352| 4.728577989058067|111.04921353545224|794.8585013871385|
| stddev|398.4945595986355|1841.066184447977|  null|   null|483.03628051072656| 33.34531644580256|509.40814513813245|35.5070451051

### Select

O comando `select()` opera em nível de **coluna**, filtrando quais colunas você quer exibir. Ele pode ainda criar novas colunas, com métodos que analisam valores de uma ou mais colunas linha a linha. É o equivalente ao comando `SELECT` de uma consulta SQL. 

In [25]:
print('Selecionando id da companhia, origem e destino')
flight_schema_df.select(flight_schema_df.id_companhia, flight_schema_df.origem, flight_schema_df.destino).show()
print('Selecionando id da companhia, origem e destino e colocando o tempo total de atraso do vôo')
flight_schema_df.select('id_companhia', 'origem', 'destino', (flight_schema_df.min_atraso_partida + flight_schema_df.min_atraso_chegada).alias('atraso_total')).show()
print('Selecionando data, origem, destino e o tempo de voo em horas')


Selecionando id da companhia, origem e destino
+------------+------+-------+
|id_companhia|origem|destino|
+------------+------+-------+
|       19805|   JFK|    LAX|
|       19805|   LAX|    JFK|
|       19805|   JFK|    LAX|
|       19805|   LAX|    JFK|
|       19805|   DFW|    HNL|
|       19805|   OGG|    DFW|
|       19805|   DFW|    OGG|
|       19805|   HNL|    DFW|
|       19805|   JFK|    LAX|
|       19805|   LAX|    JFK|
|       19805|   LAX|    JFK|
|       19805|   OGG|    LAX|
|       19805|   BOS|    ORD|
|       19805|   SFO|    JFK|
|       19805|   ATL|    MIA|
|       19805|   SFO|    JFK|
|       19805|   JFK|    LAX|
|       19805|   SFO|    JFK|
|       19805|   JFK|    LAX|
|       19805|   LAX|    JFK|
+------------+------+-------+
only showing top 20 rows

Selecionando id da companhia, origem e destino e colocando o tempo total de atraso do vôo
+------------+------+-------+------------+
|id_companhia|origem|destino|atraso_total|
+------------+------+-------+--

### Filter

A função `filter()` opera em nível de **linhas**, e utiliza operações lógicas que retornam resultados _binários_. É similar ao comando `WHERE` de uma consulta SQL.

In [None]:
print('Apresentar apenas os vôos com saída adiantada')
print('Apresentar todos os vôos que não saíram nem chegaram em JFK')
print('Selecionar as colunas dataVoo, origem, destino e exibir apenas os vôos com menos de 4 horas de duração')

### GroupBy

A função `groupBy()` permite realizar operações de agregação no conjunto de dados. Ele equivale ao comando `GROUP BY` de consultas SQL. Pode receber como parâmetro colunas (onde opera a agregação). Se não receber nenhum parâmetro, realiza a agregação na _tabela inteira_. `groupBy()` retorna um objeto `GroupedData`, que contém várias funções de agregação como `sum()`, `mean()`, `max()`, `min()`, entre outros. A função `agg()` permite que seja feita múltiplas agregações (por exemplo, média e desvio padrão) nos dados.

In [None]:

print('Ver o máximo de cada coluna numérica')
print('Ver o tempo médio de atraso de chegada para cada companhia')
print('Ver o tempo médio e desvio padrão de duracao para cada vôo específico')
print('Ver apenas o vôo com menor atraso total (partida + chegada) de cada par origem-destino')

## Adicionar e remover colunas/linhas

Mais algumas funções que manipulam linhas e colunas, porém não criam _subconjuntos_ dos dados como as funções acima.

- `withColumn()`: adiciona uma coluna nova ao Dataframe e o retorna
- `drop()`: remove uma coluna do Dataframe e o retorna
- `distinct()`: remove linhas duplicadas  
- `dropDuplicates()`: remove linhas duplicadas, porém é possível passar um subconjunto de colunas para realizar a checagem

In [None]:
print('Adicionando uma nova coluna com a duração da da viagem em horas')
print('Removendo data, hora_partida e hora_chegada e duracao')
print('Removendo duplicatas')
print('Mantendo apenas um vôo por origem-destino')

### OrderBy

A função `orderBy()` aceita dois argumentos: uma coluna (ou lista de colunas) que deverão ser usadas na ordenação, e um _boolean_ (ou lista de _booleans_) definindo ordem ascendente ou não.

In [None]:
print("Ordenando decrescentemente por duração")
print("Ordenando ascendentemente por origem e destino")

### Join

A função `join()` une dois Dataframes a partir de uma coluna em comum. Há vários tipos diferentes de joins no Spark:

![Tipos de join. Disponível em: https://medium.com/bild-journal/pyspark-joins-explained-9c4fba124839](https://miro.medium.com/max/622/1*6d7MzkjxS0eBWjOJN5TaAQ.jpeg)

Todos os tipos de join mostrados acima podem ser usados junto com o método. Vamos ver dois exemplos: `'left_outer'` e `'inner'`. Os demais tipos estão listados na [documentação](https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrame.join.html).

In [None]:
print('Lendo novos datasets')
print('Fazendo left outer join')
print('Fazendo inner join')

# Usando SQL

Praticamente todas as operações que vimos acima podem ser descritas diretamente em SQL e aplicadas em um Dataframe. Spark possui um método `sql()` que recebe uma string com sintaxe SQL e a processa de maneira distribuída. O resultado é idêntico ao que fizemos programaticamente. No fim, vai de gosto e familiaridade do programador para escolher qual opção prefere.

Antes de realizarmos consultas SQL, é importante que criemos uma visualização temporária SQL com `createOrReplaceTempView()`. Vamos fazer alguns exemplos:

In [None]:

print('Distancia media de todos voos')
print('Tempo medio de cada itinerario')
print('Apenas itinerários com tempo medio  menor que 100')

# Usando funções com pyspark.sql.functions

O pacote `pyspark.sql.functions` fornece uma série de implementações de funções padrões para a manipulação de Dataframes, Datasets e consultas SQL. Podemos utilizar neste pacote, dentre muitas outras, funções aritméticas e de transformação de `Strings` e `Arrays`. Vamos importar o pacote e usá-lo da seguinte forma:

`import pyspark.sql.functions as F`


### Transformações de String

Vamos começar com algumas funções de manipulação de String:

- `upper()` e `lower()`: transformam o texto inteiro em caixa alta ou baixa, respectivamente.

- `split()`: quebram um String, transformando ele em uma lista dentro de uma célula de um Dataframe (coluna se torna do tipo `ArrayType()`.

- `contains()`: checa se uma célula contém uma substring

- `cast()`: converte um String em outro tipo

In [None]:

print('Caixa alta e caixa baixa')
print('Quebrando texto em tokens')
print('Filtrando por KING')

### Funções com ArrayType()

Dataframes conseguem guardar `Arrays` em suas células, mas não conseguem operar diretamente com elas. A manipulação deve ser feita através de funções padrões em `pyspark.sql.functions` ou de UDFs criadas por usuários (veremos elas logo).

- `size()`: retorna o tamanho do Array em uma célula.

- `getItem()`: retorna um elemento de um Array em uma célula.

In [None]:
print('Descobrindo o numero de elementos em cada linha:')
print('Recuperando o primeiro token de cada linha')

### Condicionais

O pacote `pyspark.sql.functions` fornece também as funções condicionais `when()` e `otherwise()`. 

- `when()`: Age como uma cláusula `if`.
- `otherwise()`: Corresponde ao `else`.

In [None]:
print('Retornar colunas Split, Primeiro_token e criar uma nova verificando se há a palavra "king"')
print('Criar coluna dizendo se há "Claudius" (valor 1), "Polonius" (valor 2) ou outra pessoa (valor 0)')

### Funções definidas por usuários

O pacote `pyspark.sql.functions` fornece também a possibilidade de criar funções que ele não tem implementado por padrão. Essas são chamadas de UDFs, ou _User-Defined Functions_, e têm o seguinte formato:

`F.udf(funcão, tipo_de_retorno)`

In [None]:
print('Contar numero de menções "hamlet" na linha')

# Exercício: modificando `key_text_df`

Vamos transformar o nosso Dataframe usado nos exemplos acima. Faça os seguintes passos:
- Remova as linhas onde o campo chave esteja em branco 
- Crie uma UDF para remover tudo que houver antes do @ na coluna chave e salve em uma coluna chamada 'id_linha'
- Crie uma coluna com o texto em caixa baixa e a nomeie de 'texto'
- Remova as colunas de chave e valor antigas
- Crie uma nova coluna com os tokens de 'texto'. A nomeie de 'tokens'
- Crie uma coluna com a quantidade de tokens. A nomeie de 'num_tokens'
- Remova as linhas em branco e que contém apenas '|'
- Extraia o primeiro e o ultimo token de cada linha, e os guarde em colunas nomeadas por 'primeiro' e 'ultimo'
- Filtre o dataframe para manter somente linhas que não mencionam 'hamlet' (utilize o sinal ~ como negação)

# Visualizando Dados graficamente

Infelizmente, Spark não suporta visualização de dados nativamente. Para visualizá-los, podemos:
- Utilizar bibliotecas de terceiros, como a `pyspark_dist_explore` (não recomendado, última versão em 2019), ou
- Retirar uma amostra dos dados e transformá-la em Pandas Dataframes, e aproveitar as bibliotecas `Matplotlib` ou `Seaborn`

# Particionamento e otimização

Veremos algumas funções aqui que lidam diretamente com configurações relacionadas com o formato distribuído dos Dataframes em Spark.


### Criando IDs

Em um ambiente distribuído, criar IDs seguindo abordagens como contadores e geradores aleatórios não são ideais, dado que múltiplas partições irão lidar com múltiplos _pedaços_ dos dados. Utilizando a função `monotonically_increasing_id()` do pacote `pyspark.sql.functions` possibilita ao Spark criar IDs de forma distribuída e eficiente.

### Manipulando partições

Muitas das vezes precisamos manipular em quantas partições dividimos nossos dados, para aumentar eficiência. Para manipular partições usamos as seguintes funções:

- `rdd.getNumPartitions()`: recupera a quantidade de partições seu RDD ou Dataframe está alocando.

- `repartition()`: modifica a quantidade de partições que seu RDD ou Dataframe está alocando.

- `coalesce()`: reduz a quantidade de partições (mais eficiente que `repartition()`.

### Caching

_Caching_ é a operação de guardar Dataframes na memória ou em disco. Essa operação pode ajudar a melhorar o desempenho em transformações tardias e reduzir a quantidade de recursos gastos. **Cuidado:** bases de dados muito grandes podem não caber na memória do cluster, e _swap_ com o disco pode ser custoso.

Para guardar em cache seu Dataframe, basta invocar a função `cache()`. Para tirá-lo do cache, utilize a função `unpersist()`.

# Exemplo prático: pipeline de dados

Vamos fazer uma análise um pouco mais detalhada de um dataset. Vamos explorar o _"2017 St Paul MN Real Estate Dataset"_ e iremos tentar limpar e transformar ele para que seja utilizável por algum algoritmo de aprendizado de máquina para prever o **preço de venda** de um imóvel. Começaremos importando ele e checando seu Schema.

Olhando o Schema, conseguimos ver que a variável que gostaríamos de prever é a `SalesClosePrice`. Vamos ver sua distribuição.

Precisamos escolher características (colunas) que possam ajudar o sistema a aprender o preço de cada venda. Vamos começar escolhendo variáveis que tenham uma boa correlação com `SalesClosePrice`. Correlação nem sempre implica em características relevantes, mas é um bom lugar pra começar.

Mas apenas a correlação não me convence. Quero analisar também estas colunas tentando traçar modelos lineares entre tais variáveis e `SalesClosePrice`. 

Analisando os gráficos, estou convencido que algumas dessas colunas são realmente importantes. Vou escolhê-las e filtrar `real_estate_df` de modo que mantenha apenas elas.

Há duplicatas? E valores nulos? Se sim, vamos removê-los. 

Ótimo, não tinhamos nenhum problema com dados faltantes. O próximo passo é normalizarmos os dados. Eles estão com magnitudes diferentes, o que pode interferir no aprendizado do modelo. Podemos utilizar a normalização em z-score para aproximá-los a uma distribuição normal. Vamos fazer o exemplo com `LISTPRICE`.

Uma vez que todas as colunas estão normalizadas, o dataset está pronto para ser consumido por algum algoritmo de aprendizado de máquina! Bom trabalho!