# Problema de Negócio

O objetivo desteMini-Projeto é analisar um conjunto  de  dados  de  uma  transportadora  responsável  por  entrega  de  produtos.  Diversos veículos realizam diversas entregas em diferentes horários do dia. Usando Spark SQL vamos extrair insights e compreender como está a performance da logística de entrega da empresa.

In [3]:
import findspark
findspark.init()

In [4]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import Window # Função Window para realizar análise ao longo do tempo
from pyspark.sql.functions import col
from pyspark.sql.functions import row_number
from pyspark.sql.functions import lead
from pyspark.sql.functions import min, max
from pyspark.sql.functions import unix_timestamp

# preparando o ambiente SPARK

In [5]:
# Criando o contexto 
sc = SparkContext(appName = 'Mini-Projeto4')

In [6]:
# Cria a sessão
spark = SparkSession.builder.getOrCreate()

In [7]:
spark

## Carregando os Dados como Dataframe do Spark

In [8]:
# nome do arquivo
arquivo = "C:/formacao_dataScience_DSA_DADOS/01_bigData_RealTime_Python_Spark/cap13_Apache_Spark_SQL_2/dataset.txt"

In [9]:
# arrega o dataframe do Spark
df = spark.read.csv(arquivo,header = True)

In [10]:
type(df)

pyspark.sql.dataframe.DataFrame

In [11]:
df.show(5)

+----------+---------+-------+
|id_veiculo|  entrega|horario|
+----------+---------+-------+
|       298|Entrega 1|  7:58a|
|       298|Entrega 2|  8:04a|
|       298|Entrega 3|  8:17a|
|       298|Entrega 4|  8:28a|
|       298|Entrega 5|  8:33a|
+----------+---------+-------+
only showing top 5 rows



## Criando Tabela temporária

Criamos a tabela temporaria para executar consutas SQL nos dados. A tabela temporária existe somente nesta sessão spark

In [12]:
# Criando tabela temporaria
df.createOrReplaceTempView('tb_logistica')

#### Executando Querys SQL

In [13]:
spark.sql("show columns from tb_logistica").show()

+----------+
|  col_name|
+----------+
|id_veiculo|
|   entrega|
|   horario|
+----------+



In [14]:
spark.sql("select * from tb_logistica limit 5").show()

+----------+---------+-------+
|id_veiculo|  entrega|horario|
+----------+---------+-------+
|       298|Entrega 1|  7:58a|
|       298|Entrega 2|  8:04a|
|       298|Entrega 3|  8:17a|
|       298|Entrega 4|  8:28a|
|       298|Entrega 5|  8:33a|
+----------+---------+-------+



In [15]:
spark.sql("select id_veiculo from tb_logistica where entrega like '%1%'").show()

+----------+
|id_veiculo|
+----------+
|       298|
|       315|
|       457|
+----------+



In [16]:
spark.sql("describe tb_logistica").show()

+----------+---------+-------+
|  col_name|data_type|comment|
+----------+---------+-------+
|id_veiculo|   string|   null|
|   entrega|   string|   null|
|   horario|   string|   null|
+----------+---------+-------+



# Queries SQL x Dot Notaton no Spark SQL

In [17]:
# Query SQL
spark.sql("select id_veiculo as veiculo, entrega from tb_logistica limit 5").show()

+-------+---------+
|veiculo|  entrega|
+-------+---------+
|    298|Entrega 1|
|    298|Entrega 2|
|    298|Entrega 3|
|    298|Entrega 4|
|    298|Entrega 5|
+-------+---------+



In [18]:
df.select(col('id_veiculo').alias('veiculo'), 'entrega').limit(5).show()

+-------+---------+
|veiculo|  entrega|
+-------+---------+
|    298|Entrega 1|
|    298|Entrega 2|
|    298|Entrega 3|
|    298|Entrega 4|
|    298|Entrega 5|
+-------+---------+



# Funções SQL do Sark SQL

Embora seja mais fácil usar direto linguagem SQL, as funções do Spark SQL são otimizadas para trabalho em ambinete distribuido. Se estiver com problemas de performance ao processar grandes volumes de dados, é o caso testar o sparkSQL

In [19]:
df.columns

['id_veiculo', 'entrega', 'horario']

In [20]:
dfpandas = df.toPandas()

In [21]:
type(dfpandas)

pandas.core.frame.DataFrame

In [22]:
dfpandas.head()

Unnamed: 0,id_veiculo,entrega,horario
0,298,Entrega 1,7:58a
1,298,Entrega 2,8:04a
2,298,Entrega 3,8:17a
3,298,Entrega 4,8:28a
4,298,Entrega 5,8:33a


## Métodos Select e Collect 

#### Função select()

In [23]:
df.select('id_veiculo', 'entrega').show(10)

+----------+---------+
|id_veiculo|  entrega|
+----------+---------+
|       298|Entrega 1|
|       298|Entrega 2|
|       298|Entrega 3|
|       298|Entrega 4|
|       298|Entrega 5|
|       298|Entrega 6|
|       298|Entrega 7|
|       315|Entrega 1|
|       315|Entrega 2|
|       315|Entrega 3|
+----------+---------+
only showing top 10 rows



In [24]:
df.select(df.id_veiculo, df.entrega).show(10)

+----------+---------+
|id_veiculo|  entrega|
+----------+---------+
|       298|Entrega 1|
|       298|Entrega 2|
|       298|Entrega 3|
|       298|Entrega 4|
|       298|Entrega 5|
|       298|Entrega 6|
|       298|Entrega 7|
|       315|Entrega 1|
|       315|Entrega 2|
|       315|Entrega 3|
+----------+---------+
only showing top 10 rows



In [25]:
# Função col é uma forma de realizar as queries
from pyspark.sql.functions import col
df.select(col('id_veiculo'), col('entrega')).show(10)

+----------+---------+
|id_veiculo|  entrega|
+----------+---------+
|       298|Entrega 1|
|       298|Entrega 2|
|       298|Entrega 3|
|       298|Entrega 4|
|       298|Entrega 5|
|       298|Entrega 6|
|       298|Entrega 7|
|       315|Entrega 1|
|       315|Entrega 2|
|       315|Entrega 3|
+----------+---------+
only showing top 10 rows



In [26]:
# Podemos juntar as colunas do df cujos nomes estejam em lista
nomes_colunas = ['id_veiculo', 'entrega']
df.select(*nomes_colunas).show(10)

+----------+---------+
|id_veiculo|  entrega|
+----------+---------+
|       298|Entrega 1|
|       298|Entrega 2|
|       298|Entrega 3|
|       298|Entrega 4|
|       298|Entrega 5|
|       298|Entrega 6|
|       298|Entrega 7|
|       315|Entrega 1|
|       315|Entrega 2|
|       315|Entrega 3|
+----------+---------+
only showing top 10 rows



In [27]:
# Mesmo exemplo anterior, porém com list comprehension
df.select([coluna for coluna in nomes_colunas]).show(10)

+----------+---------+
|id_veiculo|  entrega|
+----------+---------+
|       298|Entrega 1|
|       298|Entrega 2|
|       298|Entrega 3|
|       298|Entrega 4|
|       298|Entrega 5|
|       298|Entrega 6|
|       298|Entrega 7|
|       315|Entrega 1|
|       315|Entrega 2|
|       315|Entrega 3|
+----------+---------+
only showing top 10 rows



In [28]:
# Seleção de colunas pelo indice
df.select(df.columns[:1]).show(10)

+----------+
|id_veiculo|
+----------+
|       298|
|       298|
|       298|
|       298|
|       298|
|       298|
|       298|
|       315|
|       315|
|       315|
+----------+
only showing top 10 rows



In [29]:
# Renomear coluna para melhorar compreensão
df.select('id_veiculo','entrega').withColumnRenamed('id_veiculo', 'veiculo').show(10)

+-------+---------+
|veiculo|  entrega|
+-------+---------+
|    298|Entrega 1|
|    298|Entrega 2|
|    298|Entrega 3|
|    298|Entrega 4|
|    298|Entrega 5|
|    298|Entrega 6|
|    298|Entrega 7|
|    315|Entrega 1|
|    315|Entrega 2|
|    315|Entrega 3|
+-------+---------+
only showing top 10 rows



In [30]:
# renomear colunas com alias
df.select(col('id_veiculo').alias('veiculo'), 'entrega').show(10)

+-------+---------+
|veiculo|  entrega|
+-------+---------+
|    298|Entrega 1|
|    298|Entrega 2|
|    298|Entrega 3|
|    298|Entrega 4|
|    298|Entrega 5|
|    298|Entrega 6|
|    298|Entrega 7|
|    315|Entrega 1|
|    315|Entrega 2|
|    315|Entrega 3|
+-------+---------+
only showing top 10 rows



In [31]:
# Selecionando colunas através de REGEX
df.select(df.colRegex("`^.*Entrega*`")).show()

+---------+
|  entrega|
+---------+
|Entrega 1|
|Entrega 2|
|Entrega 3|
|Entrega 4|
|Entrega 5|
|Entrega 6|
|Entrega 7|
|Entrega 1|
|Entrega 2|
|Entrega 3|
|Entrega 4|
|Entrega 5|
|Entrega 6|
|Entrega 7|
|Entrega 1|
|Entrega 2|
|Entrega 3|
|Entrega 4|
|Entrega 5|
|Entrega 6|
+---------+
only showing top 20 rows



#### Função Collect()

In [32]:
df.collect()

[Row(id_veiculo='298', entrega='Entrega 1', horario='7:58a'),
 Row(id_veiculo='298', entrega='Entrega 2', horario='8:04a'),
 Row(id_veiculo='298', entrega='Entrega 3', horario='8:17a'),
 Row(id_veiculo='298', entrega='Entrega 4', horario='8:28a'),
 Row(id_veiculo='298', entrega='Entrega 5', horario='8:33a'),
 Row(id_veiculo='298', entrega='Entrega 6', horario='8:39a'),
 Row(id_veiculo='298', entrega='Entrega 7', horario='9:07a'),
 Row(id_veiculo='315', entrega='Entrega 1', horario='6:05a'),
 Row(id_veiculo='315', entrega='Entrega 2', horario='6:14a'),
 Row(id_veiculo='315', entrega='Entrega 3', horario='6:24a'),
 Row(id_veiculo='315', entrega='Entrega 4', horario='6:38a'),
 Row(id_veiculo='315', entrega='Entrega 5', horario='6:45a'),
 Row(id_veiculo='315', entrega='Entrega 6', horario='6:56a'),
 Row(id_veiculo='315', entrega='Entrega 7', horario='7:32a'),
 Row(id_veiculo='457', entrega='Entrega 1', horario='5:04a'),
 Row(id_veiculo='457', entrega='Entrega 2', horario='5:13a'),
 Row(id_

In [33]:
# Verificando tipo
new_df = df.collect()
type(new_df)

list

In [34]:
# Fatiando a lista 
df.collect()[:3]

[Row(id_veiculo='298', entrega='Entrega 1', horario='7:58a'),
 Row(id_veiculo='298', entrega='Entrega 2', horario='8:04a'),
 Row(id_veiculo='298', entrega='Entrega 3', horario='8:17a')]

In [35]:
df.collect()[8][2]

'6:14a'

In [36]:
# Com Collect retorna uma lista, podemos percorrer a lista com um loop e concatenar as colunas
for row in df.collect():
    print(row['id_veiculo'] + "," + str(row['entrega']))

298,Entrega 1
298,Entrega 2
298,Entrega 3
298,Entrega 4
298,Entrega 5
298,Entrega 6
298,Entrega 7
315,Entrega 1
315,Entrega 2
315,Entrega 3
315,Entrega 4
315,Entrega 5
315,Entrega 6
315,Entrega 7
457,Entrega 1
457,Entrega 2
457,Entrega 3
457,Entrega 4
457,Entrega 5
457,Entrega 6
457,Entrega 7


In [37]:
# Filtrar colunas com select e filtrar linhas com collect
dataCollect = df.select('id_veiculo').collect()[0:4][:]
dataCollect

[Row(id_veiculo='298'),
 Row(id_veiculo='298'),
 Row(id_veiculo='298'),
 Row(id_veiculo='298')]

In [38]:
df.sample(0.6).collect()

[Row(id_veiculo='298', entrega='Entrega 3', horario='8:17a'),
 Row(id_veiculo='298', entrega='Entrega 6', horario='8:39a'),
 Row(id_veiculo='315', entrega='Entrega 2', horario='6:14a'),
 Row(id_veiculo='315', entrega='Entrega 3', horario='6:24a'),
 Row(id_veiculo='315', entrega='Entrega 5', horario='6:45a'),
 Row(id_veiculo='315', entrega='Entrega 6', horario='6:56a'),
 Row(id_veiculo='315', entrega='Entrega 7', horario='7:32a'),
 Row(id_veiculo='457', entrega='Entrega 1', horario='5:04a'),
 Row(id_veiculo='457', entrega='Entrega 2', horario='5:13a'),
 Row(id_veiculo='457', entrega='Entrega 3', horario='5:27a'),
 Row(id_veiculo='457', entrega='Entrega 7', horario='6:38a')]

#### Método Filter e Where

Filter faz a mesma coisa que o where, só que em sintaxe spark

In [39]:
# Filtrando os dados retornados com a função filter()
df.filter("entrega == 'Entrega 2'").show()

+----------+---------+-------+
|id_veiculo|  entrega|horario|
+----------+---------+-------+
|       298|Entrega 2|  8:04a|
|       315|Entrega 2|  6:14a|
|       457|Entrega 2|  5:13a|
+----------+---------+-------+



In [40]:
df.filter("entrega != 'Entrega 2'").show()

+----------+---------+-------+
|id_veiculo|  entrega|horario|
+----------+---------+-------+
|       298|Entrega 1|  7:58a|
|       298|Entrega 3|  8:17a|
|       298|Entrega 4|  8:28a|
|       298|Entrega 5|  8:33a|
|       298|Entrega 6|  8:39a|
|       298|Entrega 7|  9:07a|
|       315|Entrega 1|  6:05a|
|       315|Entrega 3|  6:24a|
|       315|Entrega 4|  6:38a|
|       315|Entrega 5|  6:45a|
|       315|Entrega 6|  6:56a|
|       315|Entrega 7|  7:32a|
|       457|Entrega 1|  5:04a|
|       457|Entrega 3|  5:27a|
|       457|Entrega 4|  5:39a|
|       457|Entrega 5|  5:47a|
|       457|Entrega 6|  6:21a|
|       457|Entrega 7|  6:38a|
+----------+---------+-------+



In [41]:
# Filtrando com múltiplas condições
df.filter((df.entrega == 'Entrega 2') & (df.id_veiculo == 298)).show()

+----------+---------+-------+
|id_veiculo|  entrega|horario|
+----------+---------+-------+
|       298|Entrega 2|  8:04a|
+----------+---------+-------+



In [42]:
df.filter(df.entrega.like('%4%')).show()

+----------+---------+-------+
|id_veiculo|  entrega|horario|
+----------+---------+-------+
|       298|Entrega 4|  8:28a|
|       315|Entrega 4|  6:38a|
|       457|Entrega 4|  5:39a|
+----------+---------+-------+



In [43]:
df.filter(df.horario.isin('8:28a')).show()

+----------+---------+-------+
|id_veiculo|  entrega|horario|
+----------+---------+-------+
|       298|Entrega 4|  8:28a|
+----------+---------+-------+



In [44]:
df.where("entrega == 'Entrega 2'").show()

+----------+---------+-------+
|id_veiculo|  entrega|horario|
+----------+---------+-------+
|       298|Entrega 2|  8:04a|
|       315|Entrega 2|  6:14a|
|       457|Entrega 2|  5:13a|
+----------+---------+-------+



### Métodos Order By e Sort

Order By é da linguagem padrão SQL - Sort é o equivalente da sintaxe SPARK

In [45]:
# Ordenação a seleção das linhas
df.sort("horario", "entrega").show()

+----------+---------+-------+
|id_veiculo|  entrega|horario|
+----------+---------+-------+
|       457|Entrega 1|  5:04a|
|       457|Entrega 2|  5:13a|
|       457|Entrega 3|  5:27a|
|       457|Entrega 4|  5:39a|
|       457|Entrega 5|  5:47a|
|       315|Entrega 1|  6:05a|
|       315|Entrega 2|  6:14a|
|       457|Entrega 6|  6:21a|
|       315|Entrega 3|  6:24a|
|       315|Entrega 4|  6:38a|
|       457|Entrega 7|  6:38a|
|       315|Entrega 5|  6:45a|
|       315|Entrega 6|  6:56a|
|       315|Entrega 7|  7:32a|
|       298|Entrega 1|  7:58a|
|       298|Entrega 2|  8:04a|
|       298|Entrega 3|  8:17a|
|       298|Entrega 4|  8:28a|
|       298|Entrega 5|  8:33a|
|       298|Entrega 6|  8:39a|
+----------+---------+-------+
only showing top 20 rows



In [46]:
# Mesmo resultado, porém com a função col()
df.sort(col('horario'), col('entrega')).show(10)

+----------+---------+-------+
|id_veiculo|  entrega|horario|
+----------+---------+-------+
|       457|Entrega 1|  5:04a|
|       457|Entrega 2|  5:13a|
|       457|Entrega 3|  5:27a|
|       457|Entrega 4|  5:39a|
|       457|Entrega 5|  5:47a|
|       315|Entrega 1|  6:05a|
|       315|Entrega 2|  6:14a|
|       457|Entrega 6|  6:21a|
|       315|Entrega 3|  6:24a|
|       315|Entrega 4|  6:38a|
+----------+---------+-------+
only showing top 10 rows



In [47]:
# Utilizando Order By
df.orderBy(col('horario'), col('entrega')).show(10)

+----------+---------+-------+
|id_veiculo|  entrega|horario|
+----------+---------+-------+
|       457|Entrega 1|  5:04a|
|       457|Entrega 2|  5:13a|
|       457|Entrega 3|  5:27a|
|       457|Entrega 4|  5:39a|
|       457|Entrega 5|  5:47a|
|       315|Entrega 1|  6:05a|
|       315|Entrega 2|  6:14a|
|       457|Entrega 6|  6:21a|
|       315|Entrega 3|  6:24a|
|       315|Entrega 4|  6:38a|
+----------+---------+-------+
only showing top 10 rows



In [48]:
df.sort(df.horario.desc(), df.entrega.desc()).show(10)

+----------+---------+-------+
|id_veiculo|  entrega|horario|
+----------+---------+-------+
|       298|Entrega 7|  9:07a|
|       298|Entrega 6|  8:39a|
|       298|Entrega 5|  8:33a|
|       298|Entrega 4|  8:28a|
|       298|Entrega 3|  8:17a|
|       298|Entrega 2|  8:04a|
|       298|Entrega 1|  7:58a|
|       315|Entrega 7|  7:32a|
|       315|Entrega 6|  6:56a|
|       315|Entrega 5|  6:45a|
+----------+---------+-------+
only showing top 10 rows



In [49]:
# lembrando que é possível utilizar direto a linguagem SQL
spark.sql('select * from tb_logistica order by horario desc').show(10)

+----------+---------+-------+
|id_veiculo|  entrega|horario|
+----------+---------+-------+
|       298|Entrega 7|  9:07a|
|       298|Entrega 6|  8:39a|
|       298|Entrega 5|  8:33a|
|       298|Entrega 4|  8:28a|
|       298|Entrega 3|  8:17a|
|       298|Entrega 2|  8:04a|
|       298|Entrega 1|  7:58a|
|       315|Entrega 7|  7:32a|
|       315|Entrega 6|  6:56a|
|       315|Entrega 5|  6:45a|
+----------+---------+-------+
only showing top 10 rows



### Métodos Map, FlatMap e Explode

Mapeamento de dados (apply)

In [50]:
#  maps são aplicados em RDDs e por isso precisamos converter os DF para RDD
# o Método Map retorna um RDD e por isso temos que converter de volta para DF
rdd2 = df.rdd.map(lambda x: (x[0] + ',' + x[1], x[2]))
df2 = rdd2.toDF(['novo_id', 'entrega'])
df2.show(5)

+-------------+-------+
|      novo_id|entrega|
+-------------+-------+
|298,Entrega 1|  7:58a|
|298,Entrega 2|  8:04a|
|298,Entrega 3|  8:17a|
|298,Entrega 4|  8:28a|
|298,Entrega 5|  8:33a|
+-------------+-------+
only showing top 5 rows



In [51]:
df.columns

['id_veiculo', 'entrega', 'horario']

In [52]:
# Mesmo exemplo do anterior, só mudando o slicing
rdd2 = df.rdd.map(lambda x: (x['id_veiculo'] + ',' + x['entrega'], x['horario']))
df2 = rdd2.toDF(['novo_id', 'entrega'])
df2.show(5)

+-------------+-------+
|      novo_id|entrega|
+-------------+-------+
|298,Entrega 1|  7:58a|
|298,Entrega 2|  8:04a|
|298,Entrega 3|  8:17a|
|298,Entrega 4|  8:28a|
|298,Entrega 5|  8:33a|
+-------------+-------+
only showing top 5 rows



In [53]:
# Criando uma função que manipula as colunas

def manipula_colunas(x):
    coluna1 = x.id_veiculo
    coluna2 = x.entrega
    novo_id = coluna1 + '-' + coluna2
    coluna3 = x.horario
    return (novo_id, coluna3)

In [54]:
rdd2 = df.rdd.map(lambda x: manipula_colunas(x))

In [55]:
# Collect no RDD
rdd2.collect()

[('298-Entrega 1', '7:58a'),
 ('298-Entrega 2', '8:04a'),
 ('298-Entrega 3', '8:17a'),
 ('298-Entrega 4', '8:28a'),
 ('298-Entrega 5', '8:33a'),
 ('298-Entrega 6', '8:39a'),
 ('298-Entrega 7', '9:07a'),
 ('315-Entrega 1', '6:05a'),
 ('315-Entrega 2', '6:14a'),
 ('315-Entrega 3', '6:24a'),
 ('315-Entrega 4', '6:38a'),
 ('315-Entrega 5', '6:45a'),
 ('315-Entrega 6', '6:56a'),
 ('315-Entrega 7', '7:32a'),
 ('457-Entrega 1', '5:04a'),
 ('457-Entrega 2', '5:13a'),
 ('457-Entrega 3', '5:27a'),
 ('457-Entrega 4', '5:39a'),
 ('457-Entrega 5', '5:47a'),
 ('457-Entrega 6', '6:21a'),
 ('457-Entrega 7', '6:38a')]

##### Explode
Explode deve receber uma lista como argumento, mas essa lista pode estar em uma coluna de um dataframe

In [56]:
from pyspark.sql.functions import explode

In [57]:
# Cria lista
array_estudantes = [('Bob', ['Python', 'R', 'Scala']),
                   ('Maria', ['Java', 'Julia']),
                   ('Zico', ['Javascript','']),
                   ('Ana', [None, None])]

In [58]:
type(array_estudantes)

list

In [59]:
# Convertendo uma lista para dataframe
df_estudantes = spark.createDataFrame(data = array_estudantes, schema = ['aluno', 'linguagem'])

In [60]:
# Sem utilizar o método explode()
df_estudantes.show()

+-----+------------------+
|aluno|         linguagem|
+-----+------------------+
|  Bob|[Python, R, Scala]|
|Maria|     [Java, Julia]|
| Zico|    [Javascript, ]|
|  Ana|      [null, null]|
+-----+------------------+



In [61]:
# Com utilização do Explode
df2 = df_estudantes.select(df_estudantes.aluno, explode(df_estudantes.linguagem))
df2.printSchema()
df2.show()

root
 |-- aluno: string (nullable = true)
 |-- col: string (nullable = true)

+-----+----------+
|aluno|       col|
+-----+----------+
|  Bob|    Python|
|  Bob|         R|
|  Bob|     Scala|
|Maria|      Java|
|Maria|     Julia|
| Zico|Javascript|
| Zico|          |
|  Ana|      null|
|  Ana|      null|
+-----+----------+



# Agregação com Spark SQL

- Agregação com Funções do Spark SQL

In [62]:
df.columns

['id_veiculo', 'entrega', 'horario']

In [63]:
df.groupBy('id_veiculo').count().show()

+----------+-----+
|id_veiculo|count|
+----------+-----+
|       298|    7|
|       457|    7|
|       315|    7|
+----------+-----+



In [64]:
df.groupBy('id_veiculo').agg({'horario': 'min'}).show()

+----------+------------+
|id_veiculo|min(horario)|
+----------+------------+
|       298|       7:58a|
|       315|       6:05a|
|       457|       5:04a|
+----------+------------+



In [65]:
df.groupBy('id_veiculo').agg({'horario': 'max'}).show()

+----------+------------+
|id_veiculo|max(horario)|
+----------+------------+
|       298|       9:07a|
|       315|       7:32a|
|       457|       6:38a|
+----------+------------+



In [66]:
df.groupBy('id_veiculo').agg({'horario': 'count'}).show()

+----------+--------------+
|id_veiculo|count(horario)|
+----------+--------------+
|       298|             7|
|       457|             7|
|       315|             7|
+----------+--------------+



In [67]:
df.groupBy('id_veiculo').agg({'horario': 'count'}).withColumnRenamed('count(horario)','numero_entregas').show()

+----------+---------------+
|id_veiculo|numero_entregas|
+----------+---------------+
|       298|              7|
|       457|              7|
|       315|              7|
+----------+---------------+



In [68]:
df.groupBy('id_veiculo').agg({'horario': 'min'}).withColumnRenamed('min(horario)','primeira_entrega').show()

+----------+----------------+
|id_veiculo|primeira_entrega|
+----------+----------------+
|       298|           7:58a|
|       315|           6:05a|
|       457|           5:04a|
+----------+----------------+



#### Queries SQL

In [69]:
spark.sql("""
select id_veiculo, count(*) as numero_entregas 
from tb_logistica 
GROUP BY id_veiculo
""").show()

+----------+---------------+
|id_veiculo|numero_entregas|
+----------+---------------+
|       298|              7|
|       457|              7|
|       315|              7|
+----------+---------------+



In [70]:
spark.sql("""
select id_veiculo, min(horario) as hora_primeira_entrega, max(horario) as ultima_entrega 
from tb_logistica 
GROUP BY id_veiculo
""").show()

+----------+---------------------+--------------+
|id_veiculo|hora_primeira_entrega|ultima_entrega|
+----------+---------------------+--------------+
|       298|                7:58a|         9:07a|
|       315|                6:05a|         7:32a|
|       457|                5:04a|         6:38a|
+----------+---------------------+--------------+



In [71]:
spark.sql("""
select horario, count(*) as numero_entregas 
from tb_logistica
where id_veiculo = 298
GROUP BY horario
""").show()

+-------+---------------+
|horario|numero_entregas|
+-------+---------------+
|  8:33a|              1|
|  8:28a|              1|
|  8:17a|              1|
|  8:39a|              1|
|  8:04a|              1|
|  7:58a|              1|
|  9:07a|              1|
+-------+---------------+



In [72]:
spark.sql("""
select horario, count(*) as hora_ultima_entrega 
from tb_logistica
GROUP BY horario
having count(*)>1
""").show()

+-------+-------------------+
|horario|hora_ultima_entrega|
+-------+-------------------+
|  6:38a|                  2|
+-------+-------------------+



### Realizando pivot de um Dataframe

In [73]:
# Lista de horários de entregas
lista_horarios = ['5:13a', '6:38a', '7:32a', '8:04a', '9:07a']

In [74]:
# Testamos o filtro
df.filter(df.horario.isin(lista_horarios)).show()

+----------+---------+-------+
|id_veiculo|  entrega|horario|
+----------+---------+-------+
|       298|Entrega 2|  8:04a|
|       298|Entrega 7|  9:07a|
|       315|Entrega 4|  6:38a|
|       315|Entrega 7|  7:32a|
|       457|Entrega 2|  5:13a|
|       457|Entrega 7|  6:38a|
+----------+---------+-------+



In [75]:
# Pivot é uma função de agregação SPARK
df_pivot = df.filter(df.horario.isin(lista_horarios)).groupBy('id_veiculo').pivot('horario').count()

In [76]:
df_pivot.show()

+----------+-----+-----+-----+-----+-----+
|id_veiculo|5:13a|6:38a|7:32a|8:04a|9:07a|
+----------+-----+-----+-----+-----+-----+
|       298| null| null| null|    1|    1|
|       457|    1|    1| null| null| null|
|       315| null|    1|    1| null| null|
+----------+-----+-----+-----+-----+-----+



In [77]:
# Converter para DF pandas para visualizar melhor o DF
pandasDF = df_pivot.toPandas()

In [78]:
pandasDF.head()

Unnamed: 0,id_veiculo,5:13a,6:38a,7:32a,8:04a,9:07a
0,298,,,,1.0,1.0
1,457,1.0,1.0,,,
2,315,,1.0,1.0,,


In [79]:
# Pivot com sintaxe SQL
query = """
select * from (
    select id_veiculo, horario from tb_logistica
    )
pivot (
    count(*)
    for horario in (
        '5:13a', '6:38a', '7:32a', '8:04a', '9:07a')
)
order by id_veiculo
"""

In [80]:
spark.sql(query).show()

+----------+-----+-----+-----+-----+-----+
|id_veiculo|5:13a|6:38a|7:32a|8:04a|9:07a|
+----------+-----+-----+-----+-----+-----+
|       298| null| null| null|    1|    1|
|       315| null|    1|    1| null| null|
|       457|    1|    1| null| null| null|
+----------+-----+-----+-----+-----+-----+



# ---------

# Funções WINDOW

### SQL Window FUnctionc para Agregação ao Longo do Tempo

In [81]:
# Isso é gregaão por colunas
spark.sql("""
select id_veiculo, count(*) as numero_entregas
from tb_logistica
group by id_veiculo
""").show()

+----------+---------------+
|id_veiculo|numero_entregas|
+----------+---------------+
|       298|              7|
|       457|              7|
|       315|              7|
+----------+---------------+



In [82]:
# Montar um 'ranking' das entregas por número da entrega de cada veículo, com base no horário
spark.sql("""
select id_veiculo, entrega, horario,
row_number() over (partition by entrega order by horario) as ranking
from tb_logistica
""").show(21)

+----------+---------+-------+-------+
|id_veiculo|  entrega|horario|ranking|
+----------+---------+-------+-------+
|       457|Entrega 1|  5:04a|      1|
|       315|Entrega 1|  6:05a|      2|
|       298|Entrega 1|  7:58a|      3|
|       457|Entrega 2|  5:13a|      1|
|       315|Entrega 2|  6:14a|      2|
|       298|Entrega 2|  8:04a|      3|
|       457|Entrega 3|  5:27a|      1|
|       315|Entrega 3|  6:24a|      2|
|       298|Entrega 3|  8:17a|      3|
|       457|Entrega 4|  5:39a|      1|
|       315|Entrega 4|  6:38a|      2|
|       298|Entrega 4|  8:28a|      3|
|       457|Entrega 5|  5:47a|      1|
|       315|Entrega 5|  6:45a|      2|
|       298|Entrega 5|  8:33a|      3|
|       457|Entrega 6|  6:21a|      1|
|       315|Entrega 6|  6:56a|      2|
|       298|Entrega 6|  8:39a|      3|
|       457|Entrega 7|  6:38a|      1|
|       315|Entrega 7|  7:32a|      2|
|       298|Entrega 7|  9:07a|      3|
+----------+---------+-------+-------+



In [83]:
# Sintaxe do mesmo exemplo utilizando apenas sintaxe do sparkSQL
df.withColumn('id', row_number().over(Window.partitionBy('entrega').orderBy('horario'))).show(21)

+----------+---------+-------+---+
|id_veiculo|  entrega|horario| id|
+----------+---------+-------+---+
|       457|Entrega 1|  5:04a|  1|
|       315|Entrega 1|  6:05a|  2|
|       298|Entrega 1|  7:58a|  3|
|       457|Entrega 2|  5:13a|  1|
|       315|Entrega 2|  6:14a|  2|
|       298|Entrega 2|  8:04a|  3|
|       457|Entrega 3|  5:27a|  1|
|       315|Entrega 3|  6:24a|  2|
|       298|Entrega 3|  8:17a|  3|
|       457|Entrega 4|  5:39a|  1|
|       315|Entrega 4|  6:38a|  2|
|       298|Entrega 4|  8:28a|  3|
|       457|Entrega 5|  5:47a|  1|
|       315|Entrega 5|  6:45a|  2|
|       298|Entrega 5|  8:33a|  3|
|       457|Entrega 6|  6:21a|  1|
|       315|Entrega 6|  6:56a|  2|
|       298|Entrega 6|  8:39a|  3|
|       457|Entrega 7|  6:38a|  1|
|       315|Entrega 7|  7:32a|  2|
|       298|Entrega 7|  9:07a|  3|
+----------+---------+-------+---+



In [86]:
# Para cada entrega, mostrar o horário da entrega anterior por id_veiculo
spark.sql("""
select id_veiculo, entrega, horario,
lead(horario, 1) over (partition by id_veiculo order by horario) as proxima_entrega
from tb_logistica
""").show(21)

+----------+---------+-------+---------------+
|id_veiculo|  entrega|horario|proxima_entrega|
+----------+---------+-------+---------------+
|       298|Entrega 1|  7:58a|          8:04a|
|       298|Entrega 2|  8:04a|          8:17a|
|       298|Entrega 3|  8:17a|          8:28a|
|       298|Entrega 4|  8:28a|          8:33a|
|       298|Entrega 5|  8:33a|          8:39a|
|       298|Entrega 6|  8:39a|          9:07a|
|       298|Entrega 7|  9:07a|           null|
|       315|Entrega 1|  6:05a|          6:14a|
|       315|Entrega 2|  6:14a|          6:24a|
|       315|Entrega 3|  6:24a|          6:38a|
|       315|Entrega 4|  6:38a|          6:45a|
|       315|Entrega 5|  6:45a|          6:56a|
|       315|Entrega 6|  6:56a|          7:32a|
|       315|Entrega 7|  7:32a|           null|
|       457|Entrega 1|  5:04a|          5:13a|
|       457|Entrega 2|  5:13a|          5:27a|
|       457|Entrega 3|  5:27a|          5:39a|
|       457|Entrega 4|  5:39a|          5:47a|
|       457|E

# Parse de Data para Agregação ao Longo do Tempo

Caulcule o tempo (em minutos) para a próxima entrega de cada veículo

In [87]:
# Define o time parser policy
spark.sql("""
set spark.sql.legacy.timeParserPolicy=LEGACY
""")

DataFrame[key: string, value: string]

In [88]:
# Cria janela
window = Window.partitionBy('id_veiculo').orderBy('horario')

In [89]:
# Agregação por linha para calcular a diferença entre os horários
dot_df = df.withColumn('tempo_proxima_entrega',
                      (unix_timestamp(lead('horario',1).over(window), 'H:m') - 
                      unix_timestamp('horario', 'H:m'))/60).show(21)

+----------+---------+-------+---------------------+
|id_veiculo|  entrega|horario|tempo_proxima_entrega|
+----------+---------+-------+---------------------+
|       298|Entrega 1|  7:58a|                  6.0|
|       298|Entrega 2|  8:04a|                 13.0|
|       298|Entrega 3|  8:17a|                 11.0|
|       298|Entrega 4|  8:28a|                  5.0|
|       298|Entrega 5|  8:33a|                  6.0|
|       298|Entrega 6|  8:39a|                 28.0|
|       298|Entrega 7|  9:07a|                 null|
|       315|Entrega 1|  6:05a|                  9.0|
|       315|Entrega 2|  6:14a|                 10.0|
|       315|Entrega 3|  6:24a|                 14.0|
|       315|Entrega 4|  6:38a|                  7.0|
|       315|Entrega 5|  6:45a|                 11.0|
|       315|Entrega 6|  6:56a|                 36.0|
|       315|Entrega 7|  7:32a|                 null|
|       457|Entrega 1|  5:04a|                  9.0|
|       457|Entrega 2|  5:13a|                