In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql.functions import (
    col, upper, when, datediff, lit, last, 
    to_date, sum as _sum, round as _round
)

In [0]:
spark = SparkSession.builder.appName("TransformacaoSilver").getOrCreate()
spark.sql("USE silver")

In [0]:
df_consumidores = spark.read.table("bronze.ft_consumidores") \
    .select(
        col("customer_id").alias("id_consumidor"),
        col("customer_zip_code_prefix").alias("prefixo_cep"),
        col("customer_city").alias("cidade"),
        col("customer_state").alias("estado")
    ) \
    .withColumn("cidade", upper(col("cidade"))) \
    .withColumn("estado", upper(col("estado"))) \
    .dropDuplicates(["id_consumidor"])

df_consumidores.write.mode("overwrite").saveAsTable("silver.ft_consumidores")
print("Tabela silver.ft_consumidores criada.")

In [0]:
df_pedidos = spark.read.table("bronze.ft_pedidos") \
    .withColumn("status_traduzido", 
        when(col("order_status") == "delivered", "entregue")
        .when(col("order_status") == "invoiced", "faturado")
        .when(col("order_status") == "shipped", "enviado")
        .when(col("order_status") == "processing", "em processamento")
        .when(col("order_status") == "unavailable", "indisponível")
        .when(col("order_status") == "canceled", "cancelado")
        .when(col("order_status") == "created", "criado")
        .when(col("order_status") == "approved", "aprovado")
        .otherwise(col("order_status"))
    )

df_pedidos_silver = df_pedidos.select(
    col("order_id").alias("id_pedido"),
    col("customer_id").alias("id_consumidor"),
    col("status_traduzido").alias("status"),
    col("order_purchase_timestamp").cast("timestamp").alias("pedido_compra_timestamp"),
    col("order_approved_at").cast("timestamp").alias("pedido_aprovado_timestamp"),
    col("order_delivered_carrier_date").cast("timestamp").alias("pedido_carregado_timestamp"),
    col("order_delivered_customer_date").cast("timestamp").alias("pedido_entregue_timestamp"),
    col("order_estimated_delivery_date").cast("timestamp").alias("pedido_estimativa_entrega_timestamp")
)

df_pedidos_silver = df_pedidos_silver \
    .withColumn("tempo_entrega_dias", datediff(col("pedido_entregue_timestamp"), col("pedido_compra_timestamp"))) \
    .withColumn("tempo_entrega_estimado_dias", datediff(col("pedido_estimativa_entrega_timestamp"), col("pedido_compra_timestamp")))

df_pedidos_silver = df_pedidos_silver \
    .withColumn("diferenca_entrega_dias", col("tempo_entrega_dias") - col("tempo_entrega_estimado_dias"))

df_pedidos_silver = df_pedidos_silver \
    .withColumn("entrega_no_prazo",
        when(col("pedido_entregue_timestamp").isNull(), "Não Entregue")
        .when(col("diferenca_entrega_dias") <= 0, "Sim")
        .otherwise("Não")
    )

df_pedidos_silver.write.mode("overwrite").saveAsTable("silver.ft_pedidos")
print("Tabela silver.ft_pedidos criada.")

In [0]:
df_itens_pedidos = spark.read.table("bronze.ft_itens_pedidos") \
    .select(
        col("order_id").alias("id_pedido"),
        col("order_item_id").alias("id_item").cast("int"),
        col("product_id").alias("id_produto"),
        col("seller_id").alias("id_vendedor"),
        col("price").alias("preco_BRL").cast("decimal(12,2)"),
        col("freight_value").alias("preco_frete").cast("decimal(12,2)")
    )

df_itens_pedidos.write.mode("overwrite").saveAsTable("silver.ft_itens_pedidos")
print("Tabela silver.ft_itens_pedidos criada.")

In [0]:
df_pagamentos = spark.read.table("bronze.ft_pagamentos_pedidos") \
    .withColumn("forma_pagamento",
        when(col("payment_type") == "credit_card", "Cartão de Crédito")
        .when(col("payment_type") == "boleto", "Boleto")
        .when(col("payment_type") == "voucher", "Voucher")
        .when(col("payment_type") == "debit_card", "Cartão de Débito")
        .otherwise("Outro")
    ) \
    .select(
        col("order_id").alias("id_pedido"),
        col("payment_sequential").alias("codigo_pagamento").cast("int"),
        col("forma_pagamento"),
        col("payment_installments").alias("parcelas").cast("int"),
        col("payment_value").alias("valor_pagamento").cast("decimal(12,2)")
    )

df_pagamentos.write.mode("overwrite").saveAsTable("silver.ft_pagamentos")
print("Tabela silver.ft_pagamentos criada.")

In [0]:
df_avaliacoes = spark.table("bronze.ft_avaliacoes_pedidos")

from pyspark.sql.functions import col, expr

df_avaliacoes_silver = df_avaliacoes.select(
    col("review_id").alias("id_avaliacao"),
    col("order_id").alias("id_pedido"),
    expr("try_cast(review_score as int)").alias("avaliacao"),
    col("review_comment_title").alias("titulo_comentario"),
    col("review_comment_message").alias("comentario"),
    expr("try_cast(review_creation_date as timestamp)").alias("data_comentario"),
    expr("try_cast(review_answer_timestamp as timestamp)").alias("data_resposta")
)

df_avaliacoes_silver.write.mode("overwrite").saveAsTable("silver.ft_avaliacoes_pedidos")
display(df_avaliacoes_silver)

In [0]:
df_produtos = spark.read.table("bronze.ft_produtos") \
    .select(
        col("product_id").alias("id_produto"),
        col("product_category_name").alias("categoria_produto"),
        col("product_weight_g").alias("peso_produto_gramas"),
        col("product_length_cm").alias("comprimento_centimetros"),
        col("product_height_cm").alias("altura_centimetros"),
        col("product_width_cm").alias("largura_centimetros")
    )
df_produtos.write.mode("overwrite").saveAsTable("silver.ft_produtos")
print("Tabela silver.ft_produtos criada.")

In [0]:
df_vendedores = spark.read.table("bronze.ft_vendedores") \
    .select(
        col("seller_id").alias("id_vendedor"),
        col("seller_zip_code_prefix").alias("prefixo_cep"),
        col("seller_city").alias("cidade"),
        col("seller_state").alias("estado")
    ) \
    .withColumn("cidade", upper(col("cidade"))) \
    .withColumn("estado", upper(col("estado")))
    
df_vendedores.write.mode("overwrite").saveAsTable("silver.ft_vendedores")
print("Tabela silver.ft_vendedores criada.")

In [0]:
df_categoria = spark.read.table("bronze.dm_categoria_produtos_traducao") \
    .select(
        col("product_category_name").alias("nome_produto_pt"),
        col("product_category_name_english").alias("nome_produto_en")
    )
df_categoria.write.mode("overwrite").saveAsTable("silver.dm_categoria_produtos_traducao")
print("Tabela silver.dm_categoria_produtos_traducao criada.")

In [0]:
df_cotacao = spark.read.table("bronze.dm_cotacao_dolar") \
    .withColumn("data", to_date(col("dataHoraCotacao"))) \
    .select("data", col("cotacaoCompra").cast("float"))

window_spec = Window.orderBy("data").rowsBetween(Window.unboundedPreceding, Window.currentRow)

df_cotacao_filled = df_cotacao \
    .withColumn("cotacao_dolar", last(col("cotacaoCompra"), ignorenulls=True).over(window_spec)) \
    .select("data", "cotacao_dolar") \
    .dropDuplicates(["data"])
    
df_cotacao_filled.write.mode("overwrite").saveAsTable("silver.dm_cotacao_dolar")
print("Tabela silver.dm_cotacao_dolar criada.")workspace.silver.ft_pedido_total

In [0]:
print("\n---Validações")

pedidos = spark.read.table("silver.ft_pedidos")
consumidores = spark.read.table("silver.ft_consumidores")
itens = spark.read.table("silver.ft_itens_pedidos")

pedidos_orfos = pedidos.join(consumidores, "id_consumidor", "left_anti")
count_pedidos_orfos = pedidos_orfos.count()
print(f"Pedidos órfãos encontrados (sem consumidor): {count_pedidos_orfos}")

if count_pedidos_orfos > 0:
    pedidos_validos = pedidos.join(consumidores, "id_consumidor", "left_semi")
    pedidos_validos.write.mode("overwrite").saveAsTable("silver.ft_pedidos")
    print(f"Registros órfãos removidos de silver.ft_pedidos.")
    pedidos = pedidos_validos

itens_orfos = itens.join(pedidos, "id_pedido", "left_anti")
count_itens_orfos = itens_orfos.count()
print(f"Itens órfãos encontrados (sem pedido válido): {count_itens_orfos}")

if count_itens_orfos > 0:
    itens_validos = itens.join(pedidos, "id_pedido", "left_semi")
    itens_validos.write.mode("overwrite").saveAsTable("silver.ft_itens_pedidos")
    print(f"Registros órfãos removidos de silver.ft_itens_pedidos.")

In [0]:
print("\n--- Criando Tabela Agregada: silver.pedido_total ---")

pagamentos_agg = spark.read.table("silver.ft_pagamentos") \
    .groupBy("id_pedido") \
    .agg(_sum("valor_pagamento").alias("valor_total_pago_brl"))

pedidos_base = spark.read.table("silver.ft_pedidos") \
    .select(
        "id_pedido", 
        "id_consumidor", 
        "status", 
        to_date(col("pedido_compra_timestamp")).alias("data_pedido")
    )

cotacao_diaria = spark.read.table("silver.dm_cotacao_dolar") \
    .select(col("data").alias("data_pedido"), "cotacao_dolar")

df_pedido_total = pedidos_base \
    .join(pagamentos_agg, "id_pedido", "inner") \
    .join(cotacao_diaria, "data_pedido", "left") \
    .withColumn("valor_total_pago_usd", 
        _round(col("valor_total_pago_brl") / col("cotacao_dolar"), 2)
    ) \
    .select(
        "id_pedido",
        "id_consumidor",
        "status",
        "valor_total_pago_brl",
        "valor_total_pago_usd",
        "data_pedido"
    )

df_pedido_total.write.mode("overwrite").saveAsTable("silver.ft_pedido_total")
print("Tabela silver.ft_pedido_total criada com sucesso.")