In [None]:
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.avro.functions import from_avro
from pyspark.sql.types import BinaryType
import json, urllib.request

spark = (
    SparkSession.builder
    .remote("sc://localhost:15002")
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.local.type", "hadoop")
    .config("spark.sql.catalog.local.warehouse", "/opt/warehouse")
    .getOrCreate()
)

# Ejecutores (contenedores) -> Kafka vía host.docker.internal
KAFKA_BOOTSTRAP = "host.docker.internal:9094"

# Cliente (tu notebook) -> Schema Registry en el host
SCHEMA_REGISTRY = "http://localhost:8081"

TOPIC = "erp_avro.erp.orders"
SCHEMA_SUBJECT = f"{TOPIC}-value"

In [None]:
# 1) Leer crudo de Kafka (ya te funciona)
df_raw = (
    spark.read.format("kafka")
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP)
    .option("subscribe", TOPIC)
    .option("startingOffsets", "earliest")
    .option("endingOffsets", "latest")
    .option("includeHeaders", "true")  # opcional
    .load()
)

# 2) Traer schema Avro del Registry (desde el host)
def fetch_latest_schema(registry_url, subject):
    with urllib.request.urlopen(f"{registry_url.rstrip('/')}/subjects/{subject}/versions/latest") as resp:
        meta = json.load(resp)
        schema_str = meta["schema"]
        # normaliza por si viene doblemente escapado
        try:
            json.loads(schema_str)
            return schema_str
        except json.JSONDecodeError:
            return json.dumps(json.loads(schema_str))

value_schema = fetch_latest_schema(SCHEMA_REGISTRY, SCHEMA_SUBJECT)

# 3) Decodificar Avro con framing Confluent (quita 5 bytes)
drop_header_udf = F.udf(lambda b: (b[5:] if b and len(b) >= 5 else b), BinaryType())

df_decoded = (
    df_raw
    .select(drop_header_udf(F.col("value")).alias("payload"))
    .select(from_avro(F.col("payload"), value_schema).alias("data"))
    .select("data.*")
)

df_decoded.printSchema()
df_decoded.show(10, truncate=False)

In [None]:
from pyspark.sql import functions as F

df_events = (
    df_decoded
    .withColumn("op", F.col("op"))
    .withColumn("is_snapshot", (F.col("op") == F.lit("r")))
    .selectExpr(
        "op",
        "is_snapshot",
        "source.ts_ms as source_ts_ms",
        "source.db as src_db",
        "source.table as src_table",
        "transaction.id as tx_id",
        # Elegimos los campos desde after/before según op
        "CASE WHEN op in ('c','r','u') THEN after.order_id ELSE before.order_id END       as order_id",
        "CASE WHEN op in ('c','r','u') THEN after.customer_id ELSE before.customer_id END as customer_id",
        "CASE WHEN op in ('c','r','u') THEN after.order_ts ELSE before.order_ts END       as order_ts",
        "CASE WHEN op in ('c','r','u') THEN after.status ELSE before.status END           as status",
        "CASE WHEN op in ('c','r','u') THEN after.amount ELSE before.amount END           as amount",
        "CASE WHEN op in ('c','r','u') THEN after.currency ELSE before.currency END       as currency",
        "CASE WHEN op in ('c','r','u') THEN after.created_at ELSE before.created_at END   as created_at_str",
        "CASE WHEN op in ('c','r','u') THEN after.updated_at ELSE before.updated_at END   as updated_at_str"
    )
    # Si quieres, convierte created_at/updated_at a timestamp
    .withColumn("created_at", F.to_timestamp("created_at_str"))
    .withColumn("updated_at", F.to_timestamp("updated_at_str"))
    .drop("created_at_str", "updated_at_str")
)

df_events.show(truncate=False)
