In [0]:
from pyspark.sql import functions as F

print("Iniciando a transformação para silver.ft_consumidores...")

try:
    # 1. Ler a tabela da camada Bronze
    df_bronze = spark.table("medalhao.bronze.ft_consumidores")

    # 2. Remover duplicatas com base no 'customer_id' 
    # Usamos dropDuplicates para manter apenas um registro por id_consumidor
    df_deduplicated = df_bronze.dropDuplicates(["customer_id"])

    # 3. Aplicar transformações
    df_silver = df_deduplicated.select(
        # Renomear colunas 
        F.col("customer_id").alias("id_consumidor"),
        F.col("customer_zip_code_prefix").alias("prefixo_cep"),
        
        # Converter para maiúsculas (UPPER CASE) 
        F.upper(F.col("customer_city")).alias("cidade"),
        F.upper(F.col("customer_state")).alias("estado")
    )

    # 4. Salvar a tabela limpa na camada Silver
    df_silver.write.mode("overwrite").saveAsTable("medalhao.silver.ft_consumidores")
    
    print("SUCESSO: Tabela medalhao.silver.ft_consumidores criada.")

except Exception as e:
    print(f"ERRO ao processar silver.ft_consumidores: {e}")

In [0]:
from pyspark.sql import functions as F

print("Iniciando a transformação para silver.ft_pedidos...")

try:
    # 1. Ler a tabela da camada Bronze
    df_bronze = spark.table("medalhao.bronze.ft_pedidos")

    # 2. Criar o dicionário de mapeamento para tradução de status
    status_map = {
        "delivered": "entregue",
        "invoiced": "faturado",
        "shipped": "enviado",
        "processing": "em processamento",
        "unavailable": "indisponível",
        "canceled": "cancelado",
        "created": "criado",
        "approved": "aprovado"
    }
    
    # --- CORREÇÃO AQUI ---
    # A forma correta de criar um mapa de tradução no Spark
    # é "achatar" (flatten) o dicionário em uma lista de literais
    
    # 1. Achatar o dicionário para a lista: [key1, val1, key2, val2, ...]
    map_args = []
    for key, value in status_map.items():
        map_args.append(F.lit(key))
        map_args.append(F.lit(value))
    
    # 2. Criar a coluna do tipo "mapa"
    map_expr = F.create_map(*map_args)
    # --- FIM DA CORREÇÃO ---

    # 3. Enriquecimento: Criar colunas de tempo (código original, está correto)
    df_enriched = df_bronze.withColumn(
        "tempo_entrega_dias", 
        F.datediff(F.col("order_delivered_customer_date"), F.col("order_purchase_timestamp"))
    ).withColumn(
        "tempo_entrega_estimado_dias",
        F.datediff(F.col("order_estimated_delivery_date"), F.col("order_purchase_timestamp"))
    )
    
    # Calcular a diferença de entrega
    df_enriched = df_enriched.withColumn(
        "diferenca_entrega_dias", 
        F.col("tempo_entrega_dias") - F.col("tempo_entrega_estimado_dias")
    )

    # 4. Enriquecimento: Lógica da coluna 'entrega_no_prazo' (código original, está correto)
    entrega_prazo_logic = (
        F.when(F.col("order_delivered_customer_date").isNull(), "Não Entregue")
         .when(F.col("diferenca_entrega_dias") <= 0, "Sim")
         .otherwise("Não")
    )

    # 5. Aplicar transformações, renomear e selecionar colunas finais
    df_silver = df_enriched.select(
        F.col("order_id").alias("id_pedido"),
        F.col("customer_id").alias("id_consumidor"),
        
        # --- CORREÇÃO AQUI ---
        # Usar a coluna do mapa para "procurar" o valor traduzido
        # F.coalesce() garante que se a tradução não for encontrada (nulo), 
        # o valor original (em inglês) seja mantido.
        F.coalesce(map_expr[F.col("order_status")], F.col("order_status")).alias("status"),
        # --- FIM DA CORREÇÃO ---
        
        F.col("order_purchase_timestamp").alias("pedido_compra_timestamp"),
        F.col("order_approved_at").alias("pedido_aprovado_timestamp"),
        F.col("order_delivered_carrier_date").alias("pedido_carregado_timestamp"),
        F.col("order_delivered_customer_date").alias("pedido_entregue_timestamp"),
        F.col("order_estimated_delivery_date").alias("pedido_estimativa_entrega_timestamp"),
        
        # Novas colunas
        F.col("tempo_entrega_dias"),
        F.col("tempo_entrega_estimado_dias"),
        F.col("diferenca_entrega_dias"),
        entrega_prazo_logic.alias("entrega_no_prazo")
    )

    # 6. Salvar a tabela limpa na camada Silver
    df_silver.write.mode("overwrite").saveAsTable("medalhao.silver.ft_pedidos")
    
    print("SUCESSO: Tabela medalhao.silver.ft_pedidos criada.")

except Exception as e:
    print(f"ERRO ao processar silver.ft_pedidos: {e}")

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import DecimalType

print("Iniciando a transformação para silver.ft_itens_pedidos...")

try:
    # 1. Ler a tabela da camada Bronze
    df_bronze = spark.table("medalhao.bronze.ft_itens_pedidos")

    # 2. Aplicar transformações: Renomear e ajustar tipos
    df_silver = df_bronze.select(
        F.col("order_id").alias("id_pedido"),
        F.col("order_item_id").alias("id_item"),
        F.col("product_id").alias("id_produto"),
        F.col("seller_id").alias("id_vendedor"),
        
        # Ajustar tipo para monetário
        F.col("price").cast(DecimalType(12, 2)).alias("preco_BRL"),
        F.col("freight_value").cast(DecimalType(12, 2)).alias("preco_frete")
    )

    # 3. Salvar a tabela limpa na camada Silver
    df_silver.write.mode("overwrite").saveAsTable("medalhao.silver.ft_itens_pedidos")
    
    print("SUCESSO: Tabela medalhao.silver.ft_itens_pedidos criada.")

except Exception as e:
    print(f"ERRO ao processar silver.ft_itens_pedidos: {e}")

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import DecimalType

print("Iniciando a transformação para silver.ft_pagamentos...")

try:
    # 1. Ler a tabela da camada Bronze
    df_bronze = spark.table("medalhao.bronze.ft_pagamentos_pedidos")

    # 2. Criar o dicionário de mapeamento para tradução da forma de pagamento
    pagamento_map = {
        "credit_card": "Cartão de Crédito",
        "boleto": "Boleto",
        "voucher": "Voucher",
        "debit_card": "Cartão de Débito"
    }
    
    # Achatar o dicionário para a lista de literais do Spark
    map_args = []
    for key, value in pagamento_map.items():
        map_args.append(F.lit(key))
        map_args.append(F.lit(value))
    
    # Criar a coluna do tipo "mapa"
    map_expr = F.create_map(*map_args)

    # 3. Aplicar transformações, renomear e selecionar colunas
    df_silver = df_bronze.select(
        F.col("order_id").alias("id_pedido"),
        F.col("payment_sequential").alias("codigo_pagamento"),
        
        # Usar o mapa para traduzir.
        # Se não encontrar no mapa, F.coalesce() usa o 'otherwise' ("Outro")
        F.coalesce(map_expr[F.col("payment_type")], F.lit("Outro")).alias("forma_pagamento"),
        
        F.col("payment_installments").alias("parcelas"),
        F.col("payment_value").cast(DecimalType(12, 2)).alias("valor_pagamento")
    )

    # 4. Salvar a tabela limpa na camada Silver
    df_silver.write.mode("overwrite").saveAsTable("medalhao.silver.ft_pagamentos")
    
    print("SUCESSO: Tabela medalhao.silver.ft_pagamentos criada.")

except Exception as e:
    print(f"ERRO ao processar silver.ft_pagamentos: {e}")

In [0]:
from pyspark.sql import functions as F

print("Iniciando a transformação para silver.ft_avaliacoes_pedidos...")

try:
    # 1. Ler a tabela da camada Bronze
    df_bronze = spark.table("medalhao.bronze.ft_avaliacoes_pedidos")
    
    # 2. Contar as linhas antes da limpeza
    linhas_antes = df_bronze.count()
    print(f"Linhas na Bronze: {linhas_antes}")

    # 3. --- CORREÇÃO AQUI: Usar F.expr('try_cast(...)') ---
    # Esta é a forma SQL de chamar 'try_cast', que funciona em todas as versões.
    
    df_com_datas_limpas = df_bronze.withColumn(
        "data_comentario_limpa",
        F.expr("try_cast(review_creation_date AS timestamp)")
    ).withColumn(
        "data_resposta_limpa",
        F.expr("try_cast(review_answer_timestamp AS timestamp)")
    )

    # 4. Regras de validação (Agora usando as colunas limpas)
    df_limpo = df_com_datas_limpas.filter(
        (F.col("order_id").isNotNull()) &
        (F.col("data_comentario_limpa").isNotNull()) &  # Filtra datas corrompidas
        (F.col("data_resposta_limpa").isNotNull()) &    # Filtra datas corrompidas
        (F.col("data_comentario_limpa") <= F.current_timestamp())
    )

    # 5. Contar as linhas depois e documentar
    linhas_depois = df_limpo.count()
    linhas_removidas = linhas_antes - linhas_depois
    print(f"Linhas após limpeza: {linhas_depois}")
    print(f"DOCUMENTAÇÃO: {linhas_removidas} linhas removidas por ID nulo, datas nulas/corrompidas ou datas futuras.")

    # 6. Aplicar transformações: Renomear colunas
    df_silver = df_limpo.select(
        F.col("review_id").alias("id_avaliacao"),
        F.col("order_id").alias("id_pedido"),
        F.col("review_score").alias("avaliacao"),
        F.col("review_comment_title").alias("titulo_comentario"),
        F.col("review_comment_message").alias("comentario"),
        
        # Usar as novas colunas limpas
        F.col("data_comentario_limpa").alias("data_comentario"),
        F.col("data_resposta_limpa").alias("data_resposta")
    )

    # 7. Salvar a tabela limpa na camada Silver
    df_silver.write.mode("overwrite").saveAsTable("medalhao.silver.ft_avaliacoes_pedidos")
    
    print("SUCESSO: Tabela medalhao.silver.ft_avaliacoes_pedidos criada.")

except Exception as e:
    print(f"ERRO ao processar silver.ft_avaliacoes_pedidos: {e}")

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType

print("Iniciando a transformação para silver.ft_produtos...")

try:
    # 1. Ler a tabela da camada Bronze
    df_bronze = spark.table("medalhao.bronze.ft_produtos")

    # 2. Aplicar transformações: Renomear e ajustar tipos
    df_silver = df_bronze.select(
        F.col("product_id").alias("id_produto"),
        F.col("product_category_name").alias("categoria_produto"),
        
        # Garantir que as colunas numéricas sejam Integer
        F.col("product_weight_g").cast(IntegerType()).alias("peso_produto_gramas"),
        F.col("product_length_cm").cast(IntegerType()).alias("comprimento_centimetros"),
        F.col("product_height_cm").cast(IntegerType()).alias("altura_centimetros"),
        F.col("product_width_cm").cast(IntegerType()).alias("largura_centimetros")
    )

    # 3. Salvar a tabela limpa na camada Silver
    df_silver.write.mode("overwrite").saveAsTable("medalhao.silver.ft_produtos")
    
    print("SUCESSO: Tabela medalhao.silver.ft_produtos criada.")

except Exception as e:
    print(f"ERRO ao processar silver.ft_produtos: {e}")

In [0]:
from pyspark.sql import functions as F

print("Iniciando a transformação para silver.ft_vendedores...")

try:
    # 1. Ler a tabela da camada Bronze
    df_bronze = spark.table("medalhao.bronze.ft_vendedores")

    # 2. Aplicar transformações: Renomear e converter para maiúsculas
    df_silver = df_bronze.select(
        F.col("seller_id").alias("id_vendedor"),
        F.col("seller_zip_code_prefix").alias("prefixo_cep"),
        
        # Converter para maiúsculas (UPPER CASE)
        F.upper(F.col("seller_city")).alias("cidade"),
        F.upper(F.col("seller_state")).alias("estado")
    )

    # 3. Salvar a tabela limpa na camada Silver
    df_silver.write.mode("overwrite").saveAsTable("medalhao.silver.ft_vendedores")
    
    print("SUCESSO: Tabela medalhao.silver.ft_vendedores criada.")

except Exception as e:
    print(f"ERRO ao processar silver.ft_vendedores: {e}")

In [0]:
from pyspark.sql import functions as F

print("Iniciando a transformação para silver.dm_categoria_produtos_traducao...")

try:
    # 1. Ler a tabela da camada Bronze
    df_bronze = spark.table("medalhao.bronze.dm_categoria_produtos_traducao")

    # 2. Aplicar transformações: Renomear colunas
    df_silver = df_bronze.select(
        F.col("product_category_name").alias("nome_produto_pt"),
        F.col("product_category_name_english").alias("nome_produto_en")
    )

    # 3. Salvar a tabela limpa na camada Silver
    df_silver.write.mode("overwrite").saveAsTable("medalhao.silver.dm_categoria_produtos_traducao")
    
    print("SUCESSO: Tabela medalhao.silver.dm_categoria_produtos_traducao criada.")

except Exception as e:
    print(f"ERRO ao processar silver.dm_categoria_produtos_traducao: {e}")

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

print("Iniciando a transformação para silver.dm_cotacao_dolar...")

try:
    # 1. Ler a tabela da camada Bronze
    df_bronze = spark.table("medalhao.bronze.dm_cotacao_dolar")

    # 2. Limpar os dados da Bronze: Converter para data e pegar apenas a cotação e a data
    df_cotacao_dias_uteis = df_bronze.select(
        F.to_date(F.col("dataHoraCotacao")).alias("data"),
        F.col("cotacaoCompra")
    ).distinct() # Garantir que temos apenas um valor por dia

    # 3. Encontrar a data mínima e máxima para criar o calendário
    min_max_datas = df_cotacao_dias_uteis.agg(
        F.min("data").alias("data_min"),
        F.max("data").alias("data_max")
    ).first()
    
    data_min = min_max_datas["data_min"]
    data_max = min_max_datas["data_max"]

    # 4. Criar o DataFrame "calendário" com todos os dias
    df_calendario = spark.sql(f"SELECT explode(sequence(to_date('{data_min}'), to_date('{data_max}'), interval 1 day)) AS data")

    # 5. Fazer o left_join para criar os 'null's nos fins de semana
    df_com_nulos = df_calendario.join(
        df_cotacao_dias_uteis,
        on="data",
        how="left"
    )

    # 6. Usar Window Function para preencher os 'null's (Last Observation Carried Forward)
    # "Leve adiante" a última cotação válida (da sexta-feira) para os dias 'null' (sábado, domingo)
    window_spec = Window.orderBy("data").rowsBetween(Window.unboundedPreceding, Window.currentRow)
    
    df_preenchido = df_com_nulos.withColumn(
        "cotacao_preenchida",
        F.last(F.col("cotacaoCompra"), ignorenulls=True).over(window_spec)
    )

    # 7. Selecionar colunas finais e salvar na Silver
    df_silver = df_preenchido.select(
        F.col("cotacao_preenchida").alias("cotacao_dolar"), # [cite: 211, 212]
        F.col("data") # [cite: 213]
    )
    
    df_silver.write.mode("overwrite").saveAsTable("medalhao.silver.dm_cotacao_dolar")
    
    print("SUCESSO: Tabela medalhao.silver.dm_cotacao_dolar criada e preenchida.")

except Exception as e:
    print(f"ERRO ao processar silver.dm_cotacao_dolar: {e}")

In [0]:
from pyspark.sql import functions as F

print("Iniciando validações de integridade referencial (Anti-Joins)...")

try:
    # 1. Carregar as tabelas Silver necessárias
    df_pedidos = spark.table("medalhao.silver.ft_pedidos")
    df_consumidores = spark.table("medalhao.silver.ft_consumidores")
    df_itens_pedidos = spark.table("medalhao.silver.ft_itens_pedidos")

    # --- Verificação 1: Pedidos órfãos (sem consumidor) ---
    
    # Usar 'left_anti' para encontrar pedidos que NÃO TÊM correspondência em consumidores
    pedidos_orfos = df_pedidos.join(
        df_consumidores,
        on="id_consumidor",
        how="left_anti" 
    )
    
    count_pedidos_orfos = pedidos_orfos.count()
    print(f"Verificação 1: {count_pedidos_orfos} pedidos órfãos (sem consumidor) encontrados.")

    # --- Verificação 2: Itens órfãos (sem pedido) ---
    
    # Usar 'left_anti' para encontrar itens que NÃO TÊM correspondência em pedidos
    itens_orfos = df_itens_pedidos.join(
        df_pedidos,
        on="id_pedido",
        how="left_anti"
    )
    
    count_itens_orfos = itens_orfos.count()
    print(f"Verificação 2: {count_itens_orfos} itens de pedidos órfãos (sem pedido) encontrados.")

    # --- Remoção de Registros Órfãos (se existirem) ---
    
    if count_pedidos_orfos > 0:
        print(f"Removendo {count_pedidos_orfos} pedidos órfãos...")
        # 'left_semi' é o oposto do 'left_anti'. Ele MANTÉM apenas os registros que TÊM correspondência.
        df_pedidos_limpos = df_pedidos.join(df_consumidores, on="id_consumidor", how="left_semi")
        
        # Sobrescrever a tabela silver.ft_pedidos apenas com os registros válidos
        df_pedidos_limpos.write.mode("overwrite").saveAsTable("medalhao.silver.ft_pedidos")
        print("Tabela medalhao.silver.ft_pedidos atualizada.")

    if count_itens_orfos > 0:
        print(f"Removendo {count_itens_orfos} itens órfãos...")
        # Manter apenas os itens que TÊM correspondência em pedidos
        df_itens_limpos = df_itens_pedidos.join(df_pedidos, on="id_pedido", how="left_semi")
        
        # Sobrescrever a tabela silver.ft_itens_pedidos apenas com os registros válidos
        df_itens_limpos.write.mode("overwrite").saveAsTable("medalhao.silver.ft_itens_pedidos")
        print("Tabela medalhao.silver.ft_itens_pedidos atualizada.")

    if count_pedidos_orfos == 0 and count_itens_orfos == 0:
        print("Nenhum registro órfão encontrado. Nenhuma remoção necessária.")

    print("SUCESSO: Validações de integridade concluídas.")

except Exception as e:
    print(f"ERRO durante a validação: {e}")

In [0]:
from pyspark.sql import functions as F

print("Iniciando a criação da tabela final silver.pedido_total...")

try:
    # 1. Carregar as tabelas Silver necessárias
    # (O PDF sugere bronze[cite: 224], mas usaremos as tabelas Silver limpas)
    pedidos = spark.table("medalhao.silver.ft_pedidos")
    consumidores = spark.table("medalhao.silver.ft_consumidores")
    pagamentos = spark.table("medalhao.silver.ft_pagamentos")
    cotacao = spark.table("medalhao.silver.dm_cotacao_dolar")

    # 2. Agregar o valor total pago por pedido (BRL)
    pagamentos_total_por_pedido = pagamentos.groupBy("id_pedido").agg(
        F.sum("valor_pagamento").alias("valor_total_pago_brl")
    )

    # 3. Preparar a tabela de pedidos para o join com cotação
    # (Precisamos de uma coluna 'date' limpa)
    pedidos_com_data = pedidos.withColumn(
        "data_pedido", 
        F.to_date(F.col("pedido_compra_timestamp"))
    )

    # 4. Juntar as tabelas
    df_joined = pedidos_com_data.join(
        consumidores,
        on="id_consumidor",
        how="left"
    ).join(
        pagamentos_total_por_pedido,
        on="id_pedido",
        how="left"
    ).join(
        cotacao,
        on=pedidos_com_data.data_pedido == cotacao.data, # Join pela data do pedido
        how="left"
    )

    # 5. Criar colunas finais e calcular o valor em USD
    df_silver_final = df_joined.select(
        F.col("data_pedido"),
        F.col("id_pedido"),
        F.col("id_consumidor"),
        F.col("status"),
        F.col("valor_total_pago_brl"),
        
        # Calcular valor em USD e arredondar para 2 casas decimais
        (F.round(F.col("valor_total_pago_brl") / F.col("cotacao_dolar"), 2)).alias("valor_total_pago_usd")
    )

    # 6. Salvar a tabela final na camada Silver
    df_silver_final.write.mode("overwrite").saveAsTable("medalhao.silver.pedido_total")
    
    print("SUCESSO: Tabela medalhao.silver.pedido_total criada.")
    
    # 7. (Opcional) Mostrar um exemplo dos dados
    print("Exemplo de dados da tabela final:")
    df_silver_final.show(5)

except Exception as e:
    print(f"ERRO ao criar silver.pedido_total: {e}")