In [0]:
%pip install azure-eventhub


In [0]:
%restart_python

In [0]:
from azure.eventhub import EventHubProducerClient, EventData
from pyspark.sql.functions import col
import json

In [0]:
spark.conf.set(
    "fs.azure.account.key.streaminputsa01.dfs.core.windows.net",
    $value
)


In [0]:
# ==== 1) Config ====
from pyspark.sql import functions as F
from azure.eventhub import EventHubProducerClient, EventData

SRC_DIR       = "abfss://retail-input-data@streaminputsa01.dfs.core.windows.net/manual/product-reviews/"
ARCHIVE_ROOT  = "abfss://retail-input-data@streaminputsa01.dfs.core.windows.net/archive/product-reviews/"
CHK_ROOT      = "abfss://retail-input-data@streaminputsa01.dfs.core.windows.net/_chk/files_to_eh_phase1_json"

# FIRST run must process all existing files:
INCLUDE_EXISTING = "false"            # <-- Phase 1 only. We'll flip to "false" in Phase 2.

# Choose your input style:
MULTILINE = False                    # False = NDJSON (one JSON per line); True = pretty/multiline JSON

# Event Hubs
EH_CONNECTION_STR = $connection_str
MAX_EVENT_BYTES   = 950_000          # EH ~1MB/event safety
MAX_FILES_PER_TRIGGER = 5
TRIGGER_INTERVAL      = "45 seconds"



In [0]:
# ==== 2) Streaming source for JSON ====
if MULTILINE:
    # Each file contains one or more pretty-printed JSON objects/arrays.
    # Read as JSON (records), then re-serialize each record to a compact JSON string.
    stream_df = (
        spark.readStream
             .format("cloudFiles")
             .option("cloudFiles.format", "json")
             .option("multiLine", "true")
             .option("cloudFiles.includeExistingFiles", INCLUDE_EXISTING)
             .option("cloudFiles.schemaLocation", f"{CHK_ROOT}/schema")
             .load(SRC_DIR)
    )
    json_lines = (
        stream_df
        .select(F.to_json(F.struct("*")).alias("json"),
                F.input_file_name().alias("file_path"))
        .where(F.col("json").isNotNull() & (F.length("json") > 0))
    )
else:
    # NDJSON: each line is a full JSON object already
    json_lines = (
        spark.readStream
             .format("cloudFiles")
             .option("cloudFiles.format", "text")
             .option("cloudFiles.includeExistingFiles", INCLUDE_EXISTING)
             .option("cloudFiles.schemaLocation", f"{CHK_ROOT}/schema")
             .load(SRC_DIR)
             .select(F.col("value").alias("json"),
                     F.input_file_name().alias("file_path"))
             .where(F.col("json").isNotNull() & (F.length("json") > 0))
    )


In [0]:
import time
from azure.eventhub import EventHubProducerClient, EventData
from azure.eventhub.exceptions import EventDataSendError

MAX_EVENT_BYTES = 950_000          # EH ~1MB/event safety
BATCH_MAX_EVENTS = 400             # flush every N events even if batch not "full"
RETRY_MAX_ATTEMPTS = 6             # ~ total wait up to ~ (2+4+8+16+32+32)=94s worst case
RETRY_BASE_SECONDS = 2.0

def safe_send(producer, batch):
    """Send with exponential backoff on server-busy."""
    attempt = 0
    while True:
        try:
            if len(batch) > 0:
                producer.send_batch(batch)
            return
        except EventDataSendError as e:
            # Server busy / throttling
            attempt += 1
            if attempt > RETRY_MAX_ATTEMPTS:
                raise
            sleep_s = min(RETRY_BASE_SECONDS * (2 ** (attempt-1)), 32.0)
            print(f"[EH] Throttled (attempt {attempt}) — sleeping {sleep_s:.1f}s")
            time.sleep(sleep_s)
        except Exception:
            # retry other transient errors too
            attempt += 1
            if attempt > RETRY_MAX_ATTEMPTS:
                raise
            sleep_s = min(RETRY_BASE_SECONDS * (2 ** (attempt-1)), 32.0)
            print(f"[EH] Transient send error (attempt {attempt}) — sleeping {sleep_s:.1f}s")
            time.sleep(sleep_s)

def publish_then_archive(batch_df, epoch_id: int):
    # 1) Publish
    producer = EventHubProducerClient.from_connection_string(
        EH_CONNECTION_STR, eventhub_name="product-reviews"
    )

    batch = producer.create_batch()
    sent, skipped, in_batch = 0, 0, 0

    # throttle: bound how many rows we try to push in this micro-batch
    rows = batch_df.select("json").toLocalIterator()

    for row in rows:
        b = row["json"].encode("utf-8")
        if len(b) > MAX_EVENT_BYTES:
            skipped += 1
            continue
        try:
            batch.add(EventData(b))
            in_batch += 1
        except ValueError:
            # batch would overflow → flush current batch, start a new one
            safe_send(producer, batch)
            batch = producer.create_batch()
            batch.add(EventData(b))
            in_batch = 1

        # proactive flush to avoid building huge batches that trigger throttling
        if in_batch >= BATCH_MAX_EVENTS:
            safe_send(producer, batch)
            batch = producer.create_batch()
            in_batch = 0
            # small pacing to reduce server-busy
            time.sleep(0.1)

        sent += 1

    # flush any remainder
    safe_send(producer, batch)
    producer.close()
    print(f"[epoch {epoch_id}] Sent {sent} events; skipped {skipped} oversized.")

    # 2) Archive files from this micro-batch (best-effort)
    files = [r.file_path for r in batch_df.select("file_path").distinct().collect()]
    for src in files:
        dst = ARCHIVE_ROOT + src.split("/")[-1]
        try:
            dbutils.fs.mv(src, dst, recurse=False)
        except Exception as e:
            print(f"[epoch {epoch_id}] WARN archive {src} -> {dst}: {e}")



In [0]:
query = (
    json_lines.writeStream
              .foreachBatch(publish_then_archive)
              .option("checkpointLocation", f"{CHK_ROOT}/cp")
              .option("maxFilesPerTrigger", MAX_FILES_PER_TRIGGER)
              .trigger(processingTime=TRIGGER_INTERVAL)
              .start()
)

In [0]:
query.lastProgress

In [0]:
display(dbutils.fs.ls("abfss://retail-input-data@streaminputsa01.dfs.core.windows.net/manual/product-reviews/"))

path,name,size,modificationTime
abfss://retail-input-data@streaminputsa01.dfs.core.windows.net/manual/product-reviews/review_part_1.json,review_part_1.json,63032189,1754834138000


In [0]:
query.stop

In [0]:
query.isActive, query.status

In [0]:
src = "abfss://retail-input-data@streaminputsa01.dfs.core.windows.net/manual/product-reviews/"
display(dbutils.fs.ls(src))

path,name,size,modificationTime
abfss://retail-input-data@streaminputsa01.dfs.core.windows.net/manual/product-reviews/review_part_1.json,review_part_1.json,63032189,1754754782000
abfss://retail-input-data@streaminputsa01.dfs.core.windows.net/manual/product-reviews/review_part_2.json,review_part_2.json,63664806,1754810625000


In [0]:
import urllib
from pyspark.sql.functions import col, desc

# CHANGE if you store secrets in Key Vault
EH_NAMESPACE = "retail-eh-ns"
EH_NAME      = "product-reviews"
POLICY       = "read-policy"
EH_KEY       = $eh_key   # or paste once for testing

eh_conn = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={POLICY};SharedAccessKey={EH_KEY};EntityPath={EH_NAME}"
encrypted = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(eh_conn)
eh_conf = {
  "eventhubs.connectionString": encrypted,
  "consumerGroup": "$Default",
  # start at latest so we only read new messages your producer sends
  "startingPosition": """{"offset":"-1","seqNo":-1,"enqueuedTime":null,"isInclusive":false}""",
  "maxEventsPerTrigger": "2000"
}

CHECKPOINT_BASE = "abfss://retail-input-data@streaminputsa01.dfs.core.windows.net/_chk/eh_consumer_simple"
BRONZE_PATH     = "abfss://retail-input-data@streaminputsa01.dfs.core.windows.net/delta/bronze_product_reviews"


In [0]:
spark.conf.get("spark.jars.packages")

In [0]:
raw = (spark.readStream
           .format("eventhubs")  # Ensure the correct format is used with the external package
           .options(**eh_conf)
           .load()
           .select(col("body").alias("payload"), col("enqueuedTime").alias("enq_ts")))




In [0]:
q_bronze = (raw.writeStream
               .format("delta")
               .option("checkpointLocation", f"{CHECKPOINT_BASE}/bronze")
               .outputMode("append")
               .start(BRONZE_PATH))

In [0]:
q_bronze.lastProgress

In [0]:
spark.read.format("delta").load(BRONZE_PATH).orderBy("enq_ts", ascending=False).limit(5).show(truncate=False)