In [0]:

# Replace with your Event Hub connection string
connectionString = "EVENT_KEY"

ehConf = {
  'EVENT_HUB_KEY' : connectionString
}

# Read stream from Event Hub
df = spark.readStream \
    .format("eventhubs") \
    .options(**ehConf) \
    .load()

df.printSchema()


In [0]:
%pip install azure-eventhub

In [0]:
dbutils.library.restartPython()

In [0]:
from azure.eventhub import EventHubConsumerClient

CONNECTION_STR = "EVENT_KEY"
CONSUMER_GROUP = "traffic-app"  # or $Default consumer group
EVENTHUB_NAME = "traffic-iot"

def on_event(partition_context, event):
    print(f"Received event from partition: {partition_context.partition_id}")
    print(event.body_as_str())
    partition_context.update_checkpoint(event)

client = EventHubConsumerClient.from_connection_string(
    conn_str=CONNECTION_STR,
    consumer_group=CONSUMER_GROUP,
    eventhub_name=EVENTHUB_NAME
)

with client:
    client.receive(
        on_event=on_event,
        starting_position="-1",  # "-1" = read from beginning of stream
    )


In [0]:
from pyspark.sql.types import StructType, StringType
from pyspark.sql.functions import col, from_json, coalesce, to_timestamp

# ---- 1) Schema: keep everything as STRING, cast later (avoids nulls on type mismatch)
schema = (StructType()
          .add("vehicle_id", StringType())
          .add("speed", StringType())
          .add("timestamp", StringType())     # some producers use "timestamp"
          .add("event_time", StringType()))   # others use "event_time"

# ---- 2) Secure your connection string (example uses a placeholder)
connectionString = "EVENT_KEY"
eh_conf = {
    "eventhubs.connectionString": sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString),
    "eventhubs.consumerGroup": "$Default",
    # Start from the earliest retained events
    "eventhubs.startingPosition": """{
      "offset": "-1",
      "seqNo": -1,
      "enqueuedTime": null,
      "isInclusive": true
    }"""
}

# ---- 3) Read Event Hubs stream
raw_stream = (spark.readStream
              .format("eventhubs")
              .options(**eh_conf)
              .load())

# ---- 4) Parse JSON safely
parsed = raw_stream.select(
    col("enqueuedTime").alias("ingest_time"),
    from_json(col("body").cast("string"), schema).alias("data")
)

iot_df = (parsed
    .select(
        col("data.vehicle_id").cast("int").alias("vehicle_id"),
        col("data.speed").cast("int").alias("speed"),
        # prefer payload time; fall back to Event Hubs enqueued time
        coalesce(col("data.timestamp"), col("data.event_time")).alias("ts_raw"),
        col("ingest_time")
    )
    .withColumn(
        "event_time",
        coalesce(
            to_timestamp(col("ts_raw")),                     # try default parsing
            col("ingest_time").cast("timestamp")             # fallback
        )
    )
    .drop("ts_raw")
)

# ---- 5) Write to Delta (BRONZE) with a dedicated checkpoint
delta_query = (iot_df.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/mnt/bronze/_checkpoint_iot")
    .option("mergeSchema", "true")    # allow new/changed columns
    .start("/mnt/bronze/iotdata"))

# ---- 6) OPTIONAL: Also stream to console so you can verify rows immediately
debug_query = (iot_df.writeStream
    .format("console")
    .option("truncate", False)
    .option("numRows", 20)
    .start())


In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import from_json, col, to_timestamp, current_timestamp

# 1) Schema
schema = StructType([
    StructField("vehicle_id", IntegerType(), True),
    StructField("speed", IntegerType(), True),
    StructField("timestamp", StringType(), True)  # raw string
])

# 2) Parse + Add event_time
parsed_df = (raw_stream
    .select(from_json(col("body").cast("string"), schema).alias("data"))
    .select("data.*")
    .withColumn("event_time", to_timestamp("timestamp"))  # convert to timestamp
    .withColumn("ingest_time", current_timestamp())       # optional lineage
)

# 3) Paths
silver_path = "dbfs:/mnt/silver/iotdata"
checkpoint_path = "dbfs:/mnt/silver/_checkpoint_iot"

# 4) Write to Silver (Delta Lake)
delta_query = (parsed_df.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", checkpoint_path)
    .option("mergeSchema", "true")   # allow schema evolution
    .start(silver_path)
)

# 5) Debug Console Sink (for monitoring)
console_query = (parsed_df.writeStream
    .format("console")
    .outputMode("append")
    .option("truncate", False)
    .start()
)


In [0]:

# 6) ---- In a separate cell, AFTER stream runs ----
# Read from Silver Delta table
silver_df = spark.read.format("delta").load(silver_path)

# Show only 10 rows
silver_df.show(10, truncate=False)


In [0]:
from pyspark.sql.functions import col, avg, count, window

# 1. Read from silver (streaming)
silver_df = (
    spark.readStream
    .format("delta")
    .load("/mnt/silver/iotdata")
)

# 2. Aggregate for gold
gold_df = (
    silver_df
    .withWatermark("event_time", "10 minutes")   # use watermark on event_time
    .groupBy(
        window(col("event_time"), "5 minutes"),  # tumbling window of 5 mins
        col("vehicle_id")
    )
    .agg(
        avg("speed").alias("avg_speed"),
        count("*").alias("event_count")
    )
    .select(
        col("window.start").alias("window_start"),
        col("window.end").alias("window_end"),
        col("vehicle_id"),
        col("avg_speed"),
        col("event_count")
    )
)

# 3. Write to gold (Delta table)
query = (
    gold_df.writeStream
    .format("delta")
    .outputMode("append")     # required for aggregates with watermark
    .option("checkpointLocation", "/mnt/gold/_checkpoint_iot")
    .start("/mnt/gold/iotdata")
)


In [0]:
# Register the gold delta folder as a temporary view
gold_df_read = spark.read.format("delta").load("/mnt/gold/iotdata")

# Show 10 rows
gold_df_read.show(10, truncate=False)

In [0]:
%sql
SELECT window_start, vehicle_id, avg_speed, event_count
FROM delta.`/mnt/gold/iotdata`
ORDER BY window_start DESC
LIMIT 10;
