# Notebook2 - Bronze -> Silver

## 1 - Configuração Inicial e Imports:

In [0]:
# Importação de bibliotecas
from pyspark.sql import functions as F
from pyspark.sql.window import Window

print("Entrando na camada silver")

## 2 - Leitura das Tabelas da Camada Bronze:

In [0]:
# Leitura de todas as tabelas Bronze
ft_consumidores_bronze = spark.table("medalhao.bronze.ft_consumidores")
ft_geolocalizacao_bronze = spark.table("medalhao.bronze.ft_geolocalizacao")
ft_itens_pedidos_bronze = spark.table("medalhao.bronze.ft_itens_pedidos")
ft_pagamentos_pedidos_bronze = spark.table("medalhao.bronze.ft_pagamentos_pedidos")
ft_avaliacoes_pedidos_bronze = spark.table("medalhao.bronze.ft_avaliacoes_pedidos")
ft_pedidos_bronze = spark.table("medalhao.bronze.ft_pedidos")
ft_produtos_bronze = spark.table("medalhao.bronze.ft_produtos")
ft_vendedores_bronze = spark.table("medalhao.bronze.ft_vendedores")
dm_categoria_produtos_traducao_bronze = spark.table("medalhao.bronze.dm_categoria_produtos_traducao")
dm_cotacao_dolar_bronze = spark.table("medalhao.bronze.dm_cotacao_dolar")

## 3 - Função para Analise da Qualidade das Tabelas:

In [0]:
def analisar_qualidade_tabela(df, nome_tabela):
    # Contagem total
    total_registros = df.count()
    print(f"Total de registros: {total_registros:,}")
    
    # Schema
    print(f"Colunas ({len(df.columns)}): {', '.join(df.columns)}")
    
    # Análise de valores nulos
    print("Valores nulos por coluna:")
    nulos_df = df.select([
        F.count(F.when(F.col(c).isNull(), c)).alias(c) 
        for c in df.columns
    ])
    
    # Mostra colunas com problemas
    nulos_data = nulos_df.collect()[0]
    for coluna in df.columns:
        nulos_count = nulos_data[coluna]
        if nulos_count > 0:
            percentual = (nulos_count / total_registros) * 100
            print(f"{nome_tabela}: {coluna} possui {nulos_count} nulos ({percentual:.2f}%)")
    
    # Análise de duplicidades (busca chave natural)
    if "customer_id" in df.columns:
        duplicados = total_registros - df.dropDuplicates(["customer_id"]).count()
        if duplicados > 0:
            print(f"Duplicados em customer_id: {duplicados}")
    
    if "order_id" in df.columns:
        duplicados = total_registros - df.dropDuplicates(["order_id"]).count()
        if duplicados > 0:
            print(f"Duplicados em order_id: {duplicados}")
    
    if "product_id" in df.columns:
        duplicados = total_registros - df.dropDuplicates(["product_id"]).count()
        if duplicados > 0:
            print(f"Duplicados em product_id: {duplicados}")
    
    # Amostra de dados
    print("Amostra de dados (5 primeiros registros):")
    display(df.limit(5))
    df.printSchema()

- Contagem total de registros e colunas
- Verificação de valores nulos, duplicidade e colunas problemáticas
- Amostra dos 5 primeiros registros da tabela passada como parâmetro

## 4 - Uso da Função em cada Tabela de Bronze:

In [0]:
# Análise para cada tabela
tabelas_bronze = [
    (ft_consumidores_bronze, "ft_consumidores"),
    (ft_pedidos_bronze, "ft_pedidos"),
    (ft_itens_pedidos_bronze, "ft_itens_pedidos"),
    (ft_pagamentos_pedidos_bronze, "ft_pagamentos_pedidos"),
    (ft_avaliacoes_pedidos_bronze, "ft_avaliacoes_pedidos"),
    (ft_produtos_bronze, "ft_produtos"),
    (ft_vendedores_bronze, "ft_vendedores"),
    (dm_categoria_produtos_traducao_bronze, "dm_categoria_produtos_traducao"),
    (dm_cotacao_dolar_bronze, "dm_cotacao_dolar"),
    (ft_geolocalizacao_bronze, "ft_geolocalizacao")
]

for df, nome in tabelas_bronze:
    analisar_qualidade_tabela(df, nome)

# Resumo estatístico
for df, nome in tabelas_bronze:
    total = df.count()
    colunas = len(df.columns)
    print(f"{nome}: {total:,} registros, {colunas} colunas")

- Análise para confirmar filtros de qualidade e possíveis erros de cada tabela de bronze.

## 5 - Mudanças -> ft_consumidores

In [0]:
ft_consumidores_silver = (
    ft_consumidores_bronze
    .select(
        F.col("customer_id").alias("id_consumidor"),
        F.col("customer_zip_code_prefix").alias("prefixo_cep"),
        F.upper(F.trim(F.col("customer_city"))).alias("cidade"),
        F.upper(F.col("customer_state")).alias("estado"),
        F.col("ingestion_timestamp")
    )
    .dropDuplicates(["id_consumidor"])
    .filter(F.col("id_consumidor").isNotNull())
    # Adiciona timestamp de processamento Silver
    .withColumn("silver_ingestion_timestamp", F.current_timestamp())
)

print(f"Registros após transformação: {ft_consumidores_silver.count():,}")

# Salva tabela Silver
spark.sql("DROP TABLE IF EXISTS medalhao.silver.ft_consumidores_silver")
ft_consumidores_silver.write.format("delta").mode("overwrite").saveAsTable("medalhao.silver.ft_consumidores")

print("Amostra de dados:")
display(ft_consumidores_silver)

## 6 - Mudanças -> ft_pedidos

In [0]:
# Dicionário de tradução do status
status_traducao = {
    "delivered": "entregue",
    "invoiced": "faturado", 
    "shipped": "enviado",
    "processing": "em processamento",
    "unavailable": "indisponível",
    "canceled": "cancelado",
    "created": "criado",
    "approved": "aprovado"
}

# Cria expressão CASE para tradução
status_expr = F.coalesce(
    *[F.when(F.col("order_status") == en, F.lit(pt)) for en, pt in status_traducao.items()],
    F.col("order_status")
)

ft_pedidos_silver = (
    ft_pedidos_bronze
    .select(
        F.col("order_id").alias("id_pedido"),
        F.col("customer_id").alias("id_consumidor"),
        status_expr.alias("status"),
        F.col("order_purchase_timestamp").alias("pedido_compra_timestamp"),
        F.col("order_approved_at").alias("pedido_aprovado_timestamp"),
        F.col("order_delivered_carrier_date").alias("pedido_carregado_timestamp"),
        F.col("order_delivered_customer_date").alias("pedido_entregue_timestamp"),
        F.col("order_estimated_delivery_date").alias("pedido_estimativa_entrega_timestamp"),
        F.col("ingestion_timestamp")
    )
    # Colunas calculadas
    .withColumn(
        "tempo_entrega_dias",
        F.datediff(
            F.col("pedido_entregue_timestamp"), 
            F.col("pedido_compra_timestamp")
        )
    )
    .withColumn(
        "tempo_entrega_estimado_dias", 
        F.datediff(
            F.col("pedido_estimativa_entrega_timestamp"),
            F.col("pedido_compra_timestamp")
        )
    )
    .withColumn(
        "diferenca_entrega_dias",
        F.coalesce(F.col("tempo_entrega_dias"), F.lit(0)) - F.col("tempo_entrega_estimado_dias")
    )
    .withColumn(
        "entrega_no_prazo",
        F.when(F.col("pedido_entregue_timestamp").isNull(), F.lit("Não Entregue"))
        .when(F.col("diferenca_entrega_dias") >= 0, F.lit("Sim"))
        .otherwise(F.lit("Não"))
    )
    # Adiciona timestamp Silver
    .withColumn("silver_ingestion_timestamp", F.current_timestamp())
)

print(f"Registros processados: {ft_pedidos_silver.count():,}")

spark.sql("DROP TABLE IF EXISTS medalhao.silver.ft_pedidos_silver")
ft_pedidos_silver.write.format("delta").mode("overwrite").saveAsTable("medalhao.silver.ft_pedidos")

print("Amostra de dados:")
display(ft_pedidos_silver)

## 7 - Mudanças -> ft_itens_pedidos

In [0]:
ft_itens_pedidos_silver = (
    ft_itens_pedidos_bronze
    .select(
        F.col("order_id").alias("id_pedido"),
        F.col("order_item_id").alias("id_item"),
        F.col("product_id").alias("id_produto"),
        F.col("seller_id").alias("id_vendedor"),
        F.col("price").cast("decimal(12,2)").alias("preco_BRL"),
        F.col("freight_value").cast("decimal(12,2)").alias("preco_frete"),
        F.col("ingestion_timestamp")
    )
    .withColumn("silver_ingestion_timestamp", F.current_timestamp())
)

print(f"Itens de pedidos processados: {ft_itens_pedidos_silver.count():,}")

spark.sql("DROP TABLE IF EXISTS medalhao.silver.ft_itens_pedidos_silver")
ft_itens_pedidos_silver.write.format("delta").mode("overwrite").option("overwriteSchema", "true")\
    .saveAsTable("medalhao.silver.ft_itens_pedidos")

print("Amostra de dados:")
display(ft_itens_pedidos_silver)

## 8 - Mudanças -> ft_pagamentos_pedidos

In [0]:
mapeamento_pagamento = {
    "credit_card": "Cartão de Crédito",
    "boleto": "Boleto", 
    "voucher": "Voucher",
    "debit_card": "Cartão de Débito"
}

pagamento_expr = F.coalesce(
    *[F.when(F.col("payment_type") == en, F.lit(pt)) for en, pt in mapeamento_pagamento.items()],
    F.lit("Outro")
)

ft_pagamentos_silver = (
    ft_pagamentos_pedidos_bronze
    .select(
        F.col("order_id").alias("id_pedido"),
        F.col("payment_sequential").alias("codigo_pagamento"),
        pagamento_expr.alias("forma_pagamento"),
        F.col("payment_installments").alias("parcelas"),
        F.col("payment_value").cast("decimal(12,2)").alias("valor_pagamento"),
        F.col("ingestion_timestamp")
    )
    .withColumn("silver_ingestion_timestamp", F.current_timestamp())
)

print(f"Pagamentos processados: {ft_pagamentos_silver.count():,}")
spark.sql("DROP TABLE IF EXISTS medalhao.silver.ft_pagamentos_silver")
ft_itens_pedidos_silver.write.format("delta").mode("overwrite").option("overwriteSchema", "true")\
    .saveAsTable("medalhao.silver.ft_pagamentos_pedidos")

print("Amostra de dados:")
display(ft_pagamentos_silver)

## 9 - Mudanças -> ft_avaliacoes_pedidos

In [0]:
registros_antes = ft_avaliacoes_pedidos_bronze.count()

ft_avaliacoes_silver = (
    ft_avaliacoes_pedidos_bronze
    .select(
        F.col("review_id").alias("id_avaliacao"),
        F.col("order_id").alias("id_pedido"),
        F.col("review_score").alias("avaliacao_temp"),
        F.col("review_comment_title").alias("titulo_comentario"),
        F.col("review_comment_message").alias("comentario"),
        F.col("review_creation_date").alias("data_comentario"),
        F.col("review_answer_timestamp").alias("data_resposta"),
        F.col("ingestion_timestamp")
    )
    # Remove id_pedido inválido
    .filter(
        F.col("id_pedido").isNotNull() &
        (F.trim(F.col("id_pedido")) != "") &
        (F.length(F.col("id_pedido")) == 32)
    )
    # Remove datas incorretas
    .filter(
        F.col("data_comentario").isNotNull() &
        (F.length(F.col("data_comentario")) == 19) &
        F.col("data_comentario").rlike(r"^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}$")
    )
    # Remove avaliações inválidas
    .filter(
        F.col("avaliacao_temp").isNotNull() &
        (F.length(F.col("avaliacao_temp")) == 1) &
        F.col("avaliacao_temp").between(1, 5)
    )
    .withColumn("avaliacao", F.col("avaliacao_temp").cast("int"))
    .drop("avaliacao_temp")
    .withColumn("data_comentario", F.to_date(F.col("data_comentario")))
    .withColumn("data_resposta", F.to_date(F.col("data_resposta")))
    .withColumn("silver_ingestion_timestamp", F.current_timestamp())

    # Ordem original
    .select(
        "id_avaliacao",
        "id_pedido", 
        "avaliacao",          
        "titulo_comentario",
        "comentario",
        "data_comentario",
        "data_resposta",
        "ingestion_timestamp",
        "silver_ingestion_timestamp"
    )
)

registros_depois = ft_avaliacoes_silver.count()
registros_removidos = registros_antes - registros_depois

print(f"Registros antes das validações: {registros_antes:,}")
print(f"Registros após validações: {registros_depois:,}")
print(f"Registros removidos: {registros_removidos:,}")

# Documenta as regras (conforme especificação)
print("\nRegras para validação aplicadas:")
print("id_pedido inválido: nulo, string vazia, tamanho dos ids diferente de 32 caracteres")
print("data_comentario incorreta: nula, sempre que tiver caracteres diferentes de 19 e formato inconsistente")

spark.sql("DROP TABLE IF EXISTS medalhao.silver.ft_avaliacoes_silver")
ft_avaliacoes_silver.write.format("delta").mode("overwrite").option("overwriteSchema", "true")\
.saveAsTable("medalhao.silver.ft_avaliacoes_pedidos")

print("Amostra de dados:")
display(ft_avaliacoes_silver)

- Na coluna de 'id_pedido' foram removidos os dados nulos, vazios e que continham número de caracteres diferntes de 32
- Já na coluna de 'data_comentario' foram removidos os dados nulos, com formato diferente da data e que continham número de caracteres diferentes de 19
- Na coluna de 'avaliacao' tive uma dificuldade para transformar em int direto com o cast, então filtrei os dados de apenas 1 caracter, diferentes de null e que fossem de 1 até 5
- No total foram removidas 8.855 registros inválidos e ficaram 95.307 válidos
- Precisei dar um novo select pois como havia removido a coluna de 'avaliacao_temp', a coluna de 'avaliacao' com os dados em formato int tinha ido para o final da tabela

## 10 - Mudanças -> ft_produtos

In [0]:
ft_produtos_silver = (
    ft_produtos_bronze
    .select(
        F.col("product_id").alias("id_produto"),
        F.col("product_category_name").alias("categoria_produto"),
        F.col("product_weight_g").alias("peso_produto_gramas"),
        F.col("product_length_cm").alias("comprimento_centimetros"),
        F.col("product_height_cm").alias("altura_centimetros"), 
        F.col("product_width_cm").alias("largura_centimetros"),
        F.col("ingestion_timestamp")
    )
    .withColumn("silver_ingestion_timestamp", F.current_timestamp())
)

spark.sql("DROP TABLE IF EXISTS medalhao.silver.ft_produtos_silver")
ft_produtos_silver.write.format("delta").mode("overwrite").saveAsTable("medalhao.silver.ft_produtos")

print("Amostra de dados:")
display(ft_produtos_silver)

## 11 - Mudanças -> ft_vendedores

In [0]:
ft_vendedores_silver = (
    ft_vendedores_bronze
    .select(
        F.col("seller_id").alias("id_vendedor"),
        F.col("seller_zip_code_prefix").alias("prefixo_cep"),
        F.upper(F.trim(F.col("seller_city"))).alias("cidade"),
        F.upper(F.col("seller_state")).alias("estado"),
        F.col("ingestion_timestamp")
    )
    .withColumn("silver_ingestion_timestamp", F.current_timestamp())
)

spark.sql("DROP TABLE IF EXISTS medalhao.silver.ft_vendedores_silver")
ft_vendedores_silver.write.format("delta").mode("overwrite").saveAsTable("medalhao.silver.ft_vendedores")

print("Amostra de dados:")
display(ft_vendedores_silver)

## 12 - Mudanças -> dm_categoria_produtos_traducao

In [0]:
dm_categoria_silver = (
    dm_categoria_produtos_traducao_bronze
    .select(
        F.col("product_category_name").alias("nome_produto_pt"),
        F.col("product_category_name_english").alias("nome_produto_en"),
        F.col("ingestion_timestamp")
    )
    .withColumn("silver_ingestion_timestamp", F.current_timestamp())
)

spark.sql("DROP TABLE IF EXISTS medalhao.silver.dm_categoria_silver")
dm_categoria_silver.write.format("delta").mode("overwrite").saveAsTable("medalhao.silver.dm_categoria_produtos_traducao")

print("Amostra de dados:")
display(dm_categoria_silver)

## 13 - Mudanças -> dm_cotacao_dolar

In [0]:
dm_cotacao_base = (
    dm_cotacao_dolar_bronze
    .select(
        F.col("cotacaoCompra").alias("cotacao_dolar"),
        F.col("dataHoraCotacao").alias("data"),
        F.col("ingestion_timestamp")
    )
    .withColumn("cotacao_dolar", F.col("cotacao_dolar").cast("decimal(8,4)"))
    .withColumn("data", F.to_date(F.col("data")))
    .withColumn("dia_semana", F.dayofweek("data"))
    .withColumn("silver_ingestion_timestamp", F.current_timestamp())
)

# Identifica sextas-feiras que precisam de finais de semana
sextas_feiras = dm_cotacao_base.filter(F.col("dia_semana") == 6)

# Cria dados para sábados e domingos
sabados = (
    sextas_feiras
    .withColumn("data", F.date_add(F.col("data"), 1))
    .withColumn("dia_semana", F.lit(7))
)

domingos = (
    sextas_feiras
    .withColumn("data", F.date_add(F.col("data"), 2))
    .withColumn("dia_semana", F.lit(1))
)

dm_cotacao_completa = (
    dm_cotacao_base
    .union(sabados.select(dm_cotacao_base.columns))
    .union(domingos.select(dm_cotacao_base.columns))
    .dropDuplicates(["data"])
    .orderBy("data")
)

spark.sql("DROP TABLE IF EXISTS medalhao.silver.dm_cotacao_dolar")
dm_cotacao_completa.write.format("delta").mode("overwrite").saveAsTable("medalhao.silver.dm_cotacao_dolar")

print("Amostra de dados:")
display(dm_cotacao_completa)

- Usei a lógica de identificar uma sequencia de linhas 2,3,4,5,6 na coluna de 'dia_semana', ele adicionar duas linhas logo após, e colocar seus dias como 7 e 1, respectivamente onde a data da primeira linha adicionada seria yyyy-MM-(dd+1) e a segunda seria yyyy-MM-(dd+2), com relação a data do 'dia_semana' 6 mais próximo e pega a mesma cotação da linha que possui 6 como 'dia_semana' mais próximo.

## 14 - Validações Finais da Camada Silver:

In [0]:

ft_pedidos_silver = spark.table("medalhao.silver.ft_pedidos")
ft_consumidores_silver = spark.table("medalhao.silver.ft_consumidores")
ft_itens_pedidos_silver = spark.table("medalhao.silver.ft_itens_pedidos")

# Verifica pedidos sem consumidor válido
pedidos_orfãos = (
    ft_pedidos_silver
    .join(ft_consumidores_silver, "id_consumidor", "left_anti")
)

count_pedidos_orfãos = pedidos_orfãos.count()
print(f"Pedidos órfãos encontrados: {count_pedidos_orfãos:,}")

# Verifica itens sem pedido válido
itens_orfãos = (
    ft_itens_pedidos_silver
    .join(ft_pedidos_silver, "id_pedido", "left_anti")
)

count_itens_orfãos = itens_orfãos.count()
print(f"Itens órfãos encontrados: {count_itens_orfãos:,}")

# Remove registros órfãos das tabelas Silver
if count_pedidos_orfãos > 0:
    ft_pedidos_validos = (
        ft_pedidos_silver
        .join(ft_consumidores_silver, "id_consumidor")
    )
    # Sobrescreve a tabela ft_pedidos_silver
    spark.sql("DROP TABLE IF EXISTS medalhao.silver.ft_pedidos")
    ft_pedidos_validos.write.format("delta").mode("overwrite").saveAsTable("medalhao.silver.ft_pedidos")

    ft_pedidos_silver = spark.table("medalhao.silver.ft_pedidos")

if count_itens_orfãos > 0:
    ft_itens_validos = (
        ft_itens_pedidos_silver
        .join(ft_pedidos_silver, "id_pedido")
    )
    # Sobrescreve a tabela ft_itens_pedidos_silver
    spark.sql("DROP TABLE IF EXISTS medalhao.silver.ft_itens_pedidos")
    ft_itens_validos.write.format("delta").mode("overwrite").saveAsTable("medalhao.silver.ft_itens_pedidos")
    
    ft_itens_pedidos_silver = spark.table("medalhao.silver.ft_itens_pedidos")

print("Amostra de dados:")
display(ft_itens_pedidos_silver)

## 15 - Criação da Tabela 'ft_pedido_total' na Camada Silver:

In [0]:

ft_pedidos_silver = spark.table("medalhao.silver.ft_pedidos")
ft_consumidores_silver = spark.table("medalhao.silver.ft_consumidores")
ft_pagamentos_silver = spark.table("medalhao.silver.ft_pagamentos_pedidos")
dm_cotacao_silver = spark.table("medalhao.silver.dm_cotacao_dolar")

# Soma o valor total por pedido
pagamentos_agregados = (
    ft_pagamentos_silver
    .groupBy("id_pedido")
    .agg(
        F.sum("preco_frete").cast("decimal(12,2)").alias("valor_total_pago_brl")
    )
)

# União de todas as fontes necessárias
ft_pedido_total = (
    ft_pedidos_silver
    .join(ft_consumidores_silver.select("id_consumidor"), "id_consumidor")
    .join(pagamentos_agregados, "id_pedido", "left")
    .join(
        dm_cotacao_silver.select("data", "cotacao_dolar"),
        F.to_date(ft_pedidos_silver["pedido_compra_timestamp"]) == dm_cotacao_silver["data"],
        "left"
    )
    .select(
        # Ordem fornecida de exemplo no pdf
        F.to_date("pedido_compra_timestamp").alias("data_pedido"),
        "id_pedido",
        "id_consumidor", 
        "status",
        F.coalesce("valor_total_pago_brl", F.lit(0)).alias("valor_total_pago_brl"),
        F.when(
            F.col("cotacao_dolar").isNotNull() & (F.col("cotacao_dolar") > 0),
            F.col("valor_total_pago_brl") / F.col("cotacao_dolar").cast("decimal(12,2)")
        ).otherwise(F.lit(0)).cast("decimal(12,2)").alias("valor_total_pago_usd")
    )
)

print("Amostra de dados:")
display(ft_pedido_total.limit(10))

spark.sql("DROP TABLE IF EXISTS medalhao.silver.ft_pedido_total")
ft_pedido_total.write.format("delta").mode("overwrite").saveAsTable("medalhao.silver.ft_pedido_total")

- No pdf explicando a atividade é pedido para juntarmos as fontes a partir da camada bronze, mas julguei sendo mais adequado juntar a partir das tabelas já filtradas na camada silver, não sei se é o indicado. Por isso o status está em português e não em inglês como no exemplo da linha fornecido no pdf.