# Streaming to Silver

Incrementally processes real-time events from Eventhouse (cusn schema) to Silver Delta tables.

## Data Flow
```
Tables/cusn.* (Eventhouse) --> Silver (Delta)
```

## Usage
Schedule this notebook to run **every 5 minutes** via Fabric pipeline.

Uses watermarks stored in `ag._watermarks` to track last processed timestamp per table.

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.utils import AnalysisException
from datetime import datetime, timezone
import os

In [None]:
# =============================================================================
# PARAMETERS
# =============================================================================

def get_env(var_name, default=None):
    return os.environ.get(var_name, default)

SILVER_DB = get_env("SILVER_DB", default="ag")
BRONZE_SCHEMA = get_env("BRONZE_SCHEMA", default="cusn")
WATERMARK_TABLE = f"{SILVER_DB}._watermarks"

print(f"Configuration: SILVER_DB={SILVER_DB}, BRONZE_SCHEMA={BRONZE_SCHEMA}")

In [None]:
# =============================================================================
# WATERMARK MANAGEMENT
# =============================================================================

def ensure_watermark_table():
    spark.sql(f"""
        CREATE TABLE IF NOT EXISTS {WATERMARK_TABLE} (
            source_table STRING,
            last_processed_ts TIMESTAMP,
            updated_at TIMESTAMP
        )
        USING DELTA
    """)
    print(f"Watermark table: {WATERMARK_TABLE}")

def get_watermark(source_table):
    try:
        result = spark.sql(f"""
            SELECT last_processed_ts 
            FROM {WATERMARK_TABLE} 
            WHERE source_table = '{source_table}'
        """).collect()
        if result:
            return result[0][0]
    except Exception:
        pass
    return datetime(1970, 1, 1, tzinfo=timezone.utc)

def update_watermark(source_table, new_ts):
    now = datetime.now(timezone.utc)
    spark.sql(f"""
        MERGE INTO {WATERMARK_TABLE} AS target
        USING (SELECT '{source_table}' AS source_table) AS source
        ON target.source_table = source.source_table
        WHEN MATCHED THEN UPDATE SET 
            last_processed_ts = '{new_ts}',
            updated_at = '{now}'
        WHEN NOT MATCHED THEN INSERT 
            (source_table, last_processed_ts, updated_at)
            VALUES ('{source_table}', '{new_ts}', '{now}')
    """)

ensure_watermark_table()

In [None]:
# =============================================================================
# HELPER FUNCTIONS
# =============================================================================

def cast_id_columns(df):
    """Cast ID columns to proper integer types to fix type mismatches."""
    
    # Define ID columns that should be int64/long
    id_columns_to_cast = {
        # Dimension IDs
        "store_id": "long",
        "dc_id": "long", 
        "truck_id": "long",
        "customer_id": "long",
        "product_id": "long",
        "geography_id": "long",
        
        # Other numeric IDs
        "line_number": "int",
        "line_num": "int",
        "quantity": "int",
        "count": "int",
        "item_count": "int",
        "dwell_seconds": "int",
        "rssi": "int"
    }
    
    # Cast columns if they exist in the dataframe
    for col_name, col_type in id_columns_to_cast.items():
        if col_name in df.columns:
            df = df.withColumn(col_name, F.col(col_name).cast(col_type))
    
    return df

def streaming_table_exists(table_name):
    try:
        spark.table(f"{BRONZE_SCHEMA}.{table_name}")
        return True
    except AnalysisException:
        return False

def process_events(source_table, target_table, transform_fn, ts_col="ingest_timestamp"):
    """
    Process new events from Eventhouse and append to Silver.
    
    Args:
        source_table: Eventhouse source (e.g., "receipt_created")
        target_table: Silver target (e.g., "fact_receipts")
        transform_fn: Schema transformation function
        ts_col: Timestamp column for watermarking
    """
    print(f"\n{BRONZE_SCHEMA}.{source_table} -> {SILVER_DB}.{target_table}")
    
    if not streaming_table_exists(source_table):
        print(f"  Skipping: source not found")
        return 0
    
    last_ts = get_watermark(source_table)
    print(f"  Watermark: {last_ts}")
    
    df_stream = spark.table(f"{BRONZE_SCHEMA}.{source_table}")
    df_new = df_stream.filter(F.col(ts_col) > last_ts)
    
    new_count = df_new.count()
    if new_count == 0:
        print(f"  No new events")
        return 0
    
    print(f"  Processing {new_count} events")
    
    # Transform and cast ID columns
    df_transformed = transform_fn(df_new)
    df_transformed = cast_id_columns(df_transformed)
    
    df_transformed.write.format("delta").mode("append").saveAsTable(f"{SILVER_DB}.{target_table}")
    
    max_ts = df_new.agg(F.max(ts_col)).collect()[0][0]
    update_watermark(source_table, max_ts)
    print(f"  Appended {new_count} rows, watermark -> {max_ts}")
    
    return new_count

In [None]:
# =============================================================================# TRANSFORM FUNCTIONS# =============================================================================DEFAULT_RECEIPT_TYPE = "SALE"DEFAULT_DISCOUNT = 0.0def transform_receipt_created(df):    return df.select(        F.col("ingest_timestamp").alias("event_ts"),        F.col("receipt_id").alias("receipt_id_ext"),        F.col("tender_type").alias("payment_method"),        F.lit(DEFAULT_DISCOUNT).cast("string").alias("discount_amount"),        F.col("tax"),        F.round(F.col("tax") * 100).cast("bigint").alias("tax_cents"),        F.col("subtotal"),        F.col("total"),        F.round(F.col("total") * 100).cast("bigint").alias("total_cents"),        F.lit(DEFAULT_RECEIPT_TYPE).alias("receipt_type"),        F.round(F.col("subtotal") * 100).cast("bigint").alias("subtotal_cents"),        F.col("customer_id").cast("long"),        F.col("store_id").cast("long"),        F.lit(None).cast("string").alias("return_for_receipt_id_ext")    )def transform_receipt_line_added(df):    return df.select(        F.col("receipt_id").alias("receipt_id_ext"),        F.col("ingest_timestamp").alias("event_ts"),        F.col("product_id").cast("long"),        F.col("line_number").cast("int").alias("line_num"),        F.col("quantity").cast("int"),        F.col("unit_price"),        F.col("extended_price").alias("ext_price"),        F.round(F.col("unit_price") * 100).cast("bigint").alias("unit_cents"),        F.round(F.col("extended_price") * 100).cast("bigint").alias("ext_cents"),        F.col("promo_code")    )def transform_payment_processed(df):    return df.select(        F.col("ingest_timestamp").alias("event_ts"),        F.col("transaction_id").alias("payment_id"),        F.col("receipt_id"),        F.col("payment_method").alias("tender_type"),        F.col("amount"),        F.round(F.col("amount") * 100).cast("bigint").alias("amount_cents")    )def transform_inventory_updated(df):    return df.select(        F.col("ingest_timestamp").alias("event_ts"),        F.col("store_id").cast("long"),        F.col("product_id").cast("long"),        F.col("quantity_delta").alias("delta"),        F.lit(None).cast("long").alias("balance"),  # Balance calculated later        F.col("reason")    )def transform_customer_entered(df):    return df.select(        F.col("ingest_timestamp").alias("event_ts"),        F.col("store_id").cast("long"),        F.lit(None).cast("long").alias("customer_id"),  # Not always available        F.col("zone"),        F.col("dwell_time").cast("int").alias("dwell_seconds"),        F.col("customer_count").cast("int").alias("count")    )def transform_truck_arrived(df):    return df.select(        F.col("ingest_timestamp").alias("event_ts"),        F.col("truck_id"),        F.col("dc_id").cast("long"),        F.col("store_id").cast("long"),        F.col("shipment_id"),        F.col("arrival_time").alias("eta"),        F.lit(None).cast("timestamp").alias("etd"),        F.lit("ARRIVED").alias("status")    )def transform_truck_departed(df):    return df.select(        F.col("ingest_timestamp").alias("event_ts"),        F.col("truck_id"),        F.col("dc_id").cast("long"),        F.col("store_id").cast("long"),        F.col("shipment_id"),        F.lit(None).cast("timestamp").alias("eta"),        F.col("departure_time").alias("etd"),        F.lit("DEPARTED").alias("status")    )

In [None]:
print("="*60)print("STREAMING TO SILVER")print("="*60)total = 0# Transaction eventstotal += process_events("receipt_created", "fact_receipts", transform_receipt_created)total += process_events("receipt_line_added", "fact_receipt_lines", transform_receipt_line_added)total += process_events("payment_processed", "fact_payments", transform_payment_processed)# Inventory eventstotal += process_events("inventory_updated", "fact_store_inventory_txn", transform_inventory_updated)# Customer eventstotal += process_events("customer_entered", "fact_foot_traffic", transform_customer_entered)# Truck eventstotal += process_events("truck_arrived", "fact_truck_moves", transform_truck_arrived)total += process_events("truck_departed", "fact_truck_moves", transform_truck_departed)# TODO: Add remaining event types:# - stockout_detected -> fact_stockouts# - reorder_triggered -> fact_reorders# - truck_arrived/departed -> fact_truck_moves# - store_opened/closed -> fact_store_ops# - ad_impression -> fact_marketing# - promotion_applied -> fact_promotions# - online_order_* -> fact_online_order_*print("\n" + "="*60)print(f"COMPLETE: {total} events processed")print("="*60)

In [None]:
# Show watermarks
print("\nCurrent Watermarks:")
spark.table(WATERMARK_TABLE).show(truncate=False)