In [None]:
%%sh
sudo pip install spark
sudo pip install pyspark

In [None]:
#Instalar e importar as bibliotecas do spark e pyspark (as mesmas disponibilizadas nas aulas).
import pandas as pd
#Importando o spark e o pyspark
import spark,pyspark,json
#Importando as bibliotecas do pyspark.sql 
from pyspark.sql import *
#Importando as funções sql do spark
#documentação https://spark.apache.org/docs/latest/api/sql/index.html
from pyspark.sql import functions as sf, Row
#Importando os tipos de dados do spark
#documentação https://spark.apache.org/docs/latest/sql-ref-datatypes.html
from pyspark.sql import types as t 
#Biblioteca datetime
from datetime import datetime, date
 
spark = SparkSession.builder.master("local").appName("window").getOrCreate()
 

In [None]:
#Order, pedido do cliente
pedido = [Row(order_id=1,sale_date='2022-01-01'),
      Row(order_id=2,sale_date='2022-01-02')]
df_pedido = spark.createDataFrame(pedido)

#Entrega, dentro de um pedido, temos diversas entregas
entrega= [Row(order_id=1,delivery_code= '1_A',status='entregue'),Row(order_id=1,delivery_code= '1_B',status='pendente'),
             Row(order_id=2,delivery_code= '2_A',status='entregue'),Row(order_id=2,delivery_code= '2_B',status='pendente')]
df_entrega = spark.createDataFrame(entrega)

#Produtos, dentro de uma entrega, temos diversos produtos
produto=[Row(sku='x',produto='tenis',delivery_code='1_A',valor=100,),
            Row(sku='x1',produto='chuteira',delivery_code='1_A',valor=20),
            Row(sku='z',produto='camisa',delivery_code='1_B',valor=300),
            Row(sku='z1',produto='calça',delivery_code='1_B',valor=100),
            Row(sku='y',produto='luva',delivery_code='2_A',valor=300),
            Row(sku='y1',produto='bermuda',delivery_code='2_A',valor=200),
            Row(sku='r',produto='skate',delivery_code='2_B',valor=50),
            Row(sku='r1',produto='sapato',delivery_code='2_B',valor=1000)]
            
df_produto = spark.createDataFrame(produto)            

df_pedido_entrega_r = df_pedido.join(df_entrega,['order_id'])

df_f = df_pedido_entrega_r.join(df_produto,['delivery_code'])
df_f.show(truncate=False)

#order id = id do pedido
#delivery code

+-------------+--------+----------+--------+---+--------+-----+
|delivery_code|order_id|sale_date |status  |sku|produto |valor|
+-------------+--------+----------+--------+---+--------+-----+
|1_A          |1       |2022-01-01|entregue|x  |tenis   |100  |
|1_A          |1       |2022-01-01|entregue|x1 |chuteira|20   |
|1_B          |1       |2022-01-01|pendente|z  |camisa  |300  |
|1_B          |1       |2022-01-01|pendente|z1 |calça   |100  |
|2_B          |2       |2022-01-02|pendente|r  |skate   |50   |
|2_B          |2       |2022-01-02|pendente|r1 |sapato  |1000 |
|2_A          |2       |2022-01-02|entregue|y  |luva    |300  |
|2_A          |2       |2022-01-02|entregue|y1 |bermuda |200  |
+-------------+--------+----------+--------+---+--------+-----+



In [None]:
df_3 = df_f.select(sf.sum("valor").alias("valor"),
                                    sf.count("*").alias("quantidade"),
                                    sf.max("sale_date").alias("ultima_data"),
                                    sf.min("valor").alias("valor_minimo"),sf.collect_list("delivery_code"))
df_3.show(truncate=False)

+-----+----------+-----------+------------+----------------------------------------+
|valor|quantidade|ultima_data|valor_minimo|collect_list(delivery_code)             |
+-----+----------+-----------+------------+----------------------------------------+
|2070 |8         |2022-01-02 |20          |[1_A, 1_A, 1_B, 1_B, 2_B, 2_B, 2_A, 2_A]|
+-----+----------+-----------+------------+----------------------------------------+



In [None]:
df_2 = df_f.groupBy("order_id","sale_date").agg(sf.sum("valor").alias("valor"),
                                    sf.count("*").alias("quantidade"),
                                    sf.max("sale_date").alias("ultima_data"),
                                    sf.min("valor").alias("valor_minimo"),sf.collect_list("delivery_code"))
df_2.show()

+--------+----------+-----+----------+-----------+------------+---------------------------+
|order_id| sale_date|valor|quantidade|ultima_data|valor_minimo|collect_list(delivery_code)|
+--------+----------+-----+----------+-----------+------------+---------------------------+
|       1|2022-01-01|  520|         4| 2022-01-01|          20|       [1_A, 1_A, 1_B, 1_B]|
|       2|2022-01-02| 1550|         4| 2022-01-02|          50|       [2_B, 2_B, 2_A, 2_A]|
+--------+----------+-----+----------+-----------+------------+---------------------------+



In [None]:
df_g = df_f.groupBy("delivery_code").agg(sf.sum("valor").alias("valor_agregado"))
df_g2 = df_f.groupBy("order_id").agg(sf.sum("valor").alias("valor_total"))

df_f2 = df_f.join(df_g,['delivery_code'])
df_f2 = df_f2.join(df_g2,['order_id'])

df_f2 = df_f2.withColumn("percentual_delivery",((sf.col("valor")/sf.col("valor_agregado"))*100).cast("decimal(10,4)")) #valor agregado não existe
df_f2 = df_f2.withColumn("percentual_total",((sf.col("valor")/sf.col("valor_total"))*100).cast("decimal(10,4)"))
df_f2.show()

+--------+-------------+----------+--------+---+--------+-----+--------------+-----------+-------------------+----------------+
|order_id|delivery_code| sale_date|  status|sku| produto|valor|valor_agregado|valor_total|percentual_delivery|percentual_total|
+--------+-------------+----------+--------+---+--------+-----+--------------+-----------+-------------------+----------------+
|       1|          1_A|2022-01-01|entregue|  x|   tenis|  100|           120|        520|            83.3333|         19.2308|
|       1|          1_A|2022-01-01|entregue| x1|chuteira|   20|           120|        520|            16.6667|          3.8462|
|       1|          1_B|2022-01-01|pendente|  z|  camisa|  300|           400|        520|            75.0000|         57.6923|
|       1|          1_B|2022-01-01|pendente| z1|   calça|  100|           400|        520|            25.0000|         19.2308|
|       2|          2_A|2022-01-02|entregue|  y|    luva|  300|           500|       1550|            60

In [None]:
janela1=  Window.partitionBy("delivery_code")
df_f2 = df_f.withColumn("valor_agregado",sf.sum("valor").over(janela1))

df_f2 = df_f2.withColumn("percentual_delivery",((sf.col("valor")/sf.col("valor_agregado"))*100).cast("decimal(10,4)"))
df_f2.orderBy("order_id","produto").show()

+-------------+--------+----------+--------+---+--------+-----+--------------+----------------------+-------------------+
|delivery_code|order_id| sale_date|  status|sku| produto|valor|valor_agregado|valor_agregado_corrido|percentual_delivery|
+-------------+--------+----------+--------+---+--------+-----+--------------+----------------------+-------------------+
|          1_B|       1|2022-01-01|pendente| z1|   calça|  100|           400|                   100|            25.0000|
|          1_B|       1|2022-01-01|pendente|  z|  camisa|  300|           400|                   300|            75.0000|
|          1_A|       1|2022-01-01|entregue| x1|chuteira|   20|           120|                    20|            16.6667|
|          1_A|       1|2022-01-01|entregue|  x|   tenis|  100|           120|                   100|            83.3333|
|          2_A|       2|2022-01-02|entregue| y1| bermuda|  200|           500|                   200|            40.0000|
|          2_A|       2|

In [None]:
janela2=  Window.orderBy("order_id","delivery_code","produto")
df_f2 = df_f.withColumn("valor_corrido",sf.sum("valor").over(janela2))
df_f2.show()

+-------------+--------+----------+--------+---+--------+-----+-------------+
|delivery_code|order_id| sale_date|  status|sku| produto|valor|valor_corrido|
+-------------+--------+----------+--------+---+--------+-----+-------------+
|          1_A|       1|2022-01-01|entregue| x1|chuteira|   20|           20|
|          1_A|       1|2022-01-01|entregue|  x|   tenis|  100|          120|
|          1_B|       1|2022-01-01|pendente| z1|   calça|  100|          220|
|          1_B|       1|2022-01-01|pendente|  z|  camisa|  300|          520|
|          2_A|       2|2022-01-02|entregue| y1| bermuda|  200|          720|
|          2_A|       2|2022-01-02|entregue|  y|    luva|  300|         1020|
|          2_B|       2|2022-01-02|pendente| r1|  sapato| 1000|         2020|
|          2_B|       2|2022-01-02|pendente|  r|   skate|   50|         2070|
+-------------+--------+----------+--------+---+--------+-----+-------------+



In [None]:
#Utilizando window functions Agregada - eliminamos um group by e um join
#utiliza a Classe Window - partition by delimitamos uma partição dos dados para criar agrupamentos
from pyspark.sql import Window
janela1=  Window.partitionBy("order_id")
janela3= Window.partitionBy("order_id").orderBy("order_id","delivery_code")
df_f2 = df_f.withColumn("soma_total",sf.sum("valor").over(janela1))

#utiliza a Classe Window - order by cria uma soma cumulátiva
janela2= Window.partitionBy("order_id").orderBy("order_id","delivery_code","produto")

df_f2 = df_f2.withColumn("soma_total_acumulada",sf.sum("valor").over(janela2))
df_f2.orderBy("order_id","delivery_code").show(truncate=False)

+-------------+--------+----------+--------+---+--------+-----+----------+--------------------+
|delivery_code|order_id|sale_date |status  |sku|produto |valor|soma_total|soma_total_acumulada|
+-------------+--------+----------+--------+---+--------+-----+----------+--------------------+
|1_A          |1       |2022-01-01|entregue|x1 |chuteira|20   |520       |20                  |
|1_A          |1       |2022-01-01|entregue|x  |tenis   |100  |520       |120                 |
|1_B          |1       |2022-01-01|pendente|z1 |calça   |100  |520       |220                 |
|1_B          |1       |2022-01-01|pendente|z  |camisa  |300  |520       |520                 |
|2_A          |2       |2022-01-02|entregue|y1 |bermuda |200  |1550      |200                 |
|2_A          |2       |2022-01-02|entregue|y  |luva    |300  |1550      |500                 |
|2_B          |2       |2022-01-02|pendente|r1 |sapato  |1000 |1550      |1500                |
|2_B          |2       |2022-01-02|pende

In [None]:
#Utilizando window functions Analitica
janela2= Window.orderBy("order_id","delivery_code")

janela3= Window.partitionBy("order_id").orderBy("order_id","delivery_code")

df_f2 = df_f.withColumn("lag_column",sf.lag("valor",1).over(janela2))
df_f2 = df_f2.withColumn("lead_column",sf.lead("valor",1).over(janela2))


df_f2 = df_f2.withColumn("lag_column_p",sf.lag("valor",1).over(janela3))


df_f2.show(truncate=False)

+-------------+--------+----------+--------+---+--------+-----+----------+-----------+------------+
|delivery_code|order_id|sale_date |status  |sku|produto |valor|lag_column|lead_column|lag_column_p|
+-------------+--------+----------+--------+---+--------+-----+----------+-----------+------------+
|1_A          |1       |2022-01-01|entregue|x  |tenis   |100  |null      |20         |null        |
|1_A          |1       |2022-01-01|entregue|x1 |chuteira|20   |100       |300        |100         |
|1_B          |1       |2022-01-01|pendente|z  |camisa  |300  |20        |100        |20          |
|1_B          |1       |2022-01-01|pendente|z1 |calça   |100  |300       |300        |300         |
|2_A          |2       |2022-01-02|entregue|y  |luva    |300  |100       |200        |null        |
|2_A          |2       |2022-01-02|entregue|y1 |bermuda |200  |300       |50         |300         |
|2_B          |2       |2022-01-02|pendente|r  |skate   |50   |200       |1000       |200         |


In [None]:
#Criando linhas repetidas de produtos
produto=[Row(sku='x',produto='tenis',delivery_code='1_A',valor=100,data_update='2022-01-01'),
         Row(sku='x',produto='tenis',delivery_code='1_A',valor=100,data_update='2022-01-01'),
         Row(sku='x',produto='tenis',delivery_code='1_A',valor=100,data_update='2022-01-01'),
         Row(sku='x',produto='tenis',delivery_code='1_A',valor=100,data_update='2022-01-02'),
         Row(sku='x',produto='tenis',delivery_code='1_A',valor=100,data_update='2022-01-03'),
         Row(sku='x',produto='tenis',delivery_code='1_A',valor=100,data_update='2022-01-03'),
            Row(sku='x1',produto='chuteira',delivery_code='1_A',valor=20,data_update='2022-01-02'),
            Row(sku='z',produto='camisa',delivery_code='1_B',valor=300,data_update='2022-01-02'),
            Row(sku='z1',produto='calça',delivery_code='1_B',valor=100,data_update='2022-01-03'),
            Row(sku='z1',produto='calça',delivery_code='1_B',valor=100,data_update='2022-01-04'),
            Row(sku='z1',produto='calça',delivery_code='1_B',valor=100,data_update='2022-01-05'),
            Row(sku='z1',produto='calça',delivery_code='1_B',valor=100,data_update='2022-01-06'),
            Row(sku='z1',produto='calça',delivery_code='1_B',valor=100,data_update='2022-01-07'),
            Row(sku='y',produto='luva',delivery_code='2_A',valor=300,data_update='2022-01-02'),
            Row(sku='y1',produto='bermuda',delivery_code='2_A',valor=200,data_update='2022-01-02'),
            Row(sku='r',produto='skate',delivery_code='2_B',valor=50,data_update='2022-01-02'),
            Row(sku='r1',produto='sapato',delivery_code='2_B',valor=1000,data_update='2022-01-02'),
             Row(sku='r1',produto='sapato',delivery_code='2_B',valor=1000,data_update='2022-01-03')]
            
df_produto = spark.createDataFrame(produto)            

df_pedido_entrega_r = df_pedido.join(df_entrega,['order_id'])

df_f = df_pedido_entrega_r.join(df_produto,['delivery_code'])
df_f.distinct().show(truncate=False)

+-------------+--------+----------+--------+---+--------+-----+-----------+
|delivery_code|order_id|sale_date |status  |sku|produto |valor|data_update|
+-------------+--------+----------+--------+---+--------+-----+-----------+
|1_A          |1       |2022-01-01|entregue|x  |tenis   |100  |2022-01-01 |
|1_A          |1       |2022-01-01|entregue|x  |tenis   |100  |2022-01-02 |
|1_A          |1       |2022-01-01|entregue|x  |tenis   |100  |2022-01-03 |
|1_A          |1       |2022-01-01|entregue|x1 |chuteira|20   |2022-01-02 |
|1_B          |1       |2022-01-01|pendente|z  |camisa  |300  |2022-01-02 |
|1_B          |1       |2022-01-01|pendente|z1 |calça   |100  |2022-01-03 |
|1_B          |1       |2022-01-01|pendente|z1 |calça   |100  |2022-01-04 |
|1_B          |1       |2022-01-01|pendente|z1 |calça   |100  |2022-01-05 |
|1_B          |1       |2022-01-01|pendente|z1 |calça   |100  |2022-01-06 |
|1_B          |1       |2022-01-01|pendente|z1 |calça   |100  |2022-01-07 |
|2_B        

In [None]:
janela2= Window.partitionBy("order_id","delivery_code","sku").orderBy(sf.col("data_update").desc())

df_f2 = df_f.withColumn("row_number",sf.row_number().over(janela2))
df_f2 = df_f2.withColumn("rank",sf.rank().over(janela2))
df_f2 = df_f2.withColumn("dense_rank",sf.dense_rank().over(janela2)) 
df_f2.show()

+-------------+--------+----------+--------+---+--------+-----+-----------+----------+----+----------+
|delivery_code|order_id| sale_date|  status|sku| produto|valor|data_update|row_number|rank|dense_rank|
+-------------+--------+----------+--------+---+--------+-----+-----------+----------+----+----------+
|          1_A|       1|2022-01-01|entregue|  x|   tenis|  100| 2022-01-03|         1|   1|         1|
|          1_A|       1|2022-01-01|entregue|  x|   tenis|  100| 2022-01-03|         2|   1|         1|
|          1_A|       1|2022-01-01|entregue|  x|   tenis|  100| 2022-01-02|         3|   3|         2|
|          1_A|       1|2022-01-01|entregue|  x|   tenis|  100| 2022-01-01|         4|   4|         3|
|          1_A|       1|2022-01-01|entregue|  x|   tenis|  100| 2022-01-01|         5|   4|         3|
|          1_A|       1|2022-01-01|entregue|  x|   tenis|  100| 2022-01-01|         6|   4|         3|
|          1_A|       1|2022-01-01|entregue| x1|chuteira|   20| 2022-01-0

In [None]:
#Janela do tipo Classificação, criamos rankings utilizando uma janela, útil para deduplicar, grandeza, etc

#Deduplicar valores repetidos - ordenados por data update desc, ou seja, os ultimos serão rankeados como 1
janela2= Window.partitionBy("order_id","delivery_code","sku").orderBy(sf.col("data_update").desc())

df_f2 = df_f.withColumn("row_number",sf.row_number().over(janela2))

df_f2.orderBy("order_id","delivery_code","sku","row_number").show(truncate=False)
#filtra as repetidas
df_f2 = df_f2.filter("row_number = 1")

#Criar um ranking de produtos mais caros dentro de um pedido
janela3= Window.partitionBy("order_id").orderBy(sf.col("valor").desc(),sf.col("sku"))
df_f2 = df_f2.withColumn("rank_produtos_order",sf.rank().over(janela3))
df_f2 = df_f2.withColumn("dense_rank_produtos_order",sf.dense_rank().over(janela3))
df_f2 = df_f2.withColumn("row_number_produtos_order",sf.row_number().over(janela3))
df_f2.orderBy("order_id","rank_produtos_order").show(truncate=False)

+-------------+--------+----------+--------+---+--------+-----+-----------+----------+
|delivery_code|order_id|sale_date |status  |sku|produto |valor|data_update|row_number|
+-------------+--------+----------+--------+---+--------+-----+-----------+----------+
|1_A          |1       |2022-01-01|entregue|x  |tenis   |100  |2022-01-02 |1         |
|1_A          |1       |2022-01-01|entregue|x  |tenis   |100  |2022-01-01 |2         |
|1_A          |1       |2022-01-01|entregue|x1 |chuteira|20   |2022-01-02 |1         |
|1_B          |1       |2022-01-01|pendente|z  |camisa  |300  |2022-01-02 |1         |
|1_B          |1       |2022-01-01|pendente|z1 |calça   |100  |2022-01-07 |1         |
|1_B          |1       |2022-01-01|pendente|z1 |calça   |100  |2022-01-06 |2         |
|1_B          |1       |2022-01-01|pendente|z1 |calça   |100  |2022-01-05 |3         |
|1_B          |1       |2022-01-01|pendente|z1 |calça   |100  |2022-01-04 |4         |
|1_B          |1       |2022-01-01|pendente