In [0]:
from datetime import datetime, timedelta

# yesterday’s file (assuming you process next day)
process_date = (datetime.today() - timedelta(days=1)).strftime("%Y/%m/%d")
#process_date = datetime.today().strftime("%Y/%m/%d") #--today's file

path = f"abfss://input@adlssource0001.dfs.core.windows.net/sftp/{process_date}/*.parquet"


raw_df = spark.read.format("parquet") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(path)

display(raw_df)

Delta Table

In [0]:
target_path = "abfss://output@adlstarget0001.dfs.core.windows.net/curated_sftp/order_customers_delta"

(raw_df.write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .save(target_path))

In [0]:
%sql
CREATE TABLE IF NOT EXISTS indiametastore.default.order_customers_delta
USING DELTA
LOCATION "abfss://output@adlstarget0001.dfs.core.windows.net/curated_sftp/order_customers_delta"

SCD 2

In [0]:
from pyspark.sql.functions import current_date, current_timestamp, lit, expr
from pyspark.sql.window import Window
from delta.tables import DeltaTable

# =========================================
# CONFIG
# =========================================
# Base folder where ADF writes CSV→Parquet bulk
sftp_base = "abfss://input@adlssource0001.dfs.core.windows.net/sftp"

# Delta target for SCD2 table
delta_path = "abfss://output@adlstarget0001.dfs.core.windows.net/curated_sftp/order_customers_delta"

# Business key for SCD2
key_cols = ["customer_id","customer_unique_id"]


# =========================================
# STEP 1: Find latest partition folder (YYYY/MM/DD)
# =========================================
years = dbutils.fs.ls(sftp_base + "/")
year_names = [f.name.rstrip('/') for f in years if f.isDir()]
if not year_names:
    raise ValueError("No year folders found under csv_base.")

latest_year = max(year_names)

months = dbutils.fs.ls(f"{sftp_base}/{latest_year}/")
month_names = [f.name.rstrip('/') for f in months if f.isDir()]
if not month_names:
    raise ValueError(f"No month folders found under year {latest_year}.")

latest_month = max(month_names)

days = dbutils.fs.ls(f"{sftp_base}/{latest_year}/{latest_month}/")
day_names = [f.name.rstrip('/') for f in days if f.isDir()]
if not day_names:
    raise ValueError(f"No day folders found under {latest_year}/{latest_month}.")

latest_day = max(day_names)

latest_path = f"{sftp_base}/{latest_year}/{latest_month}/{latest_day}/*.parquet"
print(f"Reading from: {latest_path}")

file_date = f"{latest_year}-{latest_month}-{latest_day}"


# =========================================
# STEP 2: Read raw data and add SCD2/audit columns
# =========================================
raw_df = spark.read.parquet(latest_path)

new_df = (raw_df
          .withColumn("ingest_date", current_date())
          .withColumn("file_date", lit(file_date))
          .withColumn("updated_at", current_timestamp())
          .withColumn("customer_dim_id", expr("uuid()"))
          .withColumn("is_active", lit(1))
          .withColumn("start_date", current_date())
          .withColumn("end_date", lit(None).cast("date")))


# =========================================
# STEP 3: Initial load or align schema
# =========================================
if not DeltaTable.isDeltaTable(spark, delta_path):
    print("Delta table path does not exist yet. Performing initial load.")
    (new_df.write
        .format("delta")
        .mode("overwrite")
        .option("mergeSchema", "true")
        .save(delta_path))
else:
    print("Delta table exists. Aligning schema if needed, then performing SCD2 merge.")
    deltaTable = DeltaTable.forPath(spark, delta_path)
    existing_columns = [f.name for f in deltaTable.toDF().schema.fields]

    df = deltaTable.toDF()

    # Ensure SCD2 columns
    if "customer_dim_id" not in existing_columns:
        df = df.withColumn("customer_dim_id", lit(None).cast("string"))
    if "is_active" not in existing_columns:
        df = df.withColumn("is_active", lit(1))
    if "start_date" not in existing_columns:
        df = df.withColumn("start_date", current_date())
    if "end_date" not in existing_columns:
        df = df.withColumn("end_date", lit(None).cast("date"))

    # Ensure audit columns
    if "file_date" not in existing_columns:
        df = df.withColumn("file_date", lit(None).cast("string"))
    if "ingest_date" not in existing_columns:
        df = df.withColumn("ingest_date", lit(None).cast("date"))
    if "updated_at" not in existing_columns:
        df = df.withColumn("updated_at", lit(None).cast("timestamp"))

    # Overwrite with aligned schema
    (df.write
        .format("delta")
        .mode("overwrite")
        .option("overwriteSchema", "true")
        .save(delta_path))

    # Reload as DeltaTable for merge
    deltaTable = DeltaTable.forPath(spark, delta_path)

    # =========================================
    # STEP 4: SCD2 merge
    # =========================================
    join_cond = " AND ".join([f"t.{c} = s.{c}" for c in key_cols])

    (deltaTable.alias("t")
        .merge(
            new_df.alias("s"),
            join_cond
        )
        .whenMatchedUpdate(
            condition="""
                t.is_active = 1 AND (
                    t.customer_zip_code_prefix <> s.customer_zip_code_prefix OR
                    t.customer_city <> s.customer_city OR
                    t.customer_state <> s.customer_state
                )
            """,
            set={
                "is_active": "0",
                "end_date": "current_date()"
            }
        )
        .whenNotMatchedInsert(values={
            "customer_dim_id": "s.customer_dim_id",
            "customer_id": "s.customer_id",
            "customer_unique_id": "s.customer_unique_id",
            "customer_zip_code_prefix": "s.customer_zip_code_prefix",
            "customer_city": "s.customer_city",
            "customer_state": "s.customer_state",
            "ingest_date": "s.ingest_date",
            "file_date": "s.file_date",
            "updated_at": "s.updated_at",
            "is_active": "1",
            "start_date": "current_date()",
            "end_date": "null"
        })
        .execute())


# =========================================
# STEP 5: Check results
# =========================================
print("Before Merge (raw_df):")
raw_df.show(10, truncate=False)

print("After Merge (Delta SCD2 table):")
updated_df = spark.read.format("delta").load(delta_path)
updated_df.show(20, truncate=False)
