<h1>Importa√ß√£o</h1>

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import StringType, DecimalType, DoubleType ,TimestampType , LongType, IntegerType,DateType
from datetime import datetime



In [0]:
catalogo = "medalhao"
bronze_db_name = "bronze"
silver_db_name = "silver"


<h1>FUN√á√ÉO AUXILIAR PARA ESCRITA NA CAMADA SILVER‚úçÔ∏èü•à</h1>

In [0]:
def write_to_silver(df, table_name, partition_by=None):
  
    full_table_name = f"{catalogo}.{silver_db_name}.{table_name}"
    
    writer = df.write.format("delta").mode("overwrite").option("overwriteSchema", "true")
    
    if partition_by:
        writer = writer.partitionBy(*partition_by)
        
    writer.saveAsTable(full_table_name)
    
    print(f"Tabela Silver {full_table_name} criada/atualizada com sucesso.")

<h1>Transforma√ß√µesüí£</h1>

In [0]:
COLUMNS_FT_CONSUMIDORES = {
    "customer_id": "id_consumidor",
    "customer_zip_code_prefix": "prefixo_cep",
    "customer_city": "cidade",
    "customer_state": "estado"
}

try:
    df_consumidores_bronze = spark.table(f"{catalogo}.{bronze_db_name}.ft_consumidores")
    
    # 1. Sele√ß√£o e Padroniza√ß√£o de Nomes
    df_consumidores_silver = df_consumidores_bronze.select(
        *[F.col(col_orig).alias(col_dest) for col_orig, col_dest in COLUMNS_FT_CONSUMIDORES.items()]
    )
    
    # 2. Upper Case para Cidade/Estado
    df_consumidores_silver = df_consumidores_silver.withColumn(
        "cidade", F.upper(F.col("cidade"))
    ).withColumn(
        "estado", F.upper(F.col("estado"))
    )
    
    # 3. Remo√ß√£o de Duplicatas em id_consumidor
    num_registros_originais = df_consumidores_silver.count()
    df_consumidores_silver = df_consumidores_silver.dropDuplicates(["id_consumidor"])
    num_registros_apos_distinct = df_consumidores_silver.count()
    
    print(f" Registros originais: {num_registros_originais}")
    print(f" Registros ap√≥s remo√ß√£o de duplicatas (id_consumidor): {num_registros_apos_distinct}")
    
    # 4. Escrita na Camada Silver
    write_to_silver(df_consumidores_silver, "ft_consumidores")

except Exception as e:
    print(f"Erro ao processar ft_consumidores: {str(e)}")

In [0]:
print("\n--- INICIANDO TRANSFORMA√á√ÉO: ft_pedidos ---")

try:
    df_pedidos_bronze = spark.table(f"{catalogo}.{bronze_db_name}.ft_pedidos")
    
    STATUS_MAPPING = {
        "delivered": "entregue", "invoiced": "faturado", "shipped": "enviado",
        "processing": "em processamento", "unavailable": "indispon√≠vel",
        "canceled": "cancelado", "created": "criado", "approved": "aprovado"
    }
    
    df_pedidos_silver = df_pedidos_bronze.select(
        F.col("order_id").alias("id_pedido"),
        F.col("customer_id").alias("id_consumidor"),
        F.col("order_purchase_timestamp").cast(TimestampType()).alias("pedido_compra_timestamp"),
        F.col("order_approved_at").cast(TimestampType()).alias("pedido_aprovado_timestamp"),
        F.col("order_delivered_carrier_date").cast(TimestampType()).alias("pedido_carregado_timestamp"),
        F.col("order_delivered_customer_date").cast(TimestampType()).alias("pedido_entregue_timestamp"),
        F.col("order_estimated_delivery_date").cast(TimestampType()).alias("pedido_estimativa_entrega_timestamp"),
        F.col("order_status").alias("status_pedido_en")
    )
    
    case_expression_status = F.col("status_pedido_en")
    for en, pt in STATUS_MAPPING.items():
        case_expression_status = F.when(case_expression_status == en, F.lit(pt)).otherwise(case_expression_status)
    
    df_pedidos_silver = df_pedidos_silver.withColumn(
        "status", case_expression_status
    ).drop("status_pedido_en")
    
    SEGUNDOS_POR_DIA = 60 * 60 * 24
    
    df_pedidos_silver = df_pedidos_silver.withColumn(
        "tempo_entrega_dias",
        F.round(
            (F.col("pedido_entregue_timestamp").cast(LongType()) - F.col("pedido_compra_timestamp").cast(LongType())) / SEGUNDOS_POR_DIA,
            2
        ).cast(DoubleType())
    )
    
    df_pedidos_silver = df_pedidos_silver.withColumn(
        "tempo_entrega_estimado_dias",
        F.round(
            (F.col("pedido_estimativa_entrega_timestamp").cast(LongType()) - F.col("pedido_compra_timestamp").cast(LongType())) / SEGUNDOS_POR_DIA,
            2
        ).cast(DoubleType())
    )
    
    df_pedidos_silver = df_pedidos_silver.withColumn(
        "diferenca_entrega_dias",
        F.round(F.col("tempo_entrega_dias") - F.col("tempo_entrega_estimado_dias"), 2)
    )
    
    df_pedidos_silver = df_pedidos_silver.withColumn(
        "entrega_no_prazo",
        F.when(F.col("status") != F.lit("entregue"), F.lit("N√£o Entregue"))
        .when(F.col("diferenca_entrega_dias") <= 0, F.lit("Sim"))
        .otherwise(F.lit("N√£o"))
    )

    df_pedidos_silver = df_pedidos_silver.select(
        "id_pedido",
        "id_consumidor",
        "status",
        "pedido_compra_timestamp",
        "pedido_aprovado_timestamp",
        "pedido_carregado_timestamp",
        "pedido_entregue_timestamp",
        "pedido_estimativa_entrega_timestamp",
        "tempo_entrega_dias",
        "tempo_entrega_estimado_dias",
        "diferenca_entrega_dias",
        "entrega_no_prazo"
    )
    
    write_to_silver(df_pedidos_silver, "ft_pedidos")

except Exception as e:
    print(f"Erro ao processar ft_pedidos: {str(e)}")

In [0]:
print("\n--- INICIANDO TRANSFORMA√á√ÉO: ft_itens_pedidos ---")

COLUMNS_FT_ITENS = {
    "order_id": "id_pedido",
    "order_item_id": "id_item",
    "product_id": "id_produto",
    "seller_id": "id_vendedor",
    "price": "preco_BRL",
    "freight_value": "preco_frete",
}

try:
    df_itens_bronze = spark.table(f"{catalogo}.{bronze_db_name}.ft_itens_pedidos")
    
    # 1. Sele√ß√£o, Padroniza√ß√£o de Nomes e Tipagem
    df_itens_silver = df_itens_bronze.select(
        *[F.col(col_orig).alias(col_dest) for col_orig, col_dest in COLUMNS_FT_ITENS.items()],
    ).withColumn(
        "preco_BRL", F.col("preco_BRL").cast(DecimalType(12, 2))
    ).withColumn(
        "preco_frete", F.col("preco_frete").cast(DecimalType(12, 2))
    )  
    # 2. Escrita na Camada Silver
    write_to_silver(df_itens_silver, "ft_itens_pedidos")

except Exception as e:
    print(f"Erro ao processar ft_itens_pedidos: {str(e)}")

In [0]:
print("\n--- INICIANDO TRANSFORMA√á√ÉO: ft_pagamentos_pedidos ---")

PAYMENT_TYPE_MAPPING = {
    "credit_card": "Cart√£o de Cr√©dito",
    "boleto": "Boleto",
    "voucher": "Voucher",
    "debit_card": "Cart√£o de D√©bito",
}

try:
    df_pagamentos_bronze = spark.table(f"{catalogo}.{bronze_db_name}.ft_pagamentos_pedidos")
    
    df_pagamentos_silver = df_pagamentos_bronze.select(
        F.col("order_id").alias("id_pedido"),
        F.col("payment_sequential").cast(IntegerType()).alias("codigo_pagamento"),
        F.col("payment_type").alias("forma_pagamento_en"),
        F.col("payment_installments").cast(IntegerType()).alias("parcelas"),
        F.col("payment_value").cast(DecimalType(12, 2)).alias("valor_pagamento"),
        F.col("ingestion_timestamp").cast(TimestampType()).alias("data_ingestao")
    )
    
    case_expression_payment = F.when(F.col("forma_pagamento_en") == F.lit("credit_card"), F.lit("Cart√£o de Cr√©dito"))

    for en, pt in PAYMENT_TYPE_MAPPING.items():
        if en != "credit_card": 
            case_expression_payment = case_expression_payment.when(F.col("forma_pagamento_en") == F.lit(en), F.lit(pt))
    
    df_pagamentos_silver = df_pagamentos_silver.withColumn(
        "forma_pagamento", case_expression_payment.otherwise(F.lit("Outro"))
    ).drop("forma_pagamento_en")
    
    df_pagamentos_silver = df_pagamentos_silver.select(
        "id_pedido",
        "codigo_pagamento",
        "forma_pagamento",
        "parcelas",
        "valor_pagamento",
    )
    
    write_to_silver(df_pagamentos_silver, "ft_pagamentos_pedidos")

except Exception as e:
    print(f"Erro ao processar ft_pagamentos_pedidos: {str(e)}")

### Transforma√ß√£o: `ft_avaliacoes_pedidos` (Camada Silver)


####Regras de Valida√ß√£o Aplicadas

| Regra | Campo | Condi√ß√£o | Motivo / A√ß√£o |
|:------|:------|:----------|:---------------|
| **R1** | `id_pedido` | `IS NULL` | ID de pedido ausente ou inv√°lido. Registro removido. |
| **R2** | `data_comentario` | `IS NULL` | Data de cria√ß√£o do coment√°rio ausente. Registro removido. |
| **R3** | `data_comentario` | `> current_timestamp()` | Data de coment√°rio no futuro. Registro removido. |
| **R4** | `avaliacao` | `IS NULL` (ap√≥s `try_cast`) | Valor de avalia√ß√£o inv√°lido (ex.: texto onde deveria haver n√∫mero). Registro removido. |

---

####Tratamento de Tipos

Para garantir consist√™ncia e evitar erros durante o processamento, as convers√µes foram feitas de forma **segura**, utilizando fun√ß√µes SQL do Spark que convertem valores malformados em `NULL`:

- `try_cast(avaliacao AS INT)` ‚Üí evita erro se o valor n√£o for num√©rico.  
- `try_to_timestamp(data_comentario)` e `try_to_timestamp(data_resposta)` ‚Üí convertem strings de data malformadas para `NULL`.

Esses `NULL` resultantes s√£o tratados pelas regras **R2** e **R4** acima.

---

####M√©tricas de Limpeza

Durante a execu√ß√£o do processo, s√£o exibidos no console:

- `Registros originais:` n√∫mero total antes da filtragem  
- `Registros removidos:` quantidade de linhas inv√°lidas descartadas  

Esses indicadores permitem validar a integridade e o impacto da limpeza aplicada.

---

#### ‚úÖ Resultado Esperado
Ap√≥s a transforma√ß√£o, a tabela `ft_avaliacoes_pedidos` na camada **Silver** deve conter apenas registros v√°lidos, com:
- IDs de pedido v√°lidos,
- Datas coerentes,
- E avalia√ß√µes num√©ricas consistentes.


In [0]:
print("\n--- INICIANDO TRANSFORMA√á√ÉO: ft_avaliacoes_pedidos ---")

COLUMNS_FT_AVALIACOES = {
    "review_id": "id_avaliacao",
    "order_id": "id_pedido",
    "review_score": "avaliacao",
    "review_comment_title": "titulo_comentario",
    "review_comment_message": "comentario",
    "review_creation_date": "data_comentario",
    "review_answer_timestamp": "data_resposta",
}

try:
    df_avaliacoes_bronze = spark.table(f"{catalogo}.{bronze_db_name}.ft_avaliacoes_pedidos")
    
    # 1. Sele√ß√£o e Padroniza√ß√£o de Nomes
    df_avaliacoes_silver = df_avaliacoes_bronze.select(
        *[F.col(col_orig).alias(col_dest) for col_orig, col_dest in COLUMNS_FT_AVALIACOES.items()],
        
    )
    
    # 2. Tipagem (Data e Score)
    # Usa F.expr com try_cast e try_to_timestamp 
    df_avaliacoes_silver = (
        df_avaliacoes_silver
        .withColumn("avaliacao", F.expr("try_cast(avaliacao as int)"))
        .withColumn("data_comentario", F.expr("try_to_timestamp(data_comentario)"))
        .withColumn("data_resposta", F.expr("try_to_timestamp(data_resposta)"))
    )
    
    # 3. Regras de Valida√ß√£o e Remo√ß√£o de Registros Inv√°lidos
    current_timestamp_spark = F.current_timestamp()

    filtro_invalido = (
        F.col("id_pedido").isNull() | 
        F.col("data_comentario").isNull() |
        (F.col("data_comentario") > current_timestamp_spark) |
        F.col("avaliacao").isNull()
    )
    
    num_registros_originais = df_avaliacoes_silver.count()
    
    df_avaliacoes_silver = df_avaliacoes_silver.filter(~filtro_invalido)
    
    num_registros_removidos = num_registros_originais - df_avaliacoes_silver.count()
    
    print(f"\n--- DETALHES DA VALIDA√á√ÉO (ft_avaliacoes_pedidos) ---")
    print(f" Registros originais: {num_registros_originais}")
    print(f" Registros removidos: {num_registros_removidos}")
    
    # 4. Escrita na Camada Silver
    write_to_silver(df_avaliacoes_silver, "ft_avaliacoes_pedidos")

except Exception as e:
    print(f"Erro ao processar ft_avaliacoes_pedidos: {str(e)}")


In [0]:
print("\n--- INICIANDO TRANSFORMA√á√ÉO: ft_produtos ---")

COLUMNS_FT_PRODUTOS = {
    "product_id": "id_produto",
    "product_category_name": "categoria_produto",
    "product_weight_g": "peso_produto_gramas",
    "product_length_cm": "comprimento_centimetros",
    "product_height_cm": "altura_centimetros",
    "product_width_cm": "largura_centimetros",
}

try:
    df_produtos_bronze = spark.table(f"{catalogo}.{bronze_db_name}.ft_produtos")
    
    # 1. Sele√ß√£o, Padroniza√ß√£o de Nomes e Tipagem
    df_produtos_silver = df_produtos_bronze.select(
        *[F.col(col_orig).alias(col_dest) for col_orig, col_dest in COLUMNS_FT_PRODUTOS.items()],
    ).withColumn(
        "peso_produto_gramas", F.col("peso_produto_gramas").cast(IntegerType())
    ).withColumn(
        "comprimento_centimetros", F.col("comprimento_centimetros").cast(IntegerType())
    ).withColumn(
        "altura_centimetros", F.col("altura_centimetros").cast(IntegerType())
    ).withColumn(
        "largura_centimetros", F.col("largura_centimetros").cast(IntegerType())
    )
    
    # 2. Escrita na Camada Silver
    write_to_silver(df_produtos_silver, "ft_produtos")

except Exception as e:
    print(f"Erro ao processar ft_produtos: {str(e)}")


In [0]:
print("\n--- INICIANDO TRANSFORMA√á√ÉO: ft_vendedores ---")

COLUMNS_FT_VENDEDORES = {
    "seller_id": "id_vendedor",
    "seller_zip_code_prefix": "prefixo_cep",
    "seller_city": "cidade",
    "seller_state": "estado"
}

try:
    df_vendedores_bronze = spark.table(f"{catalogo}.{bronze_db_name}.ft_vendedores")
    
    # 1. Sele√ß√£o e Padroniza√ß√£o de Nomes
    df_vendedores_silver = df_vendedores_bronze.select(
        *[F.col(col_orig).alias(col_dest) for col_orig, col_dest in COLUMNS_FT_VENDEDORES.items()],
    )
    
    # 2. Upper Case para Cidade/Estado
    df_vendedores_silver = df_vendedores_silver.withColumn(
        "cidade", F.upper(F.col("cidade"))
    ).withColumn(
        "estado", F.upper(F.col("estado"))
    )
    
    # 3. Escrita na Camada Silver
    write_to_silver(df_vendedores_silver, "ft_vendedores")

except Exception as e:
    print(f"Erro ao processar ft_vendedores: {str(e)}")


In [0]:
print("\n--- INICIANDO TRANSFORMA√á√ÉO: dm_categoria_produtos_traducao ---")

COLUMNS_DM_CATEGORIA = {
    "product_category_name": "nome_produto_pt",
    "product_category_name_english": "nome_produto_en",
}

try:
    df_categoria_bronze = spark.table(f"{catalogo}.{bronze_db_name}.dm_categoria_produtos_traducao")
    
    # 1. Sele√ß√£o e Padroniza√ß√£o de Nomes
    df_categoria_silver = df_categoria_bronze.select(
        *[F.col(col_orig).alias(col_dest) for col_orig, col_dest in COLUMNS_DM_CATEGORIA.items()],
    )
    
    # 2. Escrita na Camada Silver
    write_to_silver(df_categoria_silver, "dm_categoria_produtos_traducao")

except Exception as e:
    print(f"Erro ao processar dm_categoria_produtos_traducao: {str(e)}")
COLUMNS_DM_CATEGORIA = {
    "product_category_name": "nome_produto_pt",
    "product_category_name_english": "nome_produto_en",
}

try:
    df_categoria_bronze = spark.table(f"{catalogo}.{bronze_db_name}.dm_categoria_produtos_traducao")
    
    # 1. Sele√ß√£o e Padroniza√ß√£o de Nomes
    df_categoria_silver = df_categoria_bronze.select(
        *[F.col(col_orig).alias(col_dest) for col_orig, col_dest in COLUMNS_DM_CATEGORIA.items()],
    )
    
    # 2. Escrita na Camada Silver
    write_to_silver(df_categoria_silver, "dm_categoria_produtos_traducao")

except Exception as e:
    print(f"Erro ao processar dm_categoria_produtos_traducao: {str(e)}")

In [0]:
print("\n--- INICIANDO TRANSFORMA√á√ÉO: dm_cotacao_dolar ---")

from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, DateType, DecimalType, TimestampType
from pyspark.sql.window import Window
from datetime import datetime

COLUMNS_DM_COTACAO = {
    "dataHoraCotacao": "data_cotacao_raw",
    "cotacaoCompra": "cotacao_dolar",
}

try:
   
    df_cotacao_bronze = spark.table(f"{catalogo}.{bronze_db_name}.dm_cotacao_dolar")

    # Renomear colunas relevantes
    df_cotacao_silver = df_cotacao_bronze.select(
        *[F.col(col_orig).alias(col_dest) for col_orig, col_dest in COLUMNS_DM_COTACAO.items()]
    )


    df_cotacao_silver = (
        df_cotacao_silver
        .withColumn("data", F.to_date(F.col("data_cotacao_raw"), "yyyy-MM-dd HH:mm:ss"))
        .withColumn("cotacao_dolar", F.col("cotacao_dolar").cast(DecimalType(12, 4)))
        .drop("data_cotacao_raw")
    )

    # Preencher cota√ß√£o faltante pela √∫ltima conhecida
    window_spec = Window.partitionBy(F.lit(1)).orderBy("data")
    df_cotacao_silver = df_cotacao_silver.withColumn(
        "cotacao_dolar_preenchida",
        F.last(F.col("cotacao_dolar"), True).over(window_spec)
    )

    data_inicio = "2016-01-01"
    data_fim = "2018-12-31"
    dias_total = (datetime.strptime(data_fim, "%Y-%m-%d") - datetime.strptime(data_inicio, "%Y-%m-%d")).days

    df_datas = (
        spark.range(0, dias_total + 1)
        .withColumn("data", F.date_add(F.lit(data_inicio), F.col("id").cast("int")))
        .select("data")
    )


    df_cotacao_completa = df_datas.join(
        df_cotacao_silver.select("data", "cotacao_dolar_preenchida"),
        on="data",
        how="left"
    )

    # Preencher lacunas restantes com √∫ltimo valor conhecido
    window_spec_full = Window.partitionBy(F.lit(1)).orderBy("data")
    df_cotacao_final = (
        df_cotacao_completa
        .withColumn("cotacao_dolar_final", F.last(F.col("cotacao_dolar_preenchida"), True).over(window_spec_full))
        .select(
            F.col("cotacao_dolar_final").alias("cotacao_dolar"),
            F.col("data")  
        )
    )

 
    write_to_silver(df_cotacao_final, "dm_cotacao_dolar")

    print("‚úÖ Transforma√ß√£o dm_cotacao_dolar conclu√≠da e salva na Silver.")
    display(df_cotacao_final.orderBy(F.desc("data")).limit(10))


except Exception as e:
    if "is not a table or view" in str(e) or "Cannot resolve table" in str(e):
        print("‚ö†Ô∏è A tabela dm_cotacao_dolar da Bronze n√£o existe ou est√° inacess√≠vel. Criando tabela Silver vazia.")
        empty_schema = StructType([
            StructField("data", DateType(), True),
            StructField("cotacao_dolar", DecimalType(12, 4), True),
            StructField("data_ingestao", TimestampType(), True)
        ])
        df_empty = spark.createDataFrame(spark.sparkContext.emptyRDD(), empty_schema)
        write_to_silver(df_empty, "dm_cotacao_dolar")
    else:
        print(f"‚ùå Erro ao processar dm_cotacao_dolar: {str(e)}")


<h1> Verifica√ß√£o de orf√£os</h1>

In [0]:
print("\n--- VERIFICA√á√ÉO E REMO√á√ÉO DE REGISTROS √ìRF√ÉOS ---")

try:
    df_pedidos = spark.table(f"{catalogo}.{silver_db_name}.ft_pedidos")
    df_consumidores = spark.table(f"{catalogo}.{silver_db_name}.ft_consumidores")
    df_itens_pedidos = spark.table(f"{catalogo}.{silver_db_name}.ft_itens_pedidos")

except Exception as e:

    print(f"Erro ao ler tabelas Silver: {str(e)}. Pule a valida√ß√£o de integridade.")
    df_pedidos = None

if df_pedidos is not None:
  
    pedidos_sem_consumidor = df_pedidos.join(
        df_consumidores.select("id_consumidor"),
        on="id_consumidor",
        how="left_anti" 
    )
    
    count_pedidos_orfaos = pedidos_sem_consumidor.count()
    print(f"‚ö†Ô∏è Pedidos √≥rf√£os (sem 'id_consumidor' correspondente): {count_pedidos_orfaos}")
    
    if count_pedidos_orfaos > 0:

        # Remove os registros √≥rf√£os da tabela ft_pedidos

        df_pedidos_limpos = df_pedidos.exceptAll(pedidos_sem_consumidor)
        write_to_silver(df_pedidos_limpos, "ft_pedidos")

        df_pedidos = df_pedidos_limpos # Atualiza o DF de pedidos para a pr√≥xima verifica√ß√£o
        print(f"Registros √≥rf√£os removidos de ft_pedidos. Nova contagem: {df_pedidos.count()}")
    

    itens_sem_pedido = df_itens_pedidos.join(
        df_pedidos.select("id_pedido"), 
        on="id_pedido",
        how="left_anti"
    )
    
    count_itens_orfaos = itens_sem_pedido.count()
    print(f"‚ö†Ô∏è Itens √≥rf√£os (sem 'id_pedido' correspondente): {count_itens_orfaos}")
    
    if count_itens_orfaos > 0:
        # Remove os registros √≥rf√£os da tabela ft_itens_pedidos
        df_itens_pedidos_limpos = df_itens_pedidos.exceptAll(itens_sem_pedido)
        write_to_silver(df_itens_pedidos_limpos, "ft_itens_pedidos")
        df_itens_pedidos = df_itens_pedidos_limpos # Atualiza o DF de itens para o pr√≥ximo passo
        print(f"‚úÖ Registros √≥rf√£os removidos de ft_itens_pedidos. Nova contagem: {df_itens_pedidos.count()}")

<h1>Tabela Finalüìã</h1>

In [0]:
print("\n--- CRIA√á√ÉO DA TABELA FINAL: ft_pedido_total ---")

try:
    df_pedidos = spark.table(f"{catalogo}.{silver_db_name}.ft_pedidos")
    df_consumidores = spark.table(f"{catalogo}.{silver_db_name}.ft_consumidores")
    df_pagamentos = spark.table(f"{catalogo}.{silver_db_name}.ft_pagamentos_pedidos")
    df_cotacao = spark.table(f"{catalogo}.{silver_db_name}.dm_cotacao_dolar")

    df_pagamentos_agregados = (
        df_pagamentos.groupBy("id_pedido")
        .agg(F.sum("valor_pagamento").alias("valor_total_pago_brl"))
    )

    df_final = (
        df_pedidos
        .join(df_consumidores.select("id_consumidor", "cidade", "estado"), on="id_consumidor", how="inner")
        .join(df_pagamentos_agregados, on="id_pedido", how="inner")
    )

    df_final = df_final.withColumn(
        "data_pedido",
        F.to_date(F.col("pedido_compra_timestamp")).cast(DateType())
    )

    df_final_com_cotacao = df_final.join(
        df_cotacao.select("data", "cotacao_dolar"),
        df_final["data_pedido"] == df_cotacao["data"],
        "left"
    )

    df_pedido_total = df_final_com_cotacao.select(
        F.col("data_pedido").alias("data"),
        F.col("id_pedido"),
        F.col("id_consumidor"),
        F.col("status"),
        F.col("valor_total_pago_brl"),
        (F.col("valor_total_pago_brl") / F.coalesce(F.col("cotacao_dolar"), F.lit(1))).alias("valor_total_pago_usd")
    )

    df_pedido_total = df_pedido_total.withColumn(
        "valor_total_pago_usd",
        F.col("valor_total_pago_usd").cast(DecimalType(12, 2))
    )

    write_to_silver(df_pedido_total, "ft_pedido_total")

    print("‚úÖ Tabela ft_pedido_total criada com sucesso (com convers√£o para USD).")
    display(df_pedido_total.limit(10))

except Exception as e:
    # Captura o erro e imprime a mensagem
    print(f"‚ùå Erro ao criar ft_pedido_total: {str(e)}")

print("\n--- TODAS AS TRANSFORMA√á√ïES SILVER CONCLU√çDAS ---")