In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

In [2]:
spark = (
    SparkSession.builder
    .appName("Curso de PySpark")
    .config("spark.sql.repl.eagerEval.enabled", True)
    .getOrCreate()
)

In [3]:
spark

In [5]:
compras = spark.read.parquet("C:/Users/marlos.barros/Cursos/pyspark_na_pratica/DATASETS/COMPRAS.parquet").select("id","data","cd_livro","cd_cliente").alias("compras")
livros = spark.read.parquet("C:/Users/marlos.barros/Cursos/pyspark_na_pratica/DATASETS/LIVROS.parquet").select("id", "data_lancamento", "preco").alias("livros")
autores = spark.read.parquet("C:/Users/marlos.barros/Cursos/pyspark_na_pratica/DATASETS/AUTORES.parquet").alias("autores")

### Join entre tabelas e Drop de IDs repetidos
- Função: Realiza um **join** entre as tabelas **compras**, **livros** e **autores**, usando chaves de ligação entre as tabelas **(compras.cd_livro == livros.id e livros.id == autores.id)**. Depois, **remove as colunas id duplicadas de livros e autores**, que já não são mais necessárias no DataFrame resultante.

In [9]:
df = compras.join(livros, compras.cd_livro == livros.id, "inner").join(autores, livros.id == autores.id, "inner").drop("livros.id","autores.id")
df

id,data,cd_livro,cd_cliente,id.1,data_lancamento,preco,id.2,titulo,autor
12389,2021-07-24,30334762,3339828,30334762,2013-05-08,123.47,30334762,Em Busca do Tempo...,Marcel Proust
12534,2021-07-15,14347542,7799936,14347542,2007-04-05,182.43,14347542,Fogo Morto,José Lins do Rego
12574,2020-05-19,10325500,6273720,10325500,2000-09-09,30.62,10325500,A Obscena Senhora D,Hilda Hilst
12675,2021-07-11,35940339,649001,35940339,2014-08-11,213.6,35940339,Adeus às Armas,Ernest Hemingway
13457,2021-10-26,21006591,4312106,21006591,2019-02-24,96.84,21006591,Paulicéia Desvair...,Mário de Andrade
13478,2020-05-10,19488257,670483,19488257,2013-05-19,17.11,19488257,O Ódio que Você S...,Angie Thomas
13562,2021-10-28,35940339,1275633,35940339,2014-08-11,213.6,35940339,Adeus às Armas,Ernest Hemingway
13679,2020-10-19,30144651,1261140,30144651,2011-04-14,242.2,30144651,Zero,Ignácio De Loyola...
13687,2020-05-29,21923195,4098904,21923195,2015-02-22,38.89,21923195,O Ex-Mágico,Murilo Rubião
13796,2021-10-15,16778973,4024706,16778973,2017-06-23,99.81,16778973,Crime e Castigo,Fiódor Dostoiévski


### Janela com row_number para numerar linhas por ID de compra e ordem de compra por cliente
- **window_1**: Cria uma janela para **ordenar as linhas pelo ID da compra**.
- **window_2**: Cria uma janela que **agrupa os dados pelo código do cliente (cd_cliente) e ordena as compras pela data**.
- **num_linha**: Adiciona uma nova **coluna** com a **numeração de cada linha**, usando **row_number()** sobre **window_1**, que numera as linhas de acordo com o **ID da compra**.
- **ordem_compra**: Adiciona uma nova **coluna** para **indicar a ordem** em que cada **cliente** fez suas **compras**, de acordo com a **data**.

In [10]:
from pyspark.sql.window import Window

In [23]:
window_1 = Window.orderBy("compras.id")
window_2 = Window.partitionBy("cd_cliente").orderBy("data")

In [24]:
(
    df
    .withColumn("num_linha", F.row_number().over(window_1))
    .withColumn("ordem_compra", F.row_number().over(window_2))
)

id,data,cd_livro,cd_cliente,id.1,data_lancamento,preco,id.2,titulo,autor,num_linha,ordem_compra
184365,2020-10-31,100520231,1010444,100520231,2009-08-17,35.55,100520231,Galáxias,Haroldo de Campos,3330,1
162408,2021-06-16,36059407,1010444,36059407,2021-01-20,222.98,36059407,As Histórias Comp...,Franz Kafka,2874,2
629783,2021-09-14,16581063,1010444,16581063,2013-05-16,145.25,16581063,Macunaíma – O Her...,Mário de Andrade,11521,3
594608,2021-10-11,30099528,1010444,30099528,2000-02-06,208.39,30099528,Retrato do Artist...,James Joyce,10824,4
561293,2021-11-02,30144651,1010444,30144651,2011-04-14,242.2,30144651,Zero,Ignácio De Loyola...,10118,5
648950,2022-01-15,22112497,1010444,22112497,2021-01-06,10.84,22112497,Mrs Dalloway,Virginia Woolf,11896,6
184567,2022-03-29,20414016,1010444,20414016,2011-06-09,212.1,20414016,Vidas Secas,Graciliano Ramos,3335,7
26947,2022-05-04,36030824,1010444,36030824,2005-01-26,226.36,36030824,Tremor de Terra,Luiz Vilela,317,8
472081,2022-06-19,26925428,1010444,26925428,2011-09-04,193.19,26925428,"Sing, Unburied, S...",Jesmyn Ward,8520,9
27189,2022-06-26,12489208,1010444,12489208,2011-10-30,160.53,12489208,Triste Fim de Pol...,Lima Barreto,322,10


### Janela para ordenação por autor e data de lançamento
- **window_3**: Cria uma janela que **agrupa os dados pelo autor** e **ordena os livros de cada autor** pela **data de lançamento (data_lancamento)**.
- **ordem_lancamento**: Adiciona uma **coluna** que **indica a ordem de lançamento dos livros de cada autor**.

In [29]:
window_1 = Window.orderBy("compras.id")
window_3 = Window.partitionBy("autor").orderBy("data_lancamento")

In [30]:
(
    df
    .withColumn("num_linha", F.row_number().over(window_1))
    .withColumn("ordem_lancamento", F.row_number().over(window_3))
)

id,data,cd_livro,cd_cliente,id.1,data_lancamento,preco,id.2,titulo,autor,num_linha,ordem_lancamento
16345,2021-05-14,22360906,7760116,22360906,2000-08-21,26.58,22360906,Bagagem,Adélia Prado,113,1
16935,2020-09-08,22360906,5112548,22360906,2000-08-21,26.58,22360906,Bagagem,Adélia Prado,137,2
17835,2021-06-17,22360906,3136976,22360906,2000-08-21,26.58,22360906,Bagagem,Adélia Prado,153,3
26719,2022-07-03,22360906,3956557,22360906,2000-08-21,26.58,22360906,Bagagem,Adélia Prado,301,4
26893,2020-01-12,22360906,4297935,22360906,2000-08-21,26.58,22360906,Bagagem,Adélia Prado,314,5
31462,2022-06-15,22360906,613234,22360906,2000-08-21,26.58,22360906,Bagagem,Adélia Prado,399,6
34792,2020-09-09,22360906,1442665,22360906,2000-08-21,26.58,22360906,Bagagem,Adélia Prado,457,7
38126,2022-11-29,22360906,7241902,22360906,2000-08-21,26.58,22360906,Bagagem,Adélia Prado,543,8
41567,2020-04-22,22360906,742676,22360906,2000-08-21,26.58,22360906,Bagagem,Adélia Prado,607,9
43275,2022-06-11,22360906,5348711,22360906,2000-08-21,26.58,22360906,Bagagem,Adélia Prado,649,10


### Total acumulado por cliente com janela
- **total_acumulado_cliente**: Adiciona uma **coluna** que **calcula o total acumulado gasto por cada cliente em suas compras**. Usando a função **F.sum("preco").over(window_2)**, o **total é somado de forma incremental para cada cliente**, sendo **arredondado para duas casas decimais com F.round()**.

In [33]:
window_1 = Window.orderBy("compras.id")
window_2 = Window.partitionBy("cd_cliente").orderBy("data")

In [35]:
(
    df
    .withColumn("num_linha", F.row_number().over(window_1))
    .withColumn("ordem_compra", F.row_number().over(window_2))
    .withColumn("total_acumulado_cliente", F.round(F.sum("preco").over(window_2), 2))
)

id,data,cd_livro,cd_cliente,id.1,data_lancamento,preco,id.2,titulo,autor,num_linha,ordem_compra,total_acumulado_cliente
184365,2020-10-31,100520231,1010444,100520231,2009-08-17,35.55,100520231,Galáxias,Haroldo de Campos,3330,1,35.55
162408,2021-06-16,36059407,1010444,36059407,2021-01-20,222.98,36059407,As Histórias Comp...,Franz Kafka,2874,2,258.53
629783,2021-09-14,16581063,1010444,16581063,2013-05-16,145.25,16581063,Macunaíma – O Her...,Mário de Andrade,11521,3,403.78
594608,2021-10-11,30099528,1010444,30099528,2000-02-06,208.39,30099528,Retrato do Artist...,James Joyce,10824,4,612.17
561293,2021-11-02,30144651,1010444,30144651,2011-04-14,242.2,30144651,Zero,Ignácio De Loyola...,10118,5,854.37
648950,2022-01-15,22112497,1010444,22112497,2021-01-06,10.84,22112497,Mrs Dalloway,Virginia Woolf,11896,6,865.21
184567,2022-03-29,20414016,1010444,20414016,2011-06-09,212.1,20414016,Vidas Secas,Graciliano Ramos,3335,7,1077.31
26947,2022-05-04,36030824,1010444,36030824,2005-01-26,226.36,36030824,Tremor de Terra,Luiz Vilela,317,8,1303.67
472081,2022-06-19,26925428,1010444,26925428,2011-09-04,193.19,26925428,"Sing, Unburied, S...",Jesmyn Ward,8520,9,1496.86
27189,2022-06-26,12489208,1010444,12489208,2011-10-30,160.53,12489208,Triste Fim de Pol...,Lima Barreto,322,10,1657.39
