In [0]:
import dlt
from pyspark.sql.functions import md5, concat_ws, col, expr, to_date, to_timestamp
from pyspark.sql.types import (
    StructType, StructField, IntegerType,
    StringType, TimestampType, DoubleType, LongType, DoubleType
)

In [0]:
customer_schema = ('''
    row_key STRING PRIMARY KEY NOT NULL COMMENT "Unique key for each record",
    customer_id INT NOT NULL,
    customer_name STRING,
    country STRING,
    md5_key STRING,
    __START_AT STRING,
    __END_AT STRING
  ''')

@dlt.view
def raw_customer():
  df = spark.readStream.table("raw_schema.raw_customer")
  return df.withColumn("row_key", md5(concat_ws("|", col("customer_id").cast("string")))) \
          .withColumn('customer_id', col("customer_id").cast("int")) \
          .withColumn('customer_name', col("name").cast("string")) \
          .withColumn("md5_key", md5(concat_ws("|", col("customer_name"), col("country")))) \
          .select("row_key", "customer_id", "customer_name", "country", "md5_key","db_timestamp", "marker")

dlt.create_streaming_table(
    name="lake_schema.gmsgq_customers",
    comment="Customer table with SCD2",
    table_properties={"quality": "bronze"},
    schema=customer_schema,
)

dlt.create_auto_cdc_flow(
  target = "lake_schema.gmsgq_customers",
  source = "raw_customer",
  keys = ["row_key"],
  sequence_by = col("db_timestamp"),
  apply_as_deletes = expr("marker = 'D'"),
  except_column_list = ["marker", "db_timestamp"],
  stored_as_scd_type = 2
)

In [0]:
# Define DDL schema for the sales table
sales_schema = ('''
  row_key STRING PRIMARY KEY NOT NULL COMMENT "Unique key per row for SCD1 overwrite",
  customer_id INTEGER NOT NULL,
  product_id INTEGER NOT NULL,
  sale_date DATE,
  amount DOUBLE,
  md5_key STRING
''')

# Create a DLT view from the raw source with required transformations
@dlt.view
def raw_sales():
  df = spark.readStream.table("raw_schema.raw_sales")
  return df \
    .withColumn("customer_id", col("customer_id").cast("int")) \
    .withColumn("product_id", col("product_id").cast("int")) \
    .withColumn("sale_date", to_date(col("date"), "yyyy-MM-dd")) \
    .withColumn("amount", col("amount").cast("double")) \
    .withColumn("db_timestamp", to_timestamp(col("db_timestamp"))) \
    .withColumn("row_key", md5(concat_ws("|", col("customer_id").cast("string"), col("product_id").cast("string")))) \
    .withColumn("md5_key", md5(concat_ws("|", col("sale_date").cast("string"), col("amount").cast("string")))) \
    .select("row_key", "customer_id", "product_id", "sale_date", "amount", "md5_key", "db_timestamp", "marker")

# Register a streaming DLT table with schema
dlt.create_streaming_table(
    name="lake_schema.gmsgq_sales",
    comment="Sales table with SCD1 (latest row per key)",
    table_properties={"quality": "bronze"},
    schema=sales_schema,
)

# Define the SCD1 merge logic using auto_cdc_flow
dlt.create_auto_cdc_flow(
  target = "lake_schema.gmsgq_sales",
  source = "raw_sales",
  keys = ["row_key"],
  sequence_by = col("db_timestamp"),
  apply_as_deletes = expr("marker = 'D'"),
  except_column_list = ["marker","db_timestamp"],
  stored_as_scd_type = 1
)


In [0]:
product_schema = ('''
  row_key STRING PRIMARY KEY NOT NULL COMMENT "Deterministic row key for SCD1 deduplication",
  product_id INT NOT NULL,
  product_name STRING,
  product_category STRING,
  md5_key STRING
''')

@dlt.view
def raw_product():
  df = spark.readStream.table("raw_schema.raw_product")
  return df \
    .withColumn("product_id", col("product_id").cast("int")) \
    .withColumn("product_category", col("category").cast("string")) \
    .withColumn("db_timestamp", to_timestamp(col("db_timestamp"))) \
    .withColumn("row_key", md5(col("product_id").cast("string"))) \
    .withColumn("md5_key", md5(concat_ws("|", col("product_name"), col("category")))) \
    .select("row_key", "product_id", "product_name", "product_category", "md5_key", "db_timestamp", "marker")

dlt.create_streaming_table(
    name="lake_schema.gmsgq_products",
    comment="Product table with SCD1 overwrite (latest version per product_id)",
    table_properties={"quality": "bronze"},
    schema=product_schema,
)

dlt.create_auto_cdc_flow(
  target = "lake_schema.gmsgq_products",
  source = "raw_product",
  keys = ["row_key"],
  sequence_by = col("db_timestamp"),
  apply_as_deletes = expr("marker = 'D'"),
  except_column_list = ["marker","db_timestamp"],
  stored_as_scd_type = 1
)
