In [None]:
pip install pyspark==3.5.2

In [None]:
# ---- Imports básicos de Spark SQL y funciones de streaming ----
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

# ---- Crea/obtiene la SparkSession ----
# Agregamos el paquete del conector Kafka para Spark 3.5.x (Scala 2.12)
spark = (SparkSession.builder
         .config("spark.jars.packages",
                 "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2")
         .appName("orders-streaming")
         .getOrCreate())

# Baja el ruido de logs
spark.sparkContext.setLogLevel("WARN")

# ---- Esquema del JSON que llega en el value de Kafka ----
# Define los campos esperados para poder parsear el mensaje como JSON
schema = StructType([
    StructField("order_id", IntegerType()),
    StructField("customer_id", IntegerType()),
    StructField("amount", DoubleType()),
    StructField("status", StringType()),
    StructField("event_time", StringType()),  # llega como string ISO8601; luego lo casteamos a timestamp
])

# Imprime la versión de Scala activa (útil para diagnosticar mismatches)
print("Scala runtime:", spark.sparkContext._jvm.scala.util.Properties.versionString())

# ---- Fuente de datos: Kafka (lectura streaming) ----
# Lee del tópico "orders" del broker "kafka:9092".
# Nota: por defecto startingOffsets=latest; si quieres leer desde el principio, añade .option("startingOffsets","earliest")
kafka_df = (spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("subscribe", "orders")
    # .option("startingOffsets", "earliest")
    # .option("failOnDataLoss", "false")  # útil en demos si hay rotación de offsets
  .load())

# ---- Parseo del payload JSON ----
# 1) value viene en binario -> CAST a STRING
# 2) Aplica from_json con el schema definido
# 3) Aplana la estructura para tener columnas reales (order_id, customer_id, etc.)
json_df = (kafka_df
    .selectExpr("CAST(value AS STRING) as value")
    .select(from_json(col("value"), schema).alias("data"))
    .select("data.*"))

# ---- Agregación 1: suma de montos por ventana tumbling de 1 minuto ----
# Castea event_time a timestamp y agrupa por ventana de 1 min
agg_by_min = (json_df
    .withColumn("ts", col("event_time").cast("timestamp"))
    .groupBy(window(col("ts"), "1 minute"))
    .sum("amount")
    .withColumnRenamed("sum(amount)", "amount_sum"))

# ---- Agregación 2: conteo por estado y cliente en ventanas de 1 minuto ----
agg_status_client = (json_df
    .withColumn("ts", col("event_time").cast("timestamp"))
    .groupBy(
        window(col("ts"), "1 minute"),
        col("status"),
        col("customer_id")
    )
    .count())

# ---- Sinks: imprime resultados por consola (modo update) ----
# q1 muestra la suma de montos por minuto
q1 = (agg_by_min
    .writeStream
    .outputMode("update")     # solo actualiza filas afectadas por nuevas llegadas
    .format("console")
    .option("truncate", "false")
    .option("numRows", 50)
    .start())

# q2 muestra conteos por estado/cliente por minuto
q2 = (agg_status_client
    .writeStream
    .outputMode("update")
    .format("console")
    .option("truncate", "false")
    .option("numRows", 50)
    .start())

# ---- Mantiene la app viva hasta que la detengas ----
spark.streams.awaitAnyTermination()