In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, DecimalType, TimestampType, DateType
from pyspark.sql.window import Window

In [0]:
#ft_consumidores
print("Processando silver.ft_consumidores...")

#ler a tabela bronze
df_consumidores_bronze = spark.table("bronze.ft_consumidores")

#aplicar as transformações
df_consumidores_silver = df_consumidores_bronze \
    .select(
        F.col("customer_id").alias("id_consumidor"),
        F.col("customer_zip_code_prefix").alias("prefixo_cep"),
        F.upper(F.col("customer_city")).alias("cidade"),  
        F.upper(F.col("customer_state")).alias("estado")  
    ) \
    .dropDuplicates(["id_consumidor"]) #regra: id_consumidor não deve conter valores duplicados

#salvar na camada silver
df_consumidores_silver.write.mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("silver.ft_consumidores")


In [0]:
#silver.ft_pedidos
from pyspark.sql import functions as F
from pyspark.sql.types import TimestampType
from itertools import chain 
print("Processando silver.ft_pedidos...")

#ler a tabela bronze
df_pedidos_bronze = spark.table("bronze.ft_pedidos")

#mapeamento do Status
mapeamento_status = {
    'delivered': 'entregue', 'invoiced': 'faturado', 'shipped': 'enviado',
    'processing': 'em processamento', 'unavailable': 'indisponível',
    'canceled': 'cancelado', 'created': 'criado', 'approved': 'aprovado'
}

map_expr = F.create_map([F.lit(x) for x in chain(*mapeamento_status.items())])

#aplicar as transformações
df_pedidos_silver = df_pedidos_bronze \
    .withColumn("status", map_expr[F.col("order_status")]) \
    .withColumn("pedido_compra_timestamp", F.col("order_purchase_timestamp").cast(TimestampType())) \
    .withColumn("pedido_entregue_timestamp", F.col("order_delivered_customer_date").cast(TimestampType())) \
    .withColumn("pedido_estimativa_entrega_timestamp", F.col("order_estimated_delivery_date").cast(TimestampType())) \
    .withColumn("tempo_entrega_dias", F.datediff(F.col("pedido_entregue_timestamp"), F.col("pedido_compra_timestamp"))) \
    .withColumn("tempo_entrega_estimado_dias", F.datediff(F.col("pedido_estimativa_entrega_timestamp"), F.col("pedido_compra_timestamp"))) \
    .withColumn("diferenca_entrega_dias", 
        F.col("tempo_entrega_dias") - F.col("tempo_entrega_estimado_dias")
    ) \
    .withColumn("entrega_no_prazo",
        F.when(F.col("pedido_entregue_timestamp").isNull(), "Não Entregue")
         .when(F.col("diferenca_entrega_dias") <= 0, "Sim")
         .otherwise("Não")
    ) \
    .select(
        F.col("order_id").alias("id_pedido"),
        F.col("customer_id").alias("id_consumidor"),
        "status",
        "pedido_compra_timestamp",
        F.col("order_approved_at").alias("pedido_aprovado_timestamp").cast(TimestampType()),
        F.col("order_delivered_carrier_date").alias("pedido_carregado_timestamp").cast(TimestampType()),
        "pedido_entregue_timestamp",
        "pedido_estimativa_entrega_timestamp",
        "tempo_entrega_dias",
        "tempo_entrega_estimado_dias",
        "diferenca_entrega_dias",
        "entrega_no_prazo"
    )

#salvar na camada silver
df_pedidos_silver.write.mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("silver.ft_pedidos")

In [0]:
#ft_itenspedidos
from pyspark.sql import functions as F
from pyspark.sql.types import DecimalType, IntegerType

print("Processando silver.ft_itens_pedidos...")

df_itens_bronze = spark.table("bronze.ft_itens_pedidos")

df_itens_silver = df_itens_bronze.select(
    F.col("order_id").alias("id_pedido"),
    F.col("order_item_id").alias("id_item").cast(IntegerType()),
    F.col("product_id").alias("id_produto"),
    F.col("seller_id").alias("id_vendedor"),
    F.col("price").alias("preco_BRL").cast(DecimalType(12, 2)),
    F.col("freight_value").alias("preco_frete").cast(DecimalType(12, 2))
)

df_itens_silver.write.mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("silver.ft_itens_pedidos")

In [0]:
#ft_pagamentos pedido
from pyspark.sql import functions as F
from pyspark.sql.types import DecimalType, IntegerType
from itertools import chain

print("Processando silver.ft_pagamentos_pedidos...")

df_pagamentos_bronze = spark.table("bronze.ft_pagamentos_pedidos")

mapeamento_pagamento = {
    'credit_card': 'Cartão de Crédito',
    'boleto': 'Boleto',
    'voucher': 'Voucher',
    'debit_card': 'Cartão de Débito'
}
map_expr = F.create_map([F.lit(x) for x in chain(*mapeamento_pagamento.items())])

df_pagamentos_silver = df_pagamentos_bronze.select(
    F.col("order_id").alias("id_pedido"),
    F.col("payment_sequential").alias("codigo_pagamento").cast(IntegerType()),
    
    F.when(map_expr.isNull(), "Outro")
     .otherwise(map_expr[F.col("payment_type")])
     .alias("forma_pagamento"),
     
    F.col("payment_installments").alias("parcelas").cast(IntegerType()),
    F.col("payment_value").alias("valor_pagamento").cast(DecimalType(12, 2))
)

df_pagamentos_silver.write.mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("silver.ft_pagamentos_pedidos")

In [0]:
#ft_avaliacoes_pedidos

from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType

print("Iniciando 'solucao_avaliacoes'...")

#ler tabela Bronze
df_avaliacoes_bronze = spark.table("bronze.ft_avaliacoes_pedidos")

#contagem inicial
count_antes = df_avaliacoes_bronze.count()

#conversão de datas com tolerância a erro (NULL se inválida)
df_com_datas_tratadas = df_avaliacoes_bronze \
    .withColumn("data_comentario_cast", F.expr("try_to_timestamp(review_creation_date)")) \
    .withColumn("data_resposta_cast", F.expr("try_to_timestamp(review_answer_timestamp)"))


df_avaliacoes_limpo = df_com_datas_tratadas \
    .withColumn("avaliacao_cast", F.expr("try_cast(review_score as int)")) \
    .filter(
        (F.col("order_id").isNotNull()) &
        (F.col("data_comentario_cast").isNotNull()) &
        (F.col("data_resposta_cast").isNotNull()) &
        (F.col("avaliacao_cast").between(1, 5)) &
        (F.col("data_comentario_cast") <= F.current_timestamp()) &
        (F.col("data_comentario_cast") >= F.lit("2010-01-01"))
    )

#contagem final e relatório
count_depois = df_avaliacoes_limpo.count()
linhas_removidas = count_antes - count_depois

print("--- Relatório de Limpeza ---")
print(f"Linhas antes: {count_antes}")
print(f"Linhas removidas (IDs, datas ou scores inválidos): {linhas_removidas}")
print(f"Linhas depois: {count_depois}")

#selecionar e renomear colunas conforme mapeamento
df_avaliacoes_silver = df_avaliacoes_limpo.select(
    F.col("review_id").alias("id_avaliacao"),
    F.col("order_id").alias("id_pedido"),
    F.col("avaliacao_cast").alias("avaliacao"),
    F.col("review_comment_title").alias("titulo_comentario"),
    F.col("review_comment_message").alias("comentario"),
    F.col("data_comentario_cast").alias("data_comentario"),
    F.col("data_resposta_cast").alias("data_resposta")
)

#escreve em silver
df_avaliacoes_silver.write.mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("silver.ft_avaliacoes_pedidos")

In [0]:
#ft_produtos
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType

print("Processando silver.ft_produtos...")

df_produtos_bronze = spark.table("bronze.ft_produtos")

df_produtos_silver = df_produtos_bronze.select(
    F.col("product_id").alias("id_produto"),
    F.col("product_category_name").alias("categoria_produto"),
    F.col("product_weight_g").alias("peso_produto_gramas").cast(IntegerType()),
    F.col("product_length_cm").alias("comprimento_centimetros").cast(IntegerType()),
    F.col("product_height_cm").alias("altura_centimetros").cast(IntegerType()),
    F.col("product_width_cm").alias("largura_centimetros").cast(IntegerType())
)

df_produtos_silver.write.mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("silver.ft_produtos")


In [0]:
#ft_vendedores
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType

print("Processando silver.ft_vendedores...")

df_vendedores_bronze = spark.table("bronze.ft_vendedores")

df_vendedores_silver = df_vendedores_bronze.select(
    F.col("seller_id").alias("id_vendedor"),
    F.col("seller_zip_code_prefix").alias("prefixo_cep").cast(IntegerType()),
    F.upper(F.col("seller_city")).alias("cidade"),
    F.upper(F.col("seller_state")).alias("estado")
)

df_vendedores_silver.write.mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("silver.ft_vendedores")


In [0]:
#ft_categoria_produto_traducao
from pyspark.sql import functions as F

print("Processando silver.dm_categoria_produtos_traducao...")

df_traducao_bronze = spark.table("bronze.dm_categoria_produtos_traducao")

df_traducao_silver = df_traducao_bronze.select(
    F.col("product_category_name").alias("nome_produto_pt"),
    F.col("product_category_name_english").alias("nome_produto_en")
)

df_traducao_silver.write.mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("silver.dm_categoria_produtos_traducao")

In [0]:
#ft_cotacao
from pyspark.sql import functions as F
from pyspark.sql.types import DecimalType, DateType
from pyspark.sql.window import Window

print("Processando silver.dm_cotacao_dolar...")

df_cotacao_bronze = spark.table("bronze.dm_cotacao_dolar") \
    .select(
        F.to_date(F.col("dataHoraCotacao")).alias("data"),
        F.col("cotacaoCompra").cast(DecimalType(12, 2)).alias("cotacao_dolar")
    ) \
    .dropDuplicates(["data"]) \
    .orderBy("data")

min_max_data = df_cotacao_bronze.select(
    F.min("data").alias("data_min"),
    F.max("data").alias("data_max")
).first()

df_calendario = spark.sql(f"SELECT explode(sequence(to_date('{min_max_data.data_min}'), to_date('{min_max_data.data_max}'), interval 1 day)) AS data")

df_cotacao_com_nulos = df_calendario.join(
    df_cotacao_bronze,
    on="data",
    how="left"
)

window_spec = Window.orderBy("data").rowsBetween(Window.unboundedPreceding, 0)

df_cotacao_preenchida = df_cotacao_com_nulos.withColumn(
    "cotacao_preenchida",
    F.last(F.col("cotacao_dolar"), ignorenulls=True).over(window_spec)
)

df_cotacao_silver = df_cotacao_preenchida.select(
    F.col("data"),
    F.col("cotacao_preenchida").alias("cotacao_dolar")
) \
.filter(F.col("cotacao_dolar").isNotNull()) 

df_cotacao_silver.write.mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("silver.dm_cotacao_dolar")

In [0]:
from pyspark.sql import functions as F

print("Iniciando Validações de Integridade (Remoção de Órfãos)...")

#carregar as tabelas silver que acabamos de criar ---
df_pedidos = spark.table("silver.ft_pedidos")
df_consumidores = spark.table("silver.ft_consumidores")
df_itens = spark.table("silver.ft_itens_pedidos")

#verificação de Pedidos Órfãos (sem consumidor) ---

#encontra pedidos cujo 'id_consumidor' NÃO existe na tabela de consumidores
df_pedidos_orfãos = df_pedidos.join(
    df_consumidores,
    on="id_consumidor",
    how="left_anti" # 'left_anti' retorna apenas linhas de 'df_pedidos' que NÃO têm correspondência
)

count_pedidos_orfãos = df_pedidos_orfãos.count()
print(f"Relatório de Validação 1: Pedidos Órfãos (sem consumidor) encontrados: {count_pedidos_orfãos}")


df_itens_orfãos = df_itens.join(
    df_pedidos,
    on="id_pedido",
    how="left_anti"
)

count_itens_orfãos = df_itens_orfãos.count()
print(f"Relatório de Validação 2: Itens Órfãos (sem pedido) encontrados: {count_itens_orfãos}")

#remover os registros órfãos e sobrescrever as tabelas

#se encontramos órfãos, limpamos a tabela de pedidos
if count_pedidos_orfãos > 0:
    print(f"Limpando {count_pedidos_orfãos} pedidos órfãos da tabela silver.ft_pedidos...")
    # lef semi faz o oposto: mantém apenas os pedidos QUE TÊM correspondência
    df_pedidos_limpos = df_pedidos.join(
        df_consumidores,
        on="id_consumidor",
        how="left_semi" 
    )
    #sobrescreve a tabela SÓ COM OS PEDIDOS VÁLIDOS
    df_pedidos_limpos.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("silver.ft_pedidos")
else:
    print("Nenhum pedido órfão para limpar.")


#se encontramos órfãos limpamos a tabela de itens
if count_itens_orfãos > 0:
    print(f"Limpando {count_itens_orfãos} itens órfãos da tabela silver.ft_itens_pedidos...")
    #mantém apenas os itens QUE TÊM um pedido válido
    df_itens_limpos = df_itens.join(
        df_pedidos,
        on="id_pedido",
        how="left_semi"
    )
    #sobrescreve a tabela SÓ COM OS ITENS VÁLIDOS
    df_itens_limpos.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("silver.ft_itens_pedidos")
else:
    print("Nenhum item órfão para limpar.")


In [0]:
#pedido_total
from pyspark.sql import functions as F
from pyspark.sql.types import DecimalType

print("Iniciando a criação da tabela final silver.pedido_total...")

#ler as table silver
df_pedidos = spark.table("silver.ft_pedidos")
df_pagamentos = spark.table("silver.ft_pagamentos_pedidos")
df_cotacao = spark.table("silver.dm_cotacao_dolar")

df_pagamentos_total = df_pagamentos \
    .groupBy("id_pedido") \
    .agg(
        F.sum("valor_pagamento").alias("valor_total_pago_brl")
    )

#juntar_pedidos_e_pagamentos
df_pedidos_com_pagamento = df_pedidos.join(
    df_pagamentos_total,
    on="id_pedido",
    how="left" #manter_pedidos_mesmo_sem_pagamento
)

#join pela data
df_pedidos_pronto = df_pedidos_com_pagamento.withColumn(
    "data_pedido", F.to_date(F.col("pedido_compra_timestamp"))
)

#juntar com cotacao do dolar
df_final = df_pedidos_pronto.join(
    df_cotacao,
    df_pedidos_pronto.data_pedido == df_cotacao.data, #join_pela_data
    how="left"
)

#calcular valor usd 
df_pedido_total_silver = df_final.select(
    F.col("id_pedido"),
    F.col("id_consumidor"),
    F.col("status"),
    F.col("valor_total_pago_brl").cast(DecimalType(12, 2)),
    
 
    (F.col("valor_total_pago_brl") / F.col("cotacao_dolar"))
        .alias("valor_total_pago_usd").cast(DecimalType(12, 2)),
        
    F.col("data_pedido")
) \
.filter(F.col("valor_total_pago_brl").isNotNull()) 

#salvar em silver
df_pedido_total_silver.write.mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("silver.pedido_total")