# Configuração do Ambiente

#### Configura o catálogo e o esquema, conectando o notebook às camadas bronze (leitura) e silver (escrita).

In [0]:
from pyspark.sql import SparkSession

# Variáveis de ambiente
CATALOG_NAME = "`workspace`" 
SCHEMA_NAME = "olist_ecommerce"

# Configuração da sessão no Unity Catalog
spark.sql(f"USE CATALOG {CATALOG_NAME}")
spark.sql(f"USE SCHEMA {SCHEMA_NAME}")

print(f"✅ Configuração OK. Lendo de bronze e escrevendo em silver.")

✅ Configuração OK. Lendo de bronze e escrevendo em silver.


# silver.ft_consumidores

#### Implementa a renomeação, padronização para Upper Case na cidade/estado, e a remoção de duplicatas no id_consumidor .

In [0]:
from pyspark.sql.functions import col, upper

source_table = "bronze.ft_consumidores"
target_table = "silver.ft_consumidores"

df_bronze = spark.read.table(source_table)

# Seleção, Renomeação e Upper Case
df_silver = (df_bronze
    .select(
        col("customer_id").alias("id_consumidor"), 
        col("customer_zip_code_prefix").alias("prefixo_cep"), 
        upper(col("customer_city")).alias("cidade"), 
        upper(col("customer_state")).alias("estado"), 
        col("ingestion_timestamp")
    )
)

# Verificação e Remoção de Duplicados em 'id_consumidor' 
initial_count = df_silver.count()
df_final = df_silver.dropDuplicates(["id_consumidor"]) 
removed_count = initial_count - df_final.count()

if removed_count > 0:
    print(f"⚠️ Alerta: {removed_count} IDs de consumidores duplicados foram removidos.")

# Salvar na Camada Silver
(df_final.write
    .mode("overwrite")
    .format("delta")
    .saveAsTable(target_table))

print(f"✅ Tabela Silver '{target_table}' criada/atualizada com sucesso.")

✅ Tabela Silver 'silver.ft_consumidores' criada/atualizada com sucesso.


# silver.ft_pedidos

#### Implementa a tradução do order_status e a criação das 4 colunas derivadas de tempo/entrega .

In [0]:
from pyspark.sql.functions import col, datediff, lit, when, to_timestamp

source_table = "bronze.ft_pedidos"
target_table = "silver.ft_pedidos"

df_bronze = spark.read.table(source_table)

# Mapeamento de Status (Inglês -> Português) 
status_map = {
    "delivered": "entregue", "invoiced": "faturado", "shipped": "enviado",
    "processing": "em processamento", "unavailable": "indisponível", "canceled": "cancelado",
    "created": "criado", "approved": "aprovado"
} 

# Cria a expressão CASE WHEN encadeada
status_expr = col("order_status")
for k, v in status_map.items():
    status_expr = when(col("order_status") == k, lit(v)).otherwise(status_expr)

# Renomeação e Tipagem
df_temp = (df_bronze
    .select(
        col("order_id").alias("id_pedido"), col("customer_id").alias("id_consumidor"),
        status_expr.alias("status"),
        to_timestamp("order_purchase_timestamp").alias("pedido_compra_timestamp"),
        to_timestamp("order_delivered_customer_date").alias("pedido_entregue_timestamp"),
        to_timestamp("order_estimated_delivery_date").alias("pedido_estimativa_entrega_timestamp"),
        to_timestamp("order_approved_at").alias("pedido_aprovado_timestamp"),
        to_timestamp("order_delivered_carrier_date").alias("pedido_carregado_timestamp"),
    )
)

# Criação das Colunas Derivadas (Tempo em Dias)
df_silver = (df_temp
    .withColumn("tempo_entrega_dias", 
        datediff(col("pedido_entregue_timestamp"), col("pedido_compra_timestamp")) 
    )
    .withColumn("tempo_entrega_estimado_dias", 
        datediff(col("pedido_estimativa_entrega_timestamp"), col("pedido_compra_timestamp")) 
    )
    .withColumn("diferenca_entrega_dias", 
        col("tempo_entrega_dias") - col("tempo_entrega_estimado_dias") 
    )
    .withColumn("entrega_no_prazo",
        when(col("status") != "entregue", lit("Não Entregue")) 
        .when(col("diferenca_entrega_dias") <= 0, lit("Sim")) 
        .otherwise(lit("Não")) 
    )
)

# Salvar na Camada Silver
(df_silver.write
    .mode("overwrite")
    .format("delta")
    .saveAsTable(target_table))

print(f"✅ Tabela Silver '{target_table}' criada/atualizada com sucesso.")

✅ Tabela Silver 'silver.ft_pedidos' criada/atualizada com sucesso.


# silver.ft_pagamentos

#### Implementa a tradução do tipo de pagamento e o casting para DECIMAL(12,2).

In [0]:
from pyspark.sql.functions import col, when, lit

source_table = "bronze.ft_pagamentos_pedidos"
target_table = "silver.ft_pagamentos"

df_bronze = spark.read.table(source_table)

# Mapeamento de Forma de Pagamento (Inglês -> Português) 
payment_expr = when(col("payment_type") == "credit_card", lit("Cartão de Crédito"))
payment_expr = payment_expr.when(col("payment_type") == "boleto", lit("Boleto"))
payment_expr = payment_expr.when(col("payment_type") == "voucher", lit("Voucher"))
payment_expr = payment_expr.when(col("payment_type") == "debit_card", lit("Cartão de Débito"))
payment_expr = payment_expr.otherwise(lit("Outro")) 

# Seleção, Renomeação e Conversão de Tipos
df_silver = (df_bronze
    .select(
        col("order_id").alias("id_pedido"), 
        col("payment_sequential").alias("codigo_pagamento"), 
        payment_expr.alias("forma_pagamento"),
        col("payment_installments").alias("parcelas"), 
        col("payment_value").cast("DECIMAL(12,2)").alias("valor_pagamento"), 
        col("ingestion_timestamp")
    )
)

# Salvar na Camada Silver
(df_silver.write
    .mode("overwrite")
    .format("delta")
    .saveAsTable(target_table))

print(f"✅ Tabela Silver '{target_table}' criada/atualizada com sucesso.")

✅ Tabela Silver 'silver.ft_pagamentos' criada/atualizada com sucesso.


# Transformações Simples

#### Usa uma função genérica para aplicar renomeação, casting e Upper Case a múltiplas tabelas, incluindo a tabela ft_itens_pedidos que estava faltando.

In [0]:
from pyspark.sql.functions import col, upper, lit
from pyspark.sql.types import DecimalType


# Função genérica de funcionamento
def transform_and_save_simple_final(bronze_table_name, target_map):
    """Lê do Bronze, aplica mapeamento, tipagem e Upper Case, e salva no Silver."""
    
    source_table = f"bronze.{bronze_table_name}"
    target_table = f"silver.{bronze_table_name}"
    
    df_bronze = spark.read.table(source_table)
    select_cols = []
    
    for col_b, col_s, dtype, is_upper in target_map:
        expr = col(col_b)
        
        if is_upper:
            expr = upper(col(col_b))

        expr = expr.cast(dtype)
            
        select_cols.append(expr.alias(col_s))
        
    select_cols.append(col("ingestion_timestamp"))
    
    df_silver = df_bronze.select(*select_cols)
    
    # Salva como Delta Lake
    (df_silver.write
     .mode("overwrite")
     .format("delta")
     .option("mergeSchema", "true") 
     .saveAsTable(target_table))

    print(f"  > ✅ Tabela Silver '{target_table}' criada/atualizada com sucesso.")

# Mapa de configuração centralizado
TRANSFORM_MAP = {
    # ft_itens_pedidos (Etapa 3)
    "ft_itens_pedidos": [
        ("order_id", "id_pedido", "STRING", False),
        ("order_item_id", "id_item", "INT", False),
        ("product_id", "id_produto", "STRING", False),
        ("seller_id", "id_vendedor", "STRING", False),
        ("price", "preco_BRL", DecimalType(12, 2), False), 
        ("freight_value", "preco_frete", DecimalType(12, 2), False), 
    ],
    # ft_vendedores (Etapa 7) 
    "ft_vendedores": [
        ("seller_id", "id_vendedor", "STRING", False),
        ("seller_zip_code_prefix", "prefixo_cep", "INT", False), 
        ("seller_city", "cidade", "STRING", True), 
        ("seller_state", "estado", "STRING", True), 
    ],
    # ft_produtos (Etapa 6) 
    "ft_produtos": [
        ("product_id", "id_produto", "STRING", False),
        ("product_category_name", "categoria_produto", "STRING", False),
        ("product_weight_g", "peso_produto_gramas", "INT", False),
        ("product_length_cm", "comprimento_centimetros", "INT", False),
        ("product_height_cm", "altura_centimetros", "INT", False),
        ("product_width_cm", "largura_centimetros", "INT", False),
    ],
    # dm_categoria_produtos_traducao (Etapa 8) 
    "dm_categoria_produtos_traducao": [
        ("product_category_name", "nome_produto_pt", "STRING", False),
        ("product_category_name_english", "nome_produto_en", "STRING", False),
    ]
}

# Execução
print("Iniciando transformações simples (ft_itens_pedidos, ft_vendedores, ft_produtos, dm_categoria_produtos_traducao)...")
for table_name, mapping in TRANSFORM_MAP.items():
    transform_and_save_simple_final(table_name, mapping)
print("✅ Transformações simples concluídas.")

Iniciando transformações simples (ft_itens_pedidos, ft_vendedores, ft_produtos, dm_categoria_produtos_traducao)...
  > ✅ Tabela Silver 'silver.ft_itens_pedidos' criada/atualizada com sucesso.
  > ✅ Tabela Silver 'silver.ft_vendedores' criada/atualizada com sucesso.
  > ✅ Tabela Silver 'silver.ft_produtos' criada/atualizada com sucesso.
  > ✅ Tabela Silver 'silver.dm_categoria_produtos_traducao' criada/atualizada com sucesso.
✅ Transformações simples concluídas.


# silver.ft_avaliacoes_pedidos

#### Implementa a limpeza de dados (IDs inválidos e datas inconsistentes/futuras) e documenta a contagem de linhas removidas.

In [0]:
from pyspark.sql.functions import (
    col, try_to_timestamp, lit, length, current_timestamp, expr, trim
)

# Definição das tabelas
source_table = "bronze.ft_avaliacoes_pedidos"
target_table = "silver.ft_avaliacoes_pedidos"

# Lendo a tabela Bronze
df_bronze = spark.read.table(source_table)
initial_count = df_bronze.count()

# Limpeza de dados
today_ts_literal = current_timestamp()

# Regras de validação:
# ID incorreto: nulo OU tamanho diferente de 32 (
# Data incorreta: nula, formato inconsistente OU data futura
df_valid = (
    df_bronze
    .withColumn("review_creation_date_ts",
        try_to_timestamp(col("review_creation_date"), lit("yyyy-MM-dd HH:mm:ss"))
    )
    .withColumn("review_answer_timestamp_ts",
        try_to_timestamp(col("review_answer_timestamp"), lit("yyyy-MM-dd HH:mm:ss"))
    )
    .withColumn("review_score", trim(col("review_score")))

    .filter(
        # 1. Validação de ID [cite: 134]
        (col("order_id").isNotNull()) & 
        (length(col("order_id")) == 32) & 
        (col("review_creation_date_ts").isNotNull()) & 
        (col("review_creation_date_ts") < today_ts_literal) 
    )
)

# Conversão do score
df_valid = df_valid.withColumn("review_score_int", expr("try_cast(review_score as int)"))

# Contagem e documentação
final_count = df_valid.count()
removed_count = initial_count - final_count

print("--- Documentação da Limpeza de Avaliações ---")
print(f"Total de linhas em Bronze: {initial_count}")
print(f"Linhas removidas por ID/Data inválida: {removed_count}")
print(f"Total final na Silver: {final_count}")
print(f"Regras de Validação Aplicadas: order_id não nulo e com 32 caracteres; datas válidas e não futuras.")
print("---------------------------------------------")

# Seleção e renomeação
df_silver = (
    df_valid.select(
        col("review_id").alias("id_avaliacao"),
        col("order_id").alias("id_pedido"),
        col("review_score_int").alias("avaliacao"),
        col("review_comment_title").alias("titulo_comentario"),
        col("review_comment_message").alias("comentario"),
        col("review_creation_date_ts").alias("data_comentario"),
        col("review_answer_timestamp_ts").alias("data_resposta"),
        col("ingestion_timestamp")
    )
)

# Salvar na camada Silver
df_silver.write.mode("overwrite").format("delta").saveAsTable(target_table)

print(f"✅ Tabela Silver '{target_table}' criada/atualizada com sucesso.")

--- Documentação da Limpeza de Avaliações ---
Total de linhas em Bronze: 104162
Linhas removidas por ID/Data inválida: 8854
Total final na Silver: 95308
Regras de Validação Aplicadas: order_id não nulo e com 32 caracteres; datas válidas e não futuras.
---------------------------------------------
✅ Tabela Silver 'silver.ft_avaliacoes_pedidos' criada/atualizada com sucesso.


# silver.dm_cotacao_dolar

#### Utiliza Window Functions do Spark para preencher os valores de cotação nulos de fins de semana com o último valor disponível (da sexta-feira).

In [0]:
from pyspark.sql.functions import col, last, asc, to_timestamp, round
from pyspark.sql.window import Window
from pyspark.sql.types import DateType, DecimalType

source_table = "bronze.dm_cotacao_dolar"
target_table = "silver.dm_cotacao_dolar"

# Pré-processamento e Padronização
df_bronze = (spark.read.table(source_table)
             .select(
                 col("cotacaoCompra").cast(DecimalType(5, 4)).alias("cotacao_dolar"),
                 to_timestamp(col("dataHoraCotacao"), "yyyy-MM-dd HH:mm:ss.SSS").cast(DateType()).alias("data"), 
                 col("ingestion_timestamp")
             )
             .dropDuplicates(["data"])
            )

# 2. Enriquecimento
window_spec = Window.partitionBy(lit(1)).orderBy(asc("data")).rowsBetween(Window.unboundedPreceding, Window.currentRow)

df_filled = (df_bronze
    .withColumn("cotacao_dolar_filled", 
        last(col("cotacao_dolar"), ignorenulls=True).over(window_spec)
    )
    .drop("cotacao_dolar")
    .withColumnRenamed("cotacao_dolar_filled", "cotacao_dolar")
    .withColumn("cotacao_dolar", round(col("cotacao_dolar"), 4)) # Arredonda para 4 casas
)

# 3. Salvar na Camada Silver
(df_filled.write
    .mode("overwrite")
    .format("delta")
    .option("overwriteSchema", "true")
    .saveAsTable(target_table))

print(f"✅ Tabela Silver '{target_table}' criada/atualizada com sucesso (com preenchimento de fins de semana).")

✅ Tabela Silver 'silver.dm_cotacao_dolar' criada/atualizada com sucesso (com preenchimento de fins de semana).


# Validação de Integridade e Remoção de Órfãos

#### Realiza o controle de integridade referencial entre as três tabelas principais, utilizando o left_anti join para identificar e remover os registros órfãos.

In [0]:
from pyspark.sql.functions import col

# 1. Pedidos Órfãos (Pedidos sem Consumidor válido)
df_pedidos = spark.read.table("silver.ft_pedidos")
df_consumidores = spark.read.table("silver.ft_consumidores")
target_pedidos = "silver.ft_pedidos"

# Identificar pedidos órfãos (Pedidos em ft_pedidos QUE NÃO EXISTEM em ft_consumidores)
df_pedidos_orphans = (df_pedidos
    .join(df_consumidores, ["id_consumidor"], "left_anti") 
)

num_pedidos_orphans = df_pedidos_orphans.count()
print(f"\n🔢 Total de Pedidos Órfãos (sem Consumidor válido): {num_pedidos_orphans}")

if num_pedidos_orphans > 0:
    df_pedidos_valid = (df_pedidos
        .join(df_pedidos_orphans.select("id_pedido"), ["id_pedido"], "left_anti")
    )
    df_pedidos_valid.write.mode("overwrite").format("delta").saveAsTable(target_pedidos)
    print(f"  > ✅ Pedidos órfãos removidos de '{target_pedidos}'.")

# 2. Itens Órfãos (Itens sem Pedido válido)
df_itens_pedidos = spark.read.table("silver.ft_itens_pedidos")
target_itens = "silver.ft_itens_pedidos"

# Recarrega a tabela de pedidos
df_pedidos_reloaded = spark.read.table("silver.ft_pedidos") 

# Identificar itens órfãos (Itens em ft_itens_pedidos que não existem em ft_pedidos_reloaded)
df_itens_orphans = (df_itens_pedidos
    .join(df_pedidos_reloaded, ["id_pedido"], "left_anti")
)

num_itens_orphans = df_itens_orphans.count()
print(f"🔢 Total de Itens Órfãos (sem Pedido válido): {num_itens_orphans}")

if num_itens_orphans > 0:
    # Remover itens órfãos
    df_itens_valid = (df_itens_pedidos
        .join(df_itens_orphans.select("id_item"), ["id_item"], "left_anti")
    )
    # Sobrescreve a tabela original com apenas os dados válidos
    df_itens_valid.write.mode("overwrite").format("delta").saveAsTable(target_itens)
    print(f"  > ✅ Itens órfãos removidos de '{target_itens}'.")

print("\n✅ Verificações de Integridade Concluídas.")


🔢 Total de Pedidos Órfãos (sem Consumidor válido): 0
🔢 Total de Itens Órfãos (sem Pedido válido): 0

✅ Verificações de Integridade Concluídas.


# Criação da Tabela Final - silver.pedido_total

#### Agrega os pagamentos, une os dados de pedido e aplica a conversão de BRL para USD usando a cotação diária .

In [0]:
from pyspark.sql.functions import sum, date_format, round, max, col

target_table = "silver.pedido_total"

# Carregar as fontes (todas as tabelas são SILVER, exceto ft_pagamentos, que pode ter sido usada antes de silver.ft_pedidos ter órfãos removidos)
df_pedidos = spark.read.table("silver.ft_pedidos")
df_pagamentos = spark.read.table("silver.ft_pagamentos")
df_cotacao = spark.read.table("silver.dm_cotacao_dolar")

# Agregação de pagamentos (Valor Total BRL)
df_total_pagamento = (df_pagamentos
    .groupBy("id_pedido")
    .agg(sum(col("valor_pagamento")).alias("valor_total_pago_brl"))
)

# Preparação da cotação (Uma cotação por dia)
df_cotacao_diaria = (df_cotacao
    .withColumn("data_pedido_str", date_format(col("data"), "yyyy-MM-dd"))
    .select("data_pedido_str", "cotacao_dolar")
    .dropDuplicates(["data_pedido_str"]) 
)

# Join e enriquecimento
df_final = (df_pedidos
    .join(df_total_pagamento, ["id_pedido"], "inner")
    .withColumn("data_pedido_str", date_format(col("pedido_compra_timestamp"), "yyyy-MM-dd"))
    .join(df_cotacao_diaria, ["data_pedido_str"], "left")
    .withColumn("valor_total_pago_usd", 
        round(col("valor_total_pago_brl") / col("cotacao_dolar"), 2) 
    )
    .select(
        date_format(col("pedido_compra_timestamp"), "yyyy-MM-dd").alias("data_pedido"),
        col("id_pedido"), 
        col("id_consumidor"), 
        col("status"), 
        col("valor_total_pago_brl"), 
        col("valor_total_pago_usd")
    )
)

# Salvar Tabela Final
(df_final.write
    .mode("overwrite")
    .format("delta")
    .saveAsTable(target_table))

print(f"✅ Tabela Silver FINAL '{target_table}' criada com sucesso. Processo de ETL concluído.")

✅ Tabela Silver FINAL 'silver.pedido_total' criada com sucesso. Processo de ETL concluído.


# Verificação

In [0]:
df_final = spark.read.table("silver.pedido_total")

# Exibe as primeiras 10 linhas
df_final.show(10, truncate=False)

# Exibe o schema (para confirmar os tipos de dados)
df_final.printSchema()

# Para conferir o exemplo de 2017-10-02
df_final.filter(col("data_pedido") == "2017-10-02").show(5, truncate=False)

+-----------+--------------------------------+--------------------------------+--------+--------------------+--------------------+
|data_pedido|id_pedido                       |id_consumidor                   |status  |valor_total_pago_brl|valor_total_pago_usd|
+-----------+--------------------------------+--------------------------------+--------+--------------------+--------------------+
|2017-10-02 |e481f51cbdc54678b7cc49136f2d6af7|9ef432eb6251297304e76186b10a928d|entregue|38.71               |NULL                |
|2018-07-24 |53cdb2fc8bc7dce0b6741e2150273451|b0830fb4747a6c6d20dea0b8c802d7ef|entregue|141.46              |NULL                |
|2018-08-08 |47770eb9100c2d0c44946d9cf07ec65d|41ce2a54c0b03bf3443c3d931a367089|entregue|179.12              |NULL                |
|2017-11-18 |949d5b44dbf5de918fe9c16f97b45f8a|f88197465ea7920adcdbec7375364d82|entregue|72.20               |NULL                |
|2018-02-13 |ad21c59c0840e6cb83a9ceb5573f8159|8ab97904e6daea8866dbdbc4fb7aad2c|entr