# <span style="color: green; font-size: 40px; font-weight: bold;"> Projeto (Usando PySpark e Spark SQL)</span>

<br> <br>


# Analisando a Performance da Logística de Entrega com PySpark e Spark SQL

<br>

### Contexto

Neste mini-projeto, vamos explorar um contexto de negócios na área de logística: a **análise da performance de entregas de uma transportadora**. O projeto será desenvolvido desde a concepção do problema de negócio até a entrega de insights extraídos dos dados utilizando ferramentas comuns de análise de dados no dia a dia de um Cientista de Dados. Utilizaremos o Apache Spark SQL para realizar consultas e análises nos dados, com foco em agregação, funções Window e parse de data.

<br>

### Objetivo

O objetivo deste mini-projeto é **analisar um conjunto de dados de uma transportadora responsável por entregas de produtos**. Utilizaremos o Spark SQL para extrair insights e compreender como está a performance da logística de entrega da empresa. As soluções para cada pergunta de negócio serão apresentadas em Linguagem SQL e com o uso de funções do Spark SQL.

<br>

### Pergunta de Negócio Principal

> "Como podemos analisar a performance de entregas dos veículos de uma transportadora usando dados de horários de entregas?"

<br>

### Entregável

O entregável deste mini-projeto será uma **série de análises e insights sobre a performance de entregas de uma transportadora**. As análises serão realizadas utilizando o Spark SQL em um conjunto de dados fictício que simula os registros de entregas. O processo incluirá a concepção do problema de negócio, preparação dos dados e extração de insights.

<br>

### Sobre o Conjunto de Dados

Os dados utilizados neste mini-projeto são fictícios e representam os registros de entregas de uma transportadora. Cada registro contém o ID do veículo, a entrega realizada e o horário da entrega. O dataset é pequeno para permitir a realização rápida das consultas e demonstração de diversas funções e notações do Spark SQL.

<br>

O dataset possui 3 colunas:

<br>

<table border="2">
  <tr>
    <th style="text-align: center; font-size: 16px;">Nome da Coluna</th>
    <th style="text-align: center; font-size: 16px;">Tipo de Dado</th>
    <th style="text-align: center; font-size: 16px;">Descrição</th>
  </tr>
  <tr>
    <td>id_veiculo</td>
    <td>integer</td>
    <td>Identificação do veículo que realizou a entrega.</td>
  </tr>
  <tr>
    <td>entrega</td>
    <td>string</td>
    <td>Nome da entrega realizada.</td>
  </tr>
  <tr>
    <td>horario</td>
    <td>string</td>
    <td>Horário em que a entrega foi realizada (formato HH:MMa).</td>
  </tr>
</table>

<br> <br> <br>

# Importando Pacotes

In [1]:
# Imports

# Biblioteca principal do Apache Spark para processamento distribuído de grandes volumes de dados.
import pyspark

# Cria e gerencia a conexão com o cluster Spark.
from pyspark import SparkContext, SparkConf

# Ponto de entrada para criar DataFrames e utilizar funcionalidades do Spark SQL.
from pyspark.sql import SparkSession

# Cria especificações de janela para realizar operações como funções de ranking e agregação sobre partições 
# de dados.
from pyspark.sql import Window

# Refere-se a uma coluna em um DataFrame, permitindo manipulação de colunas.
from pyspark.sql.functions import col

# Atribui números únicos a linhas dentro de uma partição, com base na ordem especificada.
from pyspark.sql.functions import row_number

# Função de janela que retorna o valor da próxima linha em relação à linha atual dentro de uma partição.
from pyspark.sql.functions import lead  

# Funções agregadas que calculam o valor mínimo e máximo de uma coluna, respectivamente.
from pyspark.sql.functions import min, max

# Converte uma string de data/hora para um formato de timestamp Unix.
from pyspark.sql.functions import unix_timestamp

# Biblioteca para cálculos numéricos e manipulação de arrays
import numpy as np

<br> <br>

# <span style="color: green; font-size: 38px; font-weight: bold;">Preparando o Ambiente Spark</span>

In [2]:
# Definindo semente aleatória (seed) para reprodutibilidade do notebook
rnd_seed = 23
np.random.seed = rnd_seed
np.random.set_state = rnd_seed

# Se houver uma sessão Spark ativa, encerre-a
if 'sc' in globals():
    sc.stop()

if 'spark' in globals():
    spark.stop()


# Criando o Spark Context
conf = SparkConf().setAppName("Mini-Projeto4") \
                  .set("spark.ui.showConsoleProgress", "false") \
                  .set("spark.executor.heartbeatInterval", "20s") \
                  .set("spark.eventLog.enabled", "false") \
                  .set("spark.sql.shuffle.partitions", "2") \
                  .set("spark.sql.debug.maxToStringFields", "100") \
                  .set("spark.executor.memory", "4g") \
                  .set("spark.driver.memory", "4g") \
                  .set("spark.driver.maxResultSize", "2g")  # Configuração adicional para limitar o tamanho do resultado

# Criar o Spark Context e a Spark Session
sc = SparkContext(conf=conf)
spark = SparkSession.builder.config(conf=conf).getOrCreate()

# Ajustar o nível de log para ERROR
sc.setLogLevel("ERROR")

# Configurar log4j para suprimir avisos (deixar como comentário e volta ao normal)
log4j_logger = sc._jvm.org.apache.log4j
log4j_logger.LogManager.getLogger("org").setLevel(log4j_logger.Level.ERROR)
log4j_logger.LogManager.getLogger("akka").setLevel(log4j_logger.Level.ERROR)

# Visualizar o objeto spark_session
spark

24/07/30 12:18:46 WARN Utils: Your hostname, eduardo-Inspiron-15-3520 resolves to a loopback address: 127.0.1.1; using 192.168.0.13 instead (on interface wlp0s20f3)
24/07/30 12:18:46 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/07/30 12:18:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


<br><br>

# <span style="color: green; font-size: 38px; font-weight: bold;">Carregando os Dados</span>

- Vamos carregar os Dados diretamente como **Dataframe do Spark** pois **não vamos fazer análise exploratória de dados**.

In [9]:
# Nome do arquivo
arquivo = 'projeto04/dados/dataset.txt'

# Carrega como dataframe do Spark
df = spark.read.csv(arquivo, header = True)

# Tipo
print(type(df), '\n')

# Visualiza primeiras 5 linhas
df.show(5)

<class 'pyspark.sql.dataframe.DataFrame'> 

+----------+---------+-------+
|id_veiculo|  entrega|horario|
+----------+---------+-------+
|       298|Entrega 1|  7:58a|
|       298|Entrega 2|  8:04a|
|       298|Entrega 3|  8:17a|
|       298|Entrega 4|  8:28a|
|       298|Entrega 5|  8:33a|
+----------+---------+-------+
only showing top 5 rows



<br> <br>

# Criando Tabela Temporária

<br>

- A criação de uma **tabela temporária** permite realizar consultas SQL nos dados, o que é útil para análises rápidas e interativas.
- A tabela temporária existe somente nesta sessão.

In [4]:
# Cria tabela temporária
df.createOrReplaceTempView("tb_logistica")

<br>

## Executando Queries SQL

- Vamos através de Queries visualizar os dados

In [5]:
# Verificando nome das colunas da tabela
spark.sql("SHOW COLUMNS FROM tb_logistica").show()

+----------+
|  col_name|
+----------+
|id_veiculo|
|   entrega|
|   horario|
+----------+



In [6]:
# Descreve a tabela para visualizar os tipos de dados das colunas
spark.sql("DESCRIBE TABLE tb_logistica").show()

+----------+---------+-------+
|  col_name|data_type|comment|
+----------+---------+-------+
|id_veiculo|   string|   NULL|
|   entrega|   string|   NULL|
|   horario|   string|   NULL|
+----------+---------+-------+



In [7]:
# Visualizando os 5 primeiros registros
spark.sql("SELECT * FROM tb_logistica LIMIT 5").show()

+----------+---------+-------+
|id_veiculo|  entrega|horario|
+----------+---------+-------+
|       298|Entrega 1|  7:58a|
|       298|Entrega 2|  8:04a|
|       298|Entrega 3|  8:17a|
|       298|Entrega 4|  8:28a|
|       298|Entrega 5|  8:33a|
+----------+---------+-------+



In [8]:
# Describe da tabela (retornará null por causa do tipo dos dados das colunas)
spark.sql("DESCRIBE tb_logistica").show()

+----------+---------+-------+
|  col_name|data_type|comment|
+----------+---------+-------+
|id_veiculo|   string|   NULL|
|   entrega|   string|   NULL|
|   horario|   string|   NULL|
+----------+---------+-------+



<br> <br>

# Queries SQL x Dot Notation no Spark SQL

<br>

Utilizando <i>tabela temporária **tb_logistica</i>** criada na etapa anterior

In [10]:
# Query SQL
spark.sql('SELECT id_veiculo AS veiculo, entrega FROM tb_logistica LIMIT 5').show()

+-------+---------+
|veiculo|  entrega|
+-------+---------+
|    298|Entrega 1|
|    298|Entrega 2|
|    298|Entrega 3|
|    298|Entrega 4|
|    298|Entrega 5|
+-------+---------+



Utilizando <i>método **Dot Notattion**</i> 

In [11]:
# Dot Notation
df.select(col('id_veiculo').alias('veiculo'), 'entrega').limit(5).show()

+-------+---------+
|veiculo|  entrega|
+-------+---------+
|    298|Entrega 1|
|    298|Entrega 2|
|    298|Entrega 3|
|    298|Entrega 4|
|    298|Entrega 5|
+-------+---------+



### O que foi feito no código acima?

O código acima mostra duas maneiras de selecionar e renomear colunas em um DataFrame do Spark e limitar o resultado a 5 registros. A primeira abordagem usa SQL e a segunda usa notação de ponto (dot notation) do PySpark.

<br>

### Diferença entre Queries SQL e Dot Notation no Spark SQL

#### 1. Queries SQL:
- **Sintaxe SQL**: A abordagem usa sintaxe SQL padrão para manipulação de dados.
- **Criação de Tabela Temporária**: Necessita que os dados sejam registrados como uma tabela temporária antes de executar consultas SQL. (criada na etapa anterior)
- **Flexibilidade**: Familiar para aqueles que conhecem SQL, facilita a escrita de consultas complexas.

#### 2. Dot Notation (Notação de Ponto):

- **API do PySpark**: Usa a API de DataFrame do PySpark para manipulação de dados.
- **Sem Necessidade de Tabela Temporária**: Operações são realizadas diretamente no DataFrame sem a necessidade de criar uma tabela temporária.
- **Integração com Python**: Combina bem com outras bibliotecas e funcionalidades do Python.

<br>

Ambas as abordagens têm seus próprios méritos e a escolha entre elas pode depender da familiaridade do usuário com SQL ou APIs do PySpark e do tipo de operações que se deseja realizar.

<br> <br> <br>

# Usando Funções SQL do Spark SQL com Dot Notation

<br>

- A etapa seguinte enfatiza o uso de Dot Notation para manipulação de dados no Spark SQL.
- As funções do Spark SQL são otimizadas para trabalhar em ambiente distribuído e podem oferecer melhor desempenho ao processar grandes conjuntos de dados. Essa abordagem é demonstrada por meio de vários exemplos que mostram como selecionar, renomear, acessar, filtrar e coletar dados diretamente no DataFrame do PySpark.

In [13]:
# Visualizando dataframe carregado anteriormente
df.show(2)

+----------+---------+-------+
|id_veiculo|  entrega|horario|
+----------+---------+-------+
|       298|Entrega 1|  7:58a|
|       298|Entrega 2|  8:04a|
+----------+---------+-------+
only showing top 2 rows



In [14]:
# Colunas do dataframe
df.columns

['id_veiculo', 'entrega', 'horario']

<br>

#### Transformando em objeto Pandas (apenas para exemplo)

In [18]:
# Podemos converter um Spark DataFrame para um Pandas DataFrame (e assim usar os métodos do Pandas)
pandasDF = df.toPandas()

# Tipo do objeto
print(type(pandasDF), '\n')

# Tipo das colunas
print(pandasDF.dtypes, '\n')

# Visualizando
display(pandasDF.head(2))

<class 'pandas.core.frame.DataFrame'> 

id_veiculo    object
entrega       object
horario       object
dtype: object 



Unnamed: 0,id_veiculo,entrega,horario
0,298,Entrega 1,7:58a
1,298,Entrega 2,8:04a


<br>

## Métodos para Manipulação de Dados no Spark SQL

> Abaixo, serão apresentados diversos métodos de manipulação de dados no Spark SQL usando Dot Notation, incluindo seleções, filtragens, ordenações e transformações, entre outros.

<br>

### Métodos Select e Collect

> Os métodos select e collect permitem a manipulação e coleta de dados em DataFrames do Spark. Abaixo são apresentados diversos exemplos de como esses métodos podem ser utilizados para selecionar colunas, renomear colunas, acessar dados por índice e expressões regulares, entre outras operações.

<br><br>


### Método select()

- **Tipo**: Transformação
- **Descrição**: O método select() é utilizado para selecionar colunas específicas de um DataFrame. Ele retorna um novo DataFrame contendo apenas as colunas selecionadas, sem modificar o DataFrame original.
- **Uso**: select() é ideal para manipular e filtrar as colunas de interesse em um DataFrame antes de realizar outras operações de transformação ou ação.

<br>

### Exemplos de Uso de select()

<br>

#### Selecionando dados de 2 colunas (3 formas diferentes)

In [26]:
# Selecionando dados de 2 colunas (1)
df.select('id_veiculo', 'entrega').show(5)

+----------+---------+
|id_veiculo|  entrega|
+----------+---------+
|       298|Entrega 1|
|       298|Entrega 2|
|       298|Entrega 3|
|       298|Entrega 4|
|       298|Entrega 5|
+----------+---------+
only showing top 5 rows



In [27]:
# Alternativamente, podemos usar esta notação (2)
df.select(df.id_veiculo, df.entrega).show(5)

+----------+---------+
|id_veiculo|  entrega|
+----------+---------+
|       298|Entrega 1|
|       298|Entrega 2|
|       298|Entrega 3|
|       298|Entrega 4|
|       298|Entrega 5|
+----------+---------+
only showing top 5 rows



In [28]:
# A função col é outra alternativa (3)
from pyspark.sql.functions import col
df.select(col('id_veiculo'), col('entrega')).show(5)

+----------+---------+
|id_veiculo|  entrega|
+----------+---------+
|       298|Entrega 1|
|       298|Entrega 2|
|       298|Entrega 3|
|       298|Entrega 4|
|       298|Entrega 5|
+----------+---------+
only showing top 5 rows



<br>

#### Podemos selecionar todas as colunas do DataFrame cujos nomes estejam em uma lista (2 formas)

In [29]:
# Seleciona todas as colunas do dataframe cujos nomes estejam em uma lista (1)
nomes_colunas = ["id_veiculo", "entrega"]
df.select(*nomes_colunas).show(5)

+----------+---------+
|id_veiculo|  entrega|
+----------+---------+
|       298|Entrega 1|
|       298|Entrega 2|
|       298|Entrega 3|
|       298|Entrega 4|
|       298|Entrega 5|
+----------+---------+
only showing top 5 rows



In [30]:
# Mesmo exemplo anterior mas agora com list comprehension (2)
df.select([coluna for coluna in nomes_colunas]).show(5)

+----------+---------+
|id_veiculo|  entrega|
+----------+---------+
|       298|Entrega 1|
|       298|Entrega 2|
|       298|Entrega 3|
|       298|Entrega 4|
|       298|Entrega 5|
+----------+---------+
only showing top 5 rows



<br>

#### Podemos renomear colunas para facilitar a consulta aos dados (2 formas)

In [31]:
# Renomeando colunas (1)
df.select('id_veiculo', 'entrega').withColumnRenamed('id_veiculo', 'veiculo').show(5)

+-------+---------+
|veiculo|  entrega|
+-------+---------+
|    298|Entrega 1|
|    298|Entrega 2|
|    298|Entrega 3|
|    298|Entrega 4|
|    298|Entrega 5|
+-------+---------+
only showing top 5 rows



In [32]:
# Também podemos usar um alias para renomear coluna (2)
df.select(col('id_veiculo').alias('veiculo'), 'entrega').show(5)

+-------+---------+
|veiculo|  entrega|
+-------+---------+
|    298|Entrega 1|
|    298|Entrega 2|
|    298|Entrega 3|
|    298|Entrega 4|
|    298|Entrega 5|
+-------+---------+
only showing top 5 rows



<br>

#### Selecionando colunas pelo índice

In [34]:
# A partir da coluna de índice 2 retorne as 3 primeiras linhas
df.select(df.columns[2:]).show(3)

+-------+
|horario|
+-------+
|  7:58a|
|  8:04a|
|  8:17a|
+-------+
only showing top 3 rows



<br>

#### Selecionando colunas através de expressões regulares

In [36]:
# Selecionando colunas através de expressões regulares
# https://docs.python.org/3.9/library/re.html
df.select(df.colRegex("`^.*Entrega*`")).show(5)

+---------+
|  entrega|
+---------+
|Entrega 1|
|Entrega 2|
|Entrega 3|
|Entrega 4|
|Entrega 5|
+---------+
only showing top 5 rows



<br>

#### O DataFrame original segue intacto

- O DataFrame original df permanece inalterado durante todo o processo. As transformações são usadas para criar novos DataFrames modificados, que são então utilizados para visualização ou outras operações subsequentes.

In [33]:
# O dataframe original segue intacto
df.show(5)

+----------+---------+-------+
|id_veiculo|  entrega|horario|
+----------+---------+-------+
|       298|Entrega 1|  7:58a|
|       298|Entrega 2|  8:04a|
|       298|Entrega 3|  8:17a|
|       298|Entrega 4|  8:28a|
|       298|Entrega 5|  8:33a|
+----------+---------+-------+
only showing top 5 rows



<br><br>

### Método collect()

- **Tipo**: Ação
- **Descrição**: O método collect() é usado para recuperar todos os elementos de um DataFrame (ou RDD) para o nó do driver (master) do cluster Spark. Ele coleta os dados de todos os nós do cluster e os traz para o driver como uma lista de objetos Row.
- **Uso**: collect() deve ser utilizado com cautela e geralmente é recomendado para conjuntos de dados pequenos. Isso porque trazer grandes volumes de dados para o driver pode resultar em um erro de falta de memória (OutOfMemoryError). É útil para ações que necessitam acessar os dados no driver para visualização ou processamento adicional em Python.

<br>

### Exemplos de Uso de collect()

<br>

#### Retornando cada linha do DataFrame como um objeto do tipo Row

In [37]:
# Retornando cada linha do dataframe como um objeto do tipo Row
df.collect()

[Row(id_veiculo='298', entrega='Entrega 1', horario='7:58a'),
 Row(id_veiculo='298', entrega='Entrega 2', horario='8:04a'),
 Row(id_veiculo='298', entrega='Entrega 3', horario='8:17a'),
 Row(id_veiculo='298', entrega='Entrega 4', horario='8:28a'),
 Row(id_veiculo='298', entrega='Entrega 5', horario='8:33a'),
 Row(id_veiculo='298', entrega='Entrega 6', horario='8:39a'),
 Row(id_veiculo='298', entrega='Entrega 7', horario='9:07a'),
 Row(id_veiculo='315', entrega='Entrega 1', horario='6:05a'),
 Row(id_veiculo='315', entrega='Entrega 2', horario='6:14a'),
 Row(id_veiculo='315', entrega='Entrega 3', horario='6:24a'),
 Row(id_veiculo='315', entrega='Entrega 4', horario='6:38a'),
 Row(id_veiculo='315', entrega='Entrega 5', horario='6:45a'),
 Row(id_veiculo='315', entrega='Entrega 6', horario='6:56a'),
 Row(id_veiculo='315', entrega='Entrega 7', horario='7:32a'),
 Row(id_veiculo='457', entrega='Entrega 1', horario='5:04a'),
 Row(id_veiculo='457', entrega='Entrega 2', horario='5:13a'),
 Row(id_

<br>

#### O método collect() retorna uma lista de linhas

In [38]:
# Verificando tipo do objeto retornado
new_df = df.collect()
type(new_df)

list

<br>

#### Podemos "fatiar" as estruturas retornadas por collect()

In [39]:
# Neste caso retornamos o elemento da primeira linha e terceira coluna
df.collect()[0][2]

'7:58a'

<br>

#### Como collect() retorna uma lista, podemos percorrer a lista com um loop e concatenar as colunas, por exemplo

In [40]:
# loop
for row in df.collect():
    print(row['id_veiculo'] + "," + str(row['entrega']))

298,Entrega 1
298,Entrega 2
298,Entrega 3
298,Entrega 4
298,Entrega 5
298,Entrega 6
298,Entrega 7
315,Entrega 1
315,Entrega 2
315,Entrega 3
315,Entrega 4
315,Entrega 5
315,Entrega 6
315,Entrega 7
457,Entrega 1
457,Entrega 2
457,Entrega 3
457,Entrega 4
457,Entrega 5
457,Entrega 6
457,Entrega 7


<br>

#### Podemos ainda combinar select e collect

In [41]:
# Filtramos colunas com select e filtramos linhas com collect
dataCollect = df.select("id_veiculo").collect()[0:4][:]
dataCollect

[Row(id_veiculo='298'),
 Row(id_veiculo='298'),
 Row(id_veiculo='298'),
 Row(id_veiculo='298')]

<br>

#### Podemos extrair primeiro uma amostra do DataFrame e então coletar o resultado

In [42]:
# Podemos extrair primeiro uma amostra do dataframe e então coletar o resultado
df.sample(0.6).collect()

[Row(id_veiculo='298', entrega='Entrega 2', horario='8:04a'),
 Row(id_veiculo='298', entrega='Entrega 3', horario='8:17a'),
 Row(id_veiculo='298', entrega='Entrega 4', horario='8:28a'),
 Row(id_veiculo='298', entrega='Entrega 6', horario='8:39a'),
 Row(id_veiculo='298', entrega='Entrega 7', horario='9:07a'),
 Row(id_veiculo='315', entrega='Entrega 2', horario='6:14a'),
 Row(id_veiculo='315', entrega='Entrega 4', horario='6:38a'),
 Row(id_veiculo='457', entrega='Entrega 1', horario='5:04a'),
 Row(id_veiculo='457', entrega='Entrega 3', horario='5:27a'),
 Row(id_veiculo='457', entrega='Entrega 6', horario='6:21a'),
 Row(id_veiculo='457', entrega='Entrega 7', horario='6:38a')]

<br>

### Resumo:

- **select()**: Transformação que retorna um novo DataFrame contendo as colunas selecionadas.
- **collect()**: Ação que coleta todos os dados de um DataFrame e os traz para o nó do driver como uma lista de objetos Row. Ideal para pequenos conjuntos de dados.

<br><br><br>

### Métodos Filter e Where

> Os métodos filter() e where() são utilizados para filtrar linhas em um DataFrame com base em uma condição especificada. Ambos os métodos funcionam de forma semelhante e são intercambiáveis. Eles retornam um novo DataFrame que contém apenas as linhas que atendem à condição de filtragem, sem modificar o DataFrame original.

> Esses métodos são ideais para selecionar subconjuntos de dados que atendem a condições específicas, como valores em determinadas colunas ou expressões booleanas complexas. O DataFrame original df permanece inalterado durante todo o processo. As transformações são usadas para criar novos DataFrames modificados, que são então utilizados para visualização ou outras operações subsequentes.

<br><br>


### Método filter()

- **Tipo**: Transformação
- **Descrição**: O método filter() aplica uma condição de filtragem ao DataFrame e retorna um novo DataFrame contendo apenas as linhas que atendem a essa condição. Ele pode utilizar expressões SQL como strings ou expressões de coluna PySpark.
- **Uso**: filter() é útil para aplicar condições de filtragem de forma programática, especialmente quando combinada com outras transformações PySpark.

### Exemplos de Uso de filter()

<br>

In [45]:
# Podemos filtrar os dados retornados com a função filter()
df_filtered_1 = df.filter("entrega == 'Entrega 2'")
df_filtered_1.show()

+----------+---------+-------+
|id_veiculo|  entrega|horario|
+----------+---------+-------+
|       298|Entrega 2|  8:04a|
|       315|Entrega 2|  6:14a|
|       457|Entrega 2|  5:13a|
+----------+---------+-------+



In [44]:
# Podemos filtrar os dados retornados com a função filter() usando a negação
df_filtered_2 = df.filter("entrega != 'Entrega 2'")
df_filtered_2.show(5)

+----------+---------+-------+
|id_veiculo|  entrega|horario|
+----------+---------+-------+
|       298|Entrega 1|  7:58a|
|       298|Entrega 3|  8:17a|
|       298|Entrega 4|  8:28a|
|       298|Entrega 5|  8:33a|
|       298|Entrega 6|  8:39a|
+----------+---------+-------+
only showing top 5 rows



In [46]:
# Podemos filtrar usando múltiplas condições
df_filtered_3 = df.filter((df.entrega == "Entrega 2") & (df.id_veiculo == 298))
df_filtered_3.show()

+----------+---------+-------+
|id_veiculo|  entrega|horario|
+----------+---------+-------+
|       298|Entrega 2|  8:04a|
+----------+---------+-------+



In [47]:
# Filtro baseado em lista
lista_id_veiculos = [298, 300, 400]

df_filtered_4 = df.filter(df.id_veiculo.isin(lista_id_veiculos))
df_filtered_4.show()

+----------+---------+-------+
|id_veiculo|  entrega|horario|
+----------+---------+-------+
|       298|Entrega 1|  7:58a|
|       298|Entrega 2|  8:04a|
|       298|Entrega 3|  8:17a|
|       298|Entrega 4|  8:28a|
|       298|Entrega 5|  8:33a|
|       298|Entrega 6|  8:39a|
|       298|Entrega 7|  9:07a|
+----------+---------+-------+



In [48]:
# Alguma entrega ocorreu no minuto 38 de qualquer hora?
df_filtered_5 = df.filter(df.horario.like("%38%"))
df_filtered_5.show()

+----------+---------+-------+
|id_veiculo|  entrega|horario|
+----------+---------+-------+
|       315|Entrega 4|  6:38a|
|       457|Entrega 7|  6:38a|
+----------+---------+-------+



<br>

### Método where()

- **Tipo**: Transformação
- **Descrição**: O método where() é funcionalmente equivalente ao método filter(), aplicando uma condição de filtragem ao DataFrame e retornando um novo DataFrame contendo apenas as linhas que atendem a essa condição. Ele aceita as mesmas expressões SQL como strings ou expressões de coluna PySpark.
- **Uso**: where() é frequentemente usado em cenários onde a sintaxe SQL é mais intuitiva ou preferida.
    
<br>

### Exemplos de Uso de where()

<br>

In [49]:
# Podemos filtrar os dados retornados com a função where() também
df_where_1 = df.where("entrega == 'Entrega 2'")
df_where_1.show()

+----------+---------+-------+
|id_veiculo|  entrega|horario|
+----------+---------+-------+
|       298|Entrega 2|  8:04a|
|       315|Entrega 2|  6:14a|
|       457|Entrega 2|  5:13a|
+----------+---------+-------+



In [50]:
# Podemos filtrar os dados retornados com a função where()
df_where_2 = df.where("id_veiculo > 400")
df_where_2.show()

+----------+---------+-------+
|id_veiculo|  entrega|horario|
+----------+---------+-------+
|       457|Entrega 1|  5:04a|
|       457|Entrega 2|  5:13a|
|       457|Entrega 3|  5:27a|
|       457|Entrega 4|  5:39a|
|       457|Entrega 5|  5:47a|
|       457|Entrega 6|  6:21a|
|       457|Entrega 7|  6:38a|
+----------+---------+-------+



In [51]:
# Where baseado em lista
lista_id_veiculos = [298, 300, 400]

df_where_3 = df.where(df.id_veiculo.isin(lista_id_veiculos))
df_where_3.show()

+----------+---------+-------+
|id_veiculo|  entrega|horario|
+----------+---------+-------+
|       298|Entrega 1|  7:58a|
|       298|Entrega 2|  8:04a|
|       298|Entrega 3|  8:17a|
|       298|Entrega 4|  8:28a|
|       298|Entrega 5|  8:33a|
|       298|Entrega 6|  8:39a|
|       298|Entrega 7|  9:07a|
+----------+---------+-------+



<br><br>

### Métodos Order By e Sort

> Os métodos orderBy() e sort() são utilizados para ordenar as linhas de um DataFrame com base em uma ou mais colunas. Ambos os métodos são funcionalmente equivalentes e podem ser usados para especificar a ordem de classificação de forma ascendente ou descendente.

> Esses métodos são ideais para organizar os dados em uma ordem específica antes de realizar operações subsequentes ou visualizar os dados ordenados. O DataFrame original df permanece inalterado durante todo o processo. As transformações são usadas para criar novos DataFrames modificados, que são então utilizados para visualização ou outras operações subsequentes.

#### Métodos orderBy() e sort()

- **Tipo**: Transformação
- **Descrição**: Os métodos orderBy() e sort() são utilizados para ordenar as linhas de um DataFrame com base em uma ou mais colunas. Eles retornam um novo DataFrame com as linhas ordenadas de acordo com a ordem especificada.
- **Uso**: Esses métodos são úteis para organizar os dados antes de realizar outras operações ou visualizar os resultados de forma ordenada.
    
<br>

### Exemplos de Uso de orderBy() e sort()

<br>

In [53]:
# Ordenando a seleção das linhas
df_sorted_1 = df.sort("horario", "entrega")
df_sorted_1.show(5)

+----------+---------+-------+
|id_veiculo|  entrega|horario|
+----------+---------+-------+
|       457|Entrega 1|  5:04a|
|       457|Entrega 2|  5:13a|
|       457|Entrega 3|  5:27a|
|       457|Entrega 4|  5:39a|
|       457|Entrega 5|  5:47a|
+----------+---------+-------+
only showing top 5 rows



In [54]:
# Mesmo resultado anterior mas com a função col()
df_sorted_2 = df.sort(col("horario"), col("entrega"))
df_sorted_2.show(5)

+----------+---------+-------+
|id_veiculo|  entrega|horario|
+----------+---------+-------+
|       457|Entrega 1|  5:04a|
|       457|Entrega 2|  5:13a|
|       457|Entrega 3|  5:27a|
|       457|Entrega 4|  5:39a|
|       457|Entrega 5|  5:47a|
+----------+---------+-------+
only showing top 5 rows



In [56]:
# Ou usamos orderBy
df_sorted_3 = df.orderBy(col("horario"), col("entrega"))
df_sorted_3.show(5)

+----------+---------+-------+
|id_veiculo|  entrega|horario|
+----------+---------+-------+
|       457|Entrega 1|  5:04a|
|       457|Entrega 2|  5:13a|
|       457|Entrega 3|  5:27a|
|       457|Entrega 4|  5:39a|
|       457|Entrega 5|  5:47a|
+----------+---------+-------+
only showing top 5 rows



In [57]:
# Ordena o resultado em ordem decrescente 
df_sorted_4 = df.sort(df.horario.desc(), df.entrega.desc())
df_sorted_4.show(5)

+----------+---------+-------+
|id_veiculo|  entrega|horario|
+----------+---------+-------+
|       298|Entrega 7|  9:07a|
|       298|Entrega 6|  8:39a|
|       298|Entrega 5|  8:33a|
|       298|Entrega 4|  8:28a|
|       298|Entrega 3|  8:17a|
+----------+---------+-------+
only showing top 5 rows



In [58]:
# Lembre-se que podemos usar SQL
spark.sql("select id_veiculo, entrega, horario from tb_logistica ORDER BY horario desc").show(5)

+----------+---------+-------+
|id_veiculo|  entrega|horario|
+----------+---------+-------+
|       298|Entrega 7|  9:07a|
|       298|Entrega 6|  8:39a|
|       298|Entrega 5|  8:33a|
|       298|Entrega 4|  8:28a|
|       298|Entrega 3|  8:17a|
+----------+---------+-------+
only showing top 5 rows



<br><br>

### Métodos map(), flatMap() e explode()

> Os métodos map(), flatMap() e explode() são usados para transformar e manipular os dados em um DataFrame ou RDD. Abaixo estão exemplos de como utilizar cada um desses métodos em PySpark.

> Esses métodos são ideais para realizar transformações complexas, desmembrar elementos ou manipular listas dentro de colunas. O DataFrame original df permanece inalterado durante todo o processo. As transformações são usadas para criar novos DataFrames ou RDDs modificados, que são então utilizados para visualização ou outras operações subsequentes. 

<br>

### Método map()

- **Tipo**: Transformação
- **Descrição**: O método map() é usado para aplicar uma função a cada elemento de um RDD, retornando um novo RDD com os resultados da função aplicada.
- **Uso**: map() é útil quando se deseja transformar cada elemento de um RDD de forma independente.
    
<br>

### Exemplos de Uso de map()

<br>

In [59]:
# Maps são aplicados em RDDs e por isso precisamos converter o dataframe para RDD
rdd2 = df.rdd.map(lambda x: (x[0] + "," + x[1], x[2]))  

# O método Map retorna um RDD e por isso temos que converter de volta para dataframe
df2 = rdd2.toDF(["novo_id", "entrega"])
df2.show(5)

+-------------+-------+
|      novo_id|entrega|
+-------------+-------+
|298,Entrega 1|  7:58a|
|298,Entrega 2|  8:04a|
|298,Entrega 3|  8:17a|
|298,Entrega 4|  8:28a|
|298,Entrega 5|  8:33a|
+-------------+-------+
only showing top 5 rows



In [60]:
# Mesmo exemplo anterior mas usando o nome da coluna e não o índice
rdd2 = df.rdd.map(lambda x: (x['id_veiculo'] + "," + x['entrega'], x['horario']))  
df2 = rdd2.toDF(["novo_id", "entrega"])
df2.show(5)

+-------------+-------+
|      novo_id|entrega|
+-------------+-------+
|298,Entrega 1|  7:58a|
|298,Entrega 2|  8:04a|
|298,Entrega 3|  8:17a|
|298,Entrega 4|  8:28a|
|298,Entrega 5|  8:33a|
+-------------+-------+
only showing top 5 rows



In [61]:
# Criando uma função que manipula as colunas
def manipula_colunas(x):
    coluna1 = x.id_veiculo
    coluna2 = x.entrega
    novo_id = coluna1 + "-" + coluna2
    coluna3 = x.horario
    return (novo_id, coluna3)

# Usamos a função map para aplicar a função lambda, que aplica a função manipula_colunas a cada linha do RDD
rdd2 = df.rdd.map(lambda x: manipula_colunas(x))

# Collect no RDD
rdd2.collect()

[('298-Entrega 1', '7:58a'),
 ('298-Entrega 2', '8:04a'),
 ('298-Entrega 3', '8:17a'),
 ('298-Entrega 4', '8:28a'),
 ('298-Entrega 5', '8:33a'),
 ('298-Entrega 6', '8:39a'),
 ('298-Entrega 7', '9:07a'),
 ('315-Entrega 1', '6:05a'),
 ('315-Entrega 2', '6:14a'),
 ('315-Entrega 3', '6:24a'),
 ('315-Entrega 4', '6:38a'),
 ('315-Entrega 5', '6:45a'),
 ('315-Entrega 6', '6:56a'),
 ('315-Entrega 7', '7:32a'),
 ('457-Entrega 1', '5:04a'),
 ('457-Entrega 2', '5:13a'),
 ('457-Entrega 3', '5:27a'),
 ('457-Entrega 4', '5:39a'),
 ('457-Entrega 5', '5:47a'),
 ('457-Entrega 6', '6:21a'),
 ('457-Entrega 7', '6:38a')]

<br><br>

### Método flatMap()

- **Tipo**: Transformação
- **Descrição**: O método flatMap() é semelhante ao map(), mas cada elemento do RDD pode ser mapeado para zero ou mais elementos, resultando em um RDD "achatado" (flat).
- **Uso**: é útil para dividir elementos em vários elementos ou desmembrar listas.
    
<br>

### Exemplos de Uso de flatMap()

<br>

In [63]:
# O método flatMap requer uma lista no formato RDD
# Vamos criar uma lista
data = ["A Data Science Academy",
        "oferece cursos realmente incríveis",
        "orientados às necessidades",
        "do mercado de trabalho",
        "e tudo mostrado passo a passo"]

# Convertemos a lista em um RDD
rdd = spark.sparkContext.parallelize(data)
type(rdd)

pyspark.rdd.RDD

In [64]:
# Imprime os elementos do RDD
for element in rdd.collect():
    print(element)

A Data Science Academy
oferece cursos realmente incríveis
orientados às necessidades
do mercado de trabalho
e tudo mostrado passo a passo


In [65]:
# Agora aplicamos o flatMap, que cria outro RDD  
rdd2 = rdd.flatMap(lambda x: x.split(" "))
# Imprime os elementos do RDD
for element in rdd2.collect():
    print(element)

A
Data
Science
Academy
oferece
cursos
realmente
incríveis
orientados
às
necessidades
do
mercado
de
trabalho
e
tudo
mostrado
passo
a
passo


<br><br>

### Método explode()

- **Tipo**: Transformação
- **Descrição**: O método explode() é usado para transformar uma coluna contendo listas ou arrays em várias linhas, criando uma nova linha para cada elemento da lista.
- **Uso**: explode() é útil para lidar com colunas que contêm listas e transformar esses dados em um formato mais plano.
    
<br>

### Exemplos de Uso de explode()

<br>

In [66]:
from pyspark.sql.functions import explode

## Explode deve receber uma lista como argumento, mas essa lista pode estar em uma coluna de um dataframe

# Cria uma lista
array_estudantes = [('Bob', ['Python', 'R', 'Scala']),
                    ('Maria', ['Java','Julia']),
                    ('Zico', ['JavaScript', '']),
                    ('Ana', [None, None])]

type(array_estudantes)

list

In [67]:
# Converte a lista para dataframe
df_estudantes = spark.createDataFrame(data = array_estudantes, schema = ['aluno', 'linguagem'])

# Select com explode
df2 = df_estudantes.select(df_estudantes.aluno, explode(df_estudantes.linguagem))
df2.printSchema()
df2.show()

root
 |-- aluno: string (nullable = true)
 |-- col: string (nullable = true)

+-----+----------+
|aluno|       col|
+-----+----------+
|  Bob|    Python|
|  Bob|         R|
|  Bob|     Scala|
|Maria|      Java|
|Maria|     Julia|
| Zico|JavaScript|
| Zico|          |
|  Ana|      NULL|
|  Ana|      NULL|
+-----+----------+



<br><br>

### Método foreach()

> O método foreach em PySpark é usado para aplicar uma função a cada elemento de um RDD ou DataFrame. Ao contrário dos métodos de transformação, foreach é uma ação e não retorna um novo RDD ou DataFrame, mas executa a função fornecida em cada elemento do RDD/DataFrame.

- **Tipo**: Ação
- **Descrição**: Aplica uma função a cada elemento do DataFrame (ou RDD). A função especificada é executada em cada elemento do DataFrame, mas os resultados não são coletados ou retornados.
- **Uso**: O método foreach é útil para operações onde o objetivo é executar uma ação em cada elemento, como escrever em um banco de dados, atualizar um sistema externo ou simplesmente imprimir os elementos. No entanto, deve-se ter cuidado ao usar foreach para operações que afetam o estado global, pois o PySpark executa essas funções em paralelo em diferentes nós do cluster.
    
<br>

### Exemplos de Uso de foreach()

<br>

Neste exemplo, a função lambda concatena os valores das colunas id_veiculo, entrega e horario em uma única string e imprime essa string. O método foreach aplica essa função a cada linha do DataFrame df.

In [68]:
# Aplica uma função lambda para imprimir cada linha do DataFrame
df.foreach(lambda x: print(x["id_veiculo"] + "," + x["entrega"] + "," + x["horario"])) 

298,Entrega 1,7:58a
298,Entrega 2,8:04a
298,Entrega 3,8:17a
298,Entrega 4,8:28a
298,Entrega 5,8:33a
298,Entrega 6,8:39a
298,Entrega 7,9:07a
315,Entrega 1,6:05a
315,Entrega 2,6:14a
315,Entrega 3,6:24a
315,Entrega 4,6:38a
315,Entrega 5,6:45a
315,Entrega 6,6:56a
315,Entrega 7,7:32a
457,Entrega 1,5:04a
457,Entrega 2,5:13a
457,Entrega 3,5:27a
457,Entrega 4,5:39a
457,Entrega 5,5:47a
457,Entrega 6,6:21a
457,Entrega 7,6:38a


### Resumo

- Neste exemplo, foreach é utilizado para imprimir os valores de cada linha do DataFrame. Cada linha é formatada como uma string com os valores das colunas separadas por vírgulas.

<br><br><br><br>

# Agregação com Spark SQL

<br><br>

### Agregação com Funções do Spark SQL

In [69]:
# Colunas do DataFrame
df.columns

['id_veiculo', 'entrega', 'horario']

In [71]:
# Tipo
type(df)

pyspark.sql.dataframe.DataFrame

In [72]:
# Contagem de entregas por id_veiculo
df.groupBy("id_veiculo").count().show()

+----------+-----+
|id_veiculo|count|
+----------+-----+
|       315|    7|
|       457|    7|
|       298|    7|
+----------+-----+



In [None]:
# A linha de código abaixo não funciona. LEIA A MENSAGEM DE ERRO!!!
# df.groupBy("id_veiculo").min("horario").show()

In [73]:
# Agregação mínima de horário por id_veiculo
df.groupBy('id_veiculo').agg({'horario':'min'}).show()

+----------+------------+
|id_veiculo|min(horario)|
+----------+------------+
|       298|       7:58a|
|       315|       6:05a|
|       457|       5:04a|
+----------+------------+



In [74]:
# Agregação máxima de horário por id_veiculo
df.groupBy('id_veiculo').agg({'horario':'max'}).show()

+----------+------------+
|id_veiculo|max(horario)|
+----------+------------+
|       298|       9:07a|
|       315|       7:32a|
|       457|       6:38a|
+----------+------------+



In [75]:
# Contagem de horários por id_veiculo
df.groupBy('id_veiculo').agg({'horario':'count'}).show()

+----------+--------------+
|id_veiculo|count(horario)|
+----------+--------------+
|       315|             7|
|       457|             7|
|       298|             7|
+----------+--------------+



In [76]:
# Renomeando a coluna de contagem de horários para numero_entregas
df.groupBy('id_veiculo').agg({'horario':'count'}).withColumnRenamed('count(horario)', 'numero_entregas').show()

+----------+---------------+
|id_veiculo|numero_entregas|
+----------+---------------+
|       315|              7|
|       457|              7|
|       298|              7|
+----------+---------------+



In [84]:
# Renomeando a coluna de mínimo horário para hora_primeira_entrega
df_novo = df.groupBy('id_veiculo').agg({'horario':'min'}).withColumnRenamed(
    'min(horario)', 'hora_primeira_entrega')

df_novo.show()

+----------+---------------------+
|id_veiculo|hora_primeira_entrega|
+----------+---------------------+
|       298|                7:58a|
|       315|                6:05a|
|       457|                5:04a|
+----------+---------------------+



In [85]:
# Visualizando dataframe original e modificado acima
df.show(2)
df_novo.show()

+----------+---------+-------+
|id_veiculo|  entrega|horario|
+----------+---------+-------+
|       298|Entrega 1|  7:58a|
|       298|Entrega 2|  8:04a|
+----------+---------+-------+
only showing top 2 rows

+----------+---------------------+
|id_veiculo|hora_primeira_entrega|
+----------+---------------------+
|       298|                7:58a|
|       315|                6:05a|
|       457|                5:04a|
+----------+---------------------+



<br>

### Agregação com Queries SQL

In [87]:
# Usando função SQL do SparkSQL para contagem de entregas por id_veiculo
df.groupBy("id_veiculo").count().withColumnRenamed('count', 'numero_entregas').show()

+----------+---------------+
|id_veiculo|numero_entregas|
+----------+---------------+
|       315|              7|
|       457|              7|
|       298|              7|
+----------+---------------+



In [88]:
# Define a query para contar número de entregas por id_veiculo
query = """
SELECT id_veiculo, COUNT(*) AS numero_entregas
FROM tb_logistica
GROUP BY id_veiculo
"""
# Executa a query
spark.sql(query).show()

+----------+---------------+
|id_veiculo|numero_entregas|
+----------+---------------+
|       315|              7|
|       457|              7|
|       298|              7|
+----------+---------------+



In [89]:
# Define a query para obter o primeiro e último horário de entrega por id_veiculo
query = """
SELECT id_veiculo, MIN(horario) AS hora_primeira_entrega, MAX(horario) AS hora_ultima_entrega
FROM tb_logistica
GROUP BY id_veiculo
"""
# Executa a query
spark.sql(query).show()

+----------+---------------------+-------------------+
|id_veiculo|hora_primeira_entrega|hora_ultima_entrega|
+----------+---------------------+-------------------+
|       298|                7:58a|              9:07a|
|       315|                6:05a|              7:32a|
|       457|                5:04a|              6:38a|
+----------+---------------------+-------------------+



In [90]:
# Define a query para contar o número de entregas por horário para o id_veiculo 298
query = """
SELECT horario, COUNT(*) AS hora_ultima_entrega
FROM tb_logistica
WHERE id_veiculo = 298
GROUP BY horario
"""
# Executa a query
spark.sql(query).show()

+-------+-------------------+
|horario|hora_ultima_entrega|
+-------+-------------------+
|  8:04a|                  1|
|  8:28a|                  1|
|  8:39a|                  1|
|  9:07a|                  1|
|  7:58a|                  1|
|  8:17a|                  1|
|  8:33a|                  1|
+-------+-------------------+



In [91]:
# Define a query para contar o número de entregas por horário
query = """
SELECT horario, COUNT(*) AS numero_entregas
FROM tb_logistica
GROUP BY horario
"""
# Executa a query
spark.sql(query).show()

+-------+---------------+
|horario|numero_entregas|
+-------+---------------+
|  8:04a|              1|
|  8:28a|              1|
|  8:39a|              1|
|  9:07a|              1|
|  6:05a|              1|
|  6:24a|              1|
|  6:38a|              2|
|  6:45a|              1|
|  7:32a|              1|
|  5:04a|              1|
|  5:13a|              1|
|  7:58a|              1|
|  8:17a|              1|
|  8:33a|              1|
|  6:14a|              1|
|  6:56a|              1|
|  5:27a|              1|
|  5:39a|              1|
|  5:47a|              1|
|  6:21a|              1|
+-------+---------------+



In [92]:
# Define a query para identificar horários que tiveram mais de uma entrega
query = """
SELECT horario, COUNT(*) AS hora_ultima_entrega
FROM tb_logistica
GROUP BY horario
HAVING COUNT(*) > 1
"""
# Executa a query
spark.sql(query).show()

+-------+-------------------+
|horario|hora_ultima_entrega|
+-------+-------------------+
|  6:38a|                  2|
+-------+-------------------+



<br>

### Fazendo um Pivot de um DataFrame

In [93]:
# Lista de horários de entregas
lista_horarios = ['5:13a', '6:38a', '7:32a', '8:04a', '9:07a']

# Testamos o filtro
df.filter(df.horario.isin(lista_horarios)).show()

+----------+---------+-------+
|id_veiculo|  entrega|horario|
+----------+---------+-------+
|       298|Entrega 2|  8:04a|
|       298|Entrega 7|  9:07a|
|       315|Entrega 4|  6:38a|
|       315|Entrega 7|  7:32a|
|       457|Entrega 2|  5:13a|
|       457|Entrega 7|  6:38a|
+----------+---------+-------+



In [94]:
# Pivot é uma função de agregação no Spark
df_pivot = df.filter(df.horario.isin(lista_horarios)).groupBy("id_veiculo").pivot("horario").count()
df_pivot.show()

+----------+-----+-----+-----+-----+-----+
|id_veiculo|5:13a|6:38a|7:32a|8:04a|9:07a|
+----------+-----+-----+-----+-----+-----+
|       315| NULL|    1|    1| NULL| NULL|
|       457|    1|    1| NULL| NULL| NULL|
|       298| NULL| NULL| NULL|    1|    1|
+----------+-----+-----+-----+-----+-----+



In [95]:
# Converte o Spark DataFrame para Pandas DataFrame a fim de facilitar a visualização
pandasDF = df_pivot.toPandas()
pandasDF.head()

Unnamed: 0,id_veiculo,5:13a,6:38a,7:32a,8:04a,9:07a
0,315,,1.0,1.0,,
1,457,1.0,1.0,,,
2,298,,,,1.0,1.0


In [96]:
# Define a query para criar um pivot
query = """
SELECT * FROM (
  SELECT id_veiculo, horario
  FROM tb_logistica
)
PIVOT (
  COUNT(*)
  FOR horario in (
    '5:13a', '6:38a', '7:32a', '8:04a', '9:07a'
  )
)
ORDER BY id_veiculo
"""
# Executa a query
spark.sql(query).show()

+----------+-----+-----+-----+-----+-----+
|id_veiculo|5:13a|6:38a|7:32a|8:04a|9:07a|
+----------+-----+-----+-----+-----+-----+
|       298| NULL| NULL| NULL|    1|    1|
|       315| NULL|    1|    1| NULL| NULL|
|       457|    1|    1| NULL| NULL| NULL|
+----------+-----+-----+-----+-----+-----+



### Resumo

- Os exemplos acima mostram como utilizar as funções de agregação e consultas SQL no PySpark para realizar operações complexas e transformar os dados de várias formas. O DataFrame original df permanece inalterado durante todo o processo, enquanto novas transformações são aplicadas para criar DataFrames modificados para visualização ou operações subsequentes.

<br><br><br><br>

# SQL Window Function Para Agregação ao Longo do Tempo

- Quando usamos a cláusula GROUP BY em SQL, nosso objetivo é fazer agregações no nível de coluna. Por exemplo, considere a tabela tb_funcionarios abaixo:

<table>
  <tr>
    <th>ID_Funcionario</th>
    <th>Nome_Funcionario</th>
    <th>Departamento</th>
    <th>Salario (R$)</th>
  </tr>
  <tr>
    <td>1001</td>
    <td>Bob</td>
    <td>Marketing</td>
    <td>8.000,00</td>
  </tr>
  <tr>
    <td>1002</td>
    <td>Zico</td>
    <td>Finanças</td>
    <td>7.500,00</td>
  </tr>
  <tr>
    <td>1003</td>
    <td>Bete</td>
    <td>Marketing</td>
    <td>8.200,00</td>
  </tr>
  <tr>
    <td>1004</td>
    <td>Josias</td>
    <td>RH</td>
    <td>6.000,00</td>
  </tr>
  <tr>
    <td>1005</td>
    <td>Maria</td>
    <td>RH</td>
    <td>6.700,00</td>
  </tr>
</table>

<br>

Se quisermos calcular a média de salário por departamento, a query SQL seria:

**SELECT departamento, AVG(salario)**<br>
**FROM tb_funcionarios**<br>
**GROUP BY departamento;**

Essa query agrupa os dados por departamento e calcula a média dos salários. Podemos usar outras funções de agregação como SUM(), MIN(), MAX() e COUNT().

<br>

> No entanto, quando precisamos fazer agregações no nível de linha, seguindo ou não uma ordem temporal, usamos funções Window. Elas permitem calcular resultados em um intervalo de linhas, proporcionando uma forma mais granular de agregação.

<br>

#### Exemplo de Agregação por Coluna

In [97]:
# Agregação por coluna usando GROUP BY
query = """
SELECT id_veiculo, COUNT(horario) AS numero_entregas
FROM tb_logistica
GROUP BY id_veiculo
"""
spark.sql(query).show()

+----------+---------------+
|id_veiculo|numero_entregas|
+----------+---------------+
|       315|              7|
|       457|              7|
|       298|              7|
+----------+---------------+



<br>

#### Exemplo de Agregação por Linha usando Funções Window

In [98]:
# Agregação por linha usando função Window para criar um ranking
query = """
SELECT id_veiculo, entrega, horario,
ROW_NUMBER() OVER (PARTITION BY entrega ORDER BY horario) AS ranking
FROM tb_logistica
"""
spark.sql(query).show(21)

+----------+---------+-------+-------+
|id_veiculo|  entrega|horario|ranking|
+----------+---------+-------+-------+
|       457|Entrega 1|  5:04a|      1|
|       315|Entrega 1|  6:05a|      2|
|       298|Entrega 1|  7:58a|      3|
|       457|Entrega 2|  5:13a|      1|
|       315|Entrega 2|  6:14a|      2|
|       298|Entrega 2|  8:04a|      3|
|       457|Entrega 3|  5:27a|      1|
|       315|Entrega 3|  6:24a|      2|
|       298|Entrega 3|  8:17a|      3|
|       457|Entrega 4|  5:39a|      1|
|       315|Entrega 4|  6:38a|      2|
|       298|Entrega 4|  8:28a|      3|
|       457|Entrega 5|  5:47a|      1|
|       315|Entrega 5|  6:45a|      2|
|       298|Entrega 5|  8:33a|      3|
|       457|Entrega 6|  6:21a|      1|
|       315|Entrega 6|  6:56a|      2|
|       298|Entrega 6|  8:39a|      3|
|       457|Entrega 7|  6:38a|      1|
|       315|Entrega 7|  7:32a|      2|
|       298|Entrega 7|  9:07a|      3|
+----------+---------+-------+-------+



<br>

#### Usando Funções Window com SparkSQL

In [99]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, lag, lead

# Criando um ranking com SparkSQL
df.withColumn("id", row_number().over(Window.partitionBy('entrega').orderBy('horario'))).show(21)

+----------+---------+-------+---+
|id_veiculo|  entrega|horario| id|
+----------+---------+-------+---+
|       457|Entrega 1|  5:04a|  1|
|       315|Entrega 1|  6:05a|  2|
|       298|Entrega 1|  7:58a|  3|
|       457|Entrega 2|  5:13a|  1|
|       315|Entrega 2|  6:14a|  2|
|       298|Entrega 2|  8:04a|  3|
|       457|Entrega 3|  5:27a|  1|
|       315|Entrega 3|  6:24a|  2|
|       298|Entrega 3|  8:17a|  3|
|       457|Entrega 4|  5:39a|  1|
|       315|Entrega 4|  6:38a|  2|
|       298|Entrega 4|  8:28a|  3|
|       457|Entrega 5|  5:47a|  1|
|       315|Entrega 5|  6:45a|  2|
|       298|Entrega 5|  8:33a|  3|
|       457|Entrega 6|  6:21a|  1|
|       315|Entrega 6|  6:56a|  2|
|       298|Entrega 6|  8:39a|  3|
|       457|Entrega 7|  6:38a|  1|
|       315|Entrega 7|  7:32a|  2|
|       298|Entrega 7|  9:07a|  3|
+----------+---------+-------+---+



<br>

#### Usando LAG e LEAD para Acessar Linhas Anteriores e Posteriores

In [100]:
# Usando LAG para acessar a entrega anterior
query = """
SELECT id_veiculo, entrega, horario, 
LAG(horario, 1) OVER (PARTITION BY id_veiculo ORDER BY horario) AS entrega_anterior 
FROM tb_logistica
"""
spark.sql(query).show(21)

+----------+---------+-------+----------------+
|id_veiculo|  entrega|horario|entrega_anterior|
+----------+---------+-------+----------------+
|       298|Entrega 1|  7:58a|            NULL|
|       298|Entrega 2|  8:04a|           7:58a|
|       298|Entrega 3|  8:17a|           8:04a|
|       298|Entrega 4|  8:28a|           8:17a|
|       298|Entrega 5|  8:33a|           8:28a|
|       298|Entrega 6|  8:39a|           8:33a|
|       298|Entrega 7|  9:07a|           8:39a|
|       315|Entrega 1|  6:05a|            NULL|
|       315|Entrega 2|  6:14a|           6:05a|
|       315|Entrega 3|  6:24a|           6:14a|
|       315|Entrega 4|  6:38a|           6:24a|
|       315|Entrega 5|  6:45a|           6:38a|
|       315|Entrega 6|  6:56a|           6:45a|
|       315|Entrega 7|  7:32a|           6:56a|
|       457|Entrega 1|  5:04a|            NULL|
|       457|Entrega 2|  5:13a|           5:04a|
|       457|Entrega 3|  5:27a|           5:13a|
|       457|Entrega 4|  5:39a|          

In [101]:
# Usando LEAD para acessar a próxima entrega
query = """
SELECT 
id_veiculo, entrega, horario, 
LEAD(horario, 1) OVER (PARTITION BY id_veiculo ORDER BY horario) AS proxima_entrega 
FROM tb_logistica
"""
spark.sql(query).show(21)

+----------+---------+-------+---------------+
|id_veiculo|  entrega|horario|proxima_entrega|
+----------+---------+-------+---------------+
|       298|Entrega 1|  7:58a|          8:04a|
|       298|Entrega 2|  8:04a|          8:17a|
|       298|Entrega 3|  8:17a|          8:28a|
|       298|Entrega 4|  8:28a|          8:33a|
|       298|Entrega 5|  8:33a|          8:39a|
|       298|Entrega 6|  8:39a|          9:07a|
|       298|Entrega 7|  9:07a|           NULL|
|       315|Entrega 1|  6:05a|          6:14a|
|       315|Entrega 2|  6:14a|          6:24a|
|       315|Entrega 3|  6:24a|          6:38a|
|       315|Entrega 4|  6:38a|          6:45a|
|       315|Entrega 5|  6:45a|          6:56a|
|       315|Entrega 6|  6:56a|          7:32a|
|       315|Entrega 7|  7:32a|           NULL|
|       457|Entrega 1|  5:04a|          5:13a|
|       457|Entrega 2|  5:13a|          5:27a|
|       457|Entrega 3|  5:27a|          5:39a|
|       457|Entrega 4|  5:39a|          5:47a|
|       457|E

### Resumo

- Os exemplos acima mostram como utilizar funções de agregação e consultas SQL no PySpark para realizar operações complexas e transformar dados de várias formas. O DataFrame original df permanece inalterado durante todo o processo, enquanto novas transformações são aplicadas para criar DataFrames modificados para visualização ou operações subsequentes.

<br><br><br><br>

# Usando Partições com Spark SQL
- A função over() no Spark SQL corresponde a cláusula OVER em SQL.
- Ela é utilizada para realizar agregações ou operações sobre uma janela de dados especificada.

In [102]:
# Abre a janela nos dados
janela = Window.partitionBy('id_veiculo').orderBy('horario')
type(janela)

pyspark.sql.window.WindowSpec

In [103]:
# Aplica o Lead (desloca os dados no tempo) sobre (over) a janela (window)
dfx = df.withColumn('proxima_entrega', lead('horario', 1).over(janela))
dfx.show(21)

+----------+---------+-------+---------------+
|id_veiculo|  entrega|horario|proxima_entrega|
+----------+---------+-------+---------------+
|       298|Entrega 1|  7:58a|          8:04a|
|       298|Entrega 2|  8:04a|          8:17a|
|       298|Entrega 3|  8:17a|          8:28a|
|       298|Entrega 4|  8:28a|          8:33a|
|       298|Entrega 5|  8:33a|          8:39a|
|       298|Entrega 6|  8:39a|          9:07a|
|       298|Entrega 7|  9:07a|           NULL|
|       315|Entrega 1|  6:05a|          6:14a|
|       315|Entrega 2|  6:14a|          6:24a|
|       315|Entrega 3|  6:24a|          6:38a|
|       315|Entrega 4|  6:38a|          6:45a|
|       315|Entrega 5|  6:45a|          6:56a|
|       315|Entrega 6|  6:56a|          7:32a|
|       315|Entrega 7|  7:32a|           NULL|
|       457|Entrega 1|  5:04a|          5:13a|
|       457|Entrega 2|  5:13a|          5:27a|
|       457|Entrega 3|  5:27a|          5:39a|
|       457|Entrega 4|  5:39a|          5:47a|
|       457|E

In [104]:
# Mesmo exemplo anterior, mas em uma linha de código
df_dot = df.withColumn('proxima_entrega', lead('horario', 1)
                       .over(Window.partitionBy('id_veiculo')
                       .orderBy('horario'))).show(21)

+----------+---------+-------+---------------+
|id_veiculo|  entrega|horario|proxima_entrega|
+----------+---------+-------+---------------+
|       298|Entrega 1|  7:58a|          8:04a|
|       298|Entrega 2|  8:04a|          8:17a|
|       298|Entrega 3|  8:17a|          8:28a|
|       298|Entrega 4|  8:28a|          8:33a|
|       298|Entrega 5|  8:33a|          8:39a|
|       298|Entrega 6|  8:39a|          9:07a|
|       298|Entrega 7|  9:07a|           NULL|
|       315|Entrega 1|  6:05a|          6:14a|
|       315|Entrega 2|  6:14a|          6:24a|
|       315|Entrega 3|  6:24a|          6:38a|
|       315|Entrega 4|  6:38a|          6:45a|
|       315|Entrega 5|  6:45a|          6:56a|
|       315|Entrega 6|  6:56a|          7:32a|
|       315|Entrega 7|  7:32a|           NULL|
|       457|Entrega 1|  5:04a|          5:13a|
|       457|Entrega 2|  5:13a|          5:27a|
|       457|Entrega 3|  5:27a|          5:39a|
|       457|Entrega 4|  5:39a|          5:47a|
|       457|E

<br>

### Parse de Data Para Agregação ao Longo do Tempo
- Calcule o tempo (em minutos) para a próxima entrega de cada veículo!

In [105]:
# Define o time parser policy
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

# Cria a janela
window = Window.partitionBy('id_veiculo').orderBy('horario')

# Agregação por linha para calcular a diferença entre os horários
dot_df = df.withColumn('tempo_proxima_entrega', 
                       (unix_timestamp(lead('horario', 1).over(window),'H:m') 
                        - unix_timestamp('horario', 'H:m'))/60).show(21)

+----------+---------+-------+---------------------+
|id_veiculo|  entrega|horario|tempo_proxima_entrega|
+----------+---------+-------+---------------------+
|       298|Entrega 1|  7:58a|                  6.0|
|       298|Entrega 2|  8:04a|                 13.0|
|       298|Entrega 3|  8:17a|                 11.0|
|       298|Entrega 4|  8:28a|                  5.0|
|       298|Entrega 5|  8:33a|                  6.0|
|       298|Entrega 6|  8:39a|                 28.0|
|       298|Entrega 7|  9:07a|                 NULL|
|       315|Entrega 1|  6:05a|                  9.0|
|       315|Entrega 2|  6:14a|                 10.0|
|       315|Entrega 3|  6:24a|                 14.0|
|       315|Entrega 4|  6:38a|                  7.0|
|       315|Entrega 5|  6:45a|                 11.0|
|       315|Entrega 6|  6:56a|                 36.0|
|       315|Entrega 7|  7:32a|                 NULL|
|       457|Entrega 1|  5:04a|                  9.0|
|       457|Entrega 2|  5:13a|                

<br>

#### Query para calcular a diferença de tempo de uma entrega para outra por id_veiculo

In [106]:
# Define a query
query = """
SELECT *, 
(UNIX_TIMESTAMP(LEAD(horario, 1) OVER (PARTITION BY id_veiculo ORDER BY horario),'H:m') 
- UNIX_TIMESTAMP(horario, 'H:m'))/60 AS tempo_proxima_entrega
FROM tb_logistica 
"""
sql_df = spark.sql(query)
sql_df.show(21)

+----------+---------+-------+---------------------+
|id_veiculo|  entrega|horario|tempo_proxima_entrega|
+----------+---------+-------+---------------------+
|       298|Entrega 1|  7:58a|                  6.0|
|       298|Entrega 2|  8:04a|                 13.0|
|       298|Entrega 3|  8:17a|                 11.0|
|       298|Entrega 4|  8:28a|                  5.0|
|       298|Entrega 5|  8:33a|                  6.0|
|       298|Entrega 6|  8:39a|                 28.0|
|       298|Entrega 7|  9:07a|                 NULL|
|       315|Entrega 1|  6:05a|                  9.0|
|       315|Entrega 2|  6:14a|                 10.0|
|       315|Entrega 3|  6:24a|                 14.0|
|       315|Entrega 4|  6:38a|                  7.0|
|       315|Entrega 5|  6:45a|                 11.0|
|       315|Entrega 6|  6:56a|                 36.0|
|       315|Entrega 7|  7:32a|                 NULL|
|       457|Entrega 1|  5:04a|                  9.0|
|       457|Entrega 2|  5:13a|                

<br><br><br>

# FIM!