In [0]:
from pyspark.sql.functions import (
    col, upper, when, datediff, to_timestamp, to_date, current_timestamp, try_to_timestamp,
    last, expr, min as spark_min, max as spark_max, sum as spark_sum)
    
from pyspark.sql.window import Window
from pyspark.sql import DataFrame

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS silver;

In [0]:
df = spark.table("bronze.ft_consumidores")

# Português e letras maiúsculas em cidade e estado
df_silver = (
    df.select(
        col("customer_id").alias("id_consumidor"),
        col("customer_zip_code_prefix").alias("prefixo_cep"),
        upper(col("customer_city")).alias("cidade"),
        upper(col("customer_state")).alias("estado"),
        col("ingestion_timestamp")
    )
)

# Removendo duplicatas
df_silver = df_silver.dropDuplicates(["id_consumidor"])

df_silver.write.mode("overwrite").format("delta").saveAsTable("silver.ft_consumidores")


In [0]:
df_pedidos_bronze = spark.table("bronze.ft_pedidos")

# Conversão para timestamp
df_pedidos_bronze = ( df_pedidos_bronze
                     .withColumn("order_purchase_timestamp", to_timestamp("order_purchase_timestamp"))
                     .withColumn("order_approved_at", to_timestamp("order_approved_at"))
                     .withColumn("order_delivered_carrier_date", to_timestamp("order_delivered_carrier_date"))
                     .withColumn("order_delivered_customer_date", to_timestamp("order_delivered_customer_date"))
                     .withColumn("order_estimated_delivery_date", to_timestamp("order_estimated_delivery_date"))
)

# Mapeamento da tradução
mapa_status = { "delivered": "entregue",
               "invoiced": "faturado",
               "shipped": "enviado",
               "processing": "em processamento",
               "unavailable": "indisponível",
               "canceled": "cancelado",
               "created": "criado",
               "approved": "aprovado"
}

df_pedidos_bronze = df_pedidos_bronze.withColumn("status", when(col("order_status").isNull(), None))

# Aplica o mapeamento
for status_original, status_traduzido in mapa_status.items():
    df_pedidos_bronze = df_pedidos_bronze.withColumn(
        "status",
        when(col("order_status") == status_original, status_traduzido).otherwise(col("status"))
    )

df_pedidos_bronze = df_pedidos_bronze.withColumn(
    "tempo_entrega_dias",
    datediff(col("order_delivered_customer_date"), col("order_purchase_timestamp"))
)

df_pedidos_bronze = df_pedidos_bronze.withColumn(
    "tempo_entrega_estimado_dias",
    datediff(col("order_estimated_delivery_date"), col("order_purchase_timestamp"))
)

df_pedidos_bronze = df_pedidos_bronze.withColumn(
    "diferenca_entrega_dias",
    col("tempo_entrega_dias") - col("tempo_entrega_estimado_dias")
)

df_pedidos_bronze = df_pedidos_bronze.withColumn(
    "entrega_no_prazo", when(col("order_delivered_customer_date").isNull(), "NÃO ENTREGUE")
    .when(col("diferenca_entrega_dias") <= 0, "SIM")
    .otherwise("NÃO")
)

# Renomeação as colunas para português
df_pedidos_silver = df_pedidos_bronze.select(
    col("order_id").alias("id_pedido"),
    col("customer_id").alias("id_consumidor"),
    col("status"),
    col("order_purchase_timestamp").alias("pedido_compra_timestamp"),
    col("order_approved_at").alias("pedido_aprovado_timestamp"),
    col("order_delivered_carrier_date").alias("pedido_carregado_timestamp"),
    col("order_delivered_customer_date").alias("pedido_entregue_timestamp"),
    col("order_estimated_delivery_date").alias("pedido_estimativa_entrega_timestamp"),
    col("tempo_entrega_dias"),
    col("tempo_entrega_estimado_dias"),
    col("diferenca_entrega_dias"),
    col("entrega_no_prazo"),
    col("ingestion_timestamp")
)

df_pedidos_silver.write.mode("overwrite").format("delta").saveAsTable("silver.ft_pedidos")


In [0]:
df = spark.table("bronze.ft_itens_pedidos")

# Renomeação para português
df_silver = df.select(
    col("order_id").alias("id_pedido"),
    col("order_item_id").alias("id_item"),
    col("product_id").alias("id_produto"),
    col("seller_id").alias("id_vendedor"),
    col("price").alias("preço_brl"),
    col("freight_value").alias("preço_frete"),
    col("ingestion_timestamp")
)

df_silver.write.mode("overwrite").format("delta").saveAsTable("silver.ft_itens_pedidos")


In [0]:
df = spark.table("bronze.ft_pagamentos_pedidos")

# Renomeação para português
df_silver = (
    df.select(
        col("order_id").alias("id_pedido"),
        col("payment_sequential").alias("codigo_pagamento"),
        col("payment_type").alias("forma_pagamento_raw"),
        col("payment_installments").alias("parcelas"),
        col("payment_value").alias("valor_pagamento"),
        col("ingestion_timestamp")
    )
)

# Mapeamento da tradução
df_silver = df_silver.withColumn("forma_pagamento", 
                                 when(col("forma_pagamento_raw") == "credit_card", "Cartão de Crédito")
                                 .when(col("forma_pagamento_raw") == "boleto", "Boleto")
                                 .when(col("forma_pagamento_raw") == "voucher", "Voucher")
                                 .when(col("forma_pagamento_raw") == "debit_card", "Cartão de Débito")
                                 .otherwise("Outro")
).drop("forma_pagamento_raw")

df_silver.write.mode("overwrite").format("delta").saveAsTable("silver.ft_pagamentos")


In [0]:
df_avaliacoes_bronze = spark.table("bronze.ft_avaliacoes_pedidos")
df_pedidos_silver = spark.table("silver.ft_pedidos").select("id_pedido")

# Renomeação para português
df_avaliacoes = df_avaliacoes_bronze.select(
    col("review_id").alias("id_avaliacao"),
    col("order_id").alias("id_pedido"),
    col("review_score").alias("avaliacao"),
    col("review_comment_title").alias("titulo_comentario"),
    col("review_comment_message").alias("comentario"),
    try_to_timestamp("review_creation_date").alias("data_comentario"),
    try_to_timestamp("review_answer_timestamp").alias("data_resposta"),
    col("ingestion_timestamp")
)

total_original = df_avaliacoes.count()

# Remoção de registros com id_pedido nulo ou inexistente na tabela de pedidos
df_avaliacoes_validas = df_avaliacoes.join(df_pedidos_silver, on="id_pedido", how="inner")

# Remoção de registros com datas incorretas:
df_avaliacoes_validas = (df_avaliacoes.join(df_pedidos_silver, on="id_pedido", how="inner")
                         .filter(
                             (col("data_comentario").isNotNull()) & (col("data_comentario") <= current_timestamp()) & (col("data_resposta").isNull() | (col("data_resposta") <= current_timestamp()) )
                            )
                        )

total_final = df_avaliacoes_validas.count()
print("Linhas removidas em ft_avaliacoes_pedidos:", total_original - total_final)

df_avaliacoes_validas.write.mode("overwrite").format("delta").saveAsTable("silver.ft_avaliacoes_pedidos")


Linhas removidas em ft_avaliacoes_pedidos: 8855


In [0]:
df = spark.table("bronze.ft_produtos")

# Renomeção para portugu~es
df_silver = df.select(
    col("product_id").alias("id_produto"),
    col("product_category_name").alias("categoria_produto"),
    col("product_weight_g").alias("peso_produto_g"),
    col("product_length_cm").alias("comprimento_cm"),
    col("product_height_cm").alias("altura_cm"),
    col("product_width_cm").alias("largura_cm"),
    col("ingestion_timestamp")
)

df_silver.write.mode("overwrite").format("delta").saveAsTable("silver.ft_produtos")


In [0]:
df = spark.table("bronze.ft_vendedores")

# Renomeção para português
df_silver = df.select(
    col("seller_id").alias("id_vendedor"),
    col("seller_zip_code_prefix").alias("prefixo_cep"),
    upper(col("seller_city")).alias("cidade"),
    upper(col("seller_state")).alias("estado"),
    col("ingestion_timestamp")
)

df_silver.write.mode("overwrite").format("delta").saveAsTable("silver.ft_vendedores")


In [0]:
df = spark.table("bronze.dm_categoria_produtos_traducao")

# Renomeção para português
df_silver = df.select(
    col("product_category_name").alias("nome_produto_categoria_pt"),
    col("product_category_name_english").alias("nome_produto_categoria_en"),
    col("ingestion_timestamp")
)

df_silver.write.mode("overwrite").format("delta").saveAsTable("silver.dm_categoria_produtos_traducao")


In [0]:
df_dolar_bronze = spark.table("bronze.dm_cotacao_dolar")

# Converção para timestamp
df_dolar = (
    df_dolar_bronze
        .withColumn("data_hora_cotacao", to_timestamp("dataHoraCotacao"))
        .withColumn("data", to_date("data_hora_cotacao"))
        .select("data", col("cotacaoCompra"))
        .dropna(subset=["data"])
)

min_data, max_data = df_dolar.select(
    spark_min("data").alias("min_data"),
    spark_max("data").alias("max_data")
).first()

# Calendário contínuo entre as datas
datas_continuas = spark.sql(f"""
    SELECT explode(sequence(to_date('{min_data}'), to_date('{max_data}'), interval 1 day)) AS data
""")

df_dolar_completo = datas_continuas.join(df_dolar, on="data", how="left")

# Preenche as datas sem cotação com a última acotação resgistrada
janela = Window.orderBy("data").rowsBetween(Window.unboundedPreceding, 0)

df_dolar_preenchido = (df_dolar_completo
                       .withColumn("cotacao_dolar", last("cotacaoCompra", ignorenulls=True).over(janela))
                       .select("data", "cotacao_dolar")
)

df_dolar_preenchido.write.mode("overwrite").format("delta").saveAsTable("silver.dm_cotacao_dolar")




In [0]:
# Todos os pospedidos possuam um consumidor válido:
ft_pedidos = spark.table("silver.ft_pedidos")
ft_consumidores = spark.table("silver.ft_consumidores")

# Pedidos sem consumidor
pedidos_orfaos = ft_pedidos.join(ft_consumidores, on="id_consumidor", how="left_anti")
print("Pedidos com id_consumidor inválido:", pedidos_orfaos.count())

# Mantendo apenas pedidos com consumidor válido
ft_pedidos_validos = ( ft_pedidos.join(ft_consumidores.select("id_consumidor"), on="id_consumidor", how="inner").select(ft_pedidos.columns))
ft_pedidos_validos.write.mode("overwrite").format("delta").saveAsTable("silver.ft_pedidos")

# Todos os itens de pedidos estejam associados a um pedido existente:
ft_itens = spark.table("silver.ft_itens_pedidos")
ft_pedidos = spark.table("silver.ft_pedidos")

# Itens sem pedido
itens_orfaos = ft_itens.join(ft_pedidos, on="id_pedido", how="left_anti")
print("Itens com id_pedido inválido:", itens_orfaos.count())

# Mantendo apenas itens com pedido válido
ft_itens_validos = (ft_itens.join(ft_pedidos.select("id_pedido"), on="id_pedido", how="inner").select(ft_itens.columns))
ft_itens_validos.write.mode("overwrite").format("delta").saveAsTable("silver.ft_itens_pedidos")


Pedidos com id_consumidor inválido: 0
Itens com id_pedido inválido: 0


In [0]:
ft_pedidos = spark.table("silver.ft_pedidos")
ft_pagamentos = spark.table("silver.ft_pagamentos")
dm_cotacao = spark.table("bronze.dm_cotacao_dolar")

# Somatório dos pagamentos por pedido (BRL)
pagamentos_por_pedido = (
    ft_pagamentos
        .groupBy("id_pedido")
        .agg(spark_sum("valor_pagamento").alias("valor_total_pago_brl"))
)

# Junção dos pedidos com total pago
df_pedido_total = (
    ft_pedidos
        .join(pagamentos_por_pedido, on="id_pedido", how="left")
        .withColumn("data_pedido", to_date(col("pedido_compra_timestamp")))
)

dm_cotacao_tratada = (dm_cotacao
                      .withColumn("data_cotacao", to_date(col("dataHoraCotacao")))
                      .select(
                          col("data_cotacao"),
                          col("cotacaoCompra").alias("cotacao_dolar")
        )
)

# Junção da cotação com a data do pedido
df_pedido_total = df_pedido_total.join(dm_cotacao_tratada,df_pedido_total["data_pedido"] == dm_cotacao_tratada["data_cotacao"], how="left")

# Calcula valor em USD
df_pedido_total = df_pedido_total.withColumn("valor_total_pago_usd", col("valor_total_pago_brl") / col("cotacao_dolar"))

# Seleciona colunas finais
df_pedido_total_final = df_pedido_total.select(
    "id_pedido",
    "id_consumidor",
    "status",
    "valor_total_pago_brl",
    "valor_total_pago_usd",
    "data_pedido"
)

df_pedido_total_final.write.mode("overwrite").format("delta").saveAsTable("silver.ft_pedido_total")
