In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:
df_base_pd_bronze = spark.table("puc.bronze.base_pd")
display(df_base_pd_bronze)

In [0]:
from pyspark.sql import functions as F
from pyspark.sql import types as T
import unicodedata
import re

# -----------------------------
# 1) Normalizador (Python)
# -----------------------------
def _normalize_text(s: str) -> str:
    if s is None:
        return None

    s = s.strip().lower()

    # remove acentos (NFKD)
    s = unicodedata.normalize("NFKD", s)
    s = "".join(ch for ch in s if not unicodedata.combining(ch))
    
    # remove hífen no início ou fim
    s = re.sub(r"(^-|-$)", " ", s)

    # remove hífen que não está entre letras/números
    s = re.sub(r"(?<![a-z0-9])-|-(?![a-z0-9])", " ", s)

    # remove hífens duplicados
    s = re.sub(r"-{2,}", " ", s)

    # normaliza espaços
    s = re.sub(r"\s+", " ", s).strip()

    # colapsa múltiplos espaços
    s = re.sub(r"\s+", " ", s).strip()

    return s if s != "" else None

normalize_udf = F.udf(_normalize_text, T.StringType())


# -----------------------------
# 2) Função principal
# -----------------------------
def normalize_categorical_columns(
    df,
    max_distinct_per_col: int = 200_000,
    keep_nulls: bool = True,
    canonical_strategy: str = "most_frequent",  # "most_frequent" ou "normalized"
):
    """
    - Detecta colunas string.
    - Para cada coluna: (valor_original -> chave_normalizada).
    - Escolhe um valor canônico por chave_normalizada e aplica no DF.
    """

    # Colunas categóricas (string)
    str_cols = [f.name for f in df.schema.fields if isinstance(f.dataType, T.StringType)]

    if not str_cols:
        return df, {}

    mapping_by_col = {}

    df_out = df

    for c in str_cols:
        # valores distintos + contagem
        stats = (
            df.select(F.col(c).alias("orig"))
              .where(F.col("orig").isNotNull() if keep_nulls else F.lit(True))
              .groupBy("orig")
              .count()
        )

        # (opcional) proteção contra colunas com cardinalidade absurda
        distinct_cnt = stats.count()
        if distinct_cnt > max_distinct_per_col:
            print(f"[SKIP] Coluna '{c}' tem {distinct_cnt:,} distintos (limite {max_distinct_per_col:,}).")
            continue

        stats = stats.withColumn("norm", normalize_udf(F.col("orig")))

        # define o canônico por norm
        # - most_frequent: usa o valor original mais frequente (bom quando você quer preservar formato humano)
        # - normalized: usa a própria string normalizada (bom quando você quer padronização total)
        if canonical_strategy == "most_frequent":
            w = F.window  # só pra não confundir; não vamos usar window function aqui
            canon = (
                stats.where(F.col("norm").isNotNull())
                     .groupBy("norm")
                     .agg(
                         F.max(F.struct(F.col("count"), F.col("orig"))).alias("mx")  # max por count
                     )
                     .select(
                         F.col("norm"),
                         F.col("mx.orig").alias("canonical")
                     )
            )
        elif canonical_strategy == "normalized":
            canon = (
                stats.select("norm")
                     .where(F.col("norm").isNotNull())
                     .distinct()
                     .withColumn("canonical", F.col("norm"))
            )
        else:
            raise ValueError("canonical_strategy deve ser 'most_frequent' ou 'normalized'")

        # junta pra criar mapa orig -> canonical
        map_df = (
            stats.select("orig", "norm")
                 .join(canon, on="norm", how="left")
                 .select("orig", "canonical")
                 .where(F.col("orig").isNotNull())
        )

        # traz o mapeamento para o driver (ok se distinct não for gigantesco)
        pairs = map_df.collect()
        mapping = {row["orig"]: row["canonical"] for row in pairs if row["canonical"] is not None}
        mapping_by_col[c] = mapping

        # aplica no DF via create_map (mais rápido do que UDF pra substituir)
        # cria um map literal: { "Avião" -> "aviao", "aviao" -> "aviao", ... }
        if mapping:
            kv = []
            for k, v in mapping.items():
                kv.extend([F.lit(k), F.lit(v)])
            m = F.create_map(*kv)

            df_out = df_out.withColumn(
                c,
                F.coalesce(m[F.col(c)], F.col(c))
            )

        print(f"[OK] Coluna '{c}': {distinct_cnt:,} distintos, {len(mapping):,} mapeados/normalizados.")

    return df_out, mapping_by_col


# -----------------------------
# 3) Uso
# -----------------------------
# df = spark.table("silver.base_flat")  # exemplo
df_norm, mapping = normalize_categorical_columns(
    df_base_pd_bronze,
    max_distinct_per_col=200000,
    canonical_strategy="normalized"  # deixa tudo padronizado (Avião/aviao -> aviao)
)

# Ex.: ver o mapeamento gerado para uma coluna
# mapping["setor"]  # dict {original: canonical}


In [0]:
schema_silver = StructType([
    StructField("id_cliente", StringType(), False),
    StructField("data_solicitacao", DateType(), False),
    StructField("uf", StringType(), True),
    StructField("regiao", StringType(), True),
    StructField("idade", IntegerType(), True),
    StructField("tempo_conta_anos", IntegerType(), True),
    StructField("canal", StringType(), True),
    StructField("produto", StringType(), True),
    StructField("escolaridade", StringType(), True),
    StructField("estado_civil", StringType(), True),
    StructField("vinculo_emprego", StringType(), True),
    StructField("setor", StringType(), True),
    StructField("usa_internet_banking", BooleanType(), True),
    StructField("possui_cartao_credito", BooleanType(), True),
    StructField("possui_investimentos", BooleanType(), True),
    StructField("possui_seguro", BooleanType(), True),
    StructField("qtd_produtos_bancarios", IntegerType(), True),
    StructField("renda_mensal_atual", DoubleType(), True),
    StructField("renda_mensal_anterior", DoubleType(), True),
    StructField("atrasos_passados", IntegerType(), True),
    StructField("score_credito", DoubleType(), True),
    StructField("parcelas", IntegerType(), True),
    StructField("valor_emprestimo", DoubleType(), True),
    StructField("taxa_juros_mensal", DoubleType(), True),
    StructField("valor_parcela", DoubleType(), True),
    StructField("dti", DoubleType(), True),
    StructField("frequencia_transacoes", IntegerType(), True),
    StructField("valor_emprestimos_anteriores", DoubleType(), True),
    StructField("pd_true", DoubleType(), True),
    StructField("inadimplente", IntegerType(), True),
    StructField("ead", DoubleType(), True),
    StructField("lgd", DoubleType(), True),
    StructField("valor_recuperado", DoubleType(), True),
    StructField("pd_model", DoubleType(), True),
    StructField("tempo_emprego_anos", DoubleType(), True)
])


In [0]:
df_processado_1 = df_norm \
    .withColumn("data_solicitacao", to_date("data_solicitacao")) \
    .select([col(c).cast(schema_silver[c].dataType) for c in df_norm.columns])

In [0]:
str_cols = [c for c in df_processado_1.columns if isinstance(schema_silver[c].dataType, StringType)]

for c in str_cols:
    df_processado_1 = df_processado_1.withColumn(c, trim(regexp_replace(col(c), "[^\\p{L}\\p{N}\\s_./@-]", "")))

In [0]:
display(df_processado_1)

In [0]:
from pyspark.sql.types import (
    StructType, StructField, StringType, IntegerType, LongType,
    FloatType, DoubleType, DecimalType, DateType
)
from pyspark.sql.functions import col, min as spark_min, max as spark_max

df = df_processado_1

resultados = []

for c in df.columns:
    dtype = df.schema[c].dataType

    # --- TRATAR id_cliente COMO NUMÉRICO ---
    if c == "id_cliente":
        df_num = df.withColumn(c, col(c).cast("long"))
        stats = df_num.agg(
            spark_min(col(c)).alias("min"),
            spark_max(col(c)).alias("max")
        ).collect()[0]

        resultados.append((c, "intervalo_num_forcado", str(stats["min"]), str(stats["max"])))
        continue

    # --- COLUNAS NUMÉRICAS ---
    if isinstance(dtype, (IntegerType, LongType, FloatType, DoubleType, DecimalType)):
        stats = df.agg(
            spark_min(col(c)).alias("min"),
            spark_max(col(c)).alias("max")
        ).collect()[0]
        resultados.append((c, "intervalo_num", str(stats["min"]), str(stats["max"])))

    # --- COLUNAS DE DATA ---
    elif isinstance(dtype, DateType):
        stats = df.agg(
            spark_min(col(c)).alias("min"),
            spark_max(col(c)).alias("max")
        ).collect()[0]
        resultados.append((c, "intervalo_data", str(stats["min"]), str(stats["max"])))

    # --- STRINGS / CATEGÓRICAS ---
    elif isinstance(dtype, StringType):
        valores = df.select(col(c)).distinct().limit(500).collect()
        valores = [row[c] for row in valores]
        resultados.append((c, "categorico", str(valores), None))

    # --- QUALQUER OUTRO TIPO ---
    else:
        resultados.append((c, f"tipo_{dtype}", None, None))

# Criar schema final
schema = StructType([
    StructField("coluna", StringType(), True),
    StructField("tipo", StringType(), True),
    StructField("valor_1", StringType(), True),
    StructField("valor_2", StringType(), True)
])

df_resultado = spark.createDataFrame(resultados, schema)
display(df_resultado)

In [0]:
from pyspark.sql import functions as F

# domínio fechado para UF (Brasil)
UF_DOMAIN = [
    "ac","al","ap","am","ba","ce","df","es","go","ma","mt","ms","mg",
    "pa","pb","pr","pe","pi","rj","rn","rs","ro","rr","sc","sp","se","to"
]

DOMAINS = {
    # ----- CATEGÓRICOS (lista fechada) -----
    "uf": {"type": "set", "allowed": UF_DOMAIN},
    "regiao": {"type": "set", "allowed": ["norte","nordeste","centro-oeste","sudeste","sul"]},

    # se você preferir fechar produto/canal com base no que existe hoje:
    # (pode trocar por "set" com lista fixa depois)
    "canal": {"type": "set", "allowed": ['app', 'internet banking', 'agencia', 'correspondente']},
    "produto": {
        "type": "set",
        "allowed": ['consignado', 'veiculo', 'pessoal', 'home_equity', 'cartao'],
        "allow_null": True
    },
    "escolaridade": {"type": "set", "allowed": ['medio', 'fundamental', 'pos', 'superior']},
    "estado_civil": {"type": "set", "allowed": ['solteiro', 'divorciado', 'casado', 'viuvo']},
    "vinculo_emprego": {"type": "set", "allowed": ['clt', 'autonomo', 'servidor', 'desempregado', 'empresario', 'estudante']},
    "setor": {"type": "set", "allowed": ['educacao', 'saude', 'comercio', 'industria', 'ti', 'construcao', 'servicos', 'outro']},

    # ----- BOOLEANOS / FLAGS -----
    "usa_internet_banking": {"type": "boolean"},
    "possui_cartao_credito": {"type": "boolean"},
    "possui_investimentos": {"type": "boolean"},
    "possui_seguro": {"type": "boolean"},
    "inadimplente": {"type": "set", "allowed": [0, 1]},

    # ----- DATAS -----
    "data_solicitacao": {"type": "date_range", "min": "1900-01-01", "max": "today"},

    # ----- NUMÉRICOS: intervalos (incluindo 0..+inf) -----
    "idade": {"type": "range", "min": 0, "max": 120},
    "tempo_conta_anos": {"type": "range", "min": 0, "max": None},          # 0..+inf
    "tempo_emprego_anos": {
        "type": "range", "min": 0, "max": None,        # 0..+inf
        "allow_null": True
    },

    "qtd_produtos_bancarios": {"type": "range_int", "min": 0, "max": None},# inteiro 0..+inf
    "renda_mensal_atual": {
        "type": "range", "min": 0, "max": None,        # 0..+inf
        "allow_null": True
    },
    "renda_mensal_anterior": {
        "type": "range", "min": 0, "max": None,        # 0..+inf
        "allow_null": True
    },

    "atrasos_passados": {"type": "range_int", "min": 0, "max": None},
    "score_credito": {"type": "range", "min": 0, "max": 1000},

    "parcelas": {"type": "range_int", "min": 1, "max": None},
    "valor_emprestimo": {"type": "range", "min": 0, "max": None},
    "taxa_juros_mensal": {"type": "range", "min": 0, "max": None},
    "valor_parcela": {"type": "range", "min": 0, "max": None},
    "dti": {"type": "range", "min": 0, "max": None},                       # normalmente >=0 (às vezes <=1 ou <=100)

    "frequencia_transacoes": {"type": "range", "min": 0, "max": None},
    "valor_emprestimos_anteriores": {
        "type": "range", "min": 0, "max": None,
        "allow_null": True
    },

    # Risco: probabilidades e parâmetros
    "pd_true": {"type": "range", "min": 0, "max": 1},
    "pd_model": {"type": "range", "min": 0, "max": 1},
    "lgd": {"type": "range", "min": 0, "max": 1},

    # Exposição e recuperações
    "ead": {"type": "range", "min": 0, "max": None},
    "valor_recuperado": {"type": "range", "min": 0, "max": None},
}

In [0]:
from pyspark.sql import DataFrame
from pyspark.sql import functions as F

def _today_literal():
    return F.current_date()

def build_rule_expr(colname: str, rule: dict):
    c = F.col(colname)

    # se for nulo, você decide: considerar válido ou inválido
    # aqui vou considerar NULO como inválido por padrão
    not_null = c.isNotNull()

    t = rule["type"]

    if t == "set":
        allowed = rule["allowed"]
        allow_null = rule.get("allow_null", False)

        expr = c.isin(allowed)
        if allow_null:
            expr = expr | c.isNull()
        else:
            expr = expr & c.isNotNull()

        return expr

    if t == "non_empty_string":
        return not_null & (F.length(F.trim(c)) > 0)

    if t in ("range", "range_int"):
        mn = rule.get("min", None)
        mx = rule.get("max", None)
        allow_null = rule.get("allow_null", False)

        # valida o intervalo (sem forçar not_null ainda)
        expr = F.lit(True)
        if mn is not None:
            expr = expr & (c >= F.lit(mn))
        if mx is not None:
            expr = expr & (c <= F.lit(mx))

        if t == "range_int":
            expr = expr & (c == F.floor(c))

        # trata NULL conforme allow_null
        if allow_null:
            expr = expr | c.isNull()
        else:
            expr = expr & c.isNotNull()

        return expr

    if t == "date_range":
        mn = rule.get("min", None)
        mx = rule.get("max", None)

        expr = not_null
        if mn is not None:
            expr = expr & (c >= F.to_date(F.lit(mn)))
        if mx == "today":
            expr = expr & (c <= _today_literal())
        elif mx is not None:
            expr = expr & (c <= F.to_date(F.lit(mx)))
        return expr
    
    if t == "boolean":
        # aceita True/False; por padrão considera NULL como inválido
        return c.isNotNull() & (c.isin(True, False))

    raise ValueError(f"Tipo de regra desconhecido: {t}")

from pyspark.sql import DataFrame
from pyspark.sql import functions as F

def validate_domains(df: DataFrame, domains: dict):
    # só valida colunas que existem no DF
    domains = {k: v for k, v in domains.items() if k in df.columns}

    validations = {colname: build_rule_expr(colname, rule) for colname, rule in domains.items()}

    # cria uma lista de arrays, cada um contendo o nome da coluna com erro ou array vazio
    error_arrays = [
        F.when(~ok_expr, F.array(F.lit(colname))).otherwise(F.array())
        for colname, ok_expr in validations.items()
    ]

    # concatena todos os arrays em um único array de erros (garantido não-nulo)
    erros = F.flatten(F.array(*error_arrays))  # sempre retorna array (pode ser vazio)

    df_valid = (
        df
        .withColumn("__erros", erros)
        .withColumn("__qtd_erros", F.size(F.col("__erros")))
        .withColumn("__is_valid", F.col("__qtd_erros") == 0)
    )

    # resumo por coluna (% inválido) - mantém sua lógica
    total = df.count()
    summary_rows = []
    for colname, ok_expr in validations.items():
        invalid = df.where(~ok_expr).count()
        summary_rows.append((colname, invalid, total, invalid / total if total else 0.0))

    summary = spark.createDataFrame(summary_rows, ["coluna", "qtd_invalidos", "total", "pct_invalido"])
    return df_valid, summary


In [0]:
df_validado, resumo = validate_domains(df_processado_1, DOMAINS)

# resumo (% inválido por coluna)
display(resumo.orderBy(F.desc("pct_invalido")))

In [0]:
from pyspark.sql import functions as F

# 2) separa em válidos e inválidos
df_ok = df_validado.filter(F.col("__is_valid") == True)
df_bad = df_validado.filter(F.col("__is_valid") == False)

display(df_ok)

# (opcional) remover colunas de auditoria dos válidos
df_ok_clean = df_ok.drop("__erros", "__qtd_erros", "__is_valid")

# (recomendado) manter auditoria nos inválidos
# df_bad mantém __erros, __qtd_erros, __is_valid

# 3) salvar como tabelas Delta
# ajuste os nomes/schema conforme seu projeto
df_ok_clean.write.mode("overwrite").format("delta").saveAsTable("puc.silver.base_pd")
df_bad.write.mode("overwrite").format("delta").saveAsTable("puc.silver.base_pd_invalidos")


In [0]:
df_base_pd_silver = spark.table("puc.silver.base_pd")
display(df_base_pd_silver)
print(df_base_pd_silver.count())

In [0]:
df_base_pd_silver_invalidos = spark.table("puc.silver.base_pd_invalidos")
display(df_base_pd_silver_invalidos)
print(df_base_pd_silver_invalidos.count())