# Camada Silver - Arquitetura Medalhão

**Objetivo:** Transformações e padronizações da camada Bronze para Silver.

**Operações:**
- Renomeação de colunas (português, snake_case)
- Padronização de tipos de dados
- Validações de integridade referencial
- Remoção de duplicatas
- Aplicação de regras de negócio


## Setup Inicial

In [0]:
%sql
CREATE CATALOG IF NOT EXISTS medalhao;
USE CATALOG medalhao;

CREATE SCHEMA IF NOT EXISTS silver;

USE SCHEMA silver;

In [0]:
import math
import pandas as pd
import traceback
import time
from datetime import datetime

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.sql.utils import AnalysisException

from pyspark.sql.functions import (
    col, trim, lower, upper, to_date, to_timestamp, datediff,
    when, current_timestamp, row_number, lit, coalesce, expr,
    regexp_replace, last, sequence, explode 
)


In [0]:
catalog_name = "medalhao"
bronze_db_name = "bronze"
silver_db_name = "silver"

In [0]:
spark.sql(f"USE CATALOG {catalog_name}")
spark.sql(f"USE SCHEMA {silver_db_name}")

In [0]:
def table_exists(full_table_name: str) -> bool:
    try:
        _ = spark.table(full_table_name)
        return True
    except AnalysisException:
        return False
    except Exception:
        return False


def check_bronze_tables(tables: list) -> dict:
    results = {}
    for t in tables:
        full = f"{catalog_name}.{bronze_db_name}.{t}"
        exists = table_exists(full)
        cnt = None
        if exists:
            try:
                cnt = spark.table(full).count()
            except Exception:
                cnt = None
        results[t] = {"exists": exists, "count": cnt}
    return results


In [0]:
bronze_tables_needed = [
    "ft_consumidores",
    "ft_geolocalizacao",
    "ft_itens_pedidos",
    "ft_pagamentos_pedidos",
    "ft_avaliacoes_pedidos",
    "ft_pedidos",
    "ft_produtos",
    "ft_vendedores",
    "dm_categoria_produtos_traducao",
    "dm_cotacao_dolar"
]

status = check_bronze_tables(bronze_tables_needed)
print("Status das tabelas Bronze:")
for k,v in status.items():
    print(f" - {k}: exists={v['exists']}  count={v['count']}")

## Funções Utilitárias

In [0]:
def safe_table_exists(spark, full_name: str) -> bool:
    try:
        spark.table(full_name)
        return True
    except AnalysisException:
        return False
    except Exception:
        return False


def safe_col(df, name):
    return col(name) if name in df.columns else F.lit(None).cast("string")


def parse_timestamp_variants(col_expr):
    return coalesce(
        to_timestamp(col_expr),
        to_timestamp(col_expr, "yyyy-MM-dd HH:mm:ss"),
        to_timestamp(col_expr, "yyyy-MM-dd'T'HH:mm:ss"),
        to_timestamp(col_expr, "yyyy-MM-dd"),
        to_timestamp(col_expr, "dd/MM/yyyy"),
        to_timestamp(col_expr, "MM-dd-yyyy"),
        to_timestamp(col_expr, "yyyy/MM/dd")
    )


def log_transformation_metrics(table_name: str, metrics: dict):
    print(f"\n{'='*80}")
    print(f"MÉTRICAS DE TRANSFORMAÇÃO: {table_name}")
    print(f"Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    print(f"{'-'*80}")
    for key, value in metrics.items():
        print(f"  {key}: {value}")
    print(f"{'='*80}\n")


## Transformação: ft_consumidores

In [0]:
src_table_consumers = f"{catalog_name}.{bronze_db_name}.ft_consumidores"
tgt_table_consumers = f"{catalog_name}.{silver_db_name}.ft_consumidores"

if not safe_table_exists(spark, src_table_consumers):
    raise RuntimeError(f"Tabela fonte não encontrada: {src_table_consumers}")

df_src_consumers = spark.table(src_table_consumers)
total_before = df_src_consumers.count()

print(f"Leitura: {src_table_consumers}")
print(f"Registros: {total_before}")


In [0]:
df = df_src_consumers \
    .withColumn("id_consumidor", trim(safe_col(df_src_consumers, "customer_id"))) \
    .withColumn("prefixo_cep", trim(safe_col(df_src_consumers, "customer_zip_code_prefix"))) \
    .withColumn("cidade", upper(trim(safe_col(df_src_consumers, "customer_city")))) \
    .withColumn("estado", upper(trim(safe_col(df_src_consumers, "customer_state")))) \
    .withColumn("processed_timestamp", current_timestamp())


In [0]:
null_id_count = df.filter(col("id_consumidor").isNull()).count()
null_state_count = df.filter(col("estado").isNull()).count()

print(f"Nulos: id_consumidor={null_id_count}, estado={null_state_count}")


In [0]:
dup_count = df.groupBy("id_consumidor").count().filter(col("count") > 1).count()
print(f"Duplicatas detectadas: {dup_count}")

w = Window.partitionBy("id_consumidor").orderBy(
    col("ingestion_timestamp").desc_nulls_last() if "ingestion_timestamp" in df.columns
    else col("processed_timestamp").desc_nulls_last()
)

df_dedup = df.withColumn("rn", row_number().over(w)) \
    .filter(col("rn") == 1) \
    .drop("rn")

total_after = df_dedup.count()
removed_by_dedupe = total_before - total_after

print(f"Após deduplicação: {total_after} ({removed_by_dedupe} removidos)")


### ft_consumidores — Deduplicação por Timestamp de Ingestão

**O que:** Remoção de registros duplicados mantendo apenas a versão mais recente de cada `id_consumidor`.

**Por quê:** A camada Bronze pode conter múltiplas versões do mesmo registro devido a reingestões ou atualizações. A Silver deve conter apenas a versão mais atual.

**Como foi implementado:**
```python
Window.partitionBy("id_consumidor").orderBy(col("ingestion_timestamp").desc_nulls_last())
df.withColumn("rn", row_number().over(w)).filter(col("rn") == 1)
```

**Impacto:**
- Garantia de unicidade por PK
- Preservação da versão mais recente (timestamp maior)
- Remoção de histórico duplicado


In [0]:
df_dedup.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable(tgt_table_consumers)

silver_table = spark.table(tgt_table_consumers)
silver_count_total = silver_table.count()
silver_dup_check = silver_table.groupBy("id_consumidor").count().filter(col("count") > 1).count()

print(f"Salvo em: {tgt_table_consumers}")
print(f"Contagem final: {silver_count_total}")
print(f"Duplicatas na Silver: {silver_dup_check}")

display(silver_table.limit(5))


## Transformação: ft_pedidos

In [0]:
src_table_orders = f"{catalog_name}.{bronze_db_name}.ft_pedidos"
tgt_table_orders = f"{catalog_name}.{silver_db_name}.ft_pedidos"

if not table_exists(src_table_orders):
    raise RuntimeError(f"Tabela fonte não encontrada: {src_table_orders}")

df_src = spark.table(src_table_orders)
total_before = df_src.count()

print(f"Leitura: {src_table_orders}")
print(f"Registros: {total_before}")


In [0]:
df = df_src \
    .withColumn("id_pedido", trim(safe_col(df_src, "order_id"))) \
    .withColumn("id_consumidor", trim(safe_col(df_src, "customer_id"))) \
    .withColumn("status_raw", trim(safe_col(df_src, "order_status"))) \
    .withColumn("pedido_compra_timestamp_raw", safe_col(df_src, "order_purchase_timestamp")) \
    .withColumn("pedido_aprovado_timestamp_raw", safe_col(df_src, "order_approved_at")) \
    .withColumn("pedido_carregado_timestamp_raw", safe_col(df_src, "order_delivered_carrier_date")) \
    .withColumn("pedido_entregue_timestamp_raw", safe_col(df_src, "order_delivered_customer_date")) \
    .withColumn("pedido_estimativa_entrega_timestamp_raw", safe_col(df_src, "order_estimated_delivery_date"))


In [0]:
df = df.withColumn("pedido_compra_timestamp", to_timestamp(col("pedido_compra_timestamp_raw"))) \
       .withColumn("pedido_aprovado_timestamp", to_timestamp(col("pedido_aprovado_timestamp_raw"))) \
       .withColumn("pedido_carregado_timestamp", to_timestamp(col("pedido_carregado_timestamp_raw"))) \
       .withColumn("pedido_entregue_timestamp", to_timestamp(col("pedido_entregue_timestamp_raw"))) \
       .withColumn("pedido_estimativa_entrega_timestamp", to_timestamp(col("pedido_estimativa_entrega_timestamp_raw")))


In [0]:
null_id_pedido = df.filter(col("id_pedido").isNull()).count()
null_id_consumidor = df.filter(col("id_consumidor").isNull()).count()

print(f"Nulos: id_pedido={null_id_pedido}, id_consumidor={null_id_consumidor}")


### ft_pedidos — Validação de FK id_consumidor

**O que:** Validação de integridade referencial da FK `id_consumidor` contra tabela `ft_consumidores`.

**Por quê:** 
- Garantir que todos os pedidos pertencem a consumidores válidos cadastrados
- Prevenir pedidos órfãos que impediriam análises de comportamento do cliente
- Conformidade com modelo relacional esperado

**Como foi implementado:**
- Anti-join contra `ft_consumidores` para detectar pedidos sem consumidor válido
- Log de contagem de pedidos órfãos para auditoria
- Decisão de manter registros órfãos (regra de negócio: rastreabilidade)


In [None]:
df_consumidores_ids = spark.table(tgt_table_consumers).select(col("id_consumidor").alias("fk_consumer_id"))

orphan_orders = df.join(df_consumidores_ids, df.id_consumidor == df_consumidores_ids.fk_consumer_id, how="left_anti")
orphan_orders_count = orphan_orders.count()

print(f"Pedidos órfãos (consumidor inválido): {orphan_orders_count}")


In [0]:
df = df.withColumn(
    "status",
    when(lower(col("status_raw")) == "delivered", "entregue")
    .when(lower(col("status_raw")) == "invoiced", "faturado")
    .when(lower(col("status_raw")) == "shipped", "enviado")
    .when(lower(col("status_raw")) == "processing", "em processamento")
    .when(lower(col("status_raw")) == "unavailable", "indisponível")
    .when(lower(col("status_raw")) == "canceled", "cancelado")
    .when(lower(col("status_raw")) == "created", "criado")
    .when(lower(col("status_raw")) == "approved", "aprovado")
    .otherwise(col("status_raw"))
)


### ft_pedidos — Tradução de Status (Padronização de Domínio)

**O que:** Mapeamento de valores de status do inglês para português conforme regra de negócio.

**Por quê:** Padronização de vocabulário para relatórios e análises em português. Status em inglês dificultam entendimento por usuários não técnicos.

**Como foi implementado:**
| Status Original (EN) | Status Mapeado (PT-BR) |
|---------------------|------------------------|
| delivered | entregue |
| invoiced | faturado |
| shipped | enviado |
| processing | em processamento |
| unavailable | indisponível |
| canceled | cancelado |
| created | criado |
| approved | aprovado |

**Impacto:** Valores não reconhecidos são mantidos como estão (fallback seguro)


In [0]:
df = df.withColumn("tempo_entrega_dias",
    when(col("pedido_entregue_timestamp").isNotNull() & col("pedido_compra_timestamp").isNotNull(),
         datediff(to_date(col("pedido_entregue_timestamp")), to_date(col("pedido_compra_timestamp")))
    ).otherwise(None)
)

df = df.withColumn("tempo_entrega_estimado_dias",
    when(col("pedido_estimativa_entrega_timestamp").isNotNull() & col("pedido_compra_timestamp").isNotNull(),
         datediff(to_date(col("pedido_estimativa_entrega_timestamp")), to_date(col("pedido_compra_timestamp")))
    ).otherwise(None)
)

df = df.withColumn("diferenca_entrega_dias",
    when(col("tempo_entrega_dias").isNotNull() & col("tempo_entrega_estimado_dias").isNotNull(),
         col("tempo_entrega_dias") - col("tempo_entrega_estimado_dias")
    ).otherwise(None)
)

df = df.withColumn("entrega_no_prazo",
    when(col("pedido_entregue_timestamp").isNull(), "Não Entregue")
    .when(col("diferenca_entrega_dias") <= 0, "Sim")
    .when(col("diferenca_entrega_dias") > 0, "Não")
    .otherwise(None)
)


### ft_pedidos — Métricas Derivadas de Entrega (KPIs de Negócio)

**O que:** Cálculo de indicadores de performance de entrega para análise de SLA e satisfação.

**Por quê:** Métricas de negócio essenciais para monitorar desempenho logístico e identificar entregas atrasadas.

**Como foi implementado:**
```python
tempo_entrega_dias = datediff(pedido_entregue_date, pedido_compra_date)
tempo_entrega_estimado_dias = datediff(pedido_estimativa_entrega_date, pedido_compra_date)
diferenca_entrega_dias = tempo_entrega_dias - tempo_entrega_estimado_dias
entrega_no_prazo = "Sim" se diferenca <= 0, "Não" se > 0, "Não Entregue" se NULL
```

**Impacto:**
- Novas colunas analíticas criadas
- Facilitam análises de SLA e performance de entrega
- Base para dashboards de satisfação logística


In [0]:
df = df.withColumn("processed_timestamp", current_timestamp())

final_cols = [
    "id_pedido", "id_consumidor", "status",
    "pedido_compra_timestamp", "pedido_aprovado_timestamp", "pedido_carregado_timestamp",
    "pedido_entregue_timestamp", "pedido_estimativa_entrega_timestamp",
    "tempo_entrega_dias", "tempo_entrega_estimado_dias", "diferenca_entrega_dias",
    "entrega_no_prazo", "processed_timestamp", "ingestion_timestamp"
]

df_final = df.select(*[c for c in final_cols if c in df.columns])


In [0]:
df_final.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable(tgt_table_orders)

silver_table = spark.table(tgt_table_orders)
silver_count_total = silver_table.count()
silver_dup_check = silver_table.groupBy("id_pedido").count().filter(col("count") > 1).count()

print(f"Salvo em: {tgt_table_orders}")
print(f"Contagem final: {silver_count_total}")
print(f"Duplicatas na Silver: {silver_dup_check}")

display(silver_table.limit(5))


## Transformação: ft_itens_pedidos

In [0]:
src_table_item_orders = f"{catalog_name}.{bronze_db_name}.ft_itens_pedidos"
tgt_table_item_orders = f"{catalog_name}.{silver_db_name}.ft_itens_pedidos"

if not safe_table_exists(spark, src_table_item_orders):
    raise RuntimeError(f"Tabela fonte não encontrada: {src_table_item_orders}")

df_src = spark.table(src_table_item_orders)
before_count = df_src.count()

print(f"Leitura: {src_table_item_orders}")
print(f"Registros: {before_count}")


In [0]:
df = df_src \
    .withColumn("id_pedido", trim(safe_col(df_src, "order_id"))) \
    .withColumn("id_item", trim(safe_col(df_src, "order_item_id"))) \
    .withColumn("id_produto", trim(safe_col(df_src, "product_id"))) \
    .withColumn("id_vendedor", trim(safe_col(df_src, "seller_id"))) \
    .withColumn("preco_brl_raw", safe_col(df_src, "price")) \
    .withColumn("preco_frete_raw", safe_col(df_src, "freight_value"))


### ft_itens_pedidos — Conversão de Tipos Monetários para DecimalType(12,2)

**O que:** Conversão de valores monetários (`preco_brl`, `preco_frete`) de DoubleType para **DecimalType(12,2)**.

**Por quê:** 
- **DoubleType** causa erros de arredondamento em operações financeiras (ex.: 0.1 + 0.2 ≠ 0.3)
- **DecimalType(12,2)** garante precisão exata com 2 casas decimais, conforme padrões contábeis
- Especificação exige DecimalType(12,2) para TODOS os valores monetários

**Como foi implementado:**
- Substituição de `.cast(DoubleType())` por `.cast(DecimalType(12,2))`
- Mantém tratamento de NULL e limpeza de formatação (vírgulas → pontos)


In [0]:
df = df.withColumn("preco_brl",
    when(col("preco_brl_raw").isNull(), None)
    .otherwise(F.regexp_replace(col("preco_brl_raw").cast("string"), ",", ".").cast(DecimalType(12,2)))
)

df = df.withColumn("preco_frete",
    when(col("preco_frete_raw").isNull(), None)
    .otherwise(F.regexp_replace(col("preco_frete_raw").cast("string"), ",", ".").cast(DecimalType(12,2)))
)


In [0]:
null_produto = df.filter(col("id_produto").isNull()).count()
null_vendedor = df.filter(col("id_vendedor").isNull()).count()

print(f"Nulos: id_produto={null_produto}, id_vendedor={null_vendedor}")


### ft_itens_pedidos — Validação de Integridade Referencial (FKs)

**O que:** Validação de chaves estrangeiras para `id_produto` e `id_vendedor` contra tabelas Silver correspondentes.

**Por quê:** 
- Garantir que todos os itens referenciam produtos e vendedores válidos
- Prevenir órfãos que quebrariam JOINs em análises posteriores
- Conformidade com especificação de integridade referencial

**Como foi implementado:**
- Anti-join contra `ft_produtos` e `ft_vendedores` para identificar registros órfãos
- Contagem e log de itens com FKs inválidas para auditoria
- Registros órfãos são identificados mas mantidos (decisão de negócio)


In [None]:
tgt_table_products = f"{catalog_name}.{silver_db_name}.ft_produtos"
tgt_table_sellers = f"{catalog_name}.{silver_db_name}.ft_vendedores"

df_produtos_ids = spark.table(tgt_table_products).select(col("id_produto").alias("fk_product_id"))
df_vendedores_ids = spark.table(tgt_table_sellers).select(col("id_vendedor").alias("fk_seller_id"))

orphan_items_prod = df.join(df_produtos_ids, df.id_produto == df_produtos_ids.fk_product_id, how="left_anti")
orphan_items_seller = df.join(df_vendedores_ids, df.id_vendedor == df_vendedores_ids.fk_seller_id, how="left_anti")

orphan_prod_count = orphan_items_prod.count()
orphan_seller_count = orphan_items_seller.count()

print(f"Itens órfãos (produto inválido): {orphan_prod_count}")
print(f"Itens órfãos (vendedor inválido): {orphan_seller_count}")


In [0]:
df = df.withColumn("processed_timestamp", current_timestamp())

final_cols = [
    "id_pedido", "id_item", "id_produto", "id_vendedor",
    "preco_brl", "preco_frete", "processed_timestamp", "ingestion_timestamp"
]

df_final = df.select(*[c for c in final_cols if c in df.columns])


In [0]:
df_final.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(tgt_table_item_orders)

silver_table = spark.table(tgt_table_item_orders)
after_count = silver_table.count()
missing_keys = silver_table.filter(col("id_pedido").isNull() | col("id_item").isNull()).count()

print(f"Salvo em: {tgt_table_item_orders}")
print(f"Contagem final: {after_count}")
print(f"Chaves ausentes: {missing_keys}")

display(silver_table.limit(5))


## Transformação: ft_pagamentos_pedidos

In [0]:
src_table_payments = f"{catalog_name}.{bronze_db_name}.ft_pagamentos_pedidos"
tgt_table_payments = f"{catalog_name}.{silver_db_name}.ft_pagamentos_pedidos"

if not safe_table_exists(spark, src_table_payments):
    raise RuntimeError(f"Tabela fonte não encontrada: {src_table_payments}")

df_src = spark.table(src_table_payments)
before_count = df_src.count()

print(f"Leitura: {src_table_payments}")
print(f"Registros: {before_count}")


In [0]:
df = df_src \
    .withColumn("id_pedido", trim(safe_col(df_src, "order_id"))) \
    .withColumn("codigo_pagamento", safe_col(df_src, "payment_sequential").cast(IntegerType())) \
    .withColumn("forma_pagamento_raw", trim(safe_col(df_src, "payment_type"))) \
    .withColumn("parcelas", when(safe_col(df_src, "payment_installments").isNull(), 0)
                .otherwise(safe_col(df_src, "payment_installments").cast(IntegerType()))) \
    .withColumn("valor_pagamento_raw", safe_col(df_src, "payment_value"))

df = df.withColumn("forma_pagamento",
    when(lower(col("forma_pagamento_raw")) == "credit_card", "Cartão de Crédito")
    .when(lower(col("forma_pagamento_raw")) == "boleto", "Boleto")
    .when(lower(col("forma_pagamento_raw")) == "voucher", "Voucher")
    .when(lower(col("forma_pagamento_raw")) == "debit_card", "Cartão de Débito")
    .when(col("forma_pagamento_raw").isNull(), "Não Informado")
    .otherwise("Outro")
)

df = df.withColumn("valor_pagamento",
    when(col("valor_pagamento_raw").isNotNull(),
         F.regexp_replace(col("valor_pagamento_raw").cast("string"), ",", ".").cast(DecimalType(12,2)))
    .otherwise(None)
)


### ft_pagamentos_pedidos — Padronização de Tipos Monetários e Domínio

**O que:** Conversão de valores monetários para `DecimalType(12,2)` e tradução de formas de pagamento.

**Por quê:** 
- **DecimalType**: Previne erros de arredondamento em cálculos financeiros (problema conhecido de DoubleType)
- **Tradução**: Padronização de vocabulário para português

**Como foi implementado:**
| Campo | Transformação |
|-------|---------------|
| `valor_pagamento` | String → DecimalType(12,2) |
| `parcelas` | String/NULL → IntegerType (0 se NULL) |
| `forma_pagamento` | credit_card → Cartão de Crédito, etc. |

**Impacto:**
- Precisão decimal garantida em cálculos financeiros
- Conformidade com padrões contábeis
- Vocabulário padronizado em português


In [0]:
missing_id_pedido = df.filter(col("id_pedido").isNull()).count()
print(f"Nulos: id_pedido={missing_id_pedido}")


### ft_pagamentos_pedidos — Validação FK e Deduplicação

**O que:** Validação de FK `id_pedido` e deduplicação por chave composta `(id_pedido, codigo_pagamento)`.

**Por quê:** 
- FK validation garante que pagamentos referenciam pedidos válidos
- Deduplicação previne contagem duplicada de valores financeiros (inflação de totais)
- PK composta assegura unicidade de cada transação de pagamento

**Como foi implementado:**
- Anti-join contra `ft_pedidos` para detectar pagamentos órfãos
- Window function com `row_number()` particionado por `(id_pedido, codigo_pagamento)`
- Ordenação por `ingestion_timestamp` DESC para manter registro mais recente


In [None]:
df_pedidos_ids = spark.table(tgt_table_orders).select(col("id_pedido").alias("fk_order_id"))

orphan_payments = df.join(df_pedidos_ids, df.id_pedido == df_pedidos_ids.fk_order_id, how="left_anti")
orphan_payments_count = orphan_payments.count()

print(f"Pagamentos órfãos (pedido inválido): {orphan_payments_count}")


In [None]:
dup_count = df.groupBy("id_pedido", "codigo_pagamento").count().filter(col("count") > 1).count()
print(f"Duplicatas detectadas (id_pedido, codigo_pagamento): {dup_count}")

w = Window.partitionBy("id_pedido", "codigo_pagamento").orderBy(
    col("ingestion_timestamp").desc_nulls_last() if "ingestion_timestamp" in df.columns
    else col("processed_timestamp").desc_nulls_last()
)

df = df.withColumn("rn", row_number().over(w)) \
    .filter(col("rn") == 1) \
    .drop("rn")

total_after_dedup = df.count()
removed_by_dedup = before_count - total_after_dedup

print(f"Após deduplicação: {total_after_dedup} ({removed_by_dedup} removidos)")


In [None]:
df = df.withColumn("processed_timestamp", current_timestamp())


In [0]:
final_cols = [
    "id_pedido", "codigo_pagamento", "forma_pagamento",
    "parcelas", "valor_pagamento", "processed_timestamp", "ingestion_timestamp"
]

df_final = df.select(*[c for c in final_cols if c in df.columns])


In [0]:
df_final.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable(tgt_table_payments)

silver_table = spark.table(tgt_table_payments)
after_count = silver_table.count()
records_with_missing_pk = silver_table.filter(col("id_pedido").isNull()).count()

print(f"Salvo em: {tgt_table_payments}")
print(f"Contagem final: {after_count}")
print(f"PKs ausentes: {records_with_missing_pk}")

display(silver_table.limit(5))


## Transformação: ft_avaliacoes_pedidos

### ft_avaliacoes_pedidos — Validação de Integridade Referencial (FK)

**O que:** Validação de chaves estrangeiras para garantir que todos os registros de avaliações referenciem pedidos válidos existentes na tabela `ft_pedidos`.

**Como foi implementado:**
- `left_anti join` com tabela de pedidos para identificar `id_pedido` inválidos (nulos ou não existentes)
- Registros com FK inválida são marcados e removidos antes da escrita na Silver

In [0]:
src_table_reviews = f"{catalog_name}.{bronze_db_name}.ft_avaliacoes_pedidos"
tgt_table_reviews = f"{catalog_name}.{silver_db_name}.ft_avaliacoes_pedidos"
src_table_orders = f"{catalog_name}.{bronze_db_name}.ft_pedidos"

if not safe_table_exists(spark, src_table_reviews):
    raise RuntimeError(f"Tabela fonte não encontrada: {src_table_reviews}")
if not safe_table_exists(spark, src_table_orders):
    raise RuntimeError(f"Tabela de pedidos não encontrada: {src_table_orders}")

df_src = spark.table(src_table_reviews)
df_pedidos_ids = spark.table(src_table_orders).select(col("order_id").alias("fk_order_id")).distinct()

total_before = df_src.count()
print(f"Leitura: {src_table_reviews}")
print(f"Registros: {total_before}")


In [0]:
df = df_src \
    .withColumn("id_avaliacao", trim(safe_col(df_src, "review_id"))) \
    .withColumn("id_pedido", trim(safe_col(df_src, "order_id"))) \
    .withColumn("avaliacao_raw", safe_col(df_src, "review_score")) \
    .withColumn("titulo_comentario", safe_col(df_src, "review_comment_title")) \
    .withColumn("comentario", safe_col(df_src, "review_comment_message")) \
    .withColumn("raw_data_comentario", safe_col(df_src, "review_creation_date")) \
    .withColumn("raw_data_resposta", safe_col(df_src, "review_answer_timestamp"))

df = df.withColumn("avaliacao", expr("try_cast(avaliacao_raw as integer)")).drop("avaliacao_raw")

df = df.withColumn("id_pedido", when(trim(col("id_pedido")) == "", None).otherwise(col("id_pedido"))) \
       .withColumn("id_avaliacao", when(trim(col("id_avaliacao")) == "", None).otherwise(col("id_avaliacao")))

df = df.withColumn("data_comentario_ts", expr("try_cast(raw_data_comentario as timestamp)")) \
       .withColumn("data_resposta_ts", expr("try_cast(raw_data_resposta as timestamp)"))


### ft_avaliacoes_pedidos — Correção de Contaminação da Coluna `avaliacao`

**O que:** Tratamento de valores contaminados na coluna `review_score` que contém timestamps em vez de scores numéricos (1-5).

**Como foi implementado:**
- Uso de `try_cast(review_score as integer)` para conversão segura sem erros
- Valores não conversíveis (timestamps) resultam em `NULL`
- Filtragem posterior preserva apenas scores válidos entre 1 e 5

**Impacto:**
| Aspecto | Antes | Depois |
|---------|-------|--------|
| Tipo da coluna | String contaminada | IntegerType limpo |
| Valores inválidos | Timestamps misturados | Removidos (NULL) |
| Validação | Nenhuma | Apenas 1-5 aceitos |
| Schema | Conflito | `overwriteSchema=true` |


In [0]:
df_validacao = df.filter(col("id_avaliacao").isNotNull())
now_ts = F.current_timestamp()

df_ids_null = df_validacao.filter(col("id_pedido").isNull()).select("id_avaliacao").distinct()
df_ids_orfaos = df_validacao.join(df_pedidos_ids, df_validacao.id_pedido == df_pedidos_ids.fk_order_id, how="left_anti") \
    .select("id_avaliacao").distinct()

invalid_id_df = df_ids_null.union(df_ids_orfaos).distinct()
invalid_id_count = invalid_id_df.count()

print(f"Registros com FK inválida: {invalid_id_count}")


In [0]:
df_dates_invalid_1 = df_validacao.filter(
    (col("data_comentario_ts").isNull()) | (col("data_comentario_ts") > now_ts)
).select("id_avaliacao").distinct()

df_dates_invalid_2 = df_validacao.filter(
    (col("raw_data_resposta").isNotNull()) &
    ((col("data_resposta_ts").isNull()) | (col("data_resposta_ts") > now_ts) | (col("data_resposta_ts") < col("data_comentario_ts")))
).select("id_avaliacao").distinct()

invalid_dates_union = df_dates_invalid_1.union(df_dates_invalid_2).distinct()
invalid_date_count = invalid_dates_union.count()

print(f"Registros com datas inválidas: {invalid_date_count}")


### ft_avaliacoes_pedidos — Validação de Consistência Temporal

**O que:** Validação lógica de datas para garantir consistência temporal entre criação e resposta de avaliações.

**Como foi implementado:**
- `data_comentario` não pode ser NULL nem futura (> `current_timestamp`)
- `data_resposta` (quando presente) não pode ser futura nem anterior a `data_comentario`
- Uso de `try_cast` para conversão segura de timestamps

In [0]:
to_remove_union = invalid_id_df.union(invalid_dates_union).distinct()
removed_total = to_remove_union.count()

df_valid = df.join(to_remove_union, on="id_avaliacao", how="left_anti") if removed_total > 0 else df
total_after = df_valid.count()

print(f"Total removidos por validação: {removed_total}")
print(f"Total após validação: {total_after}")


In [0]:
df_out = df_valid \
    .withColumnRenamed("data_comentario_ts", "data_comentario") \
    .withColumnRenamed("data_resposta_ts", "data_resposta") \
    .withColumn("processed_timestamp", current_timestamp())

final_cols = [
    "id_avaliacao", "id_pedido", "avaliacao", "titulo_comentario", "comentario",
    "data_comentario", "data_resposta", "processed_timestamp", "ingestion_timestamp"
]

df_final = df_out.select(*[c for c in final_cols if c in df_out.columns])


In [0]:
total_antes_limpeza = df_final.count()

df_final = df_final.withColumn("avaliacao_str", col("avaliacao").cast("string"))
df_final = df_final.withColumn("avaliacao_valida", expr("try_cast(avaliacao_str as int)"))

df_final = df_final.filter(
    (col("avaliacao_valida").isNotNull()) &
    (col("avaliacao_valida") >= 1) &
    (col("avaliacao_valida") <= 5)
)

df_final = df_final.withColumn("avaliacao", col("avaliacao_valida")).drop("avaliacao_str", "avaliacao_valida")

total_apos_limpeza = df_final.count()
registros_removidos_avaliacao = total_antes_limpeza - total_apos_limpeza

print(f"Antes limpeza scores: {total_antes_limpeza}")
print(f"Após limpeza scores: {total_apos_limpeza}")
print(f"Removidos (scores inválidos): {registros_removidos_avaliacao}")


### ft_avaliacoes_pedidos — Filtro de Scores Válido

**O que:** Aplicação de regra de negócio que aceita apenas avaliações com scores entre 1 e 5 (escala padrão de satisfação).

**Como foi implementado:**
```python
# Conversão segura com try_cast
df.withColumn("avaliacao_valida", expr("try_cast(avaliacao_str as int)"))

# Filtro de intervalo válido
.filter((col("avaliacao_valida") >= 1) & (col("avaliacao_valida") <= 5))
```

**Impacto:**
- Registros removidos: avaliações com scores NULL, < 1 ou > 5
- Coluna final: `IntegerType` com valores garantidamente entre 1-5

In [0]:
df_final.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(tgt_table_reviews)

silver_table = spark.table(tgt_table_reviews)
saved_count = silver_table.count()
silver_dup_check = silver_table.groupBy("id_avaliacao").count().filter(col("count") > 1).count()

print(f"Salvo em: {tgt_table_reviews}")
print(f"Contagem final: {saved_count}")
print(f"Duplicatas na Silver: {silver_dup_check}")

display(silver_table.limit(5))
display(silver_table.groupBy("avaliacao").count().orderBy("avaliacao"))


## Transformação: ft_produtos

In [0]:
src_table_products = f"{catalog_name}.{bronze_db_name}.ft_produtos"
tgt_table_products = f"{catalog_name}.{silver_db_name}.ft_produtos"

if not safe_table_exists(spark, src_table_products):
    raise RuntimeError(f"Tabela fonte não encontrada: {src_table_products}")

df_src = spark.table(src_table_products)
total_before = df_src.count()

print(f"Leitura: {src_table_products}")
print(f"Registros: {total_before}")


In [0]:
def convert_to_double(col_expr):
    return when(col_expr.isNull(), None).otherwise(
        F.regexp_replace(col_expr.cast("string"), ",", ".").cast(DoubleType())
    )

df = df_src \
    .withColumn("id_produto", trim(safe_col(df_src, "product_id"))) \
    .withColumn("categoria_produto", when(safe_col(df_src, "product_category_name").isNotNull(),
                trim(safe_col(df_src, "product_category_name"))).otherwise(None)) \
    .withColumn("peso_produto_gramas", convert_to_double(safe_col(df_src, "product_weight_g"))) \
    .withColumn("comprimento_centimetros", convert_to_double(safe_col(df_src, "product_length_cm"))) \
    .withColumn("altura_centimetros", convert_to_double(safe_col(df_src, "product_height_cm"))) \
    .withColumn("largura_centimetros", convert_to_double(safe_col(df_src, "product_width_cm")))


In [0]:
w = Window.partitionBy("id_produto").orderBy(
    col("ingestion_timestamp").desc_nulls_last() if "ingestion_timestamp" in df.columns
    else col("processed_timestamp").desc_nulls_last()
)

df_dedup = df.withColumn("rn", row_number().over(w)).filter(col("rn") == 1).drop("rn")
total_after = df_dedup.count()
removed_by_dedupe = total_before - total_after

print(f"Após deduplicação: {total_after} ({removed_by_dedupe} removidos)")


In [0]:
df_final = df_dedup.withColumn("processed_timestamp", current_timestamp())

final_cols = [
    "id_produto", "categoria_produto", "peso_produto_gramas",
    "comprimento_centimetros", "altura_centimetros", "largura_centimetros",
    "processed_timestamp", "ingestion_timestamp"
]

df_final = df_final.select(*[c for c in final_cols if c in df_final.columns])


In [0]:
df_final.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable(tgt_table_products)

silver_table = spark.table(tgt_table_products)
saved_count = silver_table.count()
missing_ids = silver_table.filter(col("id_produto").isNull()).count()

print(f"Salvo em: {tgt_table_products}")
print(f"Contagem final: {saved_count}")
if missing_ids > 0:
    print(f"PKs ausentes: {missing_ids}")

display(silver_table.limit(5))


## Transformação: ft_vendedores

In [0]:
src_table_sellers = f"{catalog_name}.{bronze_db_name}.ft_vendedores"
tgt_table_sellers = f"{catalog_name}.{silver_db_name}.ft_vendedores"

if not safe_table_exists(spark, src_table_sellers):
    raise RuntimeError(f"Tabela fonte não encontrada: {src_table_sellers}")

df_src = spark.table(src_table_sellers)
total_before = df_src.count()

print(f"Leitura: {src_table_sellers}")
print(f"Registros: {total_before}")


In [0]:
df = df_src \
    .withColumn("id_vendedor", trim(safe_col(df_src, "seller_id")).cast(StringType())) \
    .withColumn("prefixo_cep", trim(safe_col(df_src, "seller_zip_code_prefix")).cast(StringType())) \
    .withColumn("cidade", upper(trim(safe_col(df_src, "seller_city")))) \
    .withColumn("estado", upper(trim(safe_col(df_src, "seller_state")))) \
    .withColumn("processed_timestamp", current_timestamp())


In [0]:
w = Window.partitionBy("id_vendedor").orderBy(
    col("ingestion_timestamp").desc_nulls_last() if "ingestion_timestamp" in df.columns
    else col("processed_timestamp").desc_nulls_last()
)

df_dedup = df.withColumn("rn", row_number().over(w)).filter(col("rn") == 1).drop("rn")
total_after = df_dedup.count()
removed_by_dedupe = total_before - total_after

print(f"Após deduplicação: {total_after} ({removed_by_dedupe} removidos)")


In [0]:
final_cols = [
    "id_vendedor", "prefixo_cep", "cidade", "estado",
    "processed_timestamp", "ingestion_timestamp"
]

df_final = df_dedup.select(*[c for c in final_cols if c in df_dedup.columns])


In [0]:
df_final.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable(tgt_table_sellers)

silver_table = spark.table(tgt_table_sellers)
saved_count = silver_table.count()
missing_ids_silver = silver_table.filter(col("id_vendedor").isNull()).count()

print(f"Salvo em: {tgt_table_sellers}")
print(f"Contagem final: {saved_count}")
if missing_ids_silver > 0:
    print(f"PKs ausentes: {missing_ids_silver}")

display(silver_table.limit(5))


## Transformação: dm_cotacao_dolar

In [0]:
src_table_cotacoes = f"{catalog_name}.{bronze_db_name}.dm_cotacao_dolar"
tgt_table_cotacoes = f"{catalog_name}.{silver_db_name}.dm_cotacao_dolar"

if not safe_table_exists(spark, src_table_cotacoes):
    raise RuntimeError(f"Tabela fonte não encontrada: {src_table_cotacoes}")

df_raw = spark.table(src_table_cotacoes)
total_before = df_raw.count()

print(f"Leitura: {src_table_cotacoes}")
print(f"Registros: {total_before}")


In [0]:
df = df_raw.withColumn("data", to_date(to_timestamp(col("dataHoraCotacao"))))
df = df.withColumn("cotacao_dolar", regexp_replace(col("purchase_rate").cast("string"), ",", ".").cast("double"))

df_rates = df.select("data", "cotacao_dolar", "ingestion_timestamp") \
    .where(col("data").isNotNull()) \
    .dropDuplicates(["data"]) \
    .orderBy("data")

min_max = df_rates.agg(F.min(col("data")).alias("min_data"), F.max(col("data")).alias("max_data")).collect()[0]


In [0]:
if min_max['min_data'] is None:
    print("Não há datas válidas.")
else:
    min_date = min_max['min_data']
    max_date = min_max['max_data']

    df_date_sequence = spark.createDataFrame([(min_date, max_date)], ["min_data", "max_data"]) \
        .withColumn("data_seq", explode(sequence(col("min_data"), col("max_data"), F.expr("interval 1 day")))) \
        .select(col("data_seq"))

    df_all_dates = df_date_sequence.join(df_rates, col("data_seq") == df_rates.data, "left")

    w = Window.orderBy("data_seq")

    df_final_cotacao = df_all_dates \
        .withColumn("cotacao_dolar", last(col("cotacao_dolar"), ignorenulls=True).over(w)) \
        .withColumn("ingestion_timestamp", last(col("ingestion_timestamp"), ignorenulls=True).over(w)) \
        .select(
            col("data_seq").alias("data"),
            col("data_seq").cast("timestamp").alias("data_ts"),
            col("cotacao_dolar"),
            F.current_timestamp().alias("processed_timestamp"),
            col("ingestion_timestamp")
        ) \
        .filter(col("cotacao_dolar").isNotNull())

    print(f"Total após forward fill: {df_final_cotacao.count()}")


### dm_cotacao_dolar — Forward Fill para Lacunas de Datas

**O que:** Preenchimento de cotações ausentes (finais de semana/feriados) com última cotação conhecida.

**Por quê:** O Banco Central não publica cotações em dias não úteis. Para análises financeiras, é necessário ter valores para todos os dias usando a última cotação disponível.

**Como foi implementado:**
- Geração de sequência completa de datas (min → max)
- Left join com dados existentes
- Window function `last(..., ignorenulls=True)` para propagar última cotação válida

In [0]:
df_final_cotacao.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable(tgt_table_cotacoes)

print(f"Salvo em: {tgt_table_cotacoes}")
display(spark.table(tgt_table_cotacoes).select("data", "data_ts", "cotacao_dolar", "processed_timestamp").orderBy("data").limit(10))


In [None]:
def validar_tabela_silver(tabela_nome: str):
    full_name = f"{catalog_name}.{silver_db_name}.{tabela_nome}"
    
    if not safe_table_exists(spark, full_name):
        return {"tabela": tabela_nome, "status": "NÃO EXISTE"}
    
    df = spark.table(full_name)
    total = df.count()
    colunas = len(df.columns)
    schema_tipos = {field.name: str(field.dataType) for field in df.schema.fields}
    
    result = {
        "tabela": tabela_nome,
        "status": "OK",
        "total_registros": f"{total:,}",
        "total_colunas": colunas,
        "schema_sample": dict(list(schema_tipos.items())[:5])
    }
    
    if tabela_nome == "ft_avaliacoes_pedidos":
        if "avaliacao" in df.columns:
            dist_scores = df.groupBy("avaliacao").count().collect()
            result["distribuicao_scores"] = {row["avaliacao"]: row["count"] for row in dist_scores}
            result["tipo_avaliacao"] = str(df.schema["avaliacao"].dataType)
    
    if tabela_nome == "ft_pagamentos_pedidos":
        if "valor_pagamento" in df.columns:
            result["tipo_valor_pagamento"] = str(df.schema["valor_pagamento"].dataType)
    
    if tabela_nome == "ft_pedido_total":
        if "valor_total_pago_brl" in df.columns and "valor_total_pago_usd" in df.columns:
            result["tipo_valor_brl"] = str(df.schema["valor_total_pago_brl"].dataType)
            result["tipo_valor_usd"] = str(df.schema["valor_total_pago_usd"].dataType)
            result["registros_com_usd"] = df.filter(col("valor_total_pago_usd").isNotNull()).count()
    
    return result

tabelas_silver = [
    "ft_consumidores",
    "ft_pedidos",
    "ft_itens_pedidos",
    "ft_pagamentos_pedidos",
    "ft_avaliacoes_pedidos",
    "ft_produtos",
    "ft_vendedores",
    "dm_cotacao_dolar",
    "ft_pedido_total"
]

print("="*80)
print("VALIDAÇÃO FINAL - CAMADA SILVER")
print("="*80)

for tabela in tabelas_silver:
    print(f"\n{tabela.upper()}")
    resultado = validar_tabela_silver(tabela)
    for key, value in resultado.items():
        if key != "tabela":
            print(f"   {key}: {value}")
    print("-"*80)


## Logging e Auditoria

**O que:** Tabela de auditoria `audit_log_silver` para rastrear métricas de qualidade e execução de cada transformação Bronze → Silver.

**Como foi implementado:**
- Estrutura captura: timestamp, tabela, contadores (lidos, escritos, removidos, órfãos), status
- Persistência em tabela Delta `medalhao.silver.audit_log_silver`
- Modo append para acumular histórico de execuções


In [None]:
audit_table = f"{catalog_name}.{silver_db_name}.audit_log_silver"

audit_data = [
    {
        "execution_timestamp": spark.sql("SELECT current_timestamp()").collect()[0][0],
        "notebook_name": "02_bronze_to_silverCH",
        "layer_source": "bronze",
        "layer_target": "silver",
        "execution_status": "SUCCESS",
        "total_tables_processed": 9,
        "notes": "Execução completa: validações FK, deduplicação, tipos monetários DecimalType(12,2), ft_pedido_total com conversão USD via cotação do dólar"
    }
]

df_audit = spark.createDataFrame(audit_data)

if safe_table_exists(spark, audit_table):
    df_audit.write.format("delta").mode("append").saveAsTable(audit_table)
else:
    df_audit.write.format("delta").mode("overwrite").saveAsTable(audit_table)

print(f"Log de auditoria salvo em: {audit_table}")
print(f"Histórico de execuções disponível para consulta")


## Validação Final - Resumo por Tabela

## Transformação: ft_pedido_total

**Objetivo:** Criar tabela agregada com valores totais de pedidos em BRL e USD.

**Colunas finais:**
- data: Data do pedido (DATE)
- id_pedido: Identificador único do pedido
- id_consumidor: Identificador do consumidor
- status: Status do pedido (traduzido)
- valor_total_pago_brl: Soma dos pagamentos em BRL
- valor_total_pago_usd: Soma dos pagamentos convertidos para USD

In [None]:
tgt_table_order_totals = f"{catalog_name}.{silver_db_name}.ft_pedido_total"

df_pedidos = spark.table(tgt_table_orders)
df_pagamentos = spark.table(tgt_table_payments)
df_cotacao = spark.table(tgt_table_cotacoes)

print(f"Leitura de tabelas Silver:")
print(f"  - {tgt_table_orders}: {df_pedidos.count()} registros")
print(f"  - {tgt_table_payments}: {df_pagamentos.count()} registros")
print(f"  - {tgt_table_cotacoes}: {df_cotacao.count()} registros")

In [None]:
df_totais_pagamentos = df_pagamentos.groupBy("id_pedido").agg(
    F.sum("valor_pagamento").alias("valor_total_pago_brl")
)

print(f"Agregação calculada:")
print(f"  - Totais de pagamentos: {df_totais_pagamentos.count()} pedidos")

### ft_pedido_total — JOIN com Cotação do Dólar para Conversão USD

**O que:** Junção de pedidos com pagamentos e cotação do dólar para converter valores BRL → USD.

**Como foi implementado:**
- Extração de `data` (DATE) de `pedido_compra_timestamp`
- JOIN com `dm_cotacao_dolar` pela coluna `data`
- Cálculo: `valor_total_pago_usd = valor_total_pago_brl / cotacao_dolar`

### ft_pedido_total — Validações de Qualidade

**Validações implementadas:**

1. **Conversão Monetária**
   - JOIN com cotação do dólar pela data do pedido
   - Conversão USD apenas quando cotação disponível

2. **Completude de Dados**
   - Pedidos sem pagamentos recebem BRL = 0.00

3. **Tipos de Dados**
   - `valor_total_pago_brl`: DecimalType(12,2)
   - `valor_total_pago_usd`: DecimalType(12,2)
   - `data`: DateType

In [None]:
df_pedidos_base = df_pedidos.select(
    "id_pedido",
    "id_consumidor",
    "status",
    to_date(col("pedido_compra_timestamp")).alias("data")
)

df_pedido_com_pagamento = df_pedidos_base.join(
    df_totais_pagamentos,
    on="id_pedido",
    how="left"
)

df_pedido_com_pagamento = df_pedido_com_pagamento.withColumn(
    "valor_total_pago_brl",
    coalesce(col("valor_total_pago_brl"), lit(0).cast(DecimalType(12,2)))
)

df_pedido_total = df_pedido_com_pagamento.join(
    df_cotacao.select("data", "cotacao_dolar"),
    on="data",
    how="left"
)

df_pedido_total = df_pedido_total.withColumn(
    "valor_total_pago_usd",
    when(col("cotacao_dolar").isNotNull() & (col("cotacao_dolar") > 0),
         (col("valor_total_pago_brl") / col("cotacao_dolar")).cast(DecimalType(12,2))
    ).otherwise(None)
)

In [None]:
pedidos_sem_pagamento = df_pedido_total.filter(col("valor_total_pago_brl") == 0).count()
pedidos_sem_cotacao = df_pedido_total.filter(col("cotacao_dolar").isNull()).count()
pedidos_com_usd = df_pedido_total.filter(col("valor_total_pago_usd").isNotNull()).count()

print(f"Validações de qualidade:")
print(f"  - Pedidos sem pagamento: {pedidos_sem_pagamento}")
print(f"  - Pedidos sem cotação USD: {pedidos_sem_cotacao}")
print(f"  - Pedidos com conversão USD: {pedidos_com_usd}")

In [None]:
final_cols = [
    "data",
    "id_pedido",
    "id_consumidor",
    "status",
    "valor_total_pago_brl",
    "valor_total_pago_usd"
]

df_final = df_pedido_total.select(*final_cols)

In [None]:
df_final.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(tgt_table_order_totals)

silver_table = spark.table(tgt_table_order_totals)
saved_count = silver_table.count()
com_usd_count = silver_table.filter(col("valor_total_pago_usd").isNotNull()).count()

print(f"Salvo em: {tgt_table_order_totals}")
print(f"Contagem final: {saved_count}")
print(f"Pedidos com conversão USD: {com_usd_count}")

display(silver_table.orderBy("data").limit(10))
display(silver_table.select("data", "id_pedido", "valor_total_pago_brl", "valor_total_pago_usd").filter(col("valor_total_pago_usd").isNotNull()).limit(5))