In [0]:
import dlt
from pyspark.sql.functions import col, to_date, current_timestamp, lit, max

Stream Data To Bronze Layer

In [0]:
bronze_schema ='bronze'
silver_schema ='silver'
gold_schema ='gold'

In [0]:
VOLUME_PATH = "/Volumes/workspace/dev/customers/"

In [0]:
@dlt.table(
    name=f"{bronze_schema}.customers_bronze",
    comment="Raw customer CDC data ingested from volume, append-only."
)
def customers_raw_bronze():
    """
    Reads streaming CSV data from volume path using Auto Loader
    This creates the bronze layer table.
    """
    df = (spark.readStream
        .format("cloudFiles") # Use Auto Loader for efficient file ingestion
        .option("cloudFiles.format", "csv") # Specify the input file format
        .option("cloudFiles.inferColumnTypes", "true") # Automatically infer column data types
        .option("cloudFiles.header", "true") # Indicate that the CSV files have a header row
        .option("cloudFiles.schemaEvolutionMode", "addNewColumns") # Handle new columns gracefully
        .option("cloudFiles.validateOptions", "false")
        .load(VOLUME_PATH)
        .withColumn("ingestion_timestamp", current_timestamp()) # Add an ingestion timestamp
    )

    return df

Silver Layer -> Apply SCD 1 -> History not maintained

In [0]:
def customers_silver_scd1():
    """
    Applies SCD Type 1 logic to the bronze table, keeping only the latest
    record for each customer_id based on update_timestamp.
    """
    # dlt.apply_changes is designed for CDC and simplifies SCD Type 1.
    # It automatically handles inserts and updates based on the primary key
    # and sequence column

    def load_from_bronze():
        return dlt.read_stream(f"{bronze_schema}.customers_bronze")

    dlt.create_streaming_table(
        name =f"{silver_schema}.customers_silver",
        comment="Customer data with SCD Type 1 applied, showing current state."
    
        )

    dlt.create_auto_cdc_flow(
        target=f"{silver_schema}.customers_silver", # Refers to the current table being defined (customers_silver)
        source=f"{bronze_schema}.customers_bronze", # Reads from the bronze streaming table
        keys=["customer_id"], # The primary key for identifying unique records
        sequence_by=col("update_timestamp"), # The column used to determine the latest record
        stored_as_scd_type="1" # Set to True for SCD Type 1, False for SCD Type 2
    
    )

customers_silver_scd1()

Gold Layer reading from silver

In [0]:
@dlt.table(
    name=f"{gold_schema}.customers_gold",
    comment="Customers gold layer table"
)
def customers_gold():
    """
    Creates a curated view of customer data from the silver layer,
    filtering for active customers and selecting relevant columns.
    """
    df = (
        dlt.read(f"{silver_schema}.customers_silver") # Reads from the silver SCD Type 1 table
        .filter(col("status") == "Active")
        .select(
            col("customer_id"),
            col("customer_name"),
            col("city"),
            col("status"),
            col("update_timestamp")
        )
    )

    return df