### 1. Configuração e Carregamento

Define as variáveis de catálogo e carrega todas as 10 tabelas da camada Bronze em DataFrames Spark para iniciar o processo de transformação e limpeza.


In [0]:
# Importação de bibliotecas
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Definição do caminho da pasta Bronze
catalogo = "medalhao"
silver_db_name = "silver"

# Leitura das tabelas Bronze
categoria_produtos_bronze_df = spark.table("medalhao.bronze.dm_categoria_produtos_traducao")
cotacao_dolar_bronze_df = spark.table("medalhao.bronze.dm_cotacao_dolar")
avaliacoes_pedidos_bronze_df = spark.table("medalhao.bronze.ft_avaliacoes_pedidos")
consumidores_bronze_df = spark.table("medalhao.bronze.ft_consumidores")
geolocalizacao_bronze_df = spark.table("medalhao.bronze.ft_geolocalizacao")
itens_pedidos_bronze_df = spark.table("medalhao.bronze.ft_itens_pedidos")
pagamentos_pedidos_bronze_df = spark.table("medalhao.bronze.ft_pagamentos_pedidos")
pedidos_bronze_df = spark.table("medalhao.bronze.ft_pedidos")
produtos_bronze_df = spark.table("medalhao.bronze.ft_produtos")
vendedores_bronze_df = spark.table("medalhao.bronze.ft_vendedores")

lista_df = [categoria_produtos_bronze_df, cotacao_dolar_bronze_df, avaliacoes_pedidos_bronze_df, consumidores_bronze_df, geolocalizacao_bronze_df, itens_pedidos_bronze_df, pagamentos_pedidos_bronze_df, pedidos_bronze_df, produtos_bronze_df, vendedores_bronze_df]


### 2. Preparação do Ambiente Silver (Reset)

Para garantir a **idempotência** do notebook (capacidade de ser executado várias vezes e gerar o mesmo resultado), esta célula remove (`DROP TABLE IF EXISTS`) todas as tabelas Silver de destino antes de recriá-las.


In [0]:
spark.sql("DROP TABLE IF EXISTS medalhao.silver.categoria_produtos_silver")
spark.sql("DROP TABLE IF EXISTS medalhao.silver.avaliacoes_pedidos_silver")
spark.sql("DROP TABLE IF EXISTS medalhao.silver.consumidores_silver")
spark.sql("DROP TABLE IF EXISTS medalhao.silver.consumidores_silver_df")
spark.sql("DROP TABLE IF EXISTS medalhao.silver.itens_pedidos_silver")
spark.sql("DROP TABLE IF EXISTS medalhao.silver.pagamentos_pedidos_silver")
spark.sql("DROP TABLE IF EXISTS medalhao.silver.pedidos_silver")
spark.sql("DROP TABLE IF EXISTS medalhao.silver.produtos_silver")
spark.sql("DROP TABLE IF EXISTS medalhao.silver.vendedores_silver")



### 3. Transformação: `ft_consumidores` -> `consumidores_silver`

Processamento e limpeza da tabela de consumidores:
* **Renomeação:** Colunas são renomeadas para o padrão português (ex: `customer_id` -> `id_consumidor`).
* **Padronização:** Colunas de localização (`cidade`, `estado`) são convertidas para maiúsculas (`F.upper()`) para consistência.
* **Garantia de Unicidade:** `dropDuplicates(["id_consumidor"])` é aplicado para garantir uma única linha por consumidor.


In [0]:
consumidores_silver_df = (
    consumidores_bronze_df
    .select(
        F.col("customer_id").alias("id_consumidor"),
        F.col("customer_zip_code_prefix").alias("prefixo_cep"),
        F.upper(F.col("customer_city")).alias("cidade"),
        F.upper(F.col("customer_state")).alias("estado"),
    )
    .dropDuplicates(["id_consumidor"])
    .withColumn("data_ingestao", F.current_timestamp())
)
consumidores_silver_df.write.format("delta").mode("overwrite").saveAsTable(f"{catalogo}.{silver_db_name}.consumidores_silver")
consumidores_silver_df.display()
print("✅ Tabela silver.ft_consumidores criada com sucesso!\\n")

### 4. Transformação: `ft_pedidos` -> `pedidos_silver`

Processamento e enriquecimento da tabela de pedidos:
* **Renomeação:** Colunas são renomeadas para o padrão português.
* **Conversão de Tipo:** Timestamps de eventos (compra, aprovação, entrega) são convertidos para `DateType` (`F.to_date()`) para otimizar joins e análises.
* **Mapeamento (Tradução):** O `order_status` é mapeado de inglês para português (ex: 'delivered' -> 'entregue') usando `F.when()`.
* **Enriquecimento:** Criação de novas colunas calculadas para análise de SLA de entrega (`tempo_entrega_dias`, `tempo_entrega_estimado_dias`, `diferenca_entrega_dias`, `entrega_no_prazo`).


In [0]:
pedidos_silver_df = (
    pedidos_bronze_df
    .select(
        F.col("order_id").alias("id_pedido"),
        F.col("customer_id").alias("id_consumidor"),
        F.col("order_status").alias("status"),
        F.to_date("order_purchase_timestamp").alias("pedido_compra_timestamp"),
        F.to_date("order_approved_at").alias("pedido_aprovado_timestamp"),
        F.to_date("order_delivered_carrier_date").alias("pedido_carregado_timestamp"),
        F.to_date("order_delivered_customer_date").alias("pedido_entregue_timestamp"),
        F.to_date("order_estimated_delivery_date").alias("pedido_estimativa_entrega_timestamp"),
    )
    .withColumn("data_ingestao", F.current_timestamp())
    .withColumn(
        "status",
        F.when(F.col("status") == "delivered", "entregue")
        .when(F.col("status") == "canceled", "cancelado")
        .when(F.col("status") == "invoiced", "faturado")
        .when(F.col("status") == "processing", "em processamento")
        .when(F.col("status") == "shipped", "enviado")
        .when(F.col("status") == "unavailable", "indisponível")
        .when(F.col("status") == "created", "criado")
        .when(F.col("status") == "approved", "aprovado")
    )
    .withColumn(
        "tempo_entrega_dias",
        F.datediff(F.col("pedido_entregue_timestamp"), F.col("pedido_compra_timestamp"))
    )
    .withColumn(
        "tempo_entrega_estimado_dias",
        F.datediff(F.col("pedido_estimativa_entrega_timestamp"), F.col("pedido_compra_timestamp"))
    )
    .withColumn(
        "diferenca_entrega_dias",
        F.datediff(F.col("pedido_entregue_timestamp"), F.col("pedido_estimativa_entrega_timestamp"))
    )
    .withColumn(
        "entrega_no_prazo",
        F.when(F.col("pedido_entregue_timestamp").isNull(), "Não Entregue")  
        .when(F.col("diferenca_entrega_dias") <= 0, "Sim")                 
        .when(F.col("diferenca_entrega_dias") > 0, "Não") 
    )
)
pedidos_silver_df.write.format("delta").mode("overwrite").saveAsTable(f"{catalogo}.{silver_db_name}.pedidos_silver")
pedidos_silver_df.display()
print("✅ Tabela silver.pedidos_silver criada com sucesso!\n")
#PRECISO TRATAR OS VALORES NULO
#fazer um groupby count para ver quantos tem entregues, em processamento e etc...

### 5. Transformação: `ft_itens_pedidos` -> `itens_pedidos_silver`

Processamento dos itens de pedido:
* **Renomeação:** Colunas são renomeadas para o padrão português.
* **Enriquecimento:** Criação da coluna `preço_total` (`preco_BRL + preco_frete`) para facilitar análises financeiras.


In [0]:
itens_pedidos_silver_df = (
    itens_pedidos_bronze_df.select(
        F.col("order_id").alias("id_pedido"),
        F.col("order_item_id").alias("id_item"),
        F.col("product_id").alias("id_produto"),
        F.col("seller_id").alias("id_vendedor"),
        F.col("price").alias("preco_BRL"),
        F.col("freight_value").alias("preco_frete"),
    )
    .withColumn("data_ingestao", F.current_timestamp())
    .withColumn("preço_total", F.round((F.col("preco_BRL") + F.col("preco_frete")), 2))
)
itens_pedidos_silver_df.write.format("delta").mode("overwrite").saveAsTable(f"{catalogo}.{silver_db_name}.itens_pedidos_silver")
itens_pedidos_silver_df.display()
print("✅ Tabela silver.itens_pedidos_silver criada com sucesso!\n")

### 6. Transformação: `ft_pagamentos_pedidos` -> `pagamentos_pedidos_silver`

Processamento dos pagamentos:
* **Renomeação:** Colunas são renomeadas.
* **Mapeamento (Tradução):** A coluna `payment_type` é mapeada para valores em português (ex: 'credit_card' -> 'Cartão de Crédito').


In [0]:
pagamentos_pedidos_silver_df = (
    pagamentos_pedidos_bronze_df.select(
        F.col("order_id").alias("id_pedido"),
        F.col("payment_sequential").alias("codigo_pagamento"),
        F.col("payment_type").alias("forma_pagamento"),
        F.col("payment_installments").alias("parcelas"),
        F.col("payment_value").alias("valor_pagamento")
    )
    .withColumn("forma_pagamento",
    F.when(F.col("forma_pagamento") == "credit_card","Cartão de Crédito")
    .when(F.col("forma_pagamento") == "boleto","Boleto")
    .when(F.col("forma_pagamento") == "voucher","Voucher")
    .when(F.col("forma_pagamento") == "debit_card","Cartão de Débito")
    .otherwise("Outros")
    )
    .withColumn("data_ingestao", F.current_timestamp())

)
pagamentos_pedidos_silver_df.write.format("delta").mode("overwrite").saveAsTable(f"{catalogo}.{silver_db_name}.pagamentos_pedidos_silver")
pagamentos_pedidos_silver_df.display()

### 7. Transformação e Validação (Data Quality): `ft_avaliacoes_pedidos`

Etapa crítica de limpeza e aplicação de regras de qualidade de dados (Data Quality):
* **Renomeação:** Colunas são renomeadas e datas convertidas.
* **Filtros de Validação:**
    * `id_pedido` não pode ser nulo.
    * `id_avaliacao` e `id_pedido` devem ter exatamente 32 caracteres.
    * `id_avaliacao` e `id_pedido` devem ser alfanuméricos (filtro `rlike` para remover caracteres especiais).
    * `avaliacao` deve conter apenas valores de 0 a 5.
    * `data_comentario` não pode ser nula e não pode ser uma data futura.


In [0]:
avaliacoes_pedidos_silver_df = (
    avaliacoes_pedidos_bronze_df.select(
        F.col("review_id").alias("id_avaliacao"),
        F.col("order_id").alias("id_pedido"),
        F.col("review_score").alias("avaliacao"),
        F.col("review_comment_title").alias("titulo_comentario"),
        F.col("review_comment_message").alias("comentario"),
        F.to_date("review_creation_date").alias("data_comentario"),
        F.to_date("review_answer_timestamp").alias("data_resposta")
    )
    .withColumn("data_ingestao", F.current_timestamp())
    .filter(F.col("id_pedido").isNotNull())
    .filter(F.length(F.col("id_avaliacao")) == 32)
    .filter(F.length(F.col("id_pedido")) == 32)
    .filter(~F.col("id_avaliacao").rlike("[^a-zA-Z0-9]"))
    .filter(~F.col("id_pedido").rlike("[^a-zA-Z0-9]"))
    .filter(~F.col("avaliacao").rlike("[^0-5]"))
    .filter(F.col("data_comentario").isNotNull())
    .filter(F.datediff("data_comentario", "data_ingestao")<0)

)
avaliacoes_pedidos_silver_df.write.format("delta").mode("overwrite").saveAsTable(f"{catalogo}.{silver_db_name}.avaliacoes_pedidos_silver")

avaliacoes_pedidos_silver_df.display()

In [0]:
avaliacoes_pedidos_silver_df.groupBy("avaliacao").count().display()


### 8. Transformação (Null Handling): `ft_produtos` -> `produtos_silver`

Processamento dos produtos:
* **Renomeação:** Colunas são renomeadas.
* **Tratamento de Nulos:** `F.coalesce()` é usado para preencher valores nulos em colunas de categoria (para 'outros') e dimensões do produto (para '0'), garantindo que não haja nulos nessas colunas.


In [0]:
produtos_silver_df = (
    produtos_bronze_df.select(
        F.col("product_id").alias("id_produto"),
        F.coalesce("product_category_name", F.lit("outros")).alias("categoria_produto"),
        F.coalesce("product_weight_g", F.lit("0")).alias("peso_produto_gramas"),
        F.coalesce("product_length_cm", F.lit("0")).alias("comprimento_centimetros"),
        F.coalesce("product_height_cm", F.lit("0")).alias("altura_centimetros"),
        F.coalesce("product_width_cm", F.lit("0")).alias("largura_centimetros")

    )
    .withColumn("data_ingestao", F.current_timestamp())

)
produtos_silver_df.write.format("delta").mode("overwrite").saveAsTable(f"{catalogo}.{silver_db_name}.produtos_silver")
produtos_silver_df.display()
print("✅ Tabela silver.produtos_silver criada com sucesso!\n")

In [0]:
vendedores_silver_df = (
    vendedores_bronze_df.select(
        F.col("seller_id").alias("id_vendedor"),
        F.col("seller_zip_code_prefix").alias("cep_vendedor"),
        F.upper(F.col("seller_city")).alias("cidade"),
        F.upper(F.col("seller_state")).alias("estado")
    )
    .withColumn("data_ingestao", F.current_timestamp())

)
vendedores_silver_df.write.format("delta").mode("overwrite").saveAsTable(f"{catalogo}.{silver_db_name}.vendedores_silver")
vendedores_silver_df.display()
print("✅ Tabela silver.vendedores_silver criada com sucesso!\n")

In [0]:
categoria_produtos_silver_df = (
    categoria_produtos_bronze_df.select(
        F.col("product_category_name").alias("nome_produto_pt"),
        F.col("product_category_name_english").alias("nome_produto_en")
    )
    .withColumn("data_ingestao", F.current_timestamp())
    #lembrar dropduplicates
    #lembrar groupby por produto para checar se tem duplicadas
)
categoria_produtos_silver_df.write.format("delta").mode("overwrite").saveAsTable(f"{catalogo}.{silver_db_name}.categoria_produtos_silver")
categoria_produtos_silver_df.display()
print("✅ Tabela silver.categoria_produtos_silver criada com sucesso!\n")


### 9. Transformação (Enriquecimento): `dm_cotacao_dolar` -> `cotacao_dolar_silver`

Processamento avançado para preenchimento de dados da cotação do dólar (Forward-Fill):
1.  **Criação de Calendário:** É gerado um DataFrame (`all_dates_df`) contendo todos os dias do período.
2.  **Left Join:** O calendário é unido à tabela de cotação (`bronze_prepped_df`). Isso cria linhas para fins de semana com valores `null`.
3.  **Preenchimento (Window Function):** Uma `Window` function é usada com `F.last(..., ignorenulls=True)` para preencher os `null`s dos fins de semana com a última cotação válida (a da sexta-feira).


In [0]:
from pyspark.sql.window import Window
from datetime import timedelta

cotacao_dolar_temp = (
    cotacao_dolar_bronze_df
    .select(
        F.to_date("dataHoraCotacao").alias("data"),
        F.col("cotacaoCompra").cast("decimal(10,4)").alias("cotacao_dolar")
    )
)

min_max = cotacao_dolar_temp.agg(
    F.min("data").alias("data_min"),
    F.max("data").alias("data_max")
).collect()[0]

todas_datas = []
data_atual = min_max["data_min"]
while data_atual <= min_max["data_max"]:
    todas_datas.append((data_atual,))
    data_atual += timedelta(days=1)

df_todas_datas = spark.createDataFrame(todas_datas, ["data"])

cotacao_completa = df_todas_datas.join(cotacao_dolar_temp, "data", "left")

window_spec = Window.orderBy("data").rowsBetween(Window.unboundedPreceding, Window.currentRow)

cotacao_dolar_silver_df = cotacao_completa.withColumn(
    "cotacao_dolar",
    F.last("cotacao_dolar", ignorenulls=True).over(window_spec)
)

cotacao_dolar_silver_df = cotacao_dolar_silver_df.withColumn("ingestion_timestamp", F.current_timestamp())

cotacao_dolar_silver_df.write.format("delta").mode("overwrite").saveAsTable(f"{catalogo}.{silver_db_name}.dm_cotacao_dolar")

print("✅ Tabela silver.dm_cotacao_dolar criada com sucesso!\n")
cotacao_dolar_silver_df.display()

### 10. Validação de Integridade Referencial (Chaves Estrangeiras)

Verificação da consistência entre as tabelas Silver recém-criadas:
1.  **Pedidos Órfãos:** Um `left_anti` join é usado para encontrar pedidos em `pedidos_silver` que não têm um consumidor correspondente em `clientes_silver`.
2.  **Itens Órfãos:** Um `left_anti` join é usado para encontrar itens em `itens_pedidos_silver` sem um pedido correspondente em `pedidos_silver`.
3.  **Remoção:** Se registros órfãos são encontrados, eles são removidos da tabela Silver usando um `left_semi` join (que mantém apenas os registros com correspondência), e a tabela é sobrescrita.


In [0]:
from pyspark.sql import functions as F

#Validação de integridade referencial

pedidos_silver_df = spark.table(f"{catalogo}.{silver_db_name}.pedidos_silver")
consumidores_silver_df = spark.table(f"{catalogo}.{silver_db_name}.consumidores_silver_df")
itens_pedidos_silver_df = spark.table(f"{catalogo}.{silver_db_name}.itens_pedidos_silver")

pedidos_orfaos_df = pedidos_silver_df.join(
    consumidores_silver_df,
    on=["id_consumidor"], 
    how="left_anti"        
)

count_pedidos_orfaos = pedidos_orfaos_df.count()
print(f"Total de PEDIDOS órfãos encontrados (sem consumidor): {count_pedidos_orfaos}")

itens_orfaos_df = itens_pedidos_silver_df.join(
    pedidos_silver_df,
    on=["id_pedido"],   
    how="left_anti"      
)

count_itens_orfaos = itens_orfaos_df.count()
print(f"Total de ITENS órfãos encontrados (sem pedido): {count_itens_orfaos}")

print("Não há pedidos orfãos")

### 11. Criação da Tabela Agregada Final: `pedido_total`

Construção da tabela analítica principal:
1.  **Agregação de Pagamentos:** A tabela `pagamentos_pedidos_silver` é agrupada por `id_pedido` para somar o `valor_total_pago_brl`.
2.  **Joins:** As tabelas limpas (`pedidos_silver`, `clientes_silver`, `pagamentos_agg_df`) são unidas.
3.  **Join da Cotação:** A tabela de cotação (`cotacao_limpo`) é unida usando `F.to_date(pedido_compra_timestamp) == cotacao_limpo.data` para buscar a cotação do dia exato da compra.
4.  **Cálculo Final:** O `valor_total_pago_usd` é calculado dividindo o valor em BRL pela cotação do dólar.


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import DecimalType



pedidos_df = spark.table(f"{catalogo}.{silver_db_name}.pedidos_silver")
consumidores_df = spark.table(f"{catalogo}.{silver_db_name}.consumidores_silver") 
pagamentos_df = spark.table(f"{catalogo}.{silver_db_name}.pagamentos_pedidos_silver")
cotacao_df = spark.table(f"{catalogo}.{silver_db_name}.dm_cotacao_dolar") 

pagamentos_agg_df = pagamentos_df.groupBy("id_pedido").agg(
    F.round(F.sum("valor_pagamento"), 2).alias("valor_total_pago_brl")
)

pedidos_limpo = pedidos_df.select(
    F.col("id_pedido"),
    F.col("id_consumidor"),
    F.col("status"),
    F.col("pedido_compra_timestamp")
)

consumidores_limpo = consumidores_df.select(
    F.col("id_consumidor")
).distinct()

cotacao_limpo = cotacao_df.select(
    F.col("data"), 
    F.col("cotacao_dolar")
)

base_df = pedidos_limpo.join(
    consumidores_limpo,
    on="id_consumidor",
    how="inner"
)

base_com_pagamento = base_df.join(
    pagamentos_agg_df,
    on="id_pedido",
    how="inner"
)

base_com_cotacao = base_com_pagamento.join(
    cotacao_limpo,
    F.to_date(base_com_pagamento.pedido_compra_timestamp) == cotacao_limpo.data,
    how="left"
)

pedido_total_final_df = base_com_cotacao.select(
    F.col("pedido_compra_timestamp").alias("data_pedido"),
    F.col("id_pedido"),
    F.col("id_consumidor"),
    F.col("status"),
    F.col("valor_total_pago_brl"),
    F.round((F.col("valor_total_pago_brl") / F.col("cotacao_dolar")), 2).alias("valor_total_pago_usd")
) 

pedido_total_final_df.write.format("delta").mode("overwrite").saveAsTable(f"{catalogo}.{silver_db_name}.pedido_total")

print("\n✅ Tabela final silver.pedido_total criada com sucesso!\n")

pedido_total_final_df.display()