# Capítulo 13 - Mini Projeto 4

In [1]:
# Importando bibliotecas:
from platform import python_version
print('Versão Python:', python_version(), '\n')

import findspark
findspark.init() # Inicializando o findspark

import pyspark
from pyspark import SparkContext # SparkContext é a conexão principal para criar RDDs
from pyspark.sql import SparkSession # SparkSession é a entrada principal para trabalhar com dados estruturados em Spark
from pyspark.sql import Window # Window é uma classe para criar janelas de dados
from pyspark.sql.functions import col # col é uma função para acessar uma coluna em um DataFrame
from pyspark.sql.functions import row_number # row_number é uma função para adicionar um número de linha a um DataFrame
from pyspark.sql.functions import lead # lead é uma função para acessar a próxima linha em um DataFrame
from pyspark.sql.functions import min, max # min e max são funções para encontrar o valor mínimo e máximo de uma coluna
from pyspark.sql.functions import unix_timestamp # unix_timestamp é uma função para converter uma string em um timestamp    

%reload_ext watermark
%watermark -a "gustavogzr" --iversions

Versão Python: 3.11.1 

Author: gustavogzr

findspark: 2.0.1
pyspark  : 3.5.2



## Preparando o Ambiente Spark

In [2]:
# Criar o Spark Context
sc = SparkContext(appName="Mini-Projeto4")
# Criar a Spark Session
spark = SparkSession.builder.getOrCreate()
spark # imprime a sessão criada

## Carregando os dados para DataFrame Spark

In [3]:
# Nome do arquivo:
arquivo = '.arquivos_DSA/dados/dataset.txt'
# Carregando os dados para um DataFrame Spark:
df = spark.read.csv(arquivo, header=True)
df.show(5)

+----------+---------+-------+
|id_veiculo|  entrega|horario|
+----------+---------+-------+
|       298|Entrega 1|  7:58a|
|       298|Entrega 2|  8:04a|
|       298|Entrega 3|  8:17a|
|       298|Entrega 4|  8:28a|
|       298|Entrega 5|  8:33a|
+----------+---------+-------+
only showing top 5 rows



In [4]:
type(df)

pyspark.sql.dataframe.DataFrame

## Criando tabela temporária para executar queries SQL

In [5]:
# Criar tabela temporária:
df.createOrReplaceTempView("tb_logistica")

## Executando queries SQL

In [6]:
# Verificando as colunas da tabela:
spark.sql("SHOW COLUMNS FROM tb_logistica").show()

+----------+
|  col_name|
+----------+
|id_veiculo|
|   entrega|
|   horario|
+----------+



In [7]:
# Visualizando os 5 primeiros registros:
spark.sql("SELECT * FROM tb_logistica LIMIT 5").show()

+----------+---------+-------+
|id_veiculo|  entrega|horario|
+----------+---------+-------+
|       298|Entrega 1|  7:58a|
|       298|Entrega 2|  8:04a|
|       298|Entrega 3|  8:17a|
|       298|Entrega 4|  8:28a|
|       298|Entrega 5|  8:33a|
+----------+---------+-------+



In [8]:
# Realizar describe da tabela:
spark.sql("DESCRIBE tb_logistica").show()

+----------+---------+-------+
|  col_name|data_type|comment|
+----------+---------+-------+
|id_veiculo|   string|   NULL|
|   entrega|   string|   NULL|
|   horario|   string|   NULL|
+----------+---------+-------+



## Queries SQL vs Dot Notation no Spark SQL

In [9]:
# Query SQL
spark.sql('SELECT id_veiculo AS veiculo, entrega FROM tb_logistica LIMIT 5').show()

+-------+---------+
|veiculo|  entrega|
+-------+---------+
|    298|Entrega 1|
|    298|Entrega 2|
|    298|Entrega 3|
|    298|Entrega 4|
|    298|Entrega 5|
+-------+---------+



In [10]:
# Dot Notation - vantagem: não precisa de tabelas temporárias
df.select(
    col('id_veiculo').alias('veiculo'),
    col('entrega')
).limit(5).show()

+-------+---------+
|veiculo|  entrega|
+-------+---------+
|    298|Entrega 1|
|    298|Entrega 2|
|    298|Entrega 3|
|    298|Entrega 4|
|    298|Entrega 5|
+-------+---------+



## Usando funções SQL no Spark SQL

In [11]:
df.columns # visualizando as colunas

['id_veiculo', 'entrega', 'horario']

In [12]:
# converter o dataframe Spark para um dataframe Pandas
pandasDF = df.toPandas()

In [13]:
type(pandasDF)

pandas.core.frame.DataFrame

In [14]:
pandasDF

Unnamed: 0,id_veiculo,entrega,horario
0,298,Entrega 1,7:58a
1,298,Entrega 2,8:04a
2,298,Entrega 3,8:17a
3,298,Entrega 4,8:28a
4,298,Entrega 5,8:33a
5,298,Entrega 6,8:39a
6,298,Entrega 7,9:07a
7,315,Entrega 1,6:05a
8,315,Entrega 2,6:14a
9,315,Entrega 3,6:24a


### Métodos Select e Collect

In [15]:
# Selecionando dados de duas colunas:
df.select('id_veiculo', 'entrega').show(10)

+----------+---------+
|id_veiculo|  entrega|
+----------+---------+
|       298|Entrega 1|
|       298|Entrega 2|
|       298|Entrega 3|
|       298|Entrega 4|
|       298|Entrega 5|
|       298|Entrega 6|
|       298|Entrega 7|
|       315|Entrega 1|
|       315|Entrega 2|
|       315|Entrega 3|
+----------+---------+
only showing top 10 rows



In [16]:
# Método alternativo:
df.select(df.id_veiculo, df.entrega).show(10)

+----------+---------+
|id_veiculo|  entrega|
+----------+---------+
|       298|Entrega 1|
|       298|Entrega 2|
|       298|Entrega 3|
|       298|Entrega 4|
|       298|Entrega 5|
|       298|Entrega 6|
|       298|Entrega 7|
|       315|Entrega 1|
|       315|Entrega 2|
|       315|Entrega 3|
+----------+---------+
only showing top 10 rows



In [17]:
# A função col é outra alternativa:
df.select(col('id_veiculo'), col('entrega')).show(10)

+----------+---------+
|id_veiculo|  entrega|
+----------+---------+
|       298|Entrega 1|
|       298|Entrega 2|
|       298|Entrega 3|
|       298|Entrega 4|
|       298|Entrega 5|
|       298|Entrega 6|
|       298|Entrega 7|
|       315|Entrega 1|
|       315|Entrega 2|
|       315|Entrega 3|
+----------+---------+
only showing top 10 rows



In [18]:
# Podemos selecionar todas as colunas presentes em uma lista:
lista_colunas = ['id_veiculo', 'entrega']
df.select(*lista_colunas).show(10)

+----------+---------+
|id_veiculo|  entrega|
+----------+---------+
|       298|Entrega 1|
|       298|Entrega 2|
|       298|Entrega 3|
|       298|Entrega 4|
|       298|Entrega 5|
|       298|Entrega 6|
|       298|Entrega 7|
|       315|Entrega 1|
|       315|Entrega 2|
|       315|Entrega 3|
+----------+---------+
only showing top 10 rows



In [19]:
# Mesmo exemplo anterior, mas agora utilizando o list comprehension:
df.select([coluna for coluna in lista_colunas]).show(10)

+----------+---------+
|id_veiculo|  entrega|
+----------+---------+
|       298|Entrega 1|
|       298|Entrega 2|
|       298|Entrega 3|
|       298|Entrega 4|
|       298|Entrega 5|
|       298|Entrega 6|
|       298|Entrega 7|
|       315|Entrega 1|
|       315|Entrega 2|
|       315|Entrega 3|
+----------+---------+
only showing top 10 rows



In [20]:
# Renomeando colunas para facilitar a manipulação:
df.select('id_veiculo', 'entrega').withColumnRenamed('id_veiculo', 'veiculo').show(10)

+-------+---------+
|veiculo|  entrega|
+-------+---------+
|    298|Entrega 1|
|    298|Entrega 2|
|    298|Entrega 3|
|    298|Entrega 4|
|    298|Entrega 5|
|    298|Entrega 6|
|    298|Entrega 7|
|    315|Entrega 1|
|    315|Entrega 2|
|    315|Entrega 3|
+-------+---------+
only showing top 10 rows



In [21]:
# Também é possível renomear colunas com a função alias:
df.select(
    col('id_veiculo').alias('veiculo'),
    'entrega'
).show(10)

+-------+---------+
|veiculo|  entrega|
+-------+---------+
|    298|Entrega 1|
|    298|Entrega 2|
|    298|Entrega 3|
|    298|Entrega 4|
|    298|Entrega 5|
|    298|Entrega 6|
|    298|Entrega 7|
|    315|Entrega 1|
|    315|Entrega 2|
|    315|Entrega 3|
+-------+---------+
only showing top 10 rows



In [22]:
# Selecionando colunas pelo índice
df.select(df.columns[2:]).show(3)

+-------+
|horario|
+-------+
|  7:58a|
|  8:04a|
|  8:17a|
+-------+
only showing top 3 rows



In [23]:
# Selecionando colunas através de expressões regulares:
df.select(df.colRegex("`^.*Entrega*`")).show()

+---------+
|  entrega|
+---------+
|Entrega 1|
|Entrega 2|
|Entrega 3|
|Entrega 4|
|Entrega 5|
|Entrega 6|
|Entrega 7|
|Entrega 1|
|Entrega 2|
|Entrega 3|
|Entrega 4|
|Entrega 5|
|Entrega 6|
|Entrega 7|
|Entrega 1|
|Entrega 2|
|Entrega 3|
|Entrega 4|
|Entrega 5|
|Entrega 6|
+---------+
only showing top 20 rows



In [24]:
# O Dataframe original não é alterado:
df.show(30)

+----------+---------+-------+
|id_veiculo|  entrega|horario|
+----------+---------+-------+
|       298|Entrega 1|  7:58a|
|       298|Entrega 2|  8:04a|
|       298|Entrega 3|  8:17a|
|       298|Entrega 4|  8:28a|
|       298|Entrega 5|  8:33a|
|       298|Entrega 6|  8:39a|
|       298|Entrega 7|  9:07a|
|       315|Entrega 1|  6:05a|
|       315|Entrega 2|  6:14a|
|       315|Entrega 3|  6:24a|
|       315|Entrega 4|  6:38a|
|       315|Entrega 5|  6:45a|
|       315|Entrega 6|  6:56a|
|       315|Entrega 7|  7:32a|
|       457|Entrega 1|  5:04a|
|       457|Entrega 2|  5:13a|
|       457|Entrega 3|  5:27a|
|       457|Entrega 4|  5:39a|
|       457|Entrega 5|  5:47a|
|       457|Entrega 6|  6:21a|
|       457|Entrega 7|  6:38a|
+----------+---------+-------+



In [25]:
# Retornando cada linha do dataframe como um objeto do tipo Row:
df.collect()

[Row(id_veiculo='298', entrega='Entrega 1', horario='7:58a'),
 Row(id_veiculo='298', entrega='Entrega 2', horario='8:04a'),
 Row(id_veiculo='298', entrega='Entrega 3', horario='8:17a'),
 Row(id_veiculo='298', entrega='Entrega 4', horario='8:28a'),
 Row(id_veiculo='298', entrega='Entrega 5', horario='8:33a'),
 Row(id_veiculo='298', entrega='Entrega 6', horario='8:39a'),
 Row(id_veiculo='298', entrega='Entrega 7', horario='9:07a'),
 Row(id_veiculo='315', entrega='Entrega 1', horario='6:05a'),
 Row(id_veiculo='315', entrega='Entrega 2', horario='6:14a'),
 Row(id_veiculo='315', entrega='Entrega 3', horario='6:24a'),
 Row(id_veiculo='315', entrega='Entrega 4', horario='6:38a'),
 Row(id_veiculo='315', entrega='Entrega 5', horario='6:45a'),
 Row(id_veiculo='315', entrega='Entrega 6', horario='6:56a'),
 Row(id_veiculo='315', entrega='Entrega 7', horario='7:32a'),
 Row(id_veiculo='457', entrega='Entrega 1', horario='5:04a'),
 Row(id_veiculo='457', entrega='Entrega 2', horario='5:13a'),
 Row(id_

In [26]:
# O método collect() retorna uma lista de linhas:
new_df = df.collect()
type(new_df)

list

In [27]:
# Podemos fatiar as estruturas de dados retornadas pelo collect():
df.collect()[0][2] # acessando a primeira linha e a terceira coluna

'7:58a'

In [28]:
# Como collect() retorna uma lista, podemos percorrer a lista com um loop:
for linha in df.collect():
    print(linha['id_veiculo'] + ' - ' + str(linha['entrega']))

298 - Entrega 1
298 - Entrega 2
298 - Entrega 3
298 - Entrega 4
298 - Entrega 5
298 - Entrega 6
298 - Entrega 7
315 - Entrega 1
315 - Entrega 2
315 - Entrega 3
315 - Entrega 4
315 - Entrega 5
315 - Entrega 6
315 - Entrega 7
457 - Entrega 1
457 - Entrega 2
457 - Entrega 3
457 - Entrega 4
457 - Entrega 5
457 - Entrega 6
457 - Entrega 7


In [34]:
# Podemos combinar select() com collect() para filtrar colunas e linhas:
dataCollect = df.select('id_veiculo').collect()[0:4]
dataCollect

[Row(id_veiculo='298'),
 Row(id_veiculo='298'),
 Row(id_veiculo='298'),
 Row(id_veiculo='298')]

In [54]:
# Podemos extrair primeiro uma amostra do dataframe e então coletar o resultado:
df.sample(0.6).collect()

[Row(id_veiculo='298', entrega='Entrega 2', horario='8:04a'),
 Row(id_veiculo='298', entrega='Entrega 3', horario='8:17a'),
 Row(id_veiculo='298', entrega='Entrega 4', horario='8:28a'),
 Row(id_veiculo='298', entrega='Entrega 6', horario='8:39a'),
 Row(id_veiculo='298', entrega='Entrega 7', horario='9:07a'),
 Row(id_veiculo='315', entrega='Entrega 1', horario='6:05a'),
 Row(id_veiculo='457', entrega='Entrega 1', horario='5:04a'),
 Row(id_veiculo='457', entrega='Entrega 3', horario='5:27a'),
 Row(id_veiculo='457', entrega='Entrega 4', horario='5:39a'),
 Row(id_veiculo='457', entrega='Entrega 5', horario='5:47a'),
 Row(id_veiculo='457', entrega='Entrega 6', horario='6:21a')]

### Métodos Filter e Where

In [55]:
# Podemos filtrar os dados retornados com a função filter():
df.filter("entrega == 'Entrega 2'").show() 

+----------+---------+-------+
|id_veiculo|  entrega|horario|
+----------+---------+-------+
|       298|Entrega 2|  8:04a|
|       315|Entrega 2|  6:14a|
|       457|Entrega 2|  5:13a|
+----------+---------+-------+



In [56]:
# Usando negação no filtro:
df.filter("entrega != 'Entrega 2'").show()

+----------+---------+-------+
|id_veiculo|  entrega|horario|
+----------+---------+-------+
|       298|Entrega 1|  7:58a|
|       298|Entrega 3|  8:17a|
|       298|Entrega 4|  8:28a|
|       298|Entrega 5|  8:33a|
|       298|Entrega 6|  8:39a|
|       298|Entrega 7|  9:07a|
|       315|Entrega 1|  6:05a|
|       315|Entrega 3|  6:24a|
|       315|Entrega 4|  6:38a|
|       315|Entrega 5|  6:45a|
|       315|Entrega 6|  6:56a|
|       315|Entrega 7|  7:32a|
|       457|Entrega 1|  5:04a|
|       457|Entrega 3|  5:27a|
|       457|Entrega 4|  5:39a|
|       457|Entrega 5|  5:47a|
|       457|Entrega 6|  6:21a|
|       457|Entrega 7|  6:38a|
+----------+---------+-------+



In [57]:
# Podemos filtrar usando múltiplas condições:
df.filter(
    (df.entrega == 'Entrega 2') & 
    (df.id_veiculo == 298)
).show()

+----------+---------+-------+
|id_veiculo|  entrega|horario|
+----------+---------+-------+
|       298|Entrega 2|  8:04a|
+----------+---------+-------+



In [58]:
# Filtrar baseado em uma lista
lista_id_veiculos = [298, 300, 400]
df.filter(
    df.id_veiculo.isin(lista_id_veiculos)
).show()

+----------+---------+-------+
|id_veiculo|  entrega|horario|
+----------+---------+-------+
|       298|Entrega 1|  7:58a|
|       298|Entrega 2|  8:04a|
|       298|Entrega 3|  8:17a|
|       298|Entrega 4|  8:28a|
|       298|Entrega 5|  8:33a|
|       298|Entrega 6|  8:39a|
|       298|Entrega 7|  9:07a|
+----------+---------+-------+



In [59]:
# Alguma entrega ocorreu no minuto 38 de qualquer hora?
df.filter(
    df.horario.like('%:38%')
).show()

+----------+---------+-------+
|id_veiculo|  entrega|horario|
+----------+---------+-------+
|       315|Entrega 4|  6:38a|
|       457|Entrega 7|  6:38a|
+----------+---------+-------+



In [60]:
# Podemos filtrar utilizando a função where():
df.where("entrega == 'Entrega 2'").show()

+----------+---------+-------+
|id_veiculo|  entrega|horario|
+----------+---------+-------+
|       298|Entrega 2|  8:04a|
|       315|Entrega 2|  6:14a|
|       457|Entrega 2|  5:13a|
+----------+---------+-------+



In [61]:
df.where("id_veiculo > 400").show()

+----------+---------+-------+
|id_veiculo|  entrega|horario|
+----------+---------+-------+
|       457|Entrega 1|  5:04a|
|       457|Entrega 2|  5:13a|
|       457|Entrega 3|  5:27a|
|       457|Entrega 4|  5:39a|
|       457|Entrega 5|  5:47a|
|       457|Entrega 6|  6:21a|
|       457|Entrega 7|  6:38a|
+----------+---------+-------+



In [62]:
# Usando lista:
df.where(df.id_veiculo.isin(lista_id_veiculos)).show()

+----------+---------+-------+
|id_veiculo|  entrega|horario|
+----------+---------+-------+
|       298|Entrega 1|  7:58a|
|       298|Entrega 2|  8:04a|
|       298|Entrega 3|  8:17a|
|       298|Entrega 4|  8:28a|
|       298|Entrega 5|  8:33a|
|       298|Entrega 6|  8:39a|
|       298|Entrega 7|  9:07a|
+----------+---------+-------+



### Métodos Order By e Sort

In [63]:
# Ordenando os dados:
df.sort('horario', 'entrega').show(10)

+----------+---------+-------+
|id_veiculo|  entrega|horario|
+----------+---------+-------+
|       457|Entrega 1|  5:04a|
|       457|Entrega 2|  5:13a|
|       457|Entrega 3|  5:27a|
|       457|Entrega 4|  5:39a|
|       457|Entrega 5|  5:47a|
|       315|Entrega 1|  6:05a|
|       315|Entrega 2|  6:14a|
|       457|Entrega 6|  6:21a|
|       315|Entrega 3|  6:24a|
|       315|Entrega 4|  6:38a|
+----------+---------+-------+
only showing top 10 rows



In [64]:
# Ordenando usando col:
df.sort(col('horario'), col('entrega')).show(10)

+----------+---------+-------+
|id_veiculo|  entrega|horario|
+----------+---------+-------+
|       457|Entrega 1|  5:04a|
|       457|Entrega 2|  5:13a|
|       457|Entrega 3|  5:27a|
|       457|Entrega 4|  5:39a|
|       457|Entrega 5|  5:47a|
|       315|Entrega 1|  6:05a|
|       315|Entrega 2|  6:14a|
|       457|Entrega 6|  6:21a|
|       315|Entrega 3|  6:24a|
|       315|Entrega 4|  6:38a|
+----------+---------+-------+
only showing top 10 rows



In [65]:
# Ou usando o OrderBy:
df.orderBy(col('horario'), col('entrega')).show(10)

+----------+---------+-------+
|id_veiculo|  entrega|horario|
+----------+---------+-------+
|       457|Entrega 1|  5:04a|
|       457|Entrega 2|  5:13a|
|       457|Entrega 3|  5:27a|
|       457|Entrega 4|  5:39a|
|       457|Entrega 5|  5:47a|
|       315|Entrega 1|  6:05a|
|       315|Entrega 2|  6:14a|
|       457|Entrega 6|  6:21a|
|       315|Entrega 3|  6:24a|
|       315|Entrega 4|  6:38a|
+----------+---------+-------+
only showing top 10 rows



In [67]:
# Ordenando em ordem decrescente:
df.orderBy(col('horario').desc(), col('entrega').asc()).show(10)

+----------+---------+-------+
|id_veiculo|  entrega|horario|
+----------+---------+-------+
|       298|Entrega 7|  9:07a|
|       298|Entrega 6|  8:39a|
|       298|Entrega 5|  8:33a|
|       298|Entrega 4|  8:28a|
|       298|Entrega 3|  8:17a|
|       298|Entrega 2|  8:04a|
|       298|Entrega 1|  7:58a|
|       315|Entrega 7|  7:32a|
|       315|Entrega 6|  6:56a|
|       315|Entrega 5|  6:45a|
+----------+---------+-------+
only showing top 10 rows



In [69]:
# Lembrando que podemos usar linguagem SQL:
spark.sql(
    "SELECT * FROM tb_logistica ORDER BY horario DESC, entrega ASC LIMIT 10"
).show()

+----------+---------+-------+
|id_veiculo|  entrega|horario|
+----------+---------+-------+
|       298|Entrega 7|  9:07a|
|       298|Entrega 6|  8:39a|
|       298|Entrega 5|  8:33a|
|       298|Entrega 4|  8:28a|
|       298|Entrega 3|  8:17a|
|       298|Entrega 2|  8:04a|
|       298|Entrega 1|  7:58a|
|       315|Entrega 7|  7:32a|
|       315|Entrega 6|  6:56a|
|       315|Entrega 5|  6:45a|
+----------+---------+-------+



### Método Map, FlatMap e Explode

In [74]:
# Função Map é muito parecida com a função apply do Pandas
# Map são aplicados em RDDs (Resilient Distributed Datasets) e, por isso, precisamos converter o DataFrame para RDD
# O método Map retorna um novo RDD aplicando uma função a cada elemento do RDD original, sendo necessário retornar para DataFrame
rdd2 = df.rdd # Convertendo DataFrame para RDD
rdd2 = rdd2.map(
    lambda x: (x[0] + ',' + x[1], x[2])
) # Aplicando a função lambda
df2 = rdd2.toDF(['novo_id', 'entrega']) # Convertendo RDD para DataFrame
df2.show()

+-------------+-------+
|      novo_id|entrega|
+-------------+-------+
|298,Entrega 1|  7:58a|
|298,Entrega 2|  8:04a|
|298,Entrega 3|  8:17a|
|298,Entrega 4|  8:28a|
|298,Entrega 5|  8:33a|
|298,Entrega 6|  8:39a|
|298,Entrega 7|  9:07a|
|315,Entrega 1|  6:05a|
|315,Entrega 2|  6:14a|
|315,Entrega 3|  6:24a|
|315,Entrega 4|  6:38a|
|315,Entrega 5|  6:45a|
|315,Entrega 6|  6:56a|
|315,Entrega 7|  7:32a|
|457,Entrega 1|  5:04a|
|457,Entrega 2|  5:13a|
|457,Entrega 3|  5:27a|
|457,Entrega 4|  5:39a|
|457,Entrega 5|  5:47a|
|457,Entrega 6|  6:21a|
+-------------+-------+
only showing top 20 rows



In [75]:
# Podemos realizar a mesma operação, mas utilizando o nome das colunas:
rdd2 = df.rdd.map(
    lambda x: (x.id_veiculo + ',' + x.entrega, x.horario)
)
df2 = rdd2.toDF(['novo_id', 'entrega'])
df2.show()

+-------------+-------+
|      novo_id|entrega|
+-------------+-------+
|298,Entrega 1|  7:58a|
|298,Entrega 2|  8:04a|
|298,Entrega 3|  8:17a|
|298,Entrega 4|  8:28a|
|298,Entrega 5|  8:33a|
|298,Entrega 6|  8:39a|
|298,Entrega 7|  9:07a|
|315,Entrega 1|  6:05a|
|315,Entrega 2|  6:14a|
|315,Entrega 3|  6:24a|
|315,Entrega 4|  6:38a|
|315,Entrega 5|  6:45a|
|315,Entrega 6|  6:56a|
|315,Entrega 7|  7:32a|
|457,Entrega 1|  5:04a|
|457,Entrega 2|  5:13a|
|457,Entrega 3|  5:27a|
|457,Entrega 4|  5:39a|
|457,Entrega 5|  5:47a|
|457,Entrega 6|  6:21a|
+-------------+-------+
only showing top 20 rows



In [76]:
# Criando uma função que manipula as colunas:
def manipula_colunas(x):
    coluna1 = x.id_veiculo
    coluna2 = x.entrega
    novo_id = coluna1 + '-' + coluna2
    coluna3 = x.horario
    return (novo_id, coluna3)

In [77]:
# Usando o map para aplicar a função:
rdd2 = df.rdd.map(lambda x: manipula_colunas(x))
# Collect rdd2 para visualizar o resultado:
rdd2.collect()

[('298-Entrega 1', '7:58a'),
 ('298-Entrega 2', '8:04a'),
 ('298-Entrega 3', '8:17a'),
 ('298-Entrega 4', '8:28a'),
 ('298-Entrega 5', '8:33a'),
 ('298-Entrega 6', '8:39a'),
 ('298-Entrega 7', '9:07a'),
 ('315-Entrega 1', '6:05a'),
 ('315-Entrega 2', '6:14a'),
 ('315-Entrega 3', '6:24a'),
 ('315-Entrega 4', '6:38a'),
 ('315-Entrega 5', '6:45a'),
 ('315-Entrega 6', '6:56a'),
 ('315-Entrega 7', '7:32a'),
 ('457-Entrega 1', '5:04a'),
 ('457-Entrega 2', '5:13a'),
 ('457-Entrega 3', '5:27a'),
 ('457-Entrega 4', '5:39a'),
 ('457-Entrega 5', '5:47a'),
 ('457-Entrega 6', '6:21a'),
 ('457-Entrega 7', '6:38a')]

In [78]:
# O método flatMap requer uma lista no formato RDD
# Criando uma lista:
data = ["A Data Science Academy",
        "oferece cursos realmente incríveis",
        "orientados às necessidades",
        "do mercado de trabalho",
        "e tudo mostrado passo a passo"]

In [79]:
# Converter a lista para RDD
rdd = spark.sparkContext.parallelize(data) # Convertendo a lista para RDD
type(rdd)

pyspark.rdd.RDD

In [80]:
# Imprimir os elementos do RDD
for elemento in rdd.collect():
    print(elemento)

A Data Science Academy
oferece cursos realmente incríveis
orientados às necessidades
do mercado de trabalho
e tudo mostrado passo a passo


In [81]:
# Agora aplicamos a função flatMap, que cria outro RDD:
rdd2 = rdd.flatMap(lambda x: x.split(" ")) # Aplicando a função flatMap. O split() divide a string em palavras
# Imprimir os elementos do novo RDD
for elemento in rdd2.collect():
    print(elemento)

A
Data
Science
Academy
oferece
cursos
realmente
incríveis
orientados
às
necessidades
do
mercado
de
trabalho
e
tudo
mostrado
passo
a
passo


In [82]:
# Explode deve receber uma lista como argumento, mas essa lista pode estar em uma coluna do DataFrame
from pyspark.sql.functions import explode

In [83]:
# Criando a lista:
array_estudantes = [('Bob', ['Python', 'R', 'Scala']),
                    ('Maria', ['Java','Julia']),
                    ('Zico', ['JavaScript', '']),
                    ('Ana', [None, None])]
array_estudantes

[('Bob', ['Python', 'R', 'Scala']),
 ('Maria', ['Java', 'Julia']),
 ('Zico', ['JavaScript', '']),
 ('Ana', [None, None])]

In [84]:
type(array_estudantes)

list

In [85]:
# Converter a lista para um DataFrame:
df_estudantes = spark.createDataFrame(data = array_estudantes, schema = ['aluno', 'linguagem'])
df_estudantes.show()

+-----+------------------+
|aluno|         linguagem|
+-----+------------------+
|  Bob|[Python, R, Scala]|
|Maria|     [Java, Julia]|
| Zico|    [JavaScript, ]|
|  Ana|      [NULL, NULL]|
+-----+------------------+



In [86]:
type(df_estudantes)

pyspark.sql.dataframe.DataFrame

In [88]:
# Select com Explode
df2 = df_estudantes.select(df_estudantes.aluno, explode(df_estudantes.linguagem).alias('linguagem'))
df2.printSchema()
df2.show()

root
 |-- aluno: string (nullable = true)
 |-- linguagem: string (nullable = true)

+-----+----------+
|aluno| linguagem|
+-----+----------+
|  Bob|    Python|
|  Bob|         R|
|  Bob|     Scala|
|Maria|      Java|
|Maria|     Julia|
| Zico|JavaScript|
| Zico|          |
|  Ana|      NULL|
|  Ana|      NULL|
+-----+----------+



In [92]:
# Foreeach - Aplica uma função a cada linha do DataFrame
df.foreach(
    lambda x: print(x['id_veiculo'] + ',' + x['entrega'] + ',' + x['horario']))

### Agregação com Spark SQL

In [94]:
df.groupBy("id_veiculo").count().show() # Contagem de entregas por veículo

+----------+-----+
|id_veiculo|count|
+----------+-----+
|       298|    7|
|       457|    7|
|       315|    7|
+----------+-----+



In [95]:
df.groupBy('id_veiculo').agg({'horario': 'min'}).show() # Menor horário de entrega por veículo

+----------+------------+
|id_veiculo|min(horario)|
+----------+------------+
|       298|       7:58a|
|       315|       6:05a|
|       457|       5:04a|
+----------+------------+



In [96]:
df.groupBy('id_veiculo').agg({'horario': 'max'}).show() # Maior horário de entrega por veículo

+----------+------------+
|id_veiculo|max(horario)|
+----------+------------+
|       298|       9:07a|
|       315|       7:32a|
|       457|       6:38a|
+----------+------------+



In [97]:
df.groupBy('id_veiculo').agg({'horario': 'count'}).show() # Contagem de entregas por veículo

+----------+--------------+
|id_veiculo|count(horario)|
+----------+--------------+
|       298|             7|
|       457|             7|
|       315|             7|
+----------+--------------+



In [98]:
df.groupBy('id_veiculo').agg({'horario': 'count'}).withColumnRenamed('count(horario)', 'numero_entregas').show()

+----------+---------------+
|id_veiculo|numero_entregas|
+----------+---------------+
|       298|              7|
|       457|              7|
|       315|              7|
+----------+---------------+



In [99]:
df.groupBy('id_veiculo').agg({'horario': 'min'}).withColumnRenamed('min(horario)', 'hora_primeira_entrega').show()

+----------+---------------------+
|id_veiculo|hora_primeira_entrega|
+----------+---------------------+
|       298|                7:58a|
|       315|                6:05a|
|       457|                5:04a|
+----------+---------------------+



In [100]:
# Agregando com expressões SQL:
query = """
SELECT id_veiculo, COUNT(*) AS numero_entregas
FROM tb_logistica
GROUP BY id_veiculo
"""
# Executando a query:
spark.sql(query).show()

+----------+---------------+
|id_veiculo|numero_entregas|
+----------+---------------+
|       298|              7|
|       457|              7|
|       315|              7|
+----------+---------------+



In [101]:
# Agregando com expressões SQL:
query = """
SELECT id_veiculo, MIN(horario) AS hora_primeira_entrega, MAX(horario) AS hora_ultima_entrega
FROM tb_logistica
GROUP BY id_veiculo
"""
# Executando a query:
spark.sql(query).show()

+----------+---------------------+-------------------+
|id_veiculo|hora_primeira_entrega|hora_ultima_entrega|
+----------+---------------------+-------------------+
|       298|                7:58a|              9:07a|
|       315|                6:05a|              7:32a|
|       457|                5:04a|              6:38a|
+----------+---------------------+-------------------+



In [103]:
query = """
SELECT horario, COUNT(*) AS hora_ultima_entrega
FROM tb_logistica
WHERE id_veiculo = 298
GROUP BY horario
"""
# Executando a query:
spark.sql(query).show()

+-------+-------------------+
|horario|hora_ultima_entrega|
+-------+-------------------+
|  8:33a|                  1|
|  8:28a|                  1|
|  8:17a|                  1|
|  8:39a|                  1|
|  8:04a|                  1|
|  7:58a|                  1|
|  9:07a|                  1|
+-------+-------------------+



In [104]:
query = """
SELECT horario, COUNT(*) AS hora_ultima_entrega
FROM tb_logistica
GROUP BY horario
"""
# Executando a query:
spark.sql(query).show()

+-------+-------------------+
|horario|hora_ultima_entrega|
+-------+-------------------+
|  8:33a|                  1|
|  6:45a|                  1|
|  8:28a|                  1|
|  5:39a|                  1|
|  8:17a|                  1|
|  8:39a|                  1|
|  7:32a|                  1|
|  6:56a|                  1|
|  5:04a|                  1|
|  6:14a|                  1|
|  5:13a|                  1|
|  8:04a|                  1|
|  6:05a|                  1|
|  5:27a|                  1|
|  6:24a|                  1|
|  7:58a|                  1|
|  5:47a|                  1|
|  6:38a|                  2|
|  9:07a|                  1|
|  6:21a|                  1|
+-------+-------------------+



In [105]:
query = """
SELECT horario, COUNT(*) AS hora_ultima_entrega
FROM tb_logistica
GROUP BY horario
HAVING COUNT(*) > 1
"""
# Executando a query:
spark.sql(query).show()

+-------+-------------------+
|horario|hora_ultima_entrega|
+-------+-------------------+
|  6:38a|                  2|
+-------+-------------------+



In [106]:
# Lista de horários de entrega:
lista_horarios = ['5:13a', '6:38a', '7:32a', '8:04a', '9:07a']

In [107]:
# Testando o filtro
df.filter(df.horario.isin(lista_horarios)).show()

+----------+---------+-------+
|id_veiculo|  entrega|horario|
+----------+---------+-------+
|       298|Entrega 2|  8:04a|
|       298|Entrega 7|  9:07a|
|       315|Entrega 4|  6:38a|
|       315|Entrega 7|  7:32a|
|       457|Entrega 2|  5:13a|
|       457|Entrega 7|  6:38a|
+----------+---------+-------+



In [111]:
df_pivot = df.filter(df.horario.isin(lista_horarios)).groupBy('id_veiculo').pivot('horario').count()
df_pivot.show()

+----------+-----+-----+-----+-----+-----+
|id_veiculo|5:13a|6:38a|7:32a|8:04a|9:07a|
+----------+-----+-----+-----+-----+-----+
|       298| NULL| NULL| NULL|    1|    1|
|       457|    1|    1| NULL| NULL| NULL|
|       315| NULL|    1|    1| NULL| NULL|
+----------+-----+-----+-----+-----+-----+



In [112]:
# Vamos converter para Pandas para visualizar melhor
pandasDF = df_pivot.toPandas()
pandasDF

Unnamed: 0,id_veiculo,5:13a,6:38a,7:32a,8:04a,9:07a
0,298,,,,1.0,1.0
1,457,1.0,1.0,,,
2,315,,1.0,1.0,,


In [114]:
query = """
SELECT * FROM(
    SELECT id_veiculo, horario
    FROM tb_logistica
)
PIVOT(
    COUNT(*)
    FOR horario in (
    '5:13a', '6:38a', '7:32a', '8:04a', '9:07a'
    )
)
ORDER BY id_veiculo
"""
# Executando a query:
spark.sql(query).show()

+----------+-----+-----+-----+-----+-----+
|id_veiculo|5:13a|6:38a|7:32a|8:04a|9:07a|
+----------+-----+-----+-----+-----+-----+
|       298| NULL| NULL| NULL|    1|    1|
|       315| NULL|    1|    1| NULL| NULL|
|       457|    1|    1| NULL| NULL| NULL|
+----------+-----+-----+-----+-----+-----+



### SQL Window Function para Agregação ao Longo do Tempo