CELL 1 â€” Event Hub + Schema Setup

In [None]:
# Event Hub connection
# Use the CONSUMER POLICY (Listen) created at the EVENT HUB level
CONSUMER_CONN = "your-consumer-policy-primary-connection-string"

# Encrypt the connection string for Spark
ehConf = {
    "eventhubs.connectionString": sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(CONSUMER_CONN)
}


StatementMeta(spkecommerce, 7, 2, Finished, Available, Finished)

ðŸ§ª CELL 2 â€” Define Schema and Read Stream

In [3]:
from pyspark.sql.functions import from_json, col, explode, current_timestamp
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType

# Schema of FakeStore cart events
schema = StructType([
    StructField("id", IntegerType()),
    StructField("userId", IntegerType()),
    StructField("date", StringType()),
    StructField("products", ArrayType(
        StructType([
            StructField("productId", IntegerType()),
            StructField("quantity", IntegerType())
        ])
    ))
])

# Step 1: Read event hub stream
df_eh = spark.readStream \
    .format("eventhubs") \
    .options(**ehConf) \
    .load()

# Step 2: Convert binary body â†’ text JSON string
df_string = df_eh.selectExpr("CAST(body AS STRING) as json_str")

# Step 3: Parse JSON
df_parsed = df_string.select(from_json(col("json_str"), schema).alias("data")).select("data.*")


StatementMeta(spkecommerce, 7, 4, Finished, Available, Finished)

ðŸ§ª CELL 3 â€” Flatten and Add Metadata

In [5]:
# Flatten the nested products array
df_flat = df_parsed.withColumn("product", explode(col("products"))) \
    .select(
        col("id").alias("cart_id"),
        col("userId").alias("user_id"),
        col("date").alias("cart_date"),
        col("product.productId").alias("product_id"),
        col("product.quantity").alias("quantity"),
        current_timestamp().alias("ingest_ts")
    )


StatementMeta(spkecommerce, 7, 6, Finished, Available, Finished)

ðŸ§ª CELL 4 â€” Write to Bronze Streaming

In [6]:
# Paths
bronze_path = "abfss://datalake@stecomdata123.dfs.core.windows.net/bronze/fakestore/stream/"
checkpoint_path = "abfss://datalake@stecomdata123.dfs.core.windows.net/bronze/fakestore/checkpoints/stream/"

# Start structured stream
query = df_flat.writeStream \
    .format("parquet") \
    .option("path", bronze_path) \
    .option("checkpointLocation", checkpoint_path) \
    .outputMode("append") \
    .trigger(processingTime="30 seconds") \
    .start()

print("Streaming consumer started.")

# View last batch details:
# query.lastProgress


StatementMeta(spkecommerce, 7, 7, Finished, Available, Finished)

Streaming consumer started.
