### Step 1: Setting Up the Foundation

In [0]:
# # Setup script for generating sample CDC data
# catalog = "data_university"
# schema = dbName = db = "lakeflow"
# volume_name = "raw_data"

# # Create catalog, schema, and volume
# spark.sql(f'CREATE CATALOG IF NOT EXISTS `{catalog}`')
# spark.sql(f'USE CATALOG `{catalog}`')
# spark.sql(f'CREATE SCHEMA IF NOT EXISTS `{catalog}`.`{schema}`')
# spark.sql(f'USE SCHEMA `{schema}`')
# spark.sql(f'CREATE VOLUME IF NOT EXISTS `{catalog}`.`{schema}`.`{volume_name}`')

# volume_folder = f"/Volumes/{catalog}/{db}/{volume_name}"

# # Generate sample customer data with CDC operations
# from pyspark.sql import functions as F
# from faker import Faker
# from collections import OrderedDict
# import uuid
# import random

# fake = Faker()
# fake_firstname = F.udf(fake.first_name)
# fake_lastname = F.udf(fake.last_name)
# fake_email = F.udf(fake.ascii_company_email)
# fake_date = F.udf(lambda: fake.date_time_this_month().strftime("%m-%d-%Y %H:%M:%S"))
# fake_address = F.udf(fake.address)

# # Define CDC operations with realistic distribution
# operations = OrderedDict([("APPEND", 0.5), ("DELETE", 0.1), ("UPDATE", 0.3), (None, 0.01)])
# fake_operation = F.udf(lambda: fake.random_elements(elements=operations, length=1)[0])
# fake_id = F.udf(lambda: str(uuid.uuid4()) if random.uniform(0, 1) < 0.98 else None)

# # Generate sample dataset
# df = spark.range(0, 100000).repartition(100)
# df = df.withColumn("id", fake_id())
# df = df.withColumn("firstname", fake_firstname())
# df = df.withColumn("lastname", fake_lastname())
# df = df.withColumn("email", fake_email())
# df = df.withColumn("address", fake_address())
# df = df.withColumn("operation", fake_operation())
# df_customers = df.withColumn("operation_date", fake_date())

# # Save the sample data
# df_customers.repartition(100).write.format("json").mode("overwrite").save(volume_folder + "/customers")

### Step 2: Bronze Layer - Raw Data Ingestion with Auto Loader

In [0]:
from dlt import *
from pyspark.sql.functions import *

# Create the target bronze table
dlt.create_streaming_table(
    "customers_cdc_bronze",
    comment="New customer data incrementally ingested from cloud object storage landing zone"
)

# Create an Append Flow to ingest the raw data into the bronze table
@append_flow(
    target="customers_cdc_bronze",
    name="customers_bronze_ingest_flow"
)
def customers_bronze_ingest_flow():
    return (
        spark.readStream
            .format("cloudFiles")
            .option("cloudFiles.format", "json")
            .option("cloudFiles.inferColumnTypes", "true")
            .load("/Volumes/data_university/lakeflow/raw_data/customers/")
    )


### Step 3: Silver Layer - Data Quality and Cleansing


In [0]:
# Create streaming table with data quality expectations
dlt.create_streaming_table(
    name="customers_cdc_clean",
    expect_all_or_drop={
        "no_rescued_data": "_rescued_data IS NULL",
        "valid_id": "id IS NOT NULL", 
        "valid_operation": "operation IN ('APPEND', 'DELETE', 'UPDATE')"
    }
)

@append_flow(
    target="customers_cdc_clean",
    name="customers_cdc_clean_flow"
)
def customers_cdc_clean_flow():
    return (
        dlt.read_stream("customers_cdc_bronze")
            .select("address", "email", "id", "firstname", "lastname", 
                   "operation", "operation_date", "_rescued_data")
    )


### Step 4: Materialized Customer Table with AUTO CDC

In [0]:
# Create the target customer table
dlt.create_streaming_table(
    name="customers", 
    comment="Clean, materialized customers"
)

# Create AUTO CDC flow to process changes
dlt.create_auto_cdc_flow(
    target="customers",                    # The customer table being materialized
    source="customers_cdc_clean",          # The incoming CDC stream
    keys=["id"],                          # Primary key for matching rows
    sequence_by=col("operation_date"),     # Order operations by timestamp
    ignore_null_updates=False,
    apply_as_deletes=expr("operation = 'DELETE'"),  # Handle DELETE operations
    except_column_list=["operation", "operation_date", "_rescued_data"]
)


### Step 5: Slowly Changing Dimension Type 2 (SCD2)

In [0]:
# Create the SCD2 history table
dlt.create_streaming_table(
    name="customers_history", 
    comment="Slowly Changing Dimension Type 2 for customers"
)

# Create AUTO CDC flow with SCD2 enabled
dlt.create_auto_cdc_flow(
    target="customers_history",
    source="customers_cdc_clean",
    keys=["id"],
    sequence_by=col("operation_date"),
    ignore_null_updates=False,
    apply_as_deletes=expr("operation = 'DELETE'"),
    except_column_list=["operation", "operation_date", "_rescued_data"],
    stored_as_scd_type="2"  # Enable SCD2 to store individual updates
)


### Step 6: Gold Layer - Business Analytics

In [0]:
@dlt.table(
    name="customers_history_agg",
    comment="Aggregated customer history for analytics"
)
def customers_history_agg():
    return (
        dlt.read("customers_history")
            .groupBy("id")
            .agg(
                count("address").alias("address_count"),
                count("email").alias("email_count"), 
                count("firstname").alias("firstname_count"),
                count("lastname").alias("lastname_count")
            )
    )

In [0]:
%sql

SELECT count(*) FROM data_university.lakeflow.customers_cdc_clean where operation = 'DELETE'

In [0]:
%sql

SELECT count(*) FROM data_university.lakeflow.customers_cdc_clean where operation NOT LIKE 'DELETE'