![./ImageLab.png](./Images/ImageLab.png "./ImageLab.png")

# Data Engineering with Lakeflow, Jobs, AutoLoader and more

In [0]:
CHECKPOINT_LOCATION = dbutils.widgets.get("param_location")+"/checkpoint"
from pyspark.sql import functions as F

## Load Data for Silver

In [0]:
# Reading table
df_bronze=spark.readStream \
  .table("medallion_autoloader.bronze.dim_customer")

display(df_bronze)

In [0]:
# Treating data
from pyspark.sql import functions as F

# Treating customer_bk
df_bronze_treated = df_bronze \
  .withColumn("customer_bk", F.col('customer_bk').cast("string")) \
  .withColumn("customer_bk", F.regexp_replace(F.col('customer_bk'), "[^0-9]", ""))

# Treating customer_name
df_bronze_treated = df_bronze_treated \
  .withColumn("customer_name", F.trim(F.col('customer_name')))

# Treating birth_date
#Alternative 1 with coalesce
# formats = ["dd/MM/yyyy", "yyyy-MM-dd", "MM/dd/yyyy", "dd-MM-yyyy"]
# df_bronze_treated = df_bronze_treated.withColumn("birth_date",F.coalesce(*[F.try_to_date("birth_date", f) for f in formats]))

# Alternative 2 using ai function
df_bronze_treated = df_bronze_treated.selectExpr("* EXCEPT(birth_date)","ai_query('databricks-meta-llama-3-3-70b-instruct',concat('Padronize as datas para o formato 2025-01-01. Apenas me dê o resulado final e nada mais: ', birth_date)) as birth_date")

# Treating segment
df_bronze_treated = df_bronze_treated.withColumn("segment", F.trim(F.initcap(F.col('segment'))))

# Treating region
df_bronze_treated = df_bronze_treated.selectExpr("* EXCEPT(region)","ai_query('databricks-meta-llama-3-3-70b-instruct',concat('Padronize as regiões para o formato de sigla do estado do Brasil, como SP, MG, RJ, etc. Apenas me dê o resulado final e nada mais: ', region)) as region")

# Treating effective_ts
fmt_iso = "yyyy-MM-dd'T'HH:mm:ss'Z'"
fmt_space = "yyyy-MM-dd HH:mm:ss"

df_bronze_treated = df_bronze_treated.withColumn(
    "effective_ts_clean", 
    F.coalesce(
        F.try_to_timestamp(F.col("effective_ts"), F.lit(fmt_iso)),
        F.try_to_timestamp(F.col("effective_ts"), F.lit(fmt_space))
    )
).drop("effective_ts").withColumnRenamed("effective_ts_clean", "effective_ts")

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from delta.tables import DeltaTable

df_bronze_stream = df_bronze_treated

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from delta.tables import DeltaTable

TARGET_TABLE = "medallion_autoloader.silver.dim_customer"

def upsert_batch(df_bronze_batch, batch_id):
    w_last = Window.partitionBy("customer_bk").orderBy(F.col("effective_ts").desc())
    df_bronze_dedup = (
        df_bronze_batch
        .select("customer_bk", "customer_name", "birth_date", "segment", "region", "effective_ts", "file_path", "file_mod_time") 
        .withColumn("rn", F.row_number().over(w_last))
        .filter(F.col("rn") == 1)
        .drop("rn")
    )

    if not spark.catalog.tableExists(TARGET_TABLE):
        df_init = (
            df_bronze_dedup
            .withColumn("valid_from", F.col("effective_ts"))
            .withColumn("valid_to",   F.lit(None).cast("timestamp"))
            .withColumn("is_current", F.lit(True))
            .withColumn("version",    F.lit(1).cast("int"))
        )

        w_sk_init = Window.orderBy("customer_bk", "valid_from", "customer_name", "birth_date")
        df_init = df_init.withColumn(
            "customer_sk",
            F.row_number().over(w_sk_init).cast("bigint")
        )

        df_init = df_init.select(
            "customer_bk",
            "customer_name",
            "birth_date",
            "segment",
            "region",
            "effective_ts",
            "valid_from",
            "valid_to",
            "is_current",
            "version",
            "customer_sk",
            "file_path",
            "file_mod_time"
        )

        try:
            (
                df_init
                .write
                .format("delta")
                .mode("error")
                .saveAsTable(TARGET_TABLE)
            )
            return
        except Exception as e:
            if "DELTA_TABLE_ALREADY_EXISTS" in str(e) or "already exists" in str(e):
                pass
            else:
                raise e

    delta = DeltaTable.forName(spark, TARGET_TABLE)

    (
        delta.alias("delta")
        .merge(
            source=df_bronze_dedup.alias("df"),
            condition="df.customer_bk = delta.customer_bk"
        )
        .whenMatchedUpdate(
            condition="""
                delta.is_current = true AND (
                    delta.customer_name <> df.customer_name OR
                    delta.birth_date    <> df.birth_date    OR
                    delta.segment       <> df.segment       OR
                    delta.region        <> df.region
                )
            """,
            set={
                "is_current": F.lit(False),
                "valid_to":   F.col("df.effective_ts"),
            }
        )
        .execute()
    )

    dim_customer = spark.read.table(TARGET_TABLE)
    dim_current  = dim_customer.filter(F.col("is_current") == True)

    df_join = df_bronze_dedup.alias("df").join(
        dim_current.alias("delta"),
        on=[F.col("df.customer_bk") == F.col("delta.customer_bk")],
        how="left"
    )

    df_new = df_join.filter(F.col("delta.customer_bk").isNull()).select("df.*")

    df_changed = df_join.filter(
        (F.col("delta.customer_bk").isNotNull()) &
        (
            (F.col("delta.customer_name") != F.col("df.customer_name")) |
            (F.col("delta.birth_date")    != F.col("df.birth_date"))    |
            (F.col("delta.segment")       != F.col("df.segment"))       |
            (F.col("delta.region")        != F.col("df.region"))
        )
    ).select("df.*")

    df_remaining = df_new.unionByName(df_changed)

    if df_remaining.limit(1).count() == 0:
        return

    df_remaining = (
        df_remaining
        .withColumn("valid_from", F.col("effective_ts"))
        .withColumn("valid_to",   F.lit(None).cast("timestamp"))
        .withColumn("is_current", F.lit(True))
    )

    max_ver_por_bk = dim_customer.groupBy("customer_bk") \
        .agg(F.max("version").alias("max_version"))

    df_remaining = (
        df_remaining
        .join(max_ver_por_bk, on="customer_bk", how="left")
        .withColumn("max_version", F.coalesce(F.col("max_version"), F.lit(0)))
        .withColumn("version", (F.col("max_version") + F.lit(1)).cast("int"))
        .drop("max_version")
    )

    max_sk = dim_customer.agg(F.max("customer_sk").alias("max_sk")).collect()[0]["max_sk"]
    if max_sk is None:
        max_sk = 0

    w_sk = Window.orderBy("valid_from", "customer_bk", "customer_name", "birth_date")
    df_remaining = df_remaining.withColumn(
        "customer_sk",
        (F.row_number().over(w_sk) + F.lit(max_sk)).cast("bigint")
    )

    delta = DeltaTable.forName(spark, TARGET_TABLE)

    (
        delta.alias("delta")
        .merge(
            source=df_remaining.alias("df"),
            condition="delta.customer_sk = df.customer_sk"
        )
        .whenNotMatchedInsert(
            values={
                "customer_sk":   F.col("df.customer_sk"),
                "customer_bk":   F.col("df.customer_bk"),
                "customer_name": F.col("df.customer_name"),
                "birth_date":    F.col("df.birth_date"),
                "segment":       F.col("df.segment"),
                "region":        F.col("df.region"),
                "valid_from":    F.col("df.valid_from"),
                "valid_to":      F.col("df.valid_to"),
                "is_current":    F.col("df.is_current"),
                "version":       F.col("df.version"),
                "file_path":     F.col("df.file_path"),
                "file_mod_time": F.col("df.file_mod_time")
            }
        )
        .execute()
    )


In [0]:
query = (
    df_bronze_stream
    .writeStream
    .option("checkpointLocation", CHECKPOINT_LOCATION+"/silver/dim_customer")
    .foreachBatch(upsert_batch)
    .trigger(availableNow=True)
    .start()
)
