## 1. ft_consumidores


In [0]:
ft_consumidores = spark.table("medalhao.bronze.ft_consumidores")
display(ft_consumidores)

In [0]:
import pyspark.sql.functions as F
from pyspark.sql import Window

catalogo = "medalhao"
bronze_db_name = "bronze"
silver_db_name = "silver"

# Funcao generica para criar tabela na camada silver
def salvar_tabela_silver(df, nome_tabela_destino):

    table_full_name = f"{catalogo}.{silver_db_name}.{nome_tabela_destino}"

    try:
        # Decidi colocar um ingestion_timestamp para na silver para facilitar o controle de versao se fosse preciso em um caso real
        df = df.withColumn("silver_ingestion_timestamp", F.current_timestamp())
        (df.write
           .format("delta")
           .mode("overwrite")
           .option("overwriteSchema", "true") 
           .saveAsTable(table_full_name)
        )
        print(f"✅ Tabela Silver '{table_full_name}' salva com sucesso.")

    except Exception as e:
        print(f"❌ Erro ao salvar tabela '{table_full_name}': {e}")


In [0]:
# Tratamento de dados para a tabela ft_consumidores
df_bronze_consumidores = spark.table(f"{catalogo}.{bronze_db_name}.ft_consumidores")

# Colocando o nome das tabelas em portugues e colocando o nome de cidades e estados em Upper case 
# Trim adicionadao para carantir consistencia no nome das cidades e estados 
df_silver_consumidores = df_bronze_consumidores.select(
    F.col("customer_id").alias("id_consumidor"),
    F.col("customer_zip_code_prefix").alias("prefixo_cep"),
    F.upper(F.trim(F.col("customer_city"))).alias("cidade"),
    F.upper(F.trim(F.col("customer_state"))).alias("estado")
)

# Removendo os duplicados de id_consumidor
df_deduplicado = df_silver_consumidores.dropDuplicates(["id_consumidor"])

# Chamada da funcao para salvar na camada silver 
salvar_tabela_silver(df_deduplicado, "ft_consumidores")

In [0]:
#Vizualizando como ficou na camada silver 
ft_consumidores_silver = spark.table("medalhao.silver.ft_consumidores")
display(ft_consumidores_silver)

##2. ft_pedidos


In [0]:
df_pedidos = spark.table("medalhao.bronze.ft_pedidos")
display(df_pedidos)

In [0]:
df_bronze_pedidos = spark.table(f"{catalogo}.{bronze_db_name}.ft_pedidos")

# Traduzindo os resultados do status do pedido de order_status para Portugues
status_traducao = (
    F.when(F.col("order_status") == "delivered", "entregue")
        .when(F.col("order_status") == "invoiced", "faturado")
        .when(F.col("order_status") == "shipped", "enviado")
        .when(F.col("order_status") == "processing", "em processamento")
        .when(F.col("order_status") == "unavailable", "indisponível")
        .when(F.col("order_status") == "canceled", "cancelado")
        .when(F.col("order_status") == "created", "criado")
        .when(F.col("order_status") == "approved", "aprovado")
        .otherwise(F.col("order_status"))
)

# Renomeando colunas para Potugues 
df_silver_pedidos = df_bronze_pedidos.select(
    F.col("order_id").alias("id_pedido"),
    F.col("customer_id").alias("id_consumidor"),
    status_traducao.alias("status"),
    
    # Usando o cast para garantir que as colunas serao do tipo correto data/hora
    F.col("order_purchase_timestamp").cast("timestamp").alias("pedido_compra_timestamp"),
    F.col("order_approved_at").cast("timestamp").alias("pedido_aprovado_timestamp"),
    F.col("order_delivered_carrier_date").cast("timestamp").alias("pedido_carregado_timestamp"),
    F.col("order_delivered_customer_date").cast("timestamp").alias("pedido_entregue_timestamp"),
    F.col("order_estimated_delivery_date").cast("timestamp").alias("pedido_estimativa_entrega_timestamp"),
    
    # tempo_entrega_dias
    F.datediff(
        F.col("order_delivered_customer_date"), 
        F.col("order_purchase_timestamp")
    ).alias("tempo_entrega_dias"),
    
    # tempo_entrega_estimado_dias
    F.datediff(
        F.col("order_estimated_delivery_date"), 
        F.col("order_purchase_timestamp")
    ).alias("tempo_entrega_estimado_dias")
)

# diferenca_entrega_dias
df_com_diferenca = df_silver_pedidos.withColumn(
    "diferenca_entrega_dias",
    F.col("tempo_entrega_dias") - F.col("tempo_entrega_estimado_dias")
)

# entrega_no_prazo
logica_entrega_prazo = (
    F.when(F.col("status") != "entregue", "Não Entregue")
        .when(F.col("diferenca_entrega_dias") <= 0, "Sim")
        .otherwise("Não")
)
    
df_completo = df_com_diferenca.withColumn("entrega_no_prazo", logica_entrega_prazo)

df_final_pedidos = df_completo.select(
    "id_pedido",
    "id_consumidor",
    "status",
    "pedido_compra_timestamp",
    "pedido_aprovado_timestamp",
    "pedido_carregado_timestamp",
    "pedido_entregue_timestamp",
    "pedido_estimativa_entrega_timestamp",
    "tempo_entrega_dias",
    "tempo_entrega_estimado_dias",
    "diferenca_entrega_dias",
    "entrega_no_prazo",
)

salvar_tabela_silver(df_final_pedidos, "ft_pedidos")

In [0]:
#Vizualizando como ficou na camada silver 
ft_pedidos_silver = spark.table("medalhao.silver.ft_pedidos")
display(ft_pedidos_silver)

## 3. ft_itens_pedidos


In [0]:
df_itens_pedidos = spark.table("medalhao.bronze.ft_itens_pedidos")
display(df_itens_pedidos)

In [0]:
 df_bronze_itens = spark.table(f"{catalogo}.{bronze_db_name}.ft_itens_pedidos")

# Renomeando e fiiltrando colunas 
df_silver_itens = df_bronze_itens.select(
    F.col("order_id").alias("id_pedido"),
    F.col("order_item_id").alias("id_item"),
    F.col("product_id").alias("id_produto"),
    F.col("seller_id").alias("id_vendedor"),
    
    # Garantindo a tipagem correta para valores monetarios com o cast
    F.col("price").cast("double").alias("preco_BRL"),
    F.col("freight_value").cast("double").alias("preco_frete"),
)

salvar_tabela_silver(df_silver_itens, "ft_itens_pedidos")

In [0]:
#Vizualizando como ficou na camada silver 
ft_itens_pedidos_silver = spark.table("medalhao.silver.ft_itens_pedidos")
display(ft_itens_pedidos_silver)

## 4. ft_pagamentos


In [0]:
ft_pagamentos = spark.table("medalhao.bronze.ft_pagamentos_pedidos")
display(ft_pagamentos)

In [0]:
df_bronze_pagamentos = spark.table(f"{catalogo}.{bronze_db_name}.ft_pagamentos_pedidos")

forma_pagamento = (
    F.when(F.col("payment_type") == "credit_card", "Cartão de Crédito")
        .when(F.col("payment_type") == "boleto", "Boleto")
        .when(F.col("payment_type") == "voucher", "Voucher")
        .when(F.col("payment_type") == "debit_card", "Cartão de Débito")
        .otherwise("Outro") 
)

# Renomeando colunas
df_silver_pagamentos = df_bronze_pagamentos.select(
    F.col("order_id").alias("id_pedido"),
    F.col("payment_sequential").cast("integer").alias("codigo_pagamento"),
    forma_pagamento.alias("forma_pagamento"),
    
    # Usando o cast para garantir a tipagem correta
    F.col("payment_installments").cast("integer").alias("parcelas"),
    F.col("payment_value").cast("double").alias("valor_pagamento"),
)


salvar_tabela_silver(df_silver_pagamentos, "ft_pagamentos_pedidos")

In [0]:
#Vizualizando como ficou na camada silver 
ft_itens_pagamentos_silver = spark.table("medalhao.silver.ft_pagamentos_pedidos")
display(ft_itens_pagamentos_silver)

## 5. ft_avaliacoes_pedidos


In [0]:
ft_avaliacoes_pedidos = spark.table("medalhao.bronze.ft_avaliacoes_pedidos")
display(ft_avaliacoes_pedidos)

In [0]:
df_bronze_avaliacoes = spark.table(f"{catalogo}.{bronze_db_name}.ft_avaliacoes_pedidos")

# Contegem inicial de linhas 
count_inicial = df_bronze_avaliacoes.count()
print(f"Registros na Bronze: {count_inicial}\n")

# Renomeando colunas  
df_silver_avaliacoes = df_bronze_avaliacoes.select(
    F.col("review_id").alias("id_avaliacao"),
    F.col("order_id").alias("id_pedido"),
     
    # Usando o cast para ultilizar o score como numero inteiro
    F.col("review_score").cast("integer").alias("avaliacao"),
    F.col("review_comment_title").alias("titulo_comentario"),
    F.col("review_comment_message").alias("comentario"),
    
    # A conversão para 'timestamp' retornará NULL se o formato for inconsistente
    # CORREÇÃO DO ERRO DE CAST: Usando to_timestamp, que transforma erro em NULL
    F.try_to_timestamp(F.col("review_creation_date")).alias("data_comentario"),
    F.try_to_timestamp(F.col("review_answer_timestamp")).alias("data_resposta"), 
)


print("---------------- LOGICA APLICADA PARA FILTRAR DADOS ----------------")
print("1. 'id_pedido' é NULO + Logica do regex.")
print("2. 'data_comentario' é NULA ou tem formato inválido (falhou na conversão).")
print("3. 'data_resposta' é NULA ou tem formato inválido (falhou na conversão).")
print("4. 'data_comentario' é uma data futura (maior que hoje).")
print("5. 'data_resposta' é anterior à 'data_comentario' (inconsistência lógica).")
print("--------------------------------------------------")

print("extra: Não consigo ter 100 por cento de certeza sobre isso pois nao tenho a informação de como os id's de pedido foram gerados.")  
print("Mas se o que identifiquei esta correto, os id's estao em haxadecimal (so possuem letras entre a e f do alfabeto) e sao todas minusculas, entao e isso que o regex esta buscando\n")
# Regex para validar o 'id_pedido' varia de tamanho 21 a 35 pois nao consigo confirmar se esta 100% correto entao coloco uma margem para variarcao no tamanho, mas ja deve eliminar a maior parte do id's errados dessa forma 
id_pedido_regex_valido = "^[a-f0-9]{21,35}$"
    
# Aplicando regras de qualidade e filtrando dados inválidos
df_validos = df_silver_avaliacoes.filter(
    (F.col("id_pedido").rlike(id_pedido_regex_valido)) &
    (F.col("data_comentario").isNotNull()) &
    (F.col("data_resposta").isNotNull()) &
    (F.col("data_comentario") <= F.current_timestamp()) &
    (F.col("data_resposta") >= F.col("data_comentario"))
)

count_final = df_validos.count()
count_removidos = count_inicial - count_final

print(f"Registros após limpeza (Silver): {count_final}")
print(f"Total de registros removidos (inválidos): {count_removidos}")


salvar_tabela_silver(df_validos, "ft_avaliacoes_pedidos")

In [0]:
#Vizualizando como ficou na camada silver 
ft_avaliacoes_pedidos_silver = spark.table("medalhao.silver.ft_avaliacoes_pedidos")
display(ft_avaliacoes_pedidos_silver)

## 6. ft_produtos


In [0]:
ft_produtos = spark.table("medalhao.bronze.ft_produtos")
display(ft_produtos)

In [0]:
df_bronze_produtos = spark.table(f"{catalogo}.{bronze_db_name}.ft_produtos")

# Renomeando e Tipando
df_silver_produtos = df_bronze_produtos.select(
    F.col("product_id").alias("id_produto"),
    F.col("product_category_name").alias("categoria_produto"),
    F.col("product_weight_g").cast("integer").alias("peso_produto_gramas"),
    F.col("product_length_cm").cast("integer").alias("comprimento_centimetros"),
    F.col("product_height_cm").cast("integer").alias("altura_centimetros"),
    F.col("product_width_cm").cast("integer").alias("largura_centimetros"),
)

salvar_tabela_silver(df_silver_produtos, "ft_produtos")

In [0]:
#Vizualizando como ficou na camada silver 
ft_produtos_silver = spark.table("medalhao.silver.ft_produtos")
display(ft_produtos_silver)

## 7. ft_vendedores

In [0]:
ft_vendedores = spark.table("medalhao.bronze.ft_vendedores")
display(ft_vendedores)

In [0]:
    df_bronze_vendedores = spark.table(f"{catalogo}.{bronze_db_name}.ft_vendedores")

    # Renomeando, Limpando (trim) e Padronizando (upper)
    df_silver_vendedores = df_bronze_vendedores.select(
        F.col("seller_id").alias("id_vendedor"),
        F.col("seller_zip_code_prefix").alias("prefixo_cep"),

        F.upper(F.trim(F.col("seller_city"))).alias("cidade"),
        F.upper(F.trim(F.col("seller_state"))).alias("estado"),
    )

    salvar_tabela_silver(df_silver_vendedores, "ft_vendedores")

In [0]:
ft_vendedores_silver = spark.table("medalhao.silver.ft_vendedores")
display(ft_vendedores_silver)   

## 8. dm_categoria_produtos_traducao


In [0]:
dm_categoria_produtos_traducao = spark.table("medalhao.bronze.dm_categoria_produtos_traducao")
display(dm_categoria_produtos_traducao)

In [0]:
df_bronze_traducao = spark.table(f"{catalogo}.{bronze_db_name}.dm_categoria_produtos_traducao")

# Renomeando colunas
df_silver_traducao = df_bronze_traducao.select(
    F.col("product_category_name").alias("nome_produto_pt"),
    F.col("product_category_name_english").alias("nome_produto_en"),
)

salvar_tabela_silver(df_silver_traducao, "dm_categoria_produtos_traducao")

In [0]:
dm_categoria_produtos_traducao_silver = spark.table("medalhao.silver.dm_categoria_produtos_traducao")
display(dm_categoria_produtos_traducao_silver)

## 9. dm_cotacao_dolar

In [0]:
dm_cotacao_dolar = spark.table("medalhao.bronze.dm_cotacao_dolar")
display(dm_cotacao_dolar)

In [0]:
# Convertemos 'dataHoraCotacao' para data para garantir a tipagem correta
df_bronze_cotacao = spark.table(f"{catalogo}.{bronze_db_name}.dm_cotacao_dolar").select(
    F.col("cotacaoCompra").alias("cotacao_dolar"),
    F.to_date(F.col("dataHoraCotacao")).alias("data"),
).distinct()
# Distinct para evitar duplicatas caso existam 

# Encontrando a data minima e maxima presente na tabela (mais antigo e mais recente), o perido pode ser alterado gerando diferente resultados
df_min_max = df_bronze_cotacao.agg(
    F.min("data").alias("data_min"),
    F.max("data").alias("data_max")
).first()

data_inicio = df_min_max["data_min"]
data_fim = df_min_max["data_max"]

# Criando um DataFrame com todos os dias do período da cotacao
df_calendario = spark.sql(f"SELECT explode(sequence(to_date('{data_inicio}'), to_date('{data_fim}'), interval 1 day)) AS data")

# Dando join nos dois dataframes para descobrir os dias que nao tem cotacao (recebem NULL)
df_joined = df_calendario.join(
    df_bronze_cotacao,
    "data",
    "left"
)

# Ordenar por data 
window = Window.orderBy("data").rowsBetween(Window.unboundedPreceding, Window.currentRow)

# Preenchendo os NULLs com o último valor válido (a sexta-feira)
df_filled = df_joined.withColumn(
    "cotacao_dolar_preenchida",
    F.last("cotacao_dolar", ignorenulls=True).over(window)
)

df_silver_cotacao = df_filled.select(
    F.col("data"),
    F.col("cotacao_dolar_preenchida").alias("cotacao_dolar"),
)

salvar_tabela_silver(df_silver_cotacao, "dm_cotacao_dolar")

In [0]:
dm_cotacao_dolar_silver = spark.table("medalhao.silver.dm_cotacao_dolar")
display(dm_cotacao_dolar_silver)

## VALIDAÇÕES

In [0]:
ft_pedidos_silver = spark.table("medalhao.silver.ft_pedidos")
ft_consumidores_silver = spark.table("medalhao.silver.ft_consumidores")
ft_itens_pedidos_silver = spark.table("medalhao.silver.ft_itens_pedidos")

# Encontrar e Contar Orfaos
df_pedidos_orfao = ft_pedidos_silver.join(
    df_consumidores,
    "id_consumidor",
    "left_anti"
)
count_pedidos_orfao = df_pedidos_orfao.count()
print(f"Resultado: {count_pedidos_orfao} Pedidos Órfãos encontrados.")
display(df_pedidos_orfao)

if count_pedidos_orfos > 0:
    print("Removendo registros órfãos de ft_pedidos...")
    
    # Manter apenas registros validos
    df_pedidos_validos = df_pedidos.join(
        df_consumidores,
        "id_consumidor",
        "left_semi" 
    )
    
    # Sobrescrever a tabela Silver apenas com os validos
    (df_pedidos_validos.write
        .format("delta")
        .mode("overwrite")
        .option("overwriteSchema", "true") 
        .saveAsTable(f"{catalogo}.{silver_db_name}.ft_pedidos")
    )
    print(f"✅ Tabela ft_pedidos atualizada (órfãos removidos).")


# Encontrar e Contar Orfaos
df_itens_orfao = ft_itens_pedidos_silver.join(
    df_pedidos_validos,
    "id_pedido",
    "left_anti"
)
count_itens_orfao = df_itens_orfao.count()
print(f"Resultado: {count_itens_orfao} Itens Órfãos encontrados.")
display(df_item_orfao)

if count_itens_orfos > 0:
    print("Removendo registros órfãos de ft_itens_pedidos...")
    
    # Manter apenas registros validos 
    df_itens_validos = df_itens.join(
        df_pedidos_validos,
        "id_pedido",
        "left_semi"
    )
    
    # Sobrescrever a tabela Silver apenas com os validos
    (df_itens_validos.write
        .format("delta")
        .mode("overwrite")
        .option("overwriteSchema", "true") 
        .saveAsTable(f"{catalogo}.{silver_db_name}.ft_itens_pedidos")
    )
    print(f"✅ Tabela ft_itens_pedidos atualizada (órfãos removidos).")


In [0]:
df_pedidos = spark.table(f"{catalogo}.{bronze_db_name}.ft_pedidos")
df_consumidores = spark.table(f"{catalogo}.{bronze_db_name}.ft_consumidores")
df_pagamentos_pedidos = spark.table(f"{catalogo}.{bronze_db_name}.ft_pagamentos_pedidos")
df_cotacao_dolar = spark.table(f"{catalogo}.{silver_db_name}.dm_cotacao_dolar")


# Somando todos os pagamentos de um mesmo pedido
df_pagamentos_agg = df_pagamentos_pedidos.groupBy("order_id").agg(
    F.sum("payment_value").alias("valor_total_pago_brl")
)

# Selecionar colunas e converter a data do pedido para 'date' para fazer o JOIN com a cotacao.
df_pedidos_base = df_pedidos.select(
    F.col("order_id"),
    F.col("customer_id"),
    F.col("order_status"),
    F.to_date(F.col("order_purchase_timestamp")).alias("data_pedido")
)

# Juntar Pedidos com Pagamentos Agregados
df_pedido_e_pagamento = df_pedidos_base.join(
    df_pagamentos_agg,
    "order_id",
    "inner" 
)

# A partir de 'data_pedido' buscamos a 'cotacao_dolar' daquele dia
df_joined_total = df_pedido_e_pagamento.join(
    df_cotacao_dolar,
    df_pedido_e_pagamento["data_pedido"] == df_cotacao_dolar["data"],
    "left" 
)

# Calcular dolar e Selecionar Colunas Finais
df_final = df_joined_total.select(
    F.col("order_id").alias("id_pedido"),
    F.col("customer_id").alias("id_consumidor"),
    F.col("order_status").alias("status"),
    F.col("data_pedido"),
    F.col("valor_total_pago_brl"),
    
    # Calculo do dolar: (valor_brl / cotacao_dolar) com duas casas decimais
    F.round(
        (F.col("valor_total_pago_brl") / F.col("cotacao_dolar")), 2
    ).alias("valor_total_pago_usd")
)

salvar_tabela_silver(df_final, "ft_pedido_total")
display(df_final)