In [0]:
import dlt
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# -------------------------------
# Config
# -------------------------------
BRONZE_TABLE = "events_raw"
CATALOG = "dev_aoc_catalog"
BRONZE_SCHEMA = "bronze_google_analytics"
SK_PADDING = 10

bronze_table_full = f"{CATALOG}.{BRONZE_SCHEMA}.{BRONZE_TABLE}"

# -------------------------------
# Silver Table - Sessions (Batch Incremental)
# -------------------------------
@dlt.table(
    name="ga_silver_web_sessions",
    comment="Silver GA web sessions - Incremental batch processing",
    table_properties={
        "delta.autoOptimize.optimizeWrite": "true",
        "delta.autoOptimize.autoCompact": "true"
    }
)
def silver_sessions():
    SK_PREFIX = "SES"
    """
    Incremental batch processing of sessions.
    DLT automatically tracks what has been processed.
    """
    df = dlt.read(bronze_table_full)
    
    # Extract fields from STRUCT traffic_source
    df = df.withColumn("traffic_source_val", F.col("traffic_source.source")) \
           .withColumn("traffic_medium_val", F.col("traffic_source.medium")) \
           .withColumn("traffic_campaign_val", F.col("traffic_source.name"))
    
    # Handle potential duplicate columns
    if "user_id" in df.columns:
        df = df.withColumnRenamed("user_pseudo_id", "user_pseudo_id_sk")
        user_col_name = "user_pseudo_id_sk"
    else:
        df = df.withColumnRenamed("user_pseudo_id", "user_id")
        user_col_name = "user_id"
    
    # Rename traffic_source fields safely
    traffic_source_col = "traffic_source_extracted" if "traffic_source" in df.columns else "traffic_source"
    traffic_medium_col = "traffic_medium_extracted" if "traffic_medium" in df.columns else "traffic_medium"
    traffic_campaign_col = "traffic_campaign_extracted" if "traffic_campaign" in df.columns else "traffic_campaign"
    
    df = df.withColumnRenamed("traffic_source_val", traffic_source_col) \
           .withColumnRenamed("traffic_medium_val", traffic_medium_col) \
           .withColumnRenamed("traffic_campaign_val", traffic_campaign_col)
    
    # Transform columns
    df_transformed = (
        df
        .withColumn("session_date", F.to_date((F.col("event_timestamp")/1000000).cast("timestamp")))
        .withColumn("session_start_time", (F.col("event_timestamp")/1000000).cast("timestamp"))
        .withColumn("device_category", F.col("device.category"))
        .withColumn("os", F.col("device.operating_system"))
        .withColumn("browser", F.col("device.web_info.browser"))
        .withColumn("session_id", 
                    F.expr("filter(event_params, x -> x.key = 'ga_session_id')[0].value.int_value").cast("string"))
        .withColumn("session_sequence", F.col("event_bundle_sequence_id"))
        .withColumn("is_engaged_session", 
                    F.expr("filter(event_params, x -> x.key = 'session_engaged')[0].value.int_value").cast("int"))
        .withColumn("upsert_dttm", F.current_timestamp())
        .withColumn("ingestion_id", F.monotonically_increasing_id())
    )
    try:
        max_sk = (
        spark.sql(f"""
            SELECT COALESCE(MAX(CAST(REGEXP_REPLACE(session_sk, '[^0-9]', '') AS BIGINT)), 0) AS max_num
            FROM {CATALOG}.{SILVER_SCHEMA}.ga_silver_web_sessions
        """).collect()[0]["max_num"]
        )
    except:
        max_sk = 0
    
    # Step 7: Assign sequential numbers and generate alphanumeric key
    row_window = Window.orderBy(F.col("session_start_time"))
    
    # Generate deterministic surrogate key
    df_with_sk = (
        df_transformed
        .withColumn("temp_row_id", F.row_number().over(row_window) + max_sk)
        .withColumn("session_sk", 
        F.concat(
                F.lit(SK_PREFIX),
                F.lpad(F.col("temp_row_id").cast("string"), SK_PADDING, "0")
            ))
        .drop("temp_row_id")
        )
    
    # Select final columns
    final_df = df_with_sk.select(
        "session_sk",
        "session_date",
        user_col_name,
        "session_id",
        traffic_source_col,
        traffic_medium_col,
        traffic_campaign_col,
        "device_category",
        "os",
        "browser",
        "session_sequence",
        "session_start_time",
        "is_engaged_session",
        "upsert_dttm",
        "ingestion_id"
    ).withColumnRenamed(user_col_name, "user_id") \
     .withColumnRenamed(traffic_source_col, "traffic_source") \
     .withColumnRenamed(traffic_medium_col, "traffic_medium") \
     .withColumnRenamed(traffic_campaign_col, "traffic_campaign")
    
    return final_df

# -------------------------------
# Silver Table - Form Fills (Batch Incremental)
# -------------------------------
@dlt.table(
    name="ga_silver_form_fill",
    comment="Silver GA form submissions - Incremental batch processing",
    table_properties={
        "delta.autoOptimize.optimizeWrite": "true",
        "delta.autoOptimize.autoCompact": "true"
    }
)
@dlt.expect_or_drop("valid_form_event", "event_name = 'form_submit'")
def silver_form_fill():
    SK_PREFIX = "FF"
    """
    Incremental batch processing of form submission events.
    """
    df = dlt.read(bronze_table_full)
    
    # Filter only form_submit events
    df_forms = df.filter(F.col("event_name") == "form_submit")
    
    # Extract form-specific parameters
    df_transformed = (
        df_forms
        .withColumn("form_date", F.to_date((F.col("event_timestamp")/1000000).cast("timestamp")))
        .withColumn("form_submit_time", (F.col("event_timestamp")/1000000).cast("timestamp"))
        .withColumn("session_id", 
                    F.expr("filter(event_params, x -> x.key = 'ga_session_id')[0].value.int_value").cast("string"))
        .withColumn("form_id", 
                    F.expr("filter(event_params, x -> x.key = 'form_id')[0].value.string_value"))
        .withColumn("form_name", 
                    F.expr("filter(event_params, x -> x.key = 'form_name')[0].value.string_value"))
        .withColumn("form_page_url", 
                    F.expr("filter(event_params, x -> x.key = 'page_location')[0].value.string_value"))
        .withColumn("form_type", 
                    F.expr("filter(event_params, x -> x.key = 'form_type')[0].value.string_value"))
        .withColumn("insert_dttm", F.current_timestamp())
        .withColumn("ingestion_id", F.monotonically_increasing_id())
    )
    
    try:
        max_sk = (
        spark.sql(f"""
            SELECT COALESCE(MAX(CAST(REGEXP_REPLACE(form_fill_sk, '[^0-9]', '') AS BIGINT)), 0) AS max_num
            FROM {CATALOG}.{SILVER_SCHEMA}.ga_silver_form_fill
        """).collect()[0]["max_num"]
        )
    except:
        max_sk = 0
    
    # Step 7: Assign sequential numbers and generate alphanumeric key
    row_window = Window.orderBy(F.col("form_submit_time"))
    
    # Generate deterministic surrogate key
    df_with_sk = (
        df_transformed
        .withColumn("temp_row_id", F.row_number().over(row_window) + max_sk)
        .withColumn("form_fill_sk", 
        F.concat(
                F.lit(SK_PREFIX),
                F.lpad(F.col("temp_row_id").cast("string"), SK_PADDING, "0")
            ))
        .drop("temp_row_id")
        )
    
    # Select final columns
    final_df = df_with_sk.select(
        "form_fill_sk",
        "form_date",
        F.col("user_pseudo_id").alias("user_id"),
        "session_id",
        "form_submit_time",
        "event_name",
        "form_id",
        "form_name",
        "form_page_url",
        "form_type",
        "insert_dttm",
        "ingestion_id"
    )
    
    return final_df

# -------------------------------
# Silver Table - Web Events (Batch Incremental)
# -------------------------------
@dlt.table(
    name="ga_silver_web_event",
    comment="Silver GA web events - All event types - Incremental batch processing",
    table_properties={
        "delta.autoOptimize.optimizeWrite": "true",
        "delta.autoOptimize.autoCompact": "true"
    }
)
def silver_web_event():
    SK_PREFIX = "WE"
    """
    Incremental batch processing of all web events.
    Captures all event types with key parameters.
    """
    df = dlt.read(bronze_table_full)
    
    # Extract common event parameters
    df_transformed = (
        df
        .withColumn("event_date", F.to_date((F.col("event_timestamp")/1000000).cast("timestamp")))
        .withColumn("session_id", 
                    F.expr("filter(event_params, x -> x.key = 'ga_session_id')[0].value.int_value").cast("string"))
        .withColumn("ga_session_number", 
                    F.expr("filter(event_params, x -> x.key = 'ga_session_number')[0].value.int_value").cast("int"))
        .withColumn("engagement_time_msec", 
                    F.expr("filter(event_params, x -> x.key = 'engagement_time_msec')[0].value.int_value").cast("bigint"))
        .withColumn("insert_dttm", F.current_timestamp())
        .withColumn("ingestion_id", F.monotonically_increasing_id())
    )
    
    try:
        max_sk = (
        spark.sql(f"""
            SELECT COALESCE(MAX(CAST(REGEXP_REPLACE(web_event_sk, '[^0-9]', '') AS BIGINT)), 0) AS max_num
            FROM {CATALOG}.{SILVER_SCHEMA}.ga_silver_web_event
        """).collect()[0]["max_num"]
        )
    except:
        max_sk = 0
    
    # Step 7: Assign sequential numbers and generate alphanumeric key
    row_window = Window.orderBy(F.col("engagement_time_msec"))
    
    # Generate deterministic surrogate key
    df_with_sk = (
        df_transformed
        .withColumn("temp_row_id", F.row_number().over(row_window) + max_sk)
        .withColumn("web_event_sk", 
        F.concat(
                F.lit(SK_PREFIX),
                F.lpad(F.col("temp_row_id").cast("string"), SK_PADDING, "0")
            ))
        .drop("temp_row_id")
        )
    
    # Create event_id (business key) - combination of user + timestamp + event
    df_with_bk = df_with_sk.withColumn(
        "event_id",
        F.concat_ws("_", 
            F.col("user_pseudo_id"),
            F.col("event_timestamp").cast("string"),
            F.col("event_name")
        )
    )
    
    # Select final columns
    final_df = df_with_bk.select(
        "web_event_sk",
        "event_id",
        "event_date",
        "event_name",
        "event_bundle_sequence_id",
        F.col("user_id").alias("user_id"),  # CRM/login ID if available
        "session_id",
        "ga_session_number",
        "engagement_time_msec",
        "insert_dttm",
        "ingestion_id"
    )
    
    return final_df

# -------------------------------
# Silver Table - Purchases (Batch Incremental)
# -------------------------------
@dlt.table(
    name="ga_silver_purchase",
    comment="Silver GA purchases - Incremental batch processing",
    table_properties={
        "delta.autoOptimize.optimizeWrite": "true",
        "delta.autoOptimize.autoCompact": "true"
    }
)
# @dlt.expect_or_drop("valid_transaction_id", "transaction_id IS NOT NULL")
# @dlt.expect_or_drop("valid_product", "product_id IS NOT NULL")
# @dlt.expect_or_drop("valid_quantity", "quantity > 0")
# @dlt.expect_or_drop("valid_price", "price IS NOT NULL AND price >= 0")
# @dlt.expect_or_drop("valid_currency", "currency IS NOT NULL")
def silver_purchase():
    SK_PREFIX = "PUR"
    """
    Incremental batch processing of purchase events.
    Explodes items array to get individual product purchases.
    """
    df = dlt.read(bronze_table_full)
    
    # Filter only purchase events
    # df_purchases = df.filter(F.col("event_name") == "purchase")
    
    # Explode items array to get individual products
    df_exploded = df.withColumn("item", F.explode_outer(F.col("items")))
    
    # Extract purchase and item details
    df_transformed = (
        df_exploded
        .withColumn("purchase_date", F.to_date((F.col("event_timestamp")/1000000).cast("timestamp")))
        .withColumn("session_id", 
                    F.expr("filter(event_params, x -> x.key = 'ga_session_id')[0].value.int_value").cast("string"))
        .withColumn("transaction_id", 
                    F.expr("filter(event_params, x -> x.key = 'transaction_id')[0].value.string_value"))
        .withColumn("currency", 
                    F.expr("filter(event_params, x -> x.key = 'currency')[0].value.string_value"))
        .withColumn("product_id", F.col("item.item_id"))
        .withColumn("product_name", F.col("item.item_name"))
        .withColumn("item_variant", F.col("item.item_variant"))
        .withColumn("price", F.col("item.price"))
        .withColumn("quantity", F.col("item.quantity"))
        .withColumn("insert_dttm", F.current_timestamp())
        .withColumn("ingestion_id", F.monotonically_increasing_id())
    )
    
    try:
        max_sk = (
        spark.sql(f"""
            SELECT COALESCE(MAX(CAST(REGEXP_REPLACE(purchase_sk, '[^0-9]', '') AS BIGINT)), 0) AS max_num
            FROM {CATALOG}.{SILVER_SCHEMA}.ga_silver_purchase
        """).collect()[0]["max_num"]
        )
    except:
        max_sk = 0
    
    # Step 7: Assign sequential numbers and generate alphanumeric key
    row_window = Window.orderBy(F.col("purchase_date"))
    
    # Generate deterministic surrogate key
    df_with_sk = (
        df_transformed
        .withColumn("temp_row_id", F.row_number().over(row_window) + max_sk)
        .withColumn("purchase_sk", 
        F.concat(
                F.lit(SK_PREFIX),
                F.lpad(F.col("temp_row_id").cast("string"), SK_PADDING, "0")
            ))
        .drop("temp_row_id")
        )
    
    # Select final columns
    final_df = df_with_sk.select(
        "purchase_sk",
        "transaction_id",
        F.col("user_pseudo_id").alias("user_id"),
        "session_id",
        "product_id",
        "purchase_date",
        "currency",
        "product_name",
        "item_variant",
        "price",
        "quantity",
        "insert_dttm",
        "ingestion_id"
    )
    
    return final_df

# -------------------------------
# Silver Table - Page Views (Batch Incremental)
# -------------------------------
@dlt.table(
    name="ga_silver_page_views",
    comment="Silver GA page views - Incremental batch processing",
    table_properties={
        "delta.autoOptimize.optimizeWrite": "true",
        "delta.autoOptimize.autoCompact": "true"
    }
)
@dlt.expect_or_drop("valid_page_view_event", "event_name = 'page_view'")
def silver_page_views():
    SK_PREFIX = "PV"
    """
    Incremental batch processing of page view events.
    DLT automatically tracks what has been processed.
    """
    df = dlt.read(bronze_table_full)
    
    # Filter only page_view events
    df_page_views = df.filter(F.col("event_name") == "page_view")
    
    # Extract page view specific parameters from event_params array
    df_transformed = (
        df_page_views
        .withColumn("view_date", F.to_date((F.col("event_timestamp")/1000000).cast("timestamp")))
        .withColumn("view_timestamp", (F.col("event_timestamp")/1000000).cast("timestamp"))
        .withColumn("session_id", 
                    F.expr("filter(event_params, x -> x.key = 'ga_session_id')[0].value.int_value").cast("string"))
        .withColumn("page_url", 
                    F.expr("filter(event_params, x -> x.key = 'page_location')[0].value.string_value"))
        .withColumn("page_title", 
                    F.expr("filter(event_params, x -> x.key = 'page_title')[0].value.string_value"))
        .withColumn("referrer_url", 
                    F.expr("filter(event_params, x -> x.key = 'page_referrer')[0].value.string_value"))
        .withColumn("engagement_time", 
                    F.expr("filter(event_params, x -> x.key = 'engagement_time_msec')[0].value.int_value").cast("bigint"))
        .withColumn("insert_dttm", F.current_timestamp())
        .withColumn("ingestion_id", F.monotonically_increasing_id())
    )
    
    try:
        max_sk = (
        spark.sql(f"""
            SELECT COALESCE(MAX(CAST(REGEXP_REPLACE(page_view_sk, '[^0-9]', '') AS BIGINT)), 0) AS max_num
            FROM {CATALOG}.{SILVER_SCHEMA}.ga_silver_page_views
        """).collect()[0]["max_num"]
        )
    except:
        max_sk = 0
    
    # Step 7: Assign sequential numbers and generate alphanumeric key
    row_window = Window.orderBy(F.col("view_timestamp"))
    
    # Generate deterministic surrogate key
    df_with_sk = (
        df_transformed
        .withColumn("temp_row_id", F.row_number().over(row_window) + max_sk)
        .withColumn("page_view_sk", 
        F.concat(
                F.lit(SK_PREFIX),
                F.lpad(F.col("temp_row_id").cast("string"), SK_PADDING, "0")
            ))
        .drop("temp_row_id")
        )
    
    # Select final columns
    final_df = df_with_sk.select(
        "page_view_sk",
        "view_date",
        F.col("user_pseudo_id").alias("user_id"),
        "session_id",
        "view_timestamp",
        "event_name",
        "page_url",
        "page_title",
        "referrer_url",
        "engagement_time",
        "insert_dttm",
        "ingestion_id"
    )
    
    return final_df

# -------------------------------
# Silver Table - Users (Batch Incremental)
# -------------------------------
@dlt.table(
    name="ga_silver_web_users",
    comment="Silver GA web users - Incremental batch processing",
    table_properties={
        "delta.autoOptimize.optimizeWrite": "true",
        "delta.autoOptimize.autoCompact": "true"
    }
)
def silver_users():
    SK_PREFIX = "WU"
    """
    Incremental batch processing of users.
    DLT automatically tracks what has been processed.
    """
    df = dlt.read(bronze_table_full)
    
    # Explode user_properties array
    df_exploded = df.withColumn("user_prop", F.explode_outer(F.col("user_properties"))) \
                    .withColumn("user_property_name", F.col("user_prop.key")) \
                    .withColumn("user_property_value", F.coalesce(
                        F.col("user_prop.value.string_value"),
                        F.col("user_prop.value.int_value").cast("string"),
                        F.col("user_prop.value.float_value").cast("string"),
                        F.col("user_prop.value.double_value").cast("string")
                    ))
    
    # Aggregate at user level with window functions
    window_first = Window.partitionBy("user_pseudo_id").orderBy(F.col("event_timestamp").asc())
    
    df_users = (
        df_exploded
        .withColumn("first_touch_medium", F.first(F.col("traffic_source.medium")).over(window_first))
        .withColumn("first_touch_source", F.first(F.col("traffic_source.source")).over(window_first))
        .withColumn("first_touch_campaign", F.first(F.col("traffic_source.name")).over(window_first))
        .groupBy("user_pseudo_id")
        .agg(
            F.first("user_id").alias("user_id"),
            F.min("user_first_touch_timestamp").alias("first_seen_time"),
            F.first("device.category").alias("preferred_device"),
            F.first("geo.country").alias("home_country"),
            F.first("geo.city").alias("home_city"),
            F.first("first_touch_medium").alias("first_touch_medium"),
            F.first("first_touch_source").alias("first_touch_source"),
            F.first("first_touch_campaign").alias("first_touch_campaign"),
            F.first("user_property_name").alias("user_property_name"),
            F.first("user_property_value").alias("user_property_value")
        )
        .withColumn("identified_user_flg", 
                    F.when(F.col("user_id").isNotNull(), F.lit("Known"))
                     .otherwise(F.lit("Unknown")))
        .withColumn("start_dttm", F.current_timestamp())
        .withColumn("end_dttm", F.lit(None).cast("timestamp"))
        .withColumn("ingestion_id", F.monotonically_increasing_id())
    )
    
    # Create surrogate key
    try:
        max_sk = (
        spark.sql(f"""
            SELECT COALESCE(MAX(CAST(REGEXP_REPLACE(user_sk, '[^0-9]', '') AS BIGINT)), 0) AS max_num
            FROM {CATALOG}.{SILVER_SCHEMA}.ga_silver_web_users
        """).collect()[0]["max_num"]
        )
    except:
        max_sk = 0
    
    # Step 7: Assign sequential numbers and generate alphanumeric key
    row_window = Window.orderBy(F.col("start_dttm"))
    
    # Generate deterministic surrogate key
    df_with_sk = (
        df_users
        .withColumn("temp_row_id", F.row_number().over(row_window) + max_sk)
        .withColumn("user_sk", 
        F.concat(
                F.lit(SK_PREFIX),
                F.lpad(F.col("temp_row_id").cast("string"), SK_PADDING, "0")
            ))
        .drop("temp_row_id")
        )
    
    # Select final columns
    final_df = df_with_sk.select(
        "user_sk",
        "user_id",
        F.col("user_pseudo_id").alias("pseudo_id"),
        "first_seen_time",
        "preferred_device",
        "home_country",
        "home_city",
        "first_touch_medium",
        "first_touch_source",
        "first_touch_campaign",
        "user_property_name",
        "user_property_value",
        "identified_user_flg",
        "start_dttm",
        "end_dttm",
        "ingestion_id"
    )
    
    return final_df