## 🥈 Camada Silver – Tratamento e Enriquecimento

#### 🧪 Passo 1 : Tratamentos das Tabelas + Join

In [0]:
# Importação
from pyspark.sql.functions import trim, col, when, lit, countDistinct
from pyspark.sql.types import StringType

# Leitura das tabelas bronze
df_transacoes = spark.table("adb_cliente_savana_prd.daniel_bronze.transacoes_teste").alias("transacoes")
df_clientes = spark.table("adb_cliente_savana_prd.daniel_bronze.clientes_teste").alias("clientes")

# Join
df_silver = df_transacoes.join(
    df_clientes,
    [col("transacoes.ID_Cliente") == col("clientes.ID")]
).drop("ID")  # Não precisamos mais do "ID" ja que ele virou ""

# Limpeza de espaços extras
for campo in ["Nome", "Cidade", "Meio_de_Pgmt"]:
    if campo in df_silver.columns:
        df_silver = df_silver.withColumn(campo, trim(col(campo)))

# Correções manuais de erros de Nome conhecidos
df_silver = df_silver.withColumn("Nome",
    when(col("Nome") == "Andr Barros", "André Barros")
    .when(col("Nome") == "Joo Souza", "João Souza")
    .when(col("Nome") == "Lcia Azevedo", "Leticia Azevedo")
    .when(col("Nome") == "Patrcia Gomes", "Patricia Gomes")
    .otherwise(col("Nome"))
)

# Correções manuais de erros de Cidades conhecidas
df_silver = df_silver.withColumn("Cidade",
    when(col("Cidade") == "Braslia", "Brasília")
    .when(col("Cidade") == "Florianpolis", "Florianopolis")
    .when(col("Cidade") == "Joo Pessoa", "João Pessoa")
    .when(col("Cidade") == "Macap", "Macapá")
    .when(col("Cidade") == "So Lus", "São Luis")
    .when(col("Cidade") == "So Paulo", "São Paulo")
    .when(col("Cidade") == "Vitria", "Vitória")
    .otherwise(col("Cidade"))
)

# Padronizar nulos
def padronizar_nulls(df):
    for campo in df.schema.names:
        if df.schema[campo].dataType == StringType():
            df = df.withColumn(campo, when(trim(col(campo)) == "", None).otherwise(col(campo)))
    return df

# Aplicar ao df_silver
df_silver = padronizar_nulls(df_silver)

# Validar se a coluna "CPF" pertence a mais de um "ID_Cliente"
df_cpf_validacao = df_silver.groupBy("CPF").agg(countDistinct("ID_Cliente").alias("distinct_ID_Clientes"))
df_cpf_incorreto = df_cpf_validacao.filter(col("distinct_ID_Clientes") > 1)

# Exibir
display(df_cpf_incorreto)

#### 🧪 Passo 2 : Subir a Tabela Silver no Banco

In [0]:
# Remover a Tabela antiga visando Evitar Conflitos
spark.sql("DROP TABLE IF EXISTS adb_cliente_savana_prd.daniel_silver.BaseCliente_Transacoes_Teste")

# Salvar na camada Silver
df_silver.write.format("delta").mode("overwrite") \
    .saveAsTable("adb_cliente_savana_prd.daniel_silver.BaseClientes_Transacoes_Teste")

# Exibir
display(df_silver)