In [0]:
# Selecionar catálogo
spark.sql("USE CATALOG adb_cliente_savana_prd")
meu_nome = "elenir"


In [0]:
# Carregar tabelas da Bronze
bronze_transacoes = spark.table(f"{meu_nome}_bronze.transacoes")
bronze_clientes = spark.table(f"{meu_nome}_bronze.clientes")


# Visualizar dados
display(bronze_transacoes)
display(bronze_clientes)
bronze_clientes.printSchema()

In [0]:
from pyspark.sql.functions import col, lower, trim, regexp_replace, when

# ✔️ Tratamento e normalização
silver_transacoes = bronze_transacoes \
    .withColumn("id", regexp_replace(col("_id.oid"), "[-_:\\s'\\/().$!@]", "")) \
    .withColumn("oid", col("_id.oid")) \
    .withColumn("Meio_de_Pgmto", when(col("Meio_de_Pgmto").isNull() | (trim(col("Meio_de_Pgmto")) == ""), "não informado")
                .otherwise(lower(trim(col("Meio_de_Pgmto"))))) \
    .withColumn("Tipo", when(col("Tipo").isNull() | (trim(col("Tipo")) == ""), "não informado")
                .otherwise(lower(trim(col("Tipo"))))) \
    .drop("_id") \
    .dropna(subset=["ID_Cliente", "ID_Transacao", "Valor", "Meio_de_Pgmto", "Tipo", "id"]) \
    .dropDuplicates(["ID_Transacao", "ID_Cliente", "Valor", "Meio_de_Pgmto", "Tipo"])

# ✔️ Normalização dos meios de pagamento
silver_transacoes = silver_transacoes.replace({
    "credito": "cartao de crédito",
    "crédito": "cartao de crédito",
    "debito": "cartao de débito",
    "débito": "cartao de débito",
    "pix": "pix"
}, subset=["Meio_de_Pgmto"])

# ✔️ Escrita na Silver
silver_transacoes.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(f"{meu_nome}_silver.transacoes_tratadas")

# ✔️ Conferindo dados
display(silver_transacoes)



In [0]:
from pyspark.sql.functions import (
    col, trim, initcap, regexp_replace, when, length
)

# ✔️ Dicionário de correção de cidades
cidade_correcoes = {
    "So Paulo": "São Paulo",
    "Joo Pessoa": "João Pessoa",
    "Macap": "Macapá",
    "Braslia": "Brasília",
    "So Luz": "São Luís",
    "Vitria": "Vitória"
}

# ✔️ Dicionário de correção de nomes
nome_correcoes = {
    "Joo Souza": "João Souza",
    "Lcia Azevedo": "Lúcia Azevedo",
    "Julina Lima": "Juliana Lima",
    "Carla Per": "Carla Pereira",
    "Sandar Carvalho": "Sandra Carvalho",
    "Andr Barros": "André Barros",
    "Patrcia Gomes": "Patrícia Gomes"
}

# ✔️ Pipeline de tratamento
silver_clientes = bronze_clientes \
    .withColumn("Nome", when(col("Nome").isNull() | (trim(col("Nome")) == ""), "Não informado")
                .otherwise(initcap(trim(col("Nome"))))) \
    .withColumn("Cidade", when(col("Cidade").isNull() | (trim(col("Cidade")) == ""), "Não informado")
                .otherwise(initcap(trim(col("Cidade"))))) \
    .withColumn("CPF", regexp_replace(trim(when(col("CPF").isNull(), "00000000000")
                .otherwise(col("CPF"))), "[^0-9]", "")) \
    .withColumn("Agencia", regexp_replace(trim(when(col("Agencia").isNull(), "0000")
                .otherwise(col("Agencia"))), "[^0-9A-Za-z]", "")) \
    .withColumn("CPF_valido", when(length(col("CPF")) == 11, True).otherwise(False)) \
    .dropDuplicates(["ID", "CPF"])

# ✔️ Aplicando as correções
silver_clientes = silver_clientes \
    .replace(to_replace=cidade_correcoes, subset=["Cidade"]) \
    .replace(to_replace=nome_correcoes, subset=["Nome"])

# ✔️ Preencher valores nulos restantes
silver_clientes = silver_clientes.fillna({
    "Nome": "Não informado",
    "Cidade": "Não informado",
    "CPF": "00000000000",
    "Agencia": "0000"
})

# ✔️ Escrita no Silver
silver_clientes.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(f"{meu_nome}_silver.clientes_tratados")

# ✔️ Conferindo os dados tratados
display(silver_clientes)


In [0]:
from pyspark.sql.functions import year, month, dayofmonth, col, when, current_date

# ✔️ Carregar tabelas tratadas da Silver
clientes_silver = spark.table("adb_cliente_savana_prd.elenir_silver.clientes_tratados")
transacoes_silver = spark.table("adb_cliente_savana_prd.elenir_silver.transacoes_tratadas")


# ✔️ Realizar o JOIN para enriquecimento
transacoes_enriquecidas = transacoes_silver.join(
    clientes_silver,
    transacoes_silver["ID_Cliente"] == clientes_silver["ID"],
    "left"
).drop(clientes_silver["ID"])


# ✔️ Adicionar Ano, Mês e Dia da Data de Criação do Cliente
transacoes_enriquecidas = transacoes_enriquecidas.withColumn(
    "Ano", year(clientes_silver["DataCriacao"])
).withColumn(
    "Mes", month(clientes_silver["DataCriacao"])
).withColumn(
    "Dia", dayofmonth(clientes_silver["DataCriacao"])
)

# ✔️ Adicionar status de data (Futura ou Realizada) com base na data da transação
transacoes_enriquecidas = transacoes_enriquecidas.withColumn(
    "status_data",
    when(col("DataCriacao") > current_date(), "Futura").otherwise("Realizada")
)

# ✔️ Salvar na camada Silver enriquecida
transacoes_enriquecidas.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("adb_cliente_savana_prd.elenir_silver.transacoes_enriquecidas")

# ✔️ Visualizar para conferência
display(transacoes_enriquecidas)


In [0]:
transacoes_enriquecidas.printSchema()