In [0]:
%pip install azure-eventhub
dbutils.library.restartPython()

In [0]:
%run ./config

In [0]:

# COMMAND ----------

# MAGIC %md
# MAGIC ## Define Schema

# COMMAND ----------

# Event payload schema
event_schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("customer_id", StringType(), True),
    StructField("customer_name", StringType(), True),
    StructField("location", StringType(), True),
    StructField("product_id", StringType(), True),
    StructField("product_name", StringType(), True),
    StructField("category", StringType(), True),
    StructField("brand", StringType(), True),
    StructField("order_status", StringType(), True),
    StructField("payment_method", StringType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("discount_pct", DoubleType(), True),
    StructField("total_amount", DoubleType(), True),
    StructField("base_price", DoubleType(), True),
    StructField("unit_price", DoubleType(), True),
    StructField("order_timestamp", StringType(), True)
])

print("‚úì Event schema defined")

# COMMAND ----------

# MAGIC %md
# MAGIC ## Stream 1: Read from Event Hub and Parse Events

# COMMAND ----------

print("="*70)
print("STARTING EVENT HUB STREAMING")
print("="*70)

# Read streaming data from Event Hub
raw_stream = (spark.readStream
    .format("kafka")
    .options(**KAFKA_OPTIONS)
    .load()
)

# Parse JSON payload and add metadata
parsed_stream = (raw_stream
    .selectExpr("CAST(value AS STRING) as json_value", "timestamp", "offset", "partition")
    .withColumn("parsed_data", from_json(col("json_value"), event_schema))
    .select(
        col("parsed_data.*"),
        col("timestamp").alias("event_time"),
        col("offset").alias("kafka_offset"),
        col("partition").alias("partition_id")
    )
    .withColumn("bronze_timestamp", current_timestamp())
    .withColumn("order_timestamp_parsed", to_timestamp(col("order_timestamp")))
)

print("‚úì Stream parsing configured")

# COMMAND ----------

# MAGIC %md
# MAGIC ## Stream 2: Write Orders to Bronze Table

# COMMAND ----------

print("\n" + "="*70)
print("STARTING ORDERS STREAM")
print("="*70)

# Select orders columns
orders_stream = parsed_stream.select(
    col("order_id"),
    col("customer_id"),
    col("customer_name"),
    col("location"),
    col("product_id"),  # FK to products
    col("order_status"),
    col("payment_method"),
    col("quantity"),
    col("discount_pct"),
    col("total_amount"),
    col("order_timestamp"),
    col("order_timestamp_parsed"),
    col("event_time"),
    col("kafka_offset"),
    col("partition_id"),
    col("bronze_timestamp")
)

# Write orders stream to Delta table
orders_query = (orders_stream
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", orders_checkpoint)
    .option("mergeSchema", "true")
    .trigger(processingTime="10 seconds")  # Micro-batch every 10 seconds
    .toTable(bronze_orders_table)
)

print(f"‚úì Orders stream started")
print(f"  Query ID: {orders_query.id}")
print(f"  Target: {bronze_orders_table}")
print(f"  Checkpoint: {orders_checkpoint}")

# COMMAND ----------

# MAGIC %md
# MAGIC ## Stream 3: Deduplicate and Write Products to Bronze Table

# COMMAND ----------

print("\n" + "="*70)
print("STARTING PRODUCTS STREAM")
print("="*70)

# Select and deduplicate products
products_stream = (parsed_stream
    .select(
        col("product_id"),
        col("product_name"),
        col("category"),
        col("brand"),
        col("base_price"),
        col("unit_price"),
        col("bronze_timestamp")
    )
    .dropDuplicates(["product_id"])  # Keep only unique products per batch
)

# Write products stream to Delta table with merge logic
products_query = (products_stream
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", products_checkpoint)
    .option("mergeSchema", "true")
    .trigger(processingTime="10 seconds")
    .foreachBatch(lambda batch_df, batch_id: upsert_products(batch_df, batch_id))
    .start()
)

def upsert_products(batch_df, batch_id):
    """
    Upsert products to avoid duplicates across batches
    """
    if batch_df.count() == 0:
        return
    
    # Create temp view
    batch_df.createOrReplaceTempView("products_batch")
    
    # Merge logic
    merge_query = f"""
    MERGE INTO {bronze_products_table} target
    USING products_batch source
    ON target.product_id = source.product_id
    WHEN MATCHED THEN
        UPDATE SET
            target.product_name = source.product_name,
            target.category = source.category,
            target.brand = source.brand,
            target.base_price = source.base_price,
            target.unit_price = source.unit_price,
            target.bronze_timestamp = source.bronze_timestamp
    WHEN NOT MATCHED THEN
        INSERT (product_id, product_name, category, brand, base_price, unit_price, bronze_timestamp)
        VALUES (source.product_id, source.product_name, source.category, source.brand, 
                source.base_price, source.unit_price, source.bronze_timestamp)
    """
    
    # Execute merge
    try:
        spark.sql(merge_query)
        print(f"  Batch {batch_id}: Upserted {batch_df.count()} products")
    except Exception as e:
        # If table doesn't exist, create it
        if "TABLE_OR_VIEW_NOT_FOUND" in str(e):
            batch_df.write.format("delta").mode("append").saveAsTable(bronze_products_table)
            print(f"  Batch {batch_id}: Created table and inserted {batch_df.count()} products")
        else:
            raise e

print(f"‚úì Products stream started")
print(f"  Query ID: {products_query.id}")
print(f"  Target: {bronze_products_table}")
print(f"  Checkpoint: {products_checkpoint}")

# COMMAND ----------

# MAGIC %md
# MAGIC ## Monitor Streaming Queries

# COMMAND ----------

print("\n" + "="*70)
print("ACTIVE STREAMING QUERIES")
print("="*70)

for stream in spark.streams.active:
    print(f"\nQuery ID: {stream.id}")
    print(f"  Name: {stream.name if stream.name else 'unnamed'}")
    print(f"  Status: {stream.status['message']}")
    print(f"  Is Active: {stream.isActive}")
    
    if stream.recentProgress:
        latest = stream.recentProgress[-1]
        print(f"  Recent Progress:")
        print(f"    - Batch: {latest.get('batchId', 'N/A')}")
        print(f"    - Input Rows: {latest.get('numInputRows', 0)}")
        print(f"    - Processing Rate: {latest.get('processedRowsPerSecond', 0):.2f} rows/sec")

# COMMAND ----------

# MAGIC %md
# MAGIC ## Wait and Monitor (Run for 30 seconds)

# COMMAND ----------

import time

print("\n" + "="*70)
print("MONITORING STREAMS FOR 30 SECONDS")
print("="*70)

for i in range(6):
    time.sleep(5)
    
    # Check orders table
    try:
        orders_count = spark.table(bronze_orders_table).count()
        print(f"\n‚è±Ô∏è  {(i+1)*5}s - Orders: {orders_count} records")
    except:
        print(f"\n‚è±Ô∏è  {(i+1)*5}s - Orders table not created yet")
    
    # Check products table
    try:
        products_count = spark.table(bronze_products_table).count()
        print(f"    Products: {products_count} records")
    except:
        print(f"    Products table not created yet")

print("\n‚úì Monitoring complete")

# COMMAND ----------

# MAGIC %md
# MAGIC ## Verify Bronze Tables

# COMMAND ----------

print("\n" + "="*70)
print("BRONZE LAYER VERIFICATION")
print("="*70)

# Orders table
try:
    orders_df = spark.table(bronze_orders_table)
    orders_count = orders_df.count()
    
    print(f"\nüìä Orders Table: {bronze_orders_table}")
    print(f"   Total Records: {orders_count}")
    
    if orders_count > 0:
        print("\n   Latest 10 orders:")
        display(orders_df.orderBy(desc("bronze_timestamp")).limit(10))
        
        print("\n   Orders by Location:")
        display(
            orders_df.groupBy("location")
            .agg(
                count("*").alias("order_count"),
                sum("total_amount").alias("total_revenue")
            )
            .orderBy(desc("order_count"))
        )
except Exception as e:
    print(f"\n‚ö†Ô∏è  Orders table not available: {e}")

# COMMAND ----------

# Products table
try:
    products_df = spark.table(bronze_products_table)
    products_count = products_df.count()
    
    print(f"\nüìä Products Table: {bronze_products_table}")
    print(f"   Total Records: {products_count}")
    
    if products_count > 0:
        print("\n   All products:")
        display(products_df.orderBy("product_id"))
        
        print("\n   Products by Category:")
        display(
            products_df.groupBy("category", "brand")
            .count()
            .orderBy("category", "brand")
        )
except Exception as e:
    print(f"\n‚ö†Ô∏è  Products table not available: {e}")

# COMMAND ----------

# MAGIC %md
# MAGIC ## Stream Management

# COMMAND ----------

# Option 1: Keep streams running
print("\nüí° Streams are running continuously")
print("   They will process new events as they arrive in Event Hub")
print("\n   To stop streams, run the next cell")

# COMMAND ----------

# MAGIC %md
# MAGIC ## Stop All Streams (Run when done)

# COMMAND ----------

# Uncomment to stop all streams
"""
print("=" * 70)
print("STOPPING ALL STREAMING QUERIES")
print("=" * 70)

for stream in spark.streams.active:
    print(f"\nStopping: {stream.id}")
    stream.stop()
    print(f"  ‚úì Stopped")

print("\n‚úì All streams stopped")
print("\nNote: You can restart streams by re-running the stream cells above")
"""

# COMMAND ----------

# MAGIC %md
# MAGIC ## Summary

# COMMAND ----------

print("\n" + "="*70)
print("‚úì BRONZE LAYER STREAMING SETUP COMPLETE")
print("="*70)

try:
    orders_count = spark.table(bronze_orders_table).count()
    products_count = spark.table(bronze_products_table).count()
    
    print(f"\nCurrent State:")
    print(f"  Orders: {orders_count} records")
    print(f"  Products: {products_count} records")
    print(f"\nStreaming Status:")
    print(f"  Active Queries: {len(spark.streams.active)}")
    
    for stream in spark.streams.active:
        print(f"    - {stream.id}: {stream.status['message']}")
    
    print(f"\nData Flow:")
    print(f"  Event Hub ({eh_name})")
    print(f"    ‚Üì Kafka Streaming")
    print(f"  Parsed Events")
    print(f"    ‚Üì Split & Transform")
    print(f"  Bronze Tables")
    print(f"    ‚Ä¢ Orders (with product_id FK)")
    print(f"    ‚Ä¢ Products (deduplicated)")
    
    print("\n" + "="*70)
    print("Next Steps:")
    print("  1. Monitor streams using 'Monitor Streaming Queries' cell")
    print("  2. Run Silver Layer notebook to join Orders with Products")
    print("  3. Stop streams when done using 'Stop All Streams' cell")
    print("="*70)
    
except Exception as e:
    print(f"\n‚ö†Ô∏è  Tables not created yet. Wait for streams to process data.")

In [0]:
%sql
select count(*) from   `na-dbxtraining`.biju_bronze.products

In [0]:
# Table Names (with backticks for catalog)
bronze_orders_table = f"`{catalog}`.{schema_bronze}.orders"
bronze_products_table = f"`{catalog}`.{schema_bronze}.products"

In [0]:
# Databricks notebook source
# MAGIC %md
# MAGIC # Bronze Layer - Orders Stream
# MAGIC Reads events from Event Hub and creates Orders table

# COMMAND ----------

from pyspark.sql.functions import *
from pyspark.sql.types import *

# COMMAND ----------

# MAGIC %md
# MAGIC ## Configuration

# COMMAND ----------

# Event Hub Configuration
eh_namespace = "evhns-natraining.servicebus.windows.net"
eh_name = "evh-natraining-biju"
keyvault_scope = "dbx-ss-kv-natraining-2"
secret_name = "evh-natraining-read-write"
shared_access_key_name = "SharedAccessKeyToSendAndListen"

# Unity Catalog Configuration
catalog = "na-dbxtraining"
schema_bronze = "biju_bronze"

# Table Names (with backticks for catalog)
bronze_orders_table = f"`{catalog}`.{schema_bronze}.orders"

# Checkpoint location
checkpoint_base = f"/Volumes/na-dbxtraining/biju_raw/biju_vol/checkpoints/{catalog.replace('-', '_')}"
orders_checkpoint = f"{checkpoint_base}/bronze_orders"

print("="*70)
print("ORDERS STREAM CONFIGURATION")
print("="*70)
print(f"Event Hub: {eh_name}")
print(f"Orders Table: {bronze_orders_table}")
print(f"Checkpoint: {orders_checkpoint}")
print("="*70)

# COMMAND ----------

# MAGIC %md
# MAGIC ## Get Secret from Key Vault

# COMMAND ----------

try:
    secret_value = dbutils.secrets.get(scope=keyvault_scope, key=secret_name)
    print("‚úì Successfully retrieved secret from Key Vault")
except Exception as e:
    print(f"‚úó Error retrieving secret: {str(e)}")
    raise

# COMMAND ----------

# MAGIC %md
# MAGIC ## Build Connection String and Kafka Options

# COMMAND ----------

# Connection string
connection_string = (
    f"Endpoint=sb://{eh_namespace}/;"
    f"SharedAccessKeyName={shared_access_key_name};"
    f"SharedAccessKey={secret_value}"
)

# Kafka options for streaming
KAFKA_OPTIONS = {
    "kafka.bootstrap.servers": f"{eh_namespace}:9093",
    "subscribe": eh_name,
    "kafka.sasl.mechanism": "PLAIN",
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.jaas.config": f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{connection_string}";',
    "kafka.request.timeout.ms": "60000",
    "kafka.session.timeout.ms": "30000",
    "failOnDataLoss": "false",
    "startingOffsets": "earliest",
    "maxOffsetsPerTrigger": "10000"
}

print("‚úì Kafka options configured for streaming")

# COMMAND ----------

# MAGIC %md
# MAGIC ## Define Orders Schema

# COMMAND ----------

# Event payload schema
event_schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("customer_id", StringType(), True),
    StructField("customer_name", StringType(), True),
    StructField("location", StringType(), True),
    StructField("product_id", StringType(), True),
    StructField("product_name", StringType(), True),
    StructField("category", StringType(), True),
    StructField("brand", StringType(), True),
    StructField("order_status", StringType(), True),
    StructField("payment_method", StringType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("discount_pct", DoubleType(), True),
    StructField("total_amount", DoubleType(), True),
    StructField("base_price", DoubleType(), True),
    StructField("unit_price", DoubleType(), True),
    StructField("order_timestamp", StringType(), True)
])

print("‚úì Event schema defined")

# COMMAND ----------

# MAGIC %md
# MAGIC ## Read from Event Hub and Parse Events

# COMMAND ----------

print("="*70)
print("STARTING EVENT HUB STREAMING FOR ORDERS")
print("="*70)

# Read streaming data from Event Hub
raw_stream = (spark.readStream
    .format("kafka")
    .options(**KAFKA_OPTIONS)
    .load()
)

# Parse JSON payload and add metadata
parsed_stream = (raw_stream
    .selectExpr("CAST(value AS STRING) as json_value", "timestamp", "offset", "partition")
    .withColumn("parsed_data", from_json(col("json_value"), event_schema))
    .select(
        col("parsed_data.*"),
        col("timestamp").alias("event_time"),
        col("offset").alias("kafka_offset"),
        col("partition").alias("partition_id")
    )
    .withColumn("bronze_timestamp", current_timestamp())
    .withColumn("order_timestamp_parsed", to_timestamp(col("order_timestamp")))
)

print("‚úì Stream parsing configured")

# COMMAND ----------

# MAGIC %md
# MAGIC ## Select Orders Columns

# COMMAND ----------

print("\n" + "="*70)
print("CONFIGURING ORDERS STREAM")
print("="*70)

# Select orders columns
orders_stream = parsed_stream.select(
    col("order_id"),
    col("customer_id"),
    col("customer_name"),
    col("location"),
    col("product_id"),  # FK to products
    col("order_status"),
    col("payment_method"),
    col("quantity"),
    col("discount_pct"),
    col("total_amount"),
    col("order_timestamp"),
    col("order_timestamp_parsed"),
    col("event_time"),
    col("kafka_offset"),
    col("partition_id"),
    col("bronze_timestamp")
)

print("‚úì Orders columns selected")

# COMMAND ----------

# MAGIC %md
# MAGIC ## Write Orders Stream to Bronze Table

# COMMAND ----------

print("\n" + "="*70)
print("STARTING ORDERS STREAM WRITE")
print("="*70)

# Write orders stream to Delta table
orders_query = (orders_stream
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", orders_checkpoint)
    .option("mergeSchema", "true")
  #  .trigger(processingTime="10 seconds")  # Micro-batch every 10 seconds
    .trigger(once=True) # for testing 
    .toTable(bronze_orders_table)
)

print(f"‚úì Orders stream started")
print(f"  Query ID: {orders_query.id}")
print(f"  Target: {bronze_orders_table}")
print(f"  Checkpoint: {orders_checkpoint}")
print(f"  Trigger: 10 seconds")

# COMMAND ----------

# MAGIC %md
# MAGIC ## Monitor Orders Stream

# COMMAND ----------

print("\n" + "="*70)
print("ACTIVE ORDERS STREAM")
print("="*70)

for stream in spark.streams.active:
    if stream.id == orders_query.id:
        print(f"\nQuery ID: {stream.id}")
        print(f"  Name: {stream.name if stream.name else 'unnamed'}")
        print(f"  Status: {stream.status['message']}")
        print(f"  Is Active: {stream.isActive}")
        
        if stream.recentProgress:
            latest = stream.recentProgress[-1]
            print(f"  Recent Progress:")
            print(f"    - Batch: {latest.get('batchId', 'N/A')}")
            print(f"    - Input Rows: {latest.get('numInputRows', 0)}")
            print(f"    - Processing Rate: {latest.get('processedRowsPerSecond', 0):.2f} rows/sec")

# COMMAND ----------

# MAGIC %md
# MAGIC ## Wait and Monitor (30 seconds)

# COMMAND ----------

import time

print("\n" + "="*70)
print("MONITORING ORDERS STREAM FOR 30 SECONDS")
print("="*70)

for i in range(6):
    time.sleep(5)
    
    try:
        orders_count = spark.table(bronze_orders_table).count()
        print(f"\n‚è±Ô∏è  {(i+1)*5}s - Orders: {orders_count} records")
    except:
        print(f"\n‚è±Ô∏è  {(i+1)*5}s - Orders table not created yet")

print("\n‚úì Monitoring complete")

# COMMAND ----------

# MAGIC %md
# MAGIC ## Verify Orders Table

# COMMAND ----------

print("\n" + "="*70)
print("ORDERS TABLE VERIFICATION")
print("="*70)

try:
    orders_df = spark.table(bronze_orders_table)
    orders_count = orders_df.count()
    
    print(f"\nüìä Orders Table: {bronze_orders_table}")
    print(f"   Total Records: {orders_count}")
    
    if orders_count > 0:
        print("\n   Schema:")
        orders_df.printSchema()
        
        print("\n   Latest 10 orders:")
        display(orders_df.orderBy(desc("bronze_timestamp")).limit(10))
        
        print("\n   Orders by Location:")
        display(
            orders_df.groupBy("location")
            .agg(
                count("*").alias("order_count"),
                sum("total_amount").alias("total_revenue")
            )
            .orderBy(desc("order_count"))
        )
        
        print("\n   Orders by Status:")
        display(
            orders_df.groupBy("order_status")
            .count()
            .orderBy(desc("count"))
        )
        
except Exception as e:
    print(f"\n‚ö†Ô∏è  Orders table not available: {e}")

# COMMAND ----------

# MAGIC %md
# MAGIC ## Stop Orders Stream (Run when done)

# COMMAND ----------

# Uncomment to stop the orders stream
"""
print("=" * 70)
print("STOPPING ORDERS STREAM")
print("=" * 70)

if orders_query.isActive:
    print(f"\nStopping: {orders_query.id}")
    orders_query.stop()
    print(f"  ‚úì Stopped")
else:
    print("\nStream is not active")

print("\n‚úì Orders stream stopped")
"""

# COMMAND ----------

# MAGIC %md
# MAGIC ## Summary

# COMMAND ----------

print("\n" + "="*70)
print("‚úì ORDERS STREAM SETUP COMPLETE")
print("="*70)

try:
    orders_count = spark.table(bronze_orders_table).count()
    
    print(f"\nCurrent State:")
    print(f"  Orders: {orders_count} records")
    
    print(f"\nStreaming Status:")
    print(f"  Query Active: {orders_query.isActive}")
    print(f"  Query ID: {orders_query.id}")
    
    print(f"\nData Flow:")
    print(f"  Event Hub ({eh_name})")
    print(f"    ‚Üì Kafka Protocol")
    print(f"  Raw Kafka Stream")
    print(f"    ‚Üì Parse JSON")
    print(f"  Parsed Events")
    print(f"    ‚Üì Select Orders Columns")
    print(f"  Bronze Orders Table")
    
    print("\n" + "="*70)
    print("Next Steps:")
    print("  1. Run Bronze-Products notebook to create products table")
    print("  2. Run Silver Layer notebook to join Orders with Products")
    print("  3. Stop stream when done using 'Stop Orders Stream' cell")
    print("="*70)
    
except Exception as e:
    print(f"\n‚ö†Ô∏è  Orders table not created yet. Wait for stream to process data.")

In [0]:
# Databricks notebook source
# MAGIC %md
# MAGIC # Bronze Layer - Products Stream
# MAGIC Reads events from Event Hub and creates deduplicated Products table

# COMMAND ----------

from pyspark.sql.functions import *
from pyspark.sql.types import *

# COMMAND ----------

# MAGIC %md
# MAGIC ## Configuration

# COMMAND ----------

# Event Hub Configuration
eh_namespace = "evhns-natraining.servicebus.windows.net"
eh_name = "evh-natraining-biju"
keyvault_scope = "dbx-ss-kv-natraining-2"
secret_name = "evh-natraining-read-write"
shared_access_key_name = "SharedAccessKeyToSendAndListen"

# Unity Catalog Configuration
catalog = "na-dbxtraining"
schema_bronze = "biju_bronze"

# Table Names (with backticks for catalog)
bronze_products_table = f"`{catalog}`.{schema_bronze}.products"

# Checkpoint location
checkpoint_base = f"/Volumes/na-dbxtraining/biju_raw/biju_vol/checkpoints/{catalog.replace('-', '_')}"
products_checkpoint = f"{checkpoint_base}/bronze_products"

print("="*70)
print("PRODUCTS STREAM CONFIGURATION")
print("="*70)
print(f"Event Hub: {eh_name}")
print(f"Products Table: {bronze_products_table}")
print(f"Checkpoint: {products_checkpoint}")
print("="*70)

# COMMAND ----------

# MAGIC %md
# MAGIC ## Get Secret from Key Vault

# COMMAND ----------

try:
    secret_value = dbutils.secrets.get(scope=keyvault_scope, key=secret_name)
    print("‚úì Successfully retrieved secret from Key Vault")
except Exception as e:
    print(f"‚úó Error retrieving secret: {str(e)}")
    raise

# COMMAND ----------

# MAGIC %md
# MAGIC ## Build Connection String and Kafka Options

# COMMAND ----------

# Connection string
connection_string = (
    f"Endpoint=sb://{eh_namespace}/;"
    f"SharedAccessKeyName={shared_access_key_name};"
    f"SharedAccessKey={secret_value}"
)

# Kafka options for streaming
KAFKA_OPTIONS = {
    "kafka.bootstrap.servers": f"{eh_namespace}:9093",
    "subscribe": eh_name,
    "kafka.sasl.mechanism": "PLAIN",
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.jaas.config": f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{connection_string}";',
    "kafka.request.timeout.ms": "60000",
    "kafka.session.timeout.ms": "30000",
    "failOnDataLoss": "false",
    "startingOffsets": "earliest",
    "maxOffsetsPerTrigger": "10000"
}

print("‚úì Kafka options configured for streaming")

# COMMAND ----------

# MAGIC %md
# MAGIC ## Define Products Schema

# COMMAND ----------

# Event payload schema (we'll extract product fields)
event_schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("customer_id", StringType(), True),
    StructField("customer_name", StringType(), True),
    StructField("location", StringType(), True),
    StructField("product_id", StringType(), True),
    StructField("product_name", StringType(), True),
    StructField("category", StringType(), True),
    StructField("brand", StringType(), True),
    StructField("order_status", StringType(), True),
    StructField("payment_method", StringType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("discount_pct", DoubleType(), True),
    StructField("total_amount", DoubleType(), True),
    StructField("base_price", DoubleType(), True),
    StructField("unit_price", DoubleType(), True),
    StructField("order_timestamp", StringType(), True)
])

print("‚úì Event schema defined")

# COMMAND ----------

# MAGIC %md
# MAGIC ## Read from Event Hub and Parse Events

# COMMAND ----------

print("="*70)
print("STARTING EVENT HUB STREAMING FOR PRODUCTS")
print("="*70)

# Read streaming data from Event Hub
raw_stream = (spark.readStream
    .format("kafka")
    .options(**KAFKA_OPTIONS)
    .load()
)

# Parse JSON payload and add metadata
parsed_stream = (raw_stream
    .selectExpr("CAST(value AS STRING) as json_value", "timestamp", "offset", "partition")
    .withColumn("parsed_data", from_json(col("json_value"), event_schema))
    .select(
        col("parsed_data.*"),
        col("timestamp").alias("event_time"),
        col("offset").alias("kafka_offset"),
        col("partition").alias("partition_id")
    )
    .withColumn("bronze_timestamp", current_timestamp())
)

print("‚úì Stream parsing configured")

# COMMAND ----------

# MAGIC %md
# MAGIC ## Select and Deduplicate Products

# COMMAND ----------

print("\n" + "="*70)
print("CONFIGURING PRODUCTS STREAM")
print("="*70)

# Select and deduplicate products
products_stream = (parsed_stream
    .select(
        col("product_id"),
        col("product_name"),
        col("category"),
        col("brand"),
        col("base_price"),
        col("unit_price"),
        col("bronze_timestamp")
    )
    .dropDuplicates(["product_id"])  # Keep only unique products per batch
)

print("‚úì Products columns selected with deduplication")

# COMMAND ----------

# MAGIC %md
# MAGIC ## Define Upsert Function

# COMMAND ----------

def upsert_products(batch_df, batch_id):
    """
    Upsert products to avoid duplicates across batches
    Updates existing products and inserts new ones
    """
    if batch_df.count() == 0:
        print(f"  Batch {batch_id}: No products to process")
        return
    
    # Create temp view
    batch_df.createOrReplaceTempView("products_batch")
    
    # Merge logic
    merge_query = f"""
    MERGE INTO {bronze_products_table} target
    USING products_batch source
    ON target.product_id = source.product_id
    WHEN MATCHED THEN
        UPDATE SET
            target.product_name = source.product_name,
            target.category = source.category,
            target.brand = source.brand,
            target.base_price = source.base_price,
            target.unit_price = source.unit_price,
            target.bronze_timestamp = source.bronze_timestamp
    WHEN NOT MATCHED THEN
        INSERT (product_id, product_name, category, brand, base_price, unit_price, bronze_timestamp)
        VALUES (source.product_id, source.product_name, source.category, source.brand, 
                source.base_price, source.unit_price, source.bronze_timestamp)
    """
    
    # Execute merge
    try:
        spark.sql(merge_query)
        print(f"  Batch {batch_id}: Upserted {batch_df.count()} products")
    except Exception as e:
        # If table doesn't exist, create it
        if "TABLE_OR_VIEW_NOT_FOUND" in str(e):
            batch_df.write.format("delta").mode("append").saveAsTable(bronze_products_table)
            print(f"  Batch {batch_id}: Created table and inserted {batch_df.count()} products")
        else:
            print(f"  Batch {batch_id}: Error - {str(e)}")
            raise e

print("‚úì Upsert function defined")

# COMMAND ----------

# MAGIC %md
# MAGIC ## Write Products Stream to Bronze Table

# COMMAND ----------

print("\n" + "="*70)
print("STARTING PRODUCTS STREAM WRITE")
print("="*70)

# Write products stream to Delta table with merge logic
products_query = (products_stream
    .writeStream
    .foreachBatch(upsert_products)
    .option("checkpointLocation", products_checkpoint)
    #.trigger(processingTime="10 seconds")
    .trigger(once=True) # for testing
    .start()
)

print(f"‚úì Products stream started")
print(f"  Query ID: {products_query.id}")
print(f"  Target: {bronze_products_table}")
print(f"  Checkpoint: {products_checkpoint}")
print(f"  Trigger: 10 seconds")
print(f"  Method: foreachBatch with MERGE (upsert)")

# COMMAND ----------

# MAGIC %md
# MAGIC ## Monitor Products Stream

# COMMAND ----------

print("\n" + "="*70)
print("ACTIVE PRODUCTS STREAM")
print("="*70)

for stream in spark.streams.active:
    if stream.id == products_query.id:
        print(f"\nQuery ID: {stream.id}")
        print(f"  Name: {stream.name if stream.name else 'unnamed'}")
        print(f"  Status: {stream.status['message']}")
        print(f"  Is Active: {stream.isActive}")
        
        if stream.recentProgress:
            latest = stream.recentProgress[-1]
            print(f"  Recent Progress:")
            print(f"    - Batch: {latest.get('batchId', 'N/A')}")
            print(f"    - Input Rows: {latest.get('numInputRows', 0)}")

# COMMAND ----------

# MAGIC %md
# MAGIC ## Wait and Monitor (30 seconds)

# COMMAND ----------

import time

print("\n" + "="*70)
print("MONITORING PRODUCTS STREAM FOR 30 SECONDS")
print("="*70)

for i in range(6):
    time.sleep(5)
    
    try:
        products_count = spark.table(bronze_products_table).count()
        print(f"\n‚è±Ô∏è  {(i+1)*5}s - Products: {products_count} unique products")
    except:
        print(f"\n‚è±Ô∏è  {(i+1)*5}s - Products table not created yet")

print("\n‚úì Monitoring complete")

# COMMAND ----------

# MAGIC %md
# MAGIC ## Verify Products Table

# COMMAND ----------

print("\n" + "="*70)
print("PRODUCTS TABLE VERIFICATION")
print("="*70)

try:
    products_df = spark.table(bronze_products_table)
    products_count = products_df.count()
    
    print(f"\nüìä Products Table: {bronze_products_table}")
    print(f"   Total Unique Products: {products_count}")
    
    if products_count > 0:
        print("\n   Schema:")
        products_df.printSchema()
        
        print("\n   All products:")
        display(products_df.orderBy("product_id"))
        
        print("\n   Products by Category:")
        display(
            products_df.groupBy("category", "brand")
            .count()
            .orderBy("category", "brand")
        )
        
        print("\n   Price Range by Category:")
        display(
            products_df.groupBy("category")
            .agg(
                count("*").alias("product_count"),
                min("base_price").alias("min_price"),
                max("base_price").alias("max_price"),
                avg("base_price").alias("avg_price")
            )
            .orderBy("category")
        )
        
except Exception as e:
    print(f"\n‚ö†Ô∏è  Products table not available: {e}")

# COMMAND ----------

# MAGIC %md
# MAGIC ## Stop Products Stream (Run when done)

# COMMAND ----------

# Uncomment to stop the products stream
"""
print("=" * 70)
print("STOPPING PRODUCTS STREAM")
print("=" * 70)

if products_query.isActive:
    print(f"\nStopping: {products_query.id}")
    products_query.stop()
    print(f"  ‚úì Stopped")
else:
    print("\nStream is not active")

print("\n‚úì Products stream stopped")
"""

# COMMAND ----------

# MAGIC %md
# MAGIC ## Summary

# COMMAND ----------

print("\n" + "="*70)
print("‚úì PRODUCTS STREAM SETUP COMPLETE")
print("="*70)

try:
    products_count = spark.table(bronze_products_table).count()
    
    print(f"\nCurrent State:")
    print(f"  Unique Products: {products_count} records")
    
    print(f"\nStreaming Status:")
    print(f"  Query Active: {products_query.isActive}")
    print(f"  Query ID: {products_query.id}")
    
    print(f"\nData Flow:")
    print(f"  Event Hub ({eh_name})")
    print(f"    ‚Üì Kafka Protocol")
    print(f"  Raw Kafka Stream")
    print(f"    ‚Üì Parse JSON")
    print(f"  Parsed Events")
    print(f"    ‚Üì Extract Product Fields")
    print(f"  Deduplicated Products")
    print(f"    ‚Üì MERGE (Upsert)")
    print(f"  Bronze Products Table (Unique)")
    
    print(f"\nKey Features:")
    print(f"  ‚Ä¢ Deduplication within each batch")
    print(f"  ‚Ä¢ MERGE operation across batches")
    print(f"  ‚Ä¢ Updates existing products")
    print(f"  ‚Ä¢ Inserts new products")
    print(f"  ‚Ä¢ Maintains unique product_id constraint")
    
    print("\n" + "="*70)
    print("Next Steps:")
    print("  1. Ensure Bronze-Orders notebook is also running")
    print("  2. Run Silver Layer notebook to join Orders with Products")
    print("  3. Stop stream when done using 'Stop Products Stream' cell")
    print("="*70)
    
except Exception as e:
    print(f"\n‚ö†Ô∏è  Products table not created yet. Wait for stream to process data.")

In [0]:
%sql
select * from `na-dbxtraining`.biju_bronze.products