# Apache Kafka y Spark Streaming: Fundamentos y Práctica

Este cuaderno proporciona una explicación detallada de Apache Kafka, su arquitectura, su integración con Apache Spark Streaming y un ejemplo práctico simulado.


## 1. ¿Qué es Apache Kafka?

Apache Kafka es una plataforma distribuida de transmisión de eventos (streaming) diseñada para manejar flujos de datos en tiempo real. Es escalable, tolerante a fallos y de alto rendimiento.

### Conceptos Clave:

* **Evento (Mensaje):** La unidad básica de datos en Kafka. Es un registro de algo que sucedió (ej. "usuario X hizo clic en botón Y").
* **Productor (Producer):** Aplicación que publica (escribe) eventos en Kafka.
* **Consumidor (Consumer):** Aplicación que se suscribe a (lee) y procesa eventos de Kafka.
* **Tópico (Topic):** Categoría donde se organizan y almacenan los eventos. Similar a una carpeta en un sistema de archivos o una tabla en una base de datos.
* **Partición (Partition):** Los tópicos se dividen en particiones para permitir el paralelismo. Cada partición es una secuencia ordenada e inmutable de registros.
* **Broker:** Servidor que almacena los datos. Un clúster de Kafka está formado por múltiples brokers.
* **Offset:** Identificador único secuencial asignado a cada mensaje dentro de una partición.

## 2. Arquitectura de Kafka

La arquitectura de Kafka se basa en un registro de confirmación (commit log) distribuido. Los productores añaden mensajes al final del log, y los consumidores leen desde una posición específica (offset).

### Flujo de Datos:

1.  **Productor** envía un mensaje a un **Tópico**.
2.  El mensaje se almacena en una **Partición** de un **Broker**.
3.  Un **Consumidor** (o grupo de consumidores) lee el mensaje del Tópico.
4.  El Consumidor procesa el mensaje y actualiza su **Offset** para saber qué ha leído.

## 3. Rol de Kafka en esta Aplicación MLOps

En este proyecto, Kafka actúa como el sistema nervioso central para la ingesta de datos en tiempo real.

### Escenario:

1.  **Simulación de Datos (Productor):** Generaremos datos sintéticos de viajes de taxi (similar al dataset de Nueva York) que simulan eventos en tiempo real.
2.  **Ingesta (Kafka):** Estos datos se envían a un tópico de Kafka llamado `taxi_trips`.
3.  **Procesamiento (Spark Streaming):** Spark lee estos datos desde Kafka, realiza transformaciones y extracciones de características (feature engineering) al vuelo.
4.  **Almacenamiento/ML (MLflow/Postgres):** Los datos procesados pueden usarse para re-entrenar modelos o para inferencia en tiempo real.

## 4. Configuración del Entorno

Primero, asegurémonos de tener las librerías necesarias instaladas.

In [None]:
!pip install kafka-python pyspark

## 5. Creación del Productor de Kafka (Simulador)

Vamos a crear un productor que envíe datos JSON simulados de viajes de taxi a un tópico de Kafka.

In [None]:
from kafka import KafkaProducer
import json
import time
import random
from datetime import datetime

# Configuración de Kafka
# Usamos el puerto interno definido en KAFKA_ADVERTISED_LISTENERS (PLAINTEXT)
KAFKA_BROKER = 'kafka:29092' 
TOPIC_NAME = 'taxi_trips'

# Inicializar el Productor
try:
    producer = KafkaProducer(
        bootstrap_servers=KAFKA_BROKER,
        value_serializer=lambda v: json.dumps(v).encode('utf-8'),
        request_timeout_ms=5000  # 5 segundos de tiempo de espera
    )
    print(f"Productor Kafka inicializado correctamente conectando a {KAFKA_BROKER}.")
except Exception as e:
    print(f"Error al conectar con Kafka: {e}")


### Función para Generar Datos de Taxi Simulados

In [None]:
def generate_trip_data():
    """Genera un diccionario con datos simulados de un viaje de taxi."""
    return {
        'vendor_id': random.choice([1, 2]),
        'pickup_datetime': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        'passenger_count': random.randint(1, 6),
        'trip_distance': round(random.uniform(0.5, 20.0), 2),
        'pickup_longitude': round(random.uniform(-74.0, -73.9), 6),
        'pickup_latitude': round(random.uniform(40.7, 40.8), 6),
        'dropoff_longitude': round(random.uniform(-74.0, -73.9), 6),
        'dropoff_latitude': round(random.uniform(40.7, 40.8), 6),
        'payment_type': random.choice([1, 2]), # 1=Credit Card, 2=Cash
        'fare_amount': round(random.uniform(5.0, 50.0), 2),
        'tip_amount': round(random.uniform(0.0, 10.0), 2)
    }

### Enviar Datos al Tópico (Ejecutar esto en segundo plano o detener manualmente)

In [None]:
# Enviar 10 mensajes como prueba
print(f"Enviando mensajes al tópico '{TOPIC_NAME}'...")
for i in range(10):
    data = generate_trip_data()
    producer.send(TOPIC_NAME, data)
    print(f"Enviado: {data}")
    time.sleep(1) # Simular llegada en tiempo real

producer.flush()
print("Envío completado.")

## 6. Configuración de Spark Streaming

Ahora configuraremos Spark para leer estos datos desde Kafka. Spark Structured Streaming trata los datos de Kafka como un DataFrame infinito.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

# Iniciar Spark Session con soporte para Kafka
# Nota: En un entorno real, se deben incluir los paquetes de spark-sql-kafka
# spark = SparkSession.builder \
#     .appName("KafkaSparkStreaming") \
#     .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
#     .getOrCreate()

# Para este ejemplo simulado en el entorno actual, usaremos una sesión básica
spark = SparkSession.builder \
    .appName("KafkaSparkStreamingBasic") \
    .master("local[*]") \
    .getOrCreate()

print(f"Spark Version: {spark.version}")

## 7. Definición del Esquema de Datos

Kafka envía datos como bytes. Necesitamos decirle a Spark cómo interpretar el JSON que estamos enviando.

In [None]:
schema = StructType([
    StructField("vendor_id", IntegerType()),
    StructField("pickup_datetime", StringType()),
    StructField("passenger_count", IntegerType()),
    StructField("trip_distance", DoubleType()),
    StructField("pickup_longitude", DoubleType()),
    StructField("pickup_latitude", DoubleType()),
    StructField("dropoff_longitude", DoubleType()),
    StructField("dropoff_latitude", DoubleType()),
    StructField("payment_type", IntegerType()),
    StructField("fare_amount", DoubleType()),
    StructField("tip_amount", DoubleType())
])

## 8. Lectura del Stream (Simulación)

El siguiente código muestra cómo se configuraría la lectura del stream desde Kafka. 

**Nota:** Como la ejecución de `readStream` es bloqueante y requiere el paquete de Kafka (que se descarga al inicio), aquí mostraremos la lógica. En tu entorno Docker con las dependencias correctas, esto funcionará.

In [None]:
# SIMULACIÓN: Como no tenemos los JARs de Kafka-Spark cargados en esta sesión básica,
# usaremos el formato 'rate' para simular un stream y poder ejecutar el resto del código.
# En producción, descomenta las líneas de Kafka.

# df_raw = spark.readStream \
#     .format("kafka") \
#     .option("kafka.bootstrap.servers", KAFKA_BROKER) \
#     .option("subscribe", TOPIC_NAME) \
#     .option("startingOffsets", "earliest") \
#     .load()

# Simulación con 'rate' (genera filas con timestamp y value)
df_raw = spark.readStream \
    .format("rate") \
    .option("rowsPerSecond", 1) \
    .load()

# Para la simulación, crearemos columnas dummy para que coincidan con el esquema esperado
from pyspark.sql.functions import lit

df_parsed = df_raw.withColumn("fare_amount", (col("value") % 50).cast("double")) \
                  .withColumn("tip_amount", (col("value") % 10).cast("double")) \
                  .withColumn("vendor_id", lit(1)) \
                  .select("fare_amount", "tip_amount", "vendor_id")

print("Stream de lectura inicializado (Simulación).")

## 9. Transformaciones en Tiempo Real

Una vez que tenemos el DataFrame de streaming, podemos aplicar transformaciones como si fuera un DataFrame estático.

In [None]:
# Ejemplo de transformaciones sobre el stream
def process_stream(df_stream):
    # 1. Calcular el costo total (tarifa + propina)
    df_out = df_stream.withColumn("total_cost", col("fare_amount") + col("tip_amount"))
    
    # 2. Filtrar viajes con costo negativo o cero (datos sucios)
    df_out = df_out.filter(col("total_cost") > 0)
    
    return df_out

# APLICAMOS LA TRANSFORMACIÓN
# Esto crea el DataFrame 'df_processed' que faltaba antes
df_processed = process_stream(df_parsed)

print("Transformaciones aplicadas. DataFrame 'df_processed' listo.")

## 10. Salida del Stream (Sink)

Finalmente, debemos decidir dónde enviar los resultados. Para depuración, usamos `console` o `memory`. En producción, podría ser otra base de datos, un archivo Parquet o incluso otro tópico de Kafka.

In [None]:
# Escribir el stream a la consola (para ver los resultados)
# Usamos 'append' porque no hay agregaciones
try:
    query = df_processed.writeStream \
        .outputMode("append") \
        .format("console") \
        .start()

    # Esperar unos segundos para ver la salida y luego detener
    time.sleep(10)
    query.stop()
    print("Streaming finalizado.")
except Exception as e:
    print(f"Error en streaming: {e}")

## 11. Consumidor de Prueba con Python (kafka-python)

Para verificar que los datos se están escribiendo correctamente en Kafka sin depender de Spark Streaming por ahora, usaremos un consumidor simple de Python.

In [None]:
from kafka import KafkaConsumer

# Inicializar Consumidor
try:
    consumer = KafkaConsumer(
        TOPIC_NAME,
        bootstrap_servers=KAFKA_BROKER,
        auto_offset_reset='earliest', # Leer desde el principio si no hay offset guardado
        enable_auto_commit=True,
        value_deserializer=lambda x: json.loads(x.decode('utf-8')),
        consumer_timeout_ms=5000 # Dejar de escuchar después de 5 segundos de inactividad
    )
    print("Consumidor Kafka inicializado.")
except Exception as e:
    print(f"Error al conectar Consumidor: {e}")

### Leer Mensajes

In [None]:
print(f"Leyendo mensajes del tópico '{TOPIC_NAME}'...")
msg_count = 0
for message in consumer:
    print(f"Recibido: {message.value}")
    msg_count += 1
    if msg_count >= 5: # Leer solo 5 mensajes para el ejemplo
        break
print("Lectura finalizada.")

## 12. Concepto Avanzado: Window Operations en Spark Streaming

Una de las capacidades más potentes de Spark Streaming es realizar agregaciones sobre ventanas de tiempo (ej. "promedio de propina en los últimos 10 minutos").

In [None]:
from pyspark.sql.functions import window

# Ejemplo conceptual de agregación por ventana
def aggregate_by_window(df_stream):
    # Asegurarse de que hay una columna de timestamp
    # df_stream = df_stream.withColumn("timestamp", ...)
    
    # Agrupar por ventanas de 10 minutos, deslizándose cada 5 minutos
    # windowed_counts = df_stream.groupBy(
    #     window(col("timestamp"), "10 minutes", "5 minutes"),
    #     col("vendor_id")
    # ).count()
    
    # return windowed_counts
    pass

## 13. Integración con MLflow (Conceptual)

¿Cómo encaja MLflow aquí? 

1.  **Entrenamiento:** Spark lee un lote histórico de Kafka (o data lake), entrena un modelo y lo guarda en MLflow.
2.  **Inferencia:** Spark Streaming carga el modelo desde MLflow (`model = mlflow.spark.load_model(...)`) y lo aplica a cada micro-lote de datos que llega de Kafka para hacer predicciones en tiempo real.

In [None]:
import mlflow.spark

# Cargar un modelo previamente entrenado
# model_uri = "models:/TaxiFarePrediction/Production"
# model = mlflow.spark.load_model(model_uri)

# Aplicar predicción al stream
# prediction = model.transform(df_stream)

## 14. Monitoreo y Checkpointing

En aplicaciones de streaming, es crucial guardar el estado (checkpointing) para recuperarse de fallos.

In [None]:
# Configuración de Checkpoint
CHECKPOINT_DIR = "/tmp/spark_checkpoints/taxi_app"

# query = df_processed.writeStream \
#     .option("checkpointLocation", CHECKPOINT_DIR) \
#     ...
#     .start()

## 15. Resumen del Flujo de Datos

1.  **Fuente:** Script Python (Simulador) -> Kafka Topic `taxi_trips`.
2.  **Canal:** Kafka Broker gestiona la cola de mensajes.
3.  **Procesador:** Spark Streaming lee, limpia y agrega datos.
4.  **Inteligencia:** Modelo de MLflow aplica predicciones.
5.  **Destino:** Resultados se muestran en consola o se guardan en base de datos.

## 16. Limpieza de Recursos

Es buena práctica cerrar las conexiones al finalizar.

In [None]:
try:
    producer.close()
    consumer.close()
    print("Conexiones a Kafka cerradas.")
except:
    pass

spark.stop()
print("Spark Session detenida.")

## 17. Anexo: Comandos Útiles de Kafka (CLI)

Si necesitas depurar Kafka desde la terminal del contenedor:

* **Listar tópicos:** `kafka-topics --list --bootstrap-server localhost:9092`
* **Crear tópico:** `kafka-topics --create --topic test --bootstrap-server localhost:9092`
* **Consumir consola:** `kafka-console-consumer --topic test --from-beginning --bootstrap-server localhost:9092`

## 18. Próximos Pasos

1.  Implementar el DAG de Airflow para orquestar el re-entrenamiento del modelo.
2.  Conectar Spark Streaming a una base de datos real (Postgres) para persistir resultados.
3.  Visualizar los datos en tiempo real con una herramienta de dashboarding.

--- Fin del Cuaderno ---