In [None]:
# ============================================================
#   - **Conexão**: Event Hubs (connection string no AKV via Secret Scope)
#   - **Persistência**: Delta *managed* em `catalog.schema.tabela`
#   - **Clustering**: **Delta Liquid Clustering** por `ingestion_date`
#   - **Trigger**: `once`, com **checkpoint dedicado** por tópico/tabela
#   - **Uso**: 1 notebook parametrizado → 3 tarefas (1 por tópico)
# ============================================================


# ===================== PARÂMETROS (Widgets) =====================
CATALOG           = dbutils.widgets.get("catalog")
SCHEMA            = dbutils.widgets.get("schema")
TABLE_NAME        = dbutils.widgets.get("table_name")
EVENTHUB_NAME     = dbutils.widgets.get("eventhub_name")
SECRET_SCOPE      = dbutils.widgets.get("secret_scope")
SECRET_KEY        = dbutils.widgets.get("secret_key")
CHECKPOINT_BASE   = dbutils.widgets.get("checkpoint_base").rstrip("/")

assert TABLE_NAME,     "Param 'table_name' é obrigatório"
assert EVENTHUB_NAME,  "Param 'eventhub_name' é obrigatório"
assert SECRET_SCOPE,   "Param 'secret_scope' é obrigatório"
assert SECRET_KEY,     "Param 'secret_key' é obrigatório"

FQN = f"{CATALOG}.{SCHEMA}.{TABLE_NAME}"
print(f"Destino: {FQN}")

# ===================== IMPORTS & UTILS =====================
from pyspark.sql import functions as F
from pyspark.sql import types as T
import json

def _read_secret(scope: str, key: str) -> str:
    try:
        return dbutils.secrets.get(scope=scope, key=key)
    except Exception as e:
        raise RuntimeError(f"Falha ao ler secret '{scope}:{key}'. Verifique Secret Scope/AKV. Detalhe: {e}")

def _ensure_table_with_liquid(fqn: str):
    # cria a tabela se não existir e habilita liquid clustering por ingestion_date
    if not spark.catalog.tableExists(fqn):
        spark.sql(f"""
            CREATE TABLE IF NOT EXISTS {fqn} (
            body             STRING,
            partition        INT,
            offset           STRING,
            sequenceNumber   LONG,
            enqueuedTime     TIMESTAMP,
            partitionKey     STRING,
            ingestion_ts     TIMESTAMP,
            ingestion_date   DATE
            )
            USING DELTA
            CLUSTER BY (ingestion_date)
        """)

        spark.sql(f"""
            ALTER TABLE {fqn}
            SET TBLPROPERTIES (delta.enableChangeDataFeed = true);
        """)

# ===================== CONEXÃO: EVENT HUBS =====================
conn_str = _read_secret(SECRET_SCOPE, SECRET_KEY)
eh_options = {
   'eventhubs.connectionString': sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(f"{conn_str};EntityPath={EVENTHUB_NAME}"),
    'eventhubs.startingPosition': '{"offset": "-1", "seqNo": -1, "enqueuedTime": null, "isInclusive": true}'
}

# ===================== CRIAR TABELA CASO NÃO EXISTA =====================
_ensure_table_with_liquid(FQN)

# ===================== READ STREAM & NORMALIZAÇÃO BRONZE =====================
raw = (spark.readStream
       .format("eventhubs")
       .options(**eh_options)
       .load())

bronze_df = (
    raw.select(
        F.col("body").cast("string").alias("body"),
        F.col("partition").cast("int").alias("partition"),
        F.col("offset").cast("string").alias("offset"),
        F.col("sequenceNumber").cast("long").alias("sequenceNumber"),
        F.col("enqueuedTime").cast("timestamp").alias("enqueuedTime"),
        F.col("partitionKey").cast("string").alias("partitionKey")
    )
    .withColumn("ingestion_ts",   F.current_timestamp())
    .withColumn("ingestion_date", F.to_date("ingestion_ts"))
)

# ===================== WRITE STREAM (TRIGGER ONCE) =====================
checkpoint_path = f"{CHECKPOINT_BASE}/{EVENTHUB_NAME}/{TABLE_NAME}"

query = (bronze_df.writeStream
         .format("delta")
         .outputMode("append")
         .trigger(once=True)
         .option("checkpointLocation", checkpoint_path)
         .start(FQN))

query.awaitTermination()

print(f"[OK] Ingestão concluída (trigger once) → {FQN}")
print(f"Checkpoint: {checkpoint_path}")