In [1]:
# [1]: Importar librerías necesarias
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, col
from pyspark.sql.types import StringType
from IPython.display import clear_output
import time

# [2]: Crear sesión de Spark
spark = SparkSession.builder \
    .appName("KafkaWordCount") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
    .getOrCreate()

# [3]: Leer stream desde Kafka
kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "actividad-topic") \
    .load()

In [2]:
# [4]: Convertir el valor binario de Kafka a STRING (la frase completa)
frases_df = kafka_df.selectExpr("CAST(value AS STRING)")

# [5]: Escribir el stream a memoria (creamos una tabla temporal llamada "tabla_frases")
query = frases_df.writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("tabla_frases") \
    .start()

In [None]:
# [6]: Bucle para visualizar las frases en tiempo real
try:
    while True:
        clear_output(wait=True)
        print("--- Frases recibidas desde Kafka ---")
        
        # Consultamos la tabla temporal en memoria
        spark.sql("SELECT * FROM tabla_frases").show(truncate=False)
        
        time.sleep(5)
except KeyboardInterrupt:
    print("Deteniendo el consumo de frases...")
    query.stop()

--- Frases recibidas desde Kafka ---
+---------------------------------------------------------------------+
|value                                                                |
+---------------------------------------------------------------------+
|Kafka funciona como la tubería que transporta y guarda los mensajes. |
|Kafka funciona como la tubería que transporta y guarda los mensajes. |
|Docker Compose enciende todo tu sistema de datos con un solo comando.|
|Kafka funciona como la tubería que transporta y guarda los mensajes. |
|Docker Compose enciende todo tu sistema de datos con un solo comando.|
|Kafka funciona como la tubería que transporta y guarda los mensajes. |
|Kafka funciona como la tubería que transporta y guarda los mensajes. |
|Spark es el motor que procesa y analiza esos datos rápidamente.      |
|Kafka funciona como la tubería que transporta y guarda los mensajes. |
|Kafka funciona como la tubería que transporta y guarda los mensajes. |
|Kafka envía los datos y Sp