# Arquitectura de Streaming en tiempo real. 
#### Autores: Yahya El Baroudi, Samuel Corrionero, Ismael Gonz√°lez y Jairo Farf√°n. 


### 1. Introducci√≥n y Objetivos
El objetivo de esta pr√°ctica es implementar una arquitectura de **Big Data en Streaming** capaz de ingerir, procesar y analizar datos generados en tiempo real.

Hemos simulado un entorno de red social (similar a Twitter/X) para detectar **Trending Topics** (hashtags m√°s populares) cada minuto. Para ello, utilizamos las siguientes tecnolog√≠as:
* **Docker:** Para la virtualizaci√≥n de la infraestructura.
* **Apache Kafka:** Como broker de mensajer√≠a para desacoplar el productor del consumidor.
* **Apache Spark (Structured Streaming):** Para el procesamiento de datos utilizando la API de **Dataframes**, tal y como se especifica en el temario.

### 2. Infraestructura (Docker)
Desplegamos un cl√∫ster de Kafka y Zookeeper utilizando `docker-compose`. Esto garantiza la portabilidad del proyecto y permite levantar los servicios necesarios sin instalaciones complejas en el sistema operativo anfitri√≥n.

In [None]:
import time

print("üèóÔ∏è Levantando infraestructura Docker...")
!docker compose -f docker/docker-compose.yml up -d

print("‚è≥ Esperando 30 segundos a que Kafka arranque...")
time.sleep(30) # Damos tiempo a que "caliente"
print("‚úÖ Infraestructura lista.")

üèóÔ∏è Levantando infraestructura Docker...
[1A[1B[0G[?25l[+] Running 0/1
 [33m‚†ã[0m Network docker_default  Creating                                        [34m0.1s [0m
[?25h[1A[1B[0G[?25l[+] Running 0/1
 [33m‚†ã[0m Network docker_default  Creating                                        [34m0.1s [0m
[?25h[1A[1A[0G[?25l[+] Running 2/3
 [32m‚úî[0m Network docker_default  [32mCreated[0m                                         [34m0.1s [0m
 [32m‚úî[0m Container zookeeper     [32mCreated[0m                                         [34m0.1s [0m
 [33m‚†ã[0m Container kafka         Creating                                        [34m0.0s [0m
[?25h[1A[1A[0G[?25l[+] Running 2/3
 [32m‚úî[0m Network docker_default  [32mCreated[0m                                         [34m0.1s [0m
 [32m‚úî[0m Container zookeeper     [32mCreated[0m                                         [34m0.1s [0m
 [33m‚†ã[0m Container kafka         Creating            

### 3. Ingesta de Datos (El Productor)

Para simular el flujo de datos, utilizamos un script en Python (`producer.py`) que act√∫a como generador de eventos.

* **Funcionamiento:** Genera tweets sint√©ticos en formato JSON con campos aleatorios (usuario, texto, hashtag).
* **Ejecuci√≥n en Notebook:** Dado que el productor se ejecuta en un bucle infinito, utilizamos la librer√≠a `subprocess` para lanzarlo en un hilo en segundo plano (background). Esto permite que el Notebook siga disponible para ejecutar las celdas de Spark sin bloquearse.

In [31]:

import subprocess

print("twitter_simulator: üöÄ Iniciando simulador de tweets en segundo plano...")
# Esto lanza el script sin bloquear la celda
productor_process = subprocess.Popen(["python", "src/productor.py"])
print(f"‚úÖ Productor corriendo (PID: {productor_process.pid})")

twitter_simulator: üöÄ Iniciando simulador de tweets en segundo plano...
‚úÖ Productor corriendo (PID: 35624)


üîÑ Iniciando simulador de Twitter hacia localhost:9092...
üì© Enviando: qu√© complicado es configurar #Streaming
üì© Enviando: qu√© complicado es configurar #Streaming
üì© Enviando: estoy aprendiendo mucho con #IA
üì© Enviando: estoy aprendiendo mucho con #IA
üì© Enviando: repasando conceptos de #Examen
üì© Enviando: repasando conceptos de #Examen


### 4.1. Configuraci√≥n de Librer√≠as y Dependencias

En este primer paso, importamos las funciones necesarias de **PySpark SQL** para trabajar con Dataframes.

Un punto cr√≠tico aqu√≠ es la configuraci√≥n din√°mica del entorno: detectamos la versi√≥n de Spark instalada y forzamos la descarga del paquete `.jar` (`spark-sql-kafka`) necesario para conectar Spark con Kafka, ya que este conector no viene instalado por defecto. Tambi√©n preparamos el directorio donde guardaremos los resultados (`data/`).

In [None]:
import os
import sys
from pathlib import Path
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, current_timestamp, window, count, max as max_
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
import pyspark

# Configuraci√≥n del entorno: Descarga autom√°tica del conector de Kafka
spark_version = pyspark.__version__.split("+")[0]
os.environ['PYSPARK_SUBMIT_ARGS'] = f'--packages org.apache.spark:spark-sql-kafka-0-10_2.12:{spark_version} pyspark-shell'

# Creamos directorio para persistencia de datos
output_dir = Path("data")
output_dir.mkdir(exist_ok=True)

üì© Enviando: ma√±ana tengo examen de #IA
üì° Escuchando... La tabla aparecer√° abajo üëá
üì© Enviando: incre√≠ble la velocidad de #Examen
üì© Enviando: incre√≠ble la velocidad de #Examen
üì© Enviando: ma√±ana tengo examen de #Spark
üì© Enviando: ma√±ana tengo examen de #Spark
üì© Enviando: ma√±ana tengo examen de #IA
üì© Enviando: ma√±ana tengo examen de #IA
üì© Enviando: estoy aprendiendo mucho con #Kafka
üì© Enviando: estoy aprendiendo mucho con #Kafka
üì© Enviando: estoy aprendiendo mucho con #IA
üì© Enviando: estoy aprendiendo mucho con #IA
üì© Enviando: incre√≠ble la velocidad de #Python
üì© Enviando: incre√≠ble la velocidad de #Python
üì© Enviando: estoy aprendiendo mucho con #BigData
üì© Enviando: estoy aprendiendo mucho con #BigData
üì© Enviando: repasando conceptos de #Examen
üì© Enviando: repasando conceptos de #Examen
üì© Enviando: estoy aprendiendo mucho con #Streaming
üì© Enviando: estoy aprendiendo mucho con #Streaming
üì© Enviando: estoy aprendiend

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/home/cofer/anaconda3/envs/arqesp/lib/python3.11/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/cofer/anaconda3/envs/arqesp/lib/python3.11/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/cofer/anaconda3/envs/arqesp/lib/python3.11/socket.py", line 718, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt



üõë Simulador detenido por el usuario.


KeyboardInterrupt: 

### 4.2. Inicializaci√≥n de la SparkSession

La `SparkSession` es el punto de entrada para programar con la API de Dataframes y Datasets.

Configuramos el `master` como `local[*]` para utilizar todos los n√∫cleos de la CPU disponibles en la m√°quina. Adem√°s, ajustamos `shuffle.partitions` a 2 (por defecto son 200). Esto es una optimizaci√≥n crucial para entornos locales con pocos datos, ya que evita crear cientos de tareas vac√≠as innecesarias.

In [None]:
# Iniciar la sesi√≥n de Spark
spark = SparkSession.builder \
    .appName("JupyterStreaming") \
    .master("local[*]") \
    .config("spark.sql.shuffle.partitions", "2") \
    .getOrCreate()

# Reducimos el nivel de log para mantener la salida limpia
spark.sparkContext.setLogLevel("ERROR")

### 4.3. Conexi√≥n y Lectura desde Kafka

Utilizamos `spark.readStream` para establecer una conexi√≥n continua con el broker de Kafka. Esto crea un *Streaming DataFrame*, que representa una tabla de datos infinita que crece constantemente.

* **bootstrap.servers:** Apunta a nuestro contenedor Docker (`localhost:9092`).
* **subscribe:** Escucha el t√≥pico `tweets_topic` donde el Productor est√° escribiendo.
* **startingOffsets:** Configurado en `latest` para leer solo los datos nuevos y no procesar todo el historial desde el principio.

In [None]:
df_raw = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "tweets_topic") \
    .option("startingOffsets", "latest") \
    .load()

### 4.4. Estructuraci√≥n de Datos (Schema Enforcement)

Kafka env√≠a los datos en formato binario (bytes). Para aplicar la l√≥gica de **Spark Dataframes**, debemos transformar estos bytes en una estructura tabular con columnas tipadas.

Definimos un `StructType` que coincide con el JSON generado por nuestro productor (`usuario`, `texto`, `hashtag`, `timestamp`). Usamos la funci√≥n `from_json` para parsear la columna y `select` para aplanar la estructura.

In [None]:
# Definici√≥n estricta del esquema de datos
schema = StructType([
    StructField("usuario", StringType(), True),
    StructField("texto", StringType(), True),
    StructField("hashtag_principal", StringType(), True),
    StructField("timestamp", DoubleType(), True),
])

# Transformaci√≥n: De binario -> JSON -> Columnas
df_parsed = df_raw.selectExpr("CAST(value AS STRING) as json_str") \
    .select(from_json(col("json_str"), schema).alias("data")) \
    .select("data.*") \
    .where(col("hashtag_principal").isNotNull()) \
    .withColumn("ts", current_timestamp())

### 4.5. Agregaci√≥n por Ventanas de Tiempo

Aqu√≠ reside la l√≥gica principal del Streaming. En lugar de contar hashtags desde el inicio de los tiempos, utilizamos operaciones de ventana (`window`).

Agrupamos los datos en intervalos de **60 segundos** bas√°ndonos en la hora de llegada (`ts`). Esto nos permite calcular los *Trending Topics* din√°micamente minuto a minuto. El `watermark` nos ayuda a gestionar datos que llegan con un ligero retraso, descartando aquellos que sean demasiado viejos.

In [None]:
conteo = df_parsed \
    .withWatermark("ts", "1 minutes") \
    .groupBy(window(col("ts"), "60 seconds"), col("hashtag_principal")) \
    .agg(count("*").alias("total")) \
    .orderBy(col("total").desc())

### 4.6. Funci√≥n de Salida Personalizada (ForeachBatch)

Spark Streaming procesa los datos en micro-lotes (*micro-batches*). Definimos una funci√≥n `mostrar_en_jupyter` que se ejecutar√° cada vez que un micro-lote est√© listo.

Esta funci√≥n realiza dos tareas:
1.  **Persistencia:** Guarda el lote procesado como un archivo CSV en el disco para auditor√≠a.
2.  **Visualizaci√≥n:** Convierte los datos a un DataFrame de Pandas para mostrarlos de forma tabular y est√©tica en el Notebook.

In [None]:
def mostrar_en_jupyter(batch_df, batch_id):
    if batch_df.isEmpty():
        return
    
    # Conversi√≥n a Pandas para visualizaci√≥n
    pdf = batch_df.limit(10).toPandas()
    
    print(f"üìä Actualizaci√≥n del Stream - Batch ID: {batch_id}")
    
    # Guardado en disco
    csv_path = output_dir / f"batch_{batch_id}.csv"
    pdf.to_csv(csv_path, index=False)
    print(f"Guardado en {csv_path}")
    
    # Impresi√≥n de tabla
    print(pdf.to_string(index=False))

### 4.7. Ejecuci√≥n del Query

Finalmente, iniciamos el flujo de procesamiento.
* **outputMode("complete"):** Indicamos que queremos ver la tabla completa recalculada en cada actualizaci√≥n (necesario para agregaciones con ordenamiento).
* **trigger:** Spark intentar√° procesar nuevos datos cada 5 segundos.
* **awaitTermination:** Mantiene la celda ejecut√°ndose indefinidamente para escuchar nuevos datos.

In [None]:
print("üì° Escuchando... La tabla aparecer√° abajo üëá")

query = conteo.writeStream \
    .outputMode("complete") \
    .foreachBatch(mostrar_en_jupyter) \
    .trigger(processingTime="5 seconds") \
    .start()

# NOTA: Pulsa el bot√≥n "Stop" (Cuadrado) en el men√∫ superior para detener la ejecuci√≥n.
query.awaitTermination()

### 4.8. Apagamos el productor

In [None]:

# Ejecuta esto SOLO cuando hayas parado la celda anterior
print("üõë Apagando el productor...")
productor_process.kill()


üõë Apagando el productor...


### 5. Persistencia y Auditor√≠a

Adem√°s de la visualizaci√≥n en tiempo real, el sistema persiste los resultados procesados en disco. Cada lote procesado se guarda como un archivo CSV en la carpeta `data/`.

A continuaci√≥n, leemos estos archivos generados para verificar que el hist√≥rico de tendencias se ha almacenado correctamente.

### üìÇ Visualizaci√≥n de batches grabados
Los archivos CSV generados por cada batch se guardan en `data/` con el nombre `batch_<runId>.csv`. La siguiente celda lista esos archivos y los carga con pandas para que puedas inspeccionar los resultados despu√©s de parar el stream.

In [34]:
from pathlib import Path
import pandas as pd

print("üìÇ Batches guardados:")
for path in sorted(Path("data").glob("batch_*.csv")):
    print(f"- {path.name}")
    display(pd.read_csv(path))

üìÇ Batches guardados:
- batch_1.csv


Unnamed: 0,window,hashtag_principal,total
0,"Row(start=Timestamp('2025-12-15 18:39:00'), en...",#Spark,1


- batch_10.csv


Unnamed: 0,window,hashtag_principal,total
0,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#Examen,11
1,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#IA,10
2,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#Spark,10
3,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#BigData,9
4,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#Streaming,8
5,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#RealTime,7
6,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#Kafka,7
7,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#Python,6
8,"Row(start=Timestamp('2025-12-15 18:39:00'), en...",#Kafka,3
9,"Row(start=Timestamp('2025-12-15 18:39:00'), en...",#IA,3


- batch_11.csv


Unnamed: 0,window,hashtag_principal,total
0,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Spark,15
1,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Examen,15
2,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#BigData,14
3,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#RealTime,12
4,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Python,12
5,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Kafka,11
6,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#IA,6
7,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Streaming,4
8,"Row(start=Timestamp('2025-12-15 18:34:00'), en...",#Spark,3
9,"Row(start=Timestamp('2025-12-15 18:34:00'), en...",#RealTime,2


- batch_12.csv


Unnamed: 0,window,hashtag_principal,total
0,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Spark,17
1,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#BigData,16
2,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Examen,16
3,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Kafka,13
4,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#RealTime,13
5,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Python,12
6,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#IA,7
7,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Streaming,4
8,"Row(start=Timestamp('2025-12-15 18:34:00'), en...",#Spark,3
9,"Row(start=Timestamp('2025-12-15 18:34:00'), en...",#RealTime,2


- batch_13.csv


Unnamed: 0,window,hashtag_principal,total
0,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Examen,19
1,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Spark,18
2,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#BigData,18
3,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Kafka,14
4,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#RealTime,14
5,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Python,12
6,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#IA,7
7,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Streaming,6
8,"Row(start=Timestamp('2025-12-15 18:34:00'), en...",#Spark,3
9,"Row(start=Timestamp('2025-12-15 18:34:00'), en...",#RealTime,2


- batch_14.csv


Unnamed: 0,window,hashtag_principal,total
0,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Examen,23
1,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Spark,19
2,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#BigData,18
3,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Kafka,15
4,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#RealTime,15
5,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Python,13
6,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#IA,8
7,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Streaming,7
8,"Row(start=Timestamp('2025-12-15 18:34:00'), en...",#Spark,3
9,"Row(start=Timestamp('2025-12-15 18:34:00'), en...",#RealTime,2


- batch_15.csv


Unnamed: 0,window,hashtag_principal,total
0,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Examen,23
1,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Spark,19
2,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#BigData,18
3,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Kafka,15
4,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#RealTime,15
5,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Python,13
6,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#IA,8
7,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Streaming,7
8,"Row(start=Timestamp('2025-12-15 18:36:00'), en...",#Python,3
9,"Row(start=Timestamp('2025-12-15 18:34:00'), en...",#Spark,3


- batch_16.csv


Unnamed: 0,window,hashtag_principal,total
0,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Examen,23
1,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Spark,19
2,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#BigData,18
3,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Kafka,15
4,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#RealTime,15
5,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Python,13
6,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#IA,8
7,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Streaming,7
8,"Row(start=Timestamp('2025-12-15 18:36:00'), en...",#Spark,5
9,"Row(start=Timestamp('2025-12-15 18:36:00'), en...",#Python,4


- batch_17.csv


Unnamed: 0,window,hashtag_principal,total
0,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Examen,23
1,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Spark,19
2,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#BigData,18
3,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Kafka,15
4,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#RealTime,15
5,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Python,13
6,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#IA,8
7,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Streaming,7
8,"Row(start=Timestamp('2025-12-15 18:36:00'), en...",#Spark,6
9,"Row(start=Timestamp('2025-12-15 18:36:00'), en...",#Python,5


- batch_18.csv


Unnamed: 0,window,hashtag_principal,total
0,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Examen,23
1,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Spark,19
2,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#BigData,18
3,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Kafka,15
4,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#RealTime,15
5,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Python,13
6,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#IA,8
7,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Streaming,7
8,"Row(start=Timestamp('2025-12-15 18:36:00'), en...",#Spark,7
9,"Row(start=Timestamp('2025-12-15 18:36:00'), en...",#IA,6


- batch_19.csv


Unnamed: 0,window,hashtag_principal,total
0,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Examen,23
1,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Spark,19
2,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#BigData,18
3,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Kafka,15
4,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#RealTime,15
5,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Python,13
6,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#IA,8
7,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Streaming,7
8,"Row(start=Timestamp('2025-12-15 18:36:00'), en...",#Spark,7
9,"Row(start=Timestamp('2025-12-15 18:36:00'), en...",#IA,7


- batch_2.csv


Unnamed: 0,window,hashtag_principal,total
0,"Row(start=Timestamp('2025-12-15 18:39:00'), en...",#Kafka,3
1,"Row(start=Timestamp('2025-12-15 18:39:00'), en...",#IA,3
2,"Row(start=Timestamp('2025-12-15 18:39:00'), en...",#Spark,2
3,"Row(start=Timestamp('2025-12-15 18:39:00'), en...",#BigData,1
4,"Row(start=Timestamp('2025-12-15 18:39:00'), en...",#Examen,1


- batch_20.csv


Unnamed: 0,window,hashtag_principal,total
0,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Examen,23
1,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Spark,19
2,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#BigData,18
3,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Kafka,15
4,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#RealTime,15
5,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Python,13
6,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#IA,8
7,"Row(start=Timestamp('2025-12-15 18:36:00'), en...",#Spark,8
8,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Streaming,7
9,"Row(start=Timestamp('2025-12-15 18:36:00'), en...",#IA,7


- batch_21.csv


Unnamed: 0,window,hashtag_principal,total
0,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Examen,23
1,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Spark,19
2,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#BigData,18
3,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Kafka,15
4,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#RealTime,15
5,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#Python,13
6,"Row(start=Timestamp('2025-12-15 18:36:00'), en...",#Spark,10
7,"Row(start=Timestamp('2025-12-15 18:35:00'), en...",#IA,8
8,"Row(start=Timestamp('2025-12-15 18:36:00'), en...",#IA,8
9,"Row(start=Timestamp('2025-12-15 18:36:00'), en...",#Python,7


- batch_3.csv


Unnamed: 0,window,hashtag_principal,total
0,"Row(start=Timestamp('2025-12-15 18:39:00'), en...",#Kafka,3
1,"Row(start=Timestamp('2025-12-15 18:39:00'), en...",#IA,3
2,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#Streaming,2
3,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#RealTime,2
4,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#Examen,2
5,"Row(start=Timestamp('2025-12-15 18:39:00'), en...",#Spark,2
6,"Row(start=Timestamp('2025-12-15 18:39:00'), en...",#BigData,1
7,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#BigData,1
8,"Row(start=Timestamp('2025-12-15 18:39:00'), en...",#Examen,1
9,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#IA,1


- batch_4.csv


Unnamed: 0,window,hashtag_principal,total
0,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#Examen,4
1,"Row(start=Timestamp('2025-12-15 18:39:00'), en...",#Kafka,3
2,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#RealTime,3
3,"Row(start=Timestamp('2025-12-15 18:39:00'), en...",#IA,3
4,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#IA,3
5,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#Python,3
6,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#Streaming,2
7,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#Kafka,2
8,"Row(start=Timestamp('2025-12-15 18:39:00'), en...",#Spark,2
9,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#Spark,2


- batch_5.csv


Unnamed: 0,window,hashtag_principal,total
0,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#Examen,7
1,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#RealTime,4
2,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#Streaming,4
3,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#IA,4
4,"Row(start=Timestamp('2025-12-15 18:39:00'), en...",#Kafka,3
5,"Row(start=Timestamp('2025-12-15 18:39:00'), en...",#IA,3
6,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#Kafka,3
7,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#Spark,3
8,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#Python,3
9,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#BigData,2


- batch_6.csv


Unnamed: 0,window,hashtag_principal,total
0,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#Examen,9
1,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#BigData,6
2,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#Kafka,5
3,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#IA,5
4,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#RealTime,4
5,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#Streaming,4
6,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#Spark,4
7,"Row(start=Timestamp('2025-12-15 18:39:00'), en...",#Kafka,3
8,"Row(start=Timestamp('2025-12-15 18:39:00'), en...",#IA,3
9,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#Python,3


- batch_7.csv


Unnamed: 0,window,hashtag_principal,total
0,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#Examen,10
1,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#BigData,7
2,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#IA,7
3,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#Streaming,6
4,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#Kafka,6
5,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#RealTime,5
6,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#Spark,5
7,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#Python,4
8,"Row(start=Timestamp('2025-12-15 18:39:00'), en...",#Kafka,3
9,"Row(start=Timestamp('2025-12-15 18:39:00'), en...",#IA,3


- batch_8.csv


Unnamed: 0,window,hashtag_principal,total
0,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#Examen,11
1,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#BigData,8
2,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#Streaming,8
3,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#IA,8
4,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#Kafka,6
5,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#Spark,6
6,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#Python,6
7,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#RealTime,5
8,"Row(start=Timestamp('2025-12-15 18:39:00'), en...",#Kafka,3
9,"Row(start=Timestamp('2025-12-15 18:39:00'), en...",#IA,3


- batch_9.csv


Unnamed: 0,window,hashtag_principal,total
0,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#Examen,11
1,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#IA,9
2,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#BigData,8
3,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#Streaming,8
4,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#RealTime,7
5,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#Kafka,7
6,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#Spark,7
7,"Row(start=Timestamp('2025-12-15 18:40:00'), en...",#Python,6
8,"Row(start=Timestamp('2025-12-15 18:39:00'), en...",#Kafka,3
9,"Row(start=Timestamp('2025-12-15 18:39:00'), en...",#IA,3


### 5. Desconexi√≥n de infraestructura. 

In [35]:

print("üõë Apagando infraestructura...")
!docker compose -f docker/docker-compose.yml down

üõë Apagando infraestructura...
[1A[1B[0G[?25l[+] Running 0/1
 [33m‚†ã[0m Container kafka  Stopping                                               [34m0.1s [0m
[?25h[1A[1B[0G[?25l[+] Running 0/1
 [33m‚†ã[0m Container kafka  Stopping                                               [34m0.1s [0m
[?25h[1A[1A[0G[?25l[+] Running 0/1
 [33m‚†ô[0m Container kafka  Stopping                                               [34m0.2s [0m
[?25h[1A[1A[0G[?25l[+] Running 0/1
 [33m‚†ô[0m Container kafka  Stopping                                               [34m0.2s [0m
[?25h[1A[1A[0G[?25l[+] Running 0/1
 [33m‚†π[0m Container kafka  Stopping                                               [34m0.3s [0m
[?25h[1A[1A[0G[?25l[+] Running 0/1
 [33m‚†π[0m Container kafka  Stopping                                               [34m0.3s [0m
[?25h[1A[1A[0G[?25l[+] Running 0/1
 [33m‚†∏[0m Container kafka  Stopping                                               

25/12/15 18:44:03 ERROR MicroBatchExecution: Query [id = d0068d75-9763-4318-8c6a-f65a68286287, runId = 4f546360-88a1-4ca3-87d6-46eaefb32fd7] terminated with error
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: describeTopics
	at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
	at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
	at org.apache.spark.sql.kafka010.ConsumerStrategy.retrieveAllPartitions(ConsumerStrategy.scala:66)
	at org.apache.spark.sql.kafka010.ConsumerStrategy.retrieveAllPartitions$(ConsumerStrategy.scala:65)
	at org.apache.spark.sql.kafka010.SubscribeStrategy.retrieveAllPartitions(ConsumerStrategy.scala:102)
	at org.apache.spark.sql.kafka010.SubscribeStrategy.assignedTopicPartitions(ConsumerStrategy.scala:113)
	at org.apache.sp