#Structured Streaming con procesamiento por ventana y gestión de eventos tardíos

##Contexto: 🙌
Structured Streaming simplifica el procesamiento continuo usando APIs de DataFrame. En esta actividad se simulará el conteo de eventos por intervalos de tiempo, gestionando eventos tardíos con watermark.

##Consigna: ✍️
Simula el procesamiento de eventos recibidos por un socket TCP y aplica una ventana temporal de 10 minutos con actualización cada 5 minutos. Usa un watermark de 5 minutos para gestionar eventos atrasados.

##Paso a paso:

1- Inicia un SparkSession con soporte a Structured Streaming.

2- Lee desde un socket TCP con .readStream.format("socket").

3- Convierte el texto en palabras y asígnales una marca de tiempo con current_timestamp().

4- Aplica groupBy(window("timestamp", "10 minutes", "5 minutes")) + count().

5- Agrega .withWatermark("timestamp", "5 minutes").

6- Muestra los resultados en consola con .writeStream.outputMode("update").start().

In [19]:
# Instalar PySpark en Colab si no está
!pip install pyspark

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, window
import time

# Crear SparkSession
spark = SparkSession.builder.appName("StructuredStreamingColab").getOrCreate()

# Fuente de streaming rápida (100 filas por segundo)
streaming_input = spark.readStream.format("rate").option("rowsPerSecond", 100).load()

# Simular categorías con columna word
events = streaming_input.withColumn("word", (col("value") % 5).cast("string"))

# Aplicar watermark corto
events_with_watermark = events.withWatermark("timestamp", "2 seconds")

# Ventana de 5 segundos con slide de 2 segundos
word_counts = events_with_watermark.groupBy(
    window("timestamp", "5 seconds", "2 seconds"), "word"
).count()

# Escribir resultados en memoria
query = word_counts.writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName("tabla_test") \
    .start()

# ⏳ Esperar 20s y hacer primer muestreo
time.sleep(20)
print("Primer muestreo de datos:")
spark.sql("SELECT * FROM tabla_test ORDER BY window.start, word").show(truncate=False)

# Esperar otros 20s y hacer segundo muestreo
time.sleep(20)
print("Segundo muestreo de datos:")
spark.sql("SELECT * FROM tabla_test ORDER BY window.start, word").show(truncate=False)

# Esperar otros 20s y hacer tercer muestreo
time.sleep(20)
print("Tercer muestreo de datos:")
spark.sql("SELECT * FROM tabla_test ORDER BY window.start, word").show(truncate=False)

# Detener el stream
query.stop()


Primer muestreo de datos:
+------+----+-----+
|window|word|count|
+------+----+-----+
+------+----+-----+

Segundo muestreo de datos:
+------+----+-----+
|window|word|count|
+------+----+-----+
+------+----+-----+

Tercer muestreo de datos:
+------------------------------------------+----+-----+
|window                                    |word|count|
+------------------------------------------+----+-----+
|{2025-09-15 22:58:12, 2025-09-15 22:58:17}|0   |36   |
|{2025-09-15 22:58:12, 2025-09-15 22:58:17}|1   |36   |
|{2025-09-15 22:58:12, 2025-09-15 22:58:17}|2   |36   |
|{2025-09-15 22:58:12, 2025-09-15 22:58:17}|3   |35   |
|{2025-09-15 22:58:12, 2025-09-15 22:58:17}|4   |35   |
|{2025-09-15 22:58:14, 2025-09-15 22:58:19}|0   |76   |
|{2025-09-15 22:58:14, 2025-09-15 22:58:19}|1   |76   |
|{2025-09-15 22:58:14, 2025-09-15 22:58:19}|2   |76   |
|{2025-09-15 22:58:14, 2025-09-15 22:58:19}|3   |75   |
|{2025-09-15 22:58:14, 2025-09-15 22:58:19}|4   |75   |
|{2025-09-15 22:58:16, 2025-09-

##Conclusión del experimento de Structured Streaming en Colab:

###Generación y procesamiento de datos:

Se simuló un flujo de eventos usando la fuente rate de Spark, generando 100 filas por segundo.

Cada evento se asignó a una categoría (word = value % 5) y se agrupó en ventanas de 5 segundos con slide de 2 segundos.

Se aplicó un watermark de 2 segundos para gestionar posibles eventos atrasados.

Observación de los muestreos:

Se realizaron 3 muestreos en la tabla de memoria (tabla_test) para ver cómo se acumulaban los eventos en las ventanas.

En el primer muestreo, algunas ventanas aparecieron con pocos o ningún dato porque Spark todavía no había completado suficientes batches.

En los siguientes muestreos, se pudieron observar claramente los conteos por categoría, mostrando cómo Spark agrupa los eventos en ventanas temporales solapadas.

Nota sobre los tiempos:

Se utilizó menos tiempo de espera (20 segundos entre muestreos) que en un escenario real para que el flujo fuera operativo y rápido en Colab.

En un caso productivo, las ventanas podrían ser de varios minutos y los triggers más espaciados, dependiendo del volumen de datos y la frecuencia deseada de actualización.

Aprendizaje principal:

Structured Streaming permite procesar datos en tiempo casi real en Colab, aunque hay que ajustar tiempos de ventana, slide y triggers para poder visualizar resultados de manera efectiva en entornos limitados como notebooks.