# 1. Setup inicial

In [0]:
%sql

CREATE CATALOG IF NOT EXISTS medalhao;

In [0]:
%sql
USE CATALOG medalhao;

CREATE SCHEMA IF NOT EXISTS silver;

In [0]:
%sql
USE SCHEMA silver;

In [0]:
from pyspark.sql import functions as F

from pyspark.sql.types import StringType, IntegerType, DoubleType, TimestampType, DateType, DecimalType

In [0]:
catalog_name = "medalhao"
bronze_schema_name = "bronze"
silver_schema_name = "silver" 

spark.sql(f"USE CATALOG {catalog_name}")
spark.sql(f"USE SCHEMA {silver_schema_name}")

## 2. Processamento Tabela a Tabela

### Tabela: ft_consumidores

In [0]:
#Processamento da tabela ft_consumidores

print("Processando: ft_consumidores")

bronze_table = f"{bronze_schema_name}.ft_consumidores"
silver_table = f"{silver_schema_name}.ft_consumidores"

#EXTRAIR
df_bronze = spark.read.table(bronze_table)

#TRANSFORMAR
df_silver = (
    df_bronze
    .select(
        F.col("customer_id").alias("id_consumidor"), 
        F.col("customer_zip_code_prefix").cast(IntegerType()).alias("prefixo_cep"),
         
        F.upper(F.col("customer_city")).alias("cidade"),
        F.upper(F.col("customer_state")).alias("estado")
    )
    .dropDuplicates(["id_consumidor"]) 
)

#CARREGAR
(
    df_silver.write
    .mode("overwrite")
    .format("delta")
    .option("overwriteSchema", "true")
    .saveAsTable(silver_table)
)

print(f"SUCESSO: Tabela {silver_table} criada.")

#VERIFICAR
print("Exibindo 5 linhas da tabela Silver criada:")
display(df_silver.limit(5))

### Tabela: ft_pedidos

In [0]:
#Processamento da tabela ft_pedidos

bronze_schema_name = "bronze"
bronze_table_path = f"{bronze_schema_name}.ft_pedidos"
silver_table_name = "ft_pedidos"

print(f"Processando: {bronze_table_path} -> silver.{silver_table_name}")

#EXTRAIR
df_pedidos_bronze = spark.read.table(bronze_table_path)

print("Schema e amostra dos dados da Bronze:")
df_pedidos_bronze.printSchema()
display(df_pedidos_bronze.limit(5))

In [0]:
print("Aplicando transformações...")

status_map = {
    'delivered': 'entregue',
    'invoiced': 'faturado',
    'shipped': 'enviado',
    'processing': 'em processamento',
    'unavailable': 'indisponível',
    'canceled': 'cancelado',
    'created': 'criado',
    'approved': 'aprovado'
}

map_status_lit = F.create_map(
    *[F.lit(item) for par in status_map.items() for item in par]
)

df_pedidos_silver = (
    df_pedidos_bronze
    
    .withColumn("pedido_compra_ts", F.col("order_purchase_timestamp").cast(TimestampType()))
    .withColumn("pedido_aprovado_ts", F.col("order_approved_at").cast(TimestampType()))
    .withColumn("pedido_carregado_ts", F.col("order_delivered_carrier_date").cast(TimestampType()))
    .withColumn("pedido_entregue_ts", F.col("order_delivered_customer_date").cast(TimestampType()))
    .withColumn("pedido_estimativa_ts", F.col("order_estimated_delivery_date").cast(TimestampType()))
    
    .withColumn(
        "tempo_entrega_dias", 
        F.datediff(F.col("pedido_entregue_ts"), F.col("pedido_compra_ts"))
    )
    .withColumn(
        "tempo_entrega_estimado_dias", 
        F.datediff(F.col("pedido_estimativa_ts"), F.col("pedido_compra_ts"))
    )
    .withColumn(
        "diferenca_entrega_dias", 
        F.col("tempo_entrega_dias") - F.col("tempo_entrega_estimado_dias")
    )
    
    .withColumn("entrega_no_prazo",
        F.when(F.col("pedido_entregue_ts").isNull(), "Não Entregue")
         .when(F.col("diferenca_entrega_dias") <= 0, "Sim")
         .otherwise("Não")
    )
    
    .select(
        F.col("order_id").alias("id_pedido"),
        F.col("customer_id").alias("id_consumidor"),
        
        map_status_lit[F.col("order_status")].alias("status"),
        
        F.col("pedido_compra_ts").alias("pedido_compra_timestamp"),
        F.col("pedido_aprovado_ts").alias("pedido_aprovado_timestamp"),
        F.col("pedido_carregado_ts").alias("pedido_carregado_timestamp"),
        F.col("pedido_entregue_ts").alias("pedido_entregue_timestamp"),
        F.col("pedido_estimativa_ts").alias("pedido_estimativa_entrega_timestamp"),
        
        F.col("tempo_entrega_dias"),
        F.col("tempo_entrega_estimado_dias"),
        F.col("diferenca_entrega_dias"),
        F.col("entrega_no_prazo")
    )
)

print("Transformações concluídas.")

#amostra do resultado final
print("Schema e amostra dos dados da Silver:")
df_pedidos_silver.printSchema()
display(df_pedidos_silver.limit(5))

In [0]:
#CARREGAR
(
    df_pedidos_silver.write
    .mode("overwrite")
    .format("delta")
    .option("overwriteSchema", "true")
    .saveAsTable(silver_table_name)
)

print(f"SUCESSO: Tabela 'silver.{silver_table_name}' criada/atualizada.")

### Tabela: ft_itens_pedidos

In [0]:
#tabela ft_itens_pedidos

bronze_schema_name = "bronze"
bronze_table_path = f"{bronze_schema_name}.ft_itens_pedidos"
silver_table_name = "ft_itens_pedidos"

print(f"Processando: {bronze_table_path} -> silver.{silver_table_name}")

#EXTRAIR
df_itens_bronze = spark.read.table(bronze_table_path)

print("Schema e amostra dos dados da Bronze:")
df_itens_bronze.printSchema()
display(df_itens_bronze.limit(5))

In [0]:
print("Aplicando transformações...")

df_itens_silver = (
    df_itens_bronze
    .select(
        F.col("order_id").alias("id_pedido"),
        
        F.col("order_item_id").cast(IntegerType()).alias("id_item"),
        
        F.col("product_id").alias("id_produto"),
        F.col("seller_id").alias("id_vendedor"),
        
        F.col("shipping_limit_date").cast(TimestampType()).alias("data_limite_envio_timestamp"),
        
        F.col("price").cast(DecimalType(10, 2)).alias("preco_BRL"),
        F.col("freight_value").cast(DecimalType(10, 2)).alias("frete_BRL")
    )
    #Removendo duplicatas baseado na chave composta
    .dropDuplicates(["id_pedido", "id_item"])
)

print("Transformações concluídas.")

#amostra do resultado final
print("Schema e amostra dos dados da Silver:")
df_itens_silver.printSchema()
display(df_itens_silver.limit(5))

In [0]:
#CARREGAR
(
    df_itens_silver.write
    .mode("overwrite")
    .format("delta")
    .option("overwriteSchema", "true")
    .saveAsTable(silver_table_name)
)

print(f"SUCESSO: Tabela 'silver.{silver_table_name}' criada/atualizada.")

### Tabela: ft_pagamentos

In [0]:
#tabela ft_pagamentos

bronze_schema_name = "bronze"
bronze_table_path = f"{bronze_schema_name}.ft_pagamentos_pedidos"
silver_table_name = "ft_pagamentos"

print(f"Processando: {bronze_table_path} -> silver.{silver_table_name}")

#EXTRAIR
df_pagamentos_bronze = spark.read.table(bronze_table_path)

print("Schema e amostra dos dados da Bronze:")
df_pagamentos_bronze.printSchema()
display(df_pagamentos_bronze.limit(5))

In [0]:
print("Aplicando transformações...")

#lógica de mapeamento (CASE WHEN) para o tipo de pagamento
mapeamento_tipo_pagamento = (
    F.when(F.col("payment_type") == "credit_card", F.lit("Cartão de Crédito"))
     .when(F.col("payment_type") == "boleto", F.lit("Boleto"))
     .when(F.col("payment_type") == "voucher", F.lit("Voucher"))
     .when(F.col("payment_type") == "debit_card", F.lit("Cartão de Débito"))
     .otherwise(F.lit("Outro"))
)

df_pagamentos_silver = (
    df_pagamentos_bronze
    .select(
        F.col("order_id").alias("id_pedido"),
        
        F.col("payment_sequential").cast(IntegerType()).alias("codigo_pagamento"),
        
        mapeamento_tipo_pagamento.alias("forma_pagamento"),
        
        F.col("payment_installments").cast(IntegerType()).alias("parcelas"),
        
        F.col("payment_value").cast(DecimalType(10, 2)).alias("valor_pagamento")
    )
    .dropDuplicates(["id_pedido", "codigo_pagamento"])
)

print("Transformações concluídas.")

print("Schema e amostra dos dados da Silver:")
df_pagamentos_silver.printSchema()
display(df_pagamentos_silver.limit(5))

In [0]:
#CARREGAR
(
    df_pagamentos_silver.write
    .mode("overwrite")
    .format("delta")
    .option("overwriteSchema", "true")
    .saveAsTable(silver_table_name)
)

print(f"SUCESSO: Tabela 'silver.{silver_table_name}' criada/atualizada.")

### Tabela: ft_avaliacoes_pedidos

In [0]:
bronze_schema_name = "bronze"
bronze_table_path = f"{bronze_schema_name}.ft_avaliacoes_pedidos"
silver_table_name = "ft_avaliacoes_pedidos"

print(f"Processando: {bronze_table_path} -> silver.{silver_table_name}")

df_avaliacoes_bronze = spark.read.table(bronze_table_path)

df_avaliacoes_bronze.printSchema()
display(df_avaliacoes_bronze.limit(5))

In [0]:
df_avaliacoes_silver = (
    df_avaliacoes_bronze
    .select(
        F.col("review_id").alias("id_avaliacao"),
        F.col("order_id").alias("id_pedido"),
        
        # Usa try_cast para o caso de dados sujos (como '2018-04-01...')
        F.col("review_score").try_cast(IntegerType()).alias("avaliacao"),
        
        F.col("review_comment_title").alias("titulo_comentario"),
        F.col("review_comment_message").alias("comentario"),
        
        # Usa try_cast para o caso de textos misturados com datas
        F.col("review_creation_date").try_cast(TimestampType()).alias("data_comentario"),
        
        # Usa try_cast para o caso de textos misturados com datas (o que causou o erro)
        F.col("review_answer_timestamp").try_cast(TimestampType()).alias("data_resposta")
    )
    .dropDuplicates(["id_avaliacao", "id_pedido"])
)

df_avaliacoes_silver.printSchema()
display(df_avaliacoes_silver.limit(5))

In [0]:
(
    df_avaliacoes_silver.write
    .mode("overwrite")
    .format("delta")
    .option("overwriteSchema", "true")
    .saveAsTable(silver_table_name)
)

print(f"SUCESSO: Tabela 'silver.{silver_table_name}' criada/atualizada.")

### Tabela: ft_produtos

In [0]:
#tabela ft_produtos

bronze_schema_name = "bronze"
bronze_table_path = f"{bronze_schema_name}.ft_produtos"
silver_table_name = "ft_produtos"

print(f"Processando: {bronze_table_path} -> silver.{silver_table_name}")

#EXTRAIR
df_produtos_bronze = spark.read.table(bronze_table_path)

print("Schema e amostra dos dados da Bronze:")
df_produtos_bronze.printSchema()
display(df_produtos_bronze.limit(5))

In [0]:
print("Aplicando transformações...")

df_produtos_silver = (
    df_produtos_bronze
    .select(
        F.col("product_id").alias("id_produto"),
        F.col("product_category_name").alias("categoria_produto"),
        
        F.col("product_weight_g").cast(IntegerType()).alias("peso_produto_gramas"),
        F.col("product_length_cm").cast(IntegerType()).alias("comprimento_centimetros"),
        F.col("product_height_cm").cast(IntegerType()).alias("altura_centimetros"),
        F.col("product_width_cm").cast(IntegerType()).alias("largura_centimetros")
    )
    .dropDuplicates(["id_produto"])
)

print("Transformações concluídas.")

print("Schema e amostra dos dados da Silver:")
df_produtos_silver.printSchema()
display(df_produtos_silver.limit(5))

In [0]:
#CARREGAR
(
    df_produtos_silver.write
    .mode("overwrite")
    .format("delta")
    .option("overwriteSchema", "true")
    .saveAsTable(silver_table_name)
)

print(f"SUCESSO: Tabela 'silver.{silver_table_name}' criada/atualizada.")

### Tabela: ft_vendedores

In [0]:
#tabela ft_vendedores

bronze_schema_name = "bronze"
bronze_table_path = f"{bronze_schema_name}.ft_vendedores"
silver_table_name = "ft_vendedores"

print(f"Processando: {bronze_table_path} -> silver.{silver_table_name}")

#EXTRAIR
df_vendedores_bronze = spark.read.table(bronze_table_path)

print("Schema e amostra dos dados da Bronze:")
df_vendedores_bronze.printSchema()
display(df_vendedores_bronze.limit(5))

In [0]:
print("Aplicando transformações...")

df_vendedores_silver = (
    df_vendedores_bronze
    .select(
        F.col("seller_id").alias("id_vendedor"),
        
        F.col("seller_zip_code_prefix").cast(IntegerType()).alias("prefixo_cep"),
        
        F.upper(F.col("seller_city")).alias("cidade"),
        
        F.upper(F.col("seller_state")).alias("estado")
    )
    .dropDuplicates(["id_vendedor"])
)

print("Transformações concluídas.")

print("Schema e amostra dos dados da Silver:")
df_vendedores_silver.printSchema()
display(df_vendedores_silver.limit(5))

In [0]:
#CARREGAR
(
    df_vendedores_silver.write
    .mode("overwrite")
    .format("delta")
    .option("overwriteSchema", "true")
    .saveAsTable(silver_table_name)
)

print(f"SUCESSO: Tabela 'silver.{silver_table_name}' criada/atualizada.")

### Tabela: dm_categoria_produtos_traducao

In [0]:
#tabela dm_categoria_produtos_traducao

bronze_schema_name = "bronze"
bronze_table_path = f"{bronze_schema_name}.dm_categoria_produtos_traducao"
silver_table_name = "dm_categoria_produtos_traducao"

print(f"Processando: {bronze_table_path} -> silver.{silver_table_name}")

#EXTRAIR
df_traducao_bronze = spark.read.table(bronze_table_path)

print("Schema e amostra dos dados da Bronze:")
df_traducao_bronze.printSchema()
display(df_traducao_bronze.limit(5))

In [0]:
print("Aplicando transformações...")

df_traducao_silver = (
    df_traducao_bronze
    .select(
        F.col("product_category_name").alias("nome_produto_pt"),
        
        F.col("product_category_name_english").alias("nome_produto_en")
    )
    .dropDuplicates(["nome_produto_pt"])
)

print("Transformações concluídas.")

print("Schema e amostra dos dados da Silver:")
df_traducao_silver.printSchema()
display(df_traducao_silver.limit(5))

In [0]:
#CARREGAR
(
    df_traducao_silver.write
    .mode("overwrite")
    .format("delta")
    .option("overwriteSchema", "true")
    .saveAsTable(silver_table_name)
)

print(f"SUCESSO: Tabela 'silver.{silver_table_name}' criada/atualizada.")

### Tabela: dm_cotacao_dolar

In [0]:
#Verificação de Qualidade (Pedidos Órfãos)

print("Iniciando verificação de pedidos órfãos (ft_pedidos vs ft_consumidores)...")

pedidos_table = "silver.ft_pedidos"
consumidores_table = "silver.ft_consumidores"

df_pedidos = spark.read.table(pedidos_table)
df_consumidores = spark.read.table(consumidores_table)

#Encontrando e contando os órfãos
df_orfãos_pedidos = df_pedidos.join(
    df_consumidores,
    on="id_consumidor",
    how="left_anti" #left_anti retorna linhas da esquerda que nn tem match na direita
)

contagem_orfãos_pedidos = df_orfãos_pedidos.count()
print(f"Encontrados {contagem_orfãos_pedidos} pedidos órfãos (sem consumidor).")

if contagem_orfãos_pedidos > 0:
    print(f"Removendo {contagem_orfãos_pedidos} registros órfãos e sobrescrevendo a tabela {pedidos_table}...")
    
    #Salvando apenas os registros válidos
    df_pedidos_validos = df_pedidos.join(
        df_consumidores,
        on="id_consumidor",
        how="left_semi"
    )
    
    (
        df_pedidos_validos.write
        .mode("overwrite")
        .format("delta")
        .option("overwriteSchema", "true") 
        .saveAsTable(pedidos_table)
    )
    print(f"Tabela {pedidos_table} atualizada (órfãos removidos).")
else:
    print("Nenhum pedido órfão encontrado. Tabela intacta.")

In [0]:
#Verificação de Qualidade (Itens Órfãos)

print("\nIniciando verificação de itens órfãos (ft_itens_pedidos vs ft_pedidos)...")

itens_table = "silver.ft_itens_pedidos"
pedidos_table_limpa = "silver.ft_pedidos" 

df_itens = spark.read.table(itens_table)
df_pedidos_limpa = spark.read.table(pedidos_table_limpa) 

#Encontrando e contando os órfãos
df_orfãos_itens = df_itens.join(
    df_pedidos_limpa,
    on="id_pedido",
    how="left_anti"
)

contagem_orfãos_itens = df_orfãos_itens.count()
print(f"Encontrados {contagem_orfãos_itens} itens órfãos (sem pedido).")

if contagem_orfãos_itens > 0:
    print(f"Removendo {contagem_orfãos_itens} registros órfãos e sobrescrevendo a tabela {itens_table}...")
    
    #Salvando apenas os registros válidos
    df_itens_validos = df_itens.join(
        df_pedidos_limpa,
        on="id_pedido",
        how="left_semi" 
    )
    
    (
        df_itens_validos.write
        .mode("overwrite")
        .format("delta")
        .option("overwriteSchema", "true")
        .saveAsTable(itens_table)
    )
    print(f"Tabela {itens_table} atualizada (órfãos removidos).")
else:
    print("Nenhum item órfão encontrado. Tabela intacta.")

In [0]:
#Criação da Tabela silver.ft_pedido_total

print("Iniciando a criação da tabela silver.ft_pedido_total...")

bronze_schema_name = "bronze" 

#EXTRAIR
df_pedidos_limpa = spark.read.table("silver.ft_pedidos")
df_pagamentos = spark.read.table("silver.ft_pagamentos_pedidos")
df_cotacao = spark.read.table(f"{bronze_schema_name}.dm_cotacao_dolar") 

print("Fontes lidas: silver.ft_pedidos, silver.ft_pagamentos_pedidos, bronze.dm_cotacao_dolar")

#TRANSFORMAR
df_pagamentos_agg = (
    df_pagamentos
    .groupBy("id_pedido")
    .agg(
        F.sum("valor_pagamento").alias("valor_total_pago_brl")
    )
)

#TRANSFORMAR
df_cotacao_prep = (
    df_cotacao
    
    .withColumn("data_cotacao", F.to_date(F.col("dataHoraCotacao"))) 
    .select(
        "data_cotacao", 
        F.col("cotacaoCompra").alias("taxa_usd")
    )
    .dropDuplicates(["data_cotacao"]) #cotação por dia
)

#TRANSFORMAR
df_pedidos_prep = (
    df_pedidos_limpa
    .withColumn("data_pedido", F.to_date(F.col("pedido_compra_timestamp")))
)

print("Pré-agregação de pagamentos e preparação da cotação concluídas.")

In [0]:
silver_table_name = "ft_pedido_total" 

print(f"Executando joins para criar silver.{silver_table_name}...")

df_final = (
    df_pedidos_prep
    
    .join(
        df_pagamentos_agg, 
        on="id_pedido", 
        how="left"
    )
    
    .join(
        df_cotacao_prep, 
        df_pedidos_prep["data_pedido"] == df_cotacao_prep["data_cotacao"], 
        "left" 
    )
    
    .withColumn(
        "valor_total_pago_usd", 
        F.when(
            (F.col("taxa_usd").isNotNull()) & (F.col("taxa_usd") > 0), #Evitando divisão por nulo ou zero
            F.round(F.col("valor_total_pago_brl") / F.col("taxa_usd"), 2)
        ).otherwise(None) #nulo se não houver cotação
    )
    .select(
        F.col("id_pedido"),
        F.col("id_consumidor"),
        F.col("status"),
        F.col("valor_total_pago_brl"),
        F.col("valor_total_pago_usd"),
        F.col("data_pedido")
    )
)

#CARREGAR
(
    df_final.write
    .mode("overwrite")
    .format("delta")
    .option("overwriteSchema", "true")
    .saveAsTable(silver_table_name)
)

print(f"SUCESSO: Tabela 'silver.{silver_table_name}' criada.")

#VERIFICAR
print("Exibindo 10 linhas da tabela final criada:")
display(df_final.limit(10))