In [None]:
# =============================================================
# Real-Time Streaming with Azure Databricks + Event Hubs (Medallion)
# =============================================================

try:
    dbutils  # type: ignore
except NameError:
    class _DB:
        class widgets:
            _vals = {}
            @staticmethod
            def text(name, defaultValue="", label=""): _DB.widgets._vals[name] = defaultValue
            @staticmethod
            def dropdown(name, defaultValue, choices, label=""): _DB.widgets._vals[name] = defaultValue
            @staticmethod
            def get(name): return _DB.widgets._vals.get(name, "")
            @staticmethod
            def removeAll(): _DB.widgets._vals.clear()
        class fs:
            @staticmethod
            def ls(path): return []
    dbutils = _DB()

In [None]:
# =====================
# 1) PARAMETERS
# =====================
dbutils.widgets.removeAll()

# Event Hubs
dbutils.widgets.text("eh_connection", "REPLACE_WITH_EVENT_HUBS_CONNECTION_STRING", "Event Hubs Connection String")
dbutils.widgets.text("eh_consumer_group", "$Default", "Consumer Group")
dbutils.widgets.dropdown("eh_start_position", "@latest", ["@latest","@earliest"], "Starting Position")

# Storage (ADLS Gen2 abfss:// URIs or mounted paths)
dbutils.widgets.text("bronze_path", "abfss://datalake@REPLACE.dfs.core.windows.net/bronze/iot_events", "Bronze Path")
dbutils.widgets.text("silver_path", "abfss://datalake@REPLACE.dfs.core.windows.net/silver/iot_events", "Silver Path")
dbutils.widgets.text("gold_path",   "abfss://datalake@REPLACE.dfs.core.windows.net/gold/iot_kpis",   "Gold Path")

# Checkpoints
dbutils.widgets.text("ckp_bronze", "abfss://datalake@REPLACE.dfs.core.windows.net/_checkpoints/bronze/iot_events", "Bronze Checkpoint")
dbutils.widgets.text("ckp_silver", "abfss://datalake@REPLACE.dfs.core.windows.net/_checkpoints/silver/iot_events", "Silver Checkpoint")
dbutils.widgets.text("ckp_gold",   "abfss://datalake@REPLACE.dfs.core.windows.net/_checkpoints/gold/iot_kpis",   "Gold Checkpoint")

# Table names (optional)
dbutils.widgets.text("tbl_bronze", "iot_raw_bronze", "Bronze Table")
dbutils.widgets.text("tbl_silver", "iot_clean_silver", "Silver Table")
dbutils.widgets.text("tbl_gold",   "iot_kpi_gold", "Gold Table")

params = {k: dbutils.widgets.get(k) for k in [
    "eh_connection","eh_consumer_group","eh_start_position",
    "bronze_path","silver_path","gold_path",
    "ckp_bronze","ckp_silver","ckp_gold",
    "tbl_bronze","tbl_silver","tbl_gold"
]}
print("PARAMETERS:", {k: ("***" if "connection" in k else v) for k,v in params.items()})

In [None]:
# =====================
# 2) IMPORTS & SCHEMAS
# =====================
from pyspark.sql.functions import col, from_json, to_timestamp, current_timestamp, window, expr, coalesce
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType

# Expected event body (UTF-8 JSON):
# { "deviceId":"dev-1", "site":"ams", "temperature":22.5, "humidity":0.45, "ts":1724175600 }
raw_schema = StructType([
    StructField("deviceId", StringType()),
    StructField("site", StringType()),
    StructField("temperature", DoubleType()),
    StructField("humidity", DoubleType()),
    StructField("ts", LongType())
])

In [None]:
# =====================
# 3) EVENT HUBS → BRONZE (DELTA)
# =====================
eh_conf = {
    "eventhubs.connectionString": params["eh_connection"],
    "eventhubs.consumerGroup": params["eh_consumer_group"],
    "eventhubs.startingPosition": params["eh_start_position"]
}

raw_stream = (spark.readStream
    .format("eventhubs")
    .options(**eh_conf)
    .load()
)

bronze_df = (raw_stream
    .withColumn("body_str", col("body").cast("string"))
    .withColumn("ingest_ts", current_timestamp())
)

bronze_writer = (bronze_df.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", params["ckp_bronze"])
    .option("path", params["bronze_path"])
    .trigger(processingTime="10 seconds")
)

# bronze_query = bronze_writer.start()
# display(bronze_query)
print("Bronze stream defined. Uncomment start() to run.")

In [None]:
# =====================
# 4) BRONZE → SILVER (PARSE, CLEANSE, DEDUP)
# =====================
bronze_read = (spark.readStream
    .format("delta")
    .load(params["bronze_path"])
)

parsed = (bronze_read
    .withColumn("json", from_json(col("body_str"), raw_schema))
    .select(
        col("json.deviceId").alias("device_id"),
        col("json.site").alias("site"),
        col("json.temperature").alias("temperature"),
        col("json.humidity").alias("humidity"),
        # If ts is epoch seconds, to_timestamp will handle Unix epoch. Adjust if ISO string.
        to_timestamp(col("json.ts")).alias("event_time")
    )
)

clean = (parsed
    .filter(col("device_id").isNotNull())
    .withWatermark("event_time", "5 minutes")
    .dropDuplicates(["device_id","event_time"])
)

silver_writer = (clean.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", params["ckp_silver"])
    .option("path", params["silver_path"])
    .trigger(processingTime="10 seconds")
)

# silver_query = silver_writer.start()
# display(silver_query)
print("Silver stream defined. Uncomment start() to run.")

In [None]:
# =====================
# 5) SILVER → GOLD (WINDOWED KPIs)
# =====================
silver_read = (spark.readStream
    .format("delta")
    .load(params["silver_path"])
)

kpi = (silver_read
    .withWatermark("event_time", "10 minutes")
    .groupBy(
        window(col("event_time"), "1 minute"),
        col("site"),
        col("device_id")
    )
    .agg(
        expr("avg(temperature) as avg_temperature"),
        expr("avg(humidity) as avg_humidity"),
        expr("count(*) as reading_count")
    )
    .selectExpr(
        "window.start as window_start",
        "window.end as window_end",
        "site","device_id","avg_temperature","avg_humidity","reading_count"
    )
)

gold_writer = (kpi.writeStream
    .format("delta")
    .outputMode("complete")
    .option("checkpointLocation", params["ckp_gold"])
    .option("path", params["gold_path"])
    .trigger(processingTime="1 minute")
)

# gold_query = gold_writer.start()
# display(gold_query)
print("Gold stream defined. Uncomment start() to run.")

In [None]:
# =====================
# 6) TABLE REGISTRATION (OPTIONAL)
# =====================
spark.sql(f"CREATE TABLE IF NOT EXISTS {params['tbl_bronze']} USING DELTA LOCATION '{params['bronze_path']}'")
spark.sql(f"CREATE TABLE IF NOT EXISTS {params['tbl_silver']} USING DELTA LOCATION '{params['silver_path']}'")
spark.sql(f"CREATE TABLE IF NOT EXISTS {params['tbl_gold']}   USING DELTA LOCATION '{params['gold_path']}'")
print("Tables created if metastore/unity catalog is configured.")

In [None]:
# =====================
# 7) OPTIONAL EVENT GENERATOR → EVENT HUBS
# =====================
# %pip install azure-eventhub  (on Databricks)
import json, random, time
try:
    from azure.eventhub import EventHubProducerClient, EventData
    HAVE_EH=True
except Exception:
    HAVE_EH=False
    print("Install azure-eventhub to enable generator.")

eh_conn = params["eh_connection"]
batches = 3
events_per_batch = 50

if HAVE_EH and eh_conn.startswith("Endpoint"):
    producer = EventHubProducerClient.from_connection_string(conn_str=eh_conn)
    for b in range(batches):
        batch = producer.create_batch()
        for i in range(events_per_batch):
            payload = {
                "deviceId": f"dev-{random.randint(1,5)}",
                "site": random.choice(["ams","utrecht","rotterdam"]),
                "temperature": round(random.uniform(18.0, 30.0),2),
                "humidity": round(random.uniform(0.3, 0.8),2),
                "ts": int(time.time())
            }
            batch.add(EventData(json.dumps(payload)))
        producer.send_batch(batch)
        print(f"Sent batch {b+1}/{batches}")
        time.sleep(1)
    producer.close()
else:
    print("Generator not executed. Provide a valid Event Hubs connection string including EntityPath.")

In [None]:
# =====================
# 8) EXAMPLE ANALYTICS (SQL VIA PYTHON)
# =====================
spark.sql(f"SELECT * FROM {params['tbl_gold']} ORDER BY window_end DESC LIMIT 20").show(truncate=False)

spark.sql(f'''
SELECT device_id, site, avg_temperature, avg_humidity, reading_count, window_start, window_end
FROM {params['tbl_gold']}
WHERE window_end > now() - INTERVAL 30 MINUTES
ORDER BY avg_temperature DESC, reading_count DESC
LIMIT 20
''').show(truncate=False)

In [None]:
# =====================
# 9) STREAM HELPERS
# =====================
def stop_all_streams():
    for q in spark.streams.active:
        try:
            print("Stopping:", q.name or q.id)
            q.stop()
        except Exception as e:
            print("Error:", e)

print("Call stop_all_streams() to stop all active streams.")