In [0]:
from pyspark.sql.functions import col, count

In [0]:
# Leitura das tabelas Bronze e Silver
df_orders = spark.read.table("bronze.olist_orders")
df_order_items = spark.read.table("bronze.olist_order_items")
df_dim_customers = spark.read.table("silver.dim_customers")

### JOIN

In [0]:
# Join: order_items, orders e customers
df_fato = df_order_items \
    .join(df_orders, on="order_id", how="inner") \
    .join(df_dim_customers, on="customer_id", how="inner")

### Preço total item

In [0]:
# Seleção sugerida no desafio com coluna com métrica sugerida de preço total do item. 
df_fato_final = df_fato.select(
    "order_id",
    "order_item_id",
    "product_id",
    "seller_id",
    "customer_id",
    "order_status",
    "order_purchase_timestamp",
    "shipping_limit_date",
    "price",
    "freight_value"
).withColumn("preco_total_item", col("price") + col("freight_value"))


### Preço médio unitário

In [0]:
# Calcula o preço médio por item no pedido, preco_unitario_medio = preco_total_item / order_item_id

df_fato_final = df_fato_final.withColumn(
    "preco_medio_unitario", col("preco_total_item") / col("order_item_id")
)

### Verificações

#### Campos Nulos

In [0]:
# Verificação de campos nulos em colunas essenciais
null_check = df_fato_final.select([
    count(when(col(c).isNull(), c)).alias(c) for c in [
        "order_id", "order_item_id", "product_id", "customer_id",
        "price", "freight_value", "preco_total_item"
    ]
])

display(null_check)

# Se houver nulos, interrompe o notebook
null_counts = null_check.collect()[0].asDict()
if any(v > 0 for v in null_counts.values()):
    raise Exception(f"Falha na verificação de qualidade: campos nulos encontrados: {null_counts}")


order_id,order_item_id,product_id,customer_id,price,freight_value,preco_total_item
0,0,0,0,0,0,0


#### Valores Negativos

In [0]:
# Verifica se há preços negativos
valores_invalidos = df_fato_final.filter((col("price") < 0) | (col("freight_value") < 0))

display(valores_invalidos)

if valores_invalidos.count() > 0:
    raise Exception("Falha na verificação de qualidade: valores negativos encontrados em price ou freight_value.")

order_id,order_item_id,product_id,seller_id,customer_id,order_status,order_purchase_timestamp,shipping_limit_date,price,freight_value,preco_total_item,preco_medio_unitario


In [0]:
df_fato_final.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("silver.fct_order_items")

In [0]:
%sql
SELECT * FROM silver.dim_customers LIMIT 10;

customer_id,unique_id,zip_prefix,city,state
00050bf6e01e69d5c0fd612f1bcfb69c,e3cf594a99e810f58af53ed4820f25e5,98700,IJUI,RS
000598caf2ef4117407665ac33275130,7e0516b486e92ed3f3afdd6d1276cfbd,35540,OLIVEIRA,MG
0013cd8e350a7cc76873441e431dd5ee,334fed5abcee3aa96c13f1432703e1fd,3585,SAO PAULO,SP
0015bc9fd2d5395446143e8b215d7c75,490c854539b21598cfbbac518ca25788,12233,SAO JOSE DOS CAMPOS,SP
001df1ee5c36767aa607001ab1a13a06,46b44ab325f78e5bb3dc0bbef1082082,1030,SAO PAULO,SP
001f150aebb5d897f2059b0460c38449,0f88eb431888ffb9d726252b7ac8cefe,79031,CAMPO GRANDE,MS
001f35d9f262c558fd065346fbf5801d,ed5340f0e2a52fffa065298aeb875e60,21011,RIO DE JANEIRO,RJ
0026955706fd4e2fa997f3f4c18d485a,47b6bc410befb9fa30a4c029dba944e5,2926,SAO PAULO,SP
0028ff36263a86bf679df7c863a0a0ba,0659763dba9821af49728668716e1084,28895,RIO DAS OSTRAS,RJ
0035b30e58c620fa2bd30275ddd4f7ef,422ce018570adf9fa031a7838717be81,24342,NITEROI,RJ
