In [0]:
%run ../../_utils

In [0]:
from pyspark.sql.functions import when, to_date, col, dayofweek, max, collect_set, sum
from pyspark.sql.types import StringType, BooleanType


# Camada GOLD

Na camada gold, as limpezas e ajustes já foram feitos, então essa camada é responsável por aplicar regras de negócio, agregações e junções de dados que convirjam para analises.

In [0]:
tb_name = "gold.olist_orders"
dataset_location = "olist_orders_dataset"
target_location = f"dbfs:/FileStore/delta/gold/brazilian_ecommerce/{dataset_location}"

## 1 - Data ingestion

Conforme o schema disponibilizado, iremos agregar os dados em uma big table que permitirá ~quase~ todas as analises subsequentes

Apenas para fins de teste, iremos agregar apenas reviews e payments à table "fact" orders;
Portanto, iremos carregar essas tabelas


In [0]:
df_orders = spark.read.table("silver.olist_orders") # leituira da delta table central, orders
df_order_reviews = spark.read.table("silver.olist_order_reviews") # leituira da delta table "dim" reviews
df_order_payments = spark.read.table("silver.olist_order_payments") # leituira da delta table "dim" payments
#df_order_items = spark.sql("select * from silver.olist_order_items") # leitura de outra maneira, da delta table "dim" items


## 2 - preparation


### 2.1 order_payments

uma order_id pode ter várias formas de pagamento (geralmente vouchs).  cada pagamento gera um registro

Então iremos agregar, somando em valor de pagamento e pegando o max payment_sequential

In [0]:
df_order_payments = df_order_payments.groupBy("order_id").agg(
    max("payment_sequential").alias("total_payment_sequential"),
    sum("payment_value").alias("total_payment_value"),
    collect_set("payment_type").alias("payment_types"),
)


### 2.2 order_reviews

podemos perceber que existem casos onde existe mais de um review para um mesmo order_id


## 2 - Data Join

In [0]:
print(f"Total de registros ANTES da agregação {df_orders.count()}")

In [0]:
df = (df_orders
      .join(df_order_payments, on=['order_id'], how='left')
      .join(df_order_reviews, on=['order_id'], how='left'))

In [0]:
print(f"Total de registros DEPOIS da agregação {df.count()}")

In [0]:
display(df.take(10))


## Saving data

In [0]:
save_dataframe(df, format_mode="delta", table_name=tb_name, target_location=target_location)


## create delta table

TODO: implementar UPSERT

o upsert serve para não precisar reescrever todos os dados, mas aproveitar do Delta para fazer um MERGE, caso um registro antigo tenha uma nova versão e INSERT para os dados que são novos

In [0]:
create_table(table_name=tb_name, target_location=target_location)

In [0]:
# exit para fechar a execução
dbutils.notebook.exit("OK")

In [0]:
%sql

select * from gold.olist_orders limit 10