In [0]:
from pyspark.sql import functions as F, Window as W, types as T
from delta.tables import DeltaTable
from pyspark.sql.functions import countDistinct

In [0]:
spark.conf.set("spark.sql.session.timeZone", "UTC")

spark.sql("USE CATALOG `fomacao_microsoft_power_bi_profisional`")
spark.sql("USE `pessoas`")

bronze_tbl = "bronze_pessoas"
silver_tbl = "silver_pessoas"

In [0]:
df = spark.table(bronze_tbl)

In [0]:
str_cols = [f.name for f in df.schema.fields if isinstance(f.dataType, T.StringType)]

df_clean = df
for c in str_cols:
    df_clean = df_clean.withColumn(c, F.trim(F.regexp_replace(F.col(c), r"\s+", " ")))

In [0]:
bronze_tech = {"_ingest_ts_utc", "_ingest_date", "_source_path", "_source_file", "_batch_id", "_row_hash"}
canon_cols = [c for c in df_clean.columns if c not in bronze_tech]

In [0]:
def check_primary_key(df, candidates):
    for c in df.columns:
        total = df.select(f"`{c}`").count()
        distintos = df.select(countDistinct(f"`{c}`")).collect()[0][0]
        if distintos == total:
            print(f"{candidates} - Coluna `{c}` pode ser uma chave primária: {distintos}/{total}")

check_primary_key(df_clean, "PESSOAS")

In [0]:
candidates = ["Codigo", "email", "_row_hash"]
cols_lower = [c.lower() for c in df_clean.columns]
natural_key = next((k for k in [c.lower() for c in candidates] if k in cols_lower), None)

In [0]:
if natural_key:
    w = W.partitionBy(natural_key).orderBy(F.col("_ingest_ts_utc").desc_nulls_last())
    df_stage = (df_clean.withColumn("_rn", F.row_number().over(w)).filter(F.col("_rn") == 1).drop("_rn"))
else:
    df_stage = df_clean