# Análisis de Streaming de Datos con Kafka y PySpark
Este notebook implementa un sistema de procesamiento de datos en tiempo real usando Kafka y PySpark Streaming.

In [None]:
# Importa las clases necesarias de PySpark
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, window
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, TimestampType
import logging

In [None]:
# Configura el nivel de log a WARN para reducir los mensajes INFO
spark = SparkSession.builder \
    .appName("KafkaSparkStreaming") \
    .getOrCreate()
spark.sparkContext.setLogLevel("WARN")

In [None]:
# Define la estructura de los datos entrantes
# Especifica tipos de datos para cada campo:
#   sensor_id: Entero
#   temperature: Flotante
#   humidity: Flotante
#   timestamp: Marca temporal
schema = StructType([
    StructField("sensor_id", IntegerType()),
    StructField("temperature", FloatType()),
    StructField("humidity", FloatType()),
    StructField("timestamp", TimestampType())
])

In [None]:
# Configura la lectura de streaming desde Kafka
# Conecta al broker Kafka local
# Suscribe al topic "sensor_data"
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "sensor_data") \
    .load()

In [None]:
# Parsea los mensajes JSON de Kafka
# Convierte a DataFrame estructurado según el esquema definido
parsed_df = df.select(
    from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")

In [None]:
# Agrupa datos en ventanas de 1 minuto
# Calcula promedios de temperatura y humedad por sensor
# Utiliza ventanas deslizantes para análisis en tiempo real
windowed_stats = parsed_df \
    .groupBy(window(col("timestamp"), "1 minute"), "sensor_id") \
    .agg({"temperature": "avg", "humidity": "avg"})

In [None]:
# Configura la salida del streaming
# Modo "complete": muestra todos los resultados agregados
# Escribe resultados en la consola
# Mantiene el streaming activo
query = windowed_stats \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

query.awaitTermination()