In [None]:
import os, json, gzip, time, uuid, random
from typing import Iterable
from azure.eventhub import EventHubProducerClient, EventData
from azure.eventhub.exceptions import EventHubError, OperationTimeoutError
from pyspark.sql.functions import col

# --- Constants ---
# Service limits: 1 MB/event (Standard/Dedicated), 256 KB (Basic). Leave overhead headroom.
MAX_EVENT_BYTES = 900_000  # ~0.9 MB safety margin
MAX_RETRIES = 7
BASE_DELAY = 0.5  # seconds
MAX_BACKOFF = 30  # seconds

# --- Helpers ---
def to_bytes(obj) -> bytes:
    if isinstance(obj, (bytes, bytearray)):
        return bytes(obj)
    if isinstance(obj, str):
        return obj.encode("utf-8")
    return json.dumps(obj, separators=(",", ":")).encode("utf-8")

def gzip_bytes(b: bytes) -> bytes:
    return gzip.compress(b)

def chunk_bytes(b: bytes, max_size: int) -> Iterable[bytes]:
    for i in range(0, len(b), max_size):
        yield b[i : i + max_size]

def send_with_retry(producer: EventHubProducerClient, batch):
    delay = BASE_DELAY
    for attempt in range(1, MAX_RETRIES + 1):
        try:
            producer.send_batch(batch)
            return
        except (EventHubError, OperationTimeoutError) as e:
            # server busy / transient
            jitter = random.uniform(0, delay * 0.25)
            time.sleep(min(delay + jitter, MAX_BACKOFF))
            delay = min(delay * 2, MAX_BACKOFF)
            if attempt == MAX_RETRIES:
                raise

# --- Producer (use Key Vault in real code) ---
EVENT_HUB_CONNECTION_STRING = dbutils.secrets.get("kv-scope", "eh-connstr")
EVENT_HUB_NAME = "datatransferhub"

producer = EventHubProducerClient.from_connection_string(
    conn_str=EVENT_HUB_CONNECTION_STRING,
    eventhub_name=EVENT_HUB_NAME,
)

# Create a streaming iterator (avoids collect())
rows = (row for row in spark.table("samples.bakehouse.sales_customers")
                 .select(col("value_hub").cast("string").alias("value"))
                 .toLocalIterator())

with producer:
    # Choose a partition key strategy; here we use a corr_id per row (adjust as needed)
    # If you have a natural key (customerId, orderId), use that instead to keep ordering.
    current_batch = None
    current_pk = None

    def flush():
        nonlocal current_batch
        if current_batch and current_batch.count > 0:
            send_with_retry(producer, current_batch)
        current_batch = None

    for row in rows:
        payload = row["value"]
        corr_id = str(uuid.uuid4())  # replace with stable key for your domain
        pk = corr_id  # keep all chunks for a message in order on one partition

        # Prepare bytes (compress first to minimize chunk count)
        body_bytes = gzip_bytes(to_bytes(payload))

        # If first event for this correlation id, (re)create batch with its partition key
        if current_batch is None or pk != current_pk:
            flush()
            current_batch = producer.create_batch(partition_key=pk)
            current_pk = pk

        # Build chunked events with reassembly metadata
        chunks = list(chunk_bytes(body_bytes, MAX_EVENT_BYTES))
        total = len(chunks)
        for idx, chunk in enumerate(chunks, start=1):
            ev = EventData(chunk)
            # App-level reassembly metadata (consumer must use this)
            ev.content_type = "application/json+gzip"
            ev.properties = {
                "corr_id": corr_id,
                "chunk_index": idx,
                "chunks_total": total,
                "compressed": True,
                "schema": "your-schema-v1"
            }

            try:
                current_batch.add(ev)
            except ValueError:
                # Batch full: flush and start a new one for the same partition key
                send_with_retry(producer, current_batch)
                current_batch = producer.create_batch(partition_key=pk)
                # If a single chunk can't fit (shouldn't happen with our MAX_EVENT_BYTES), re-raise
                current_batch.add(ev)

    # final flush
    flush()
