# Hands-on - Procesamiento de Streaming para ML con Apache Spark y Kafka

En este hands-on, exploraremos cómo trabajar con flujos de datos en tiempo real utilizando Apache Kafka como broker de mensajes y Apache Spark Structured Streaming para el procesamiento y la inferencia de Machine Learning. Simularemos un escenario donde se generan eventos de datos, se ingieren a través de Kafka, y luego Spark procesa estos datos en tiempo real para realizar inferencias con un modelo de ML pre-entrenado.

## 1. Requisitos Previos

Asegúrate de tener instalados los siguientes componentes en tu sistema:

* **Docker y Docker Compose:** Para levantar el servicio de Kafka de manera sencilla.
    * [Instalar Docker](https://docs.docker.com/get-docker/)
    * [Instalar Docker Compose](https://docs.docker.com/compose/install/)
* **Java Development Kit (JDK) 8 o superior:** Requerido por Apache Spark. **Importante:** Descarga e instala el JDK antes de instalar Spark.
    * [Descargar JDK](https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html) (o versión más reciente)
* **Python 3.8+:** Para ejecutar los scripts de productor y el entorno PySpark.
    * [Descargar Python](https://www.python.org/downloads/)
* **Poetry:** Para gestionar las dependencias del proyecto Python.
    * [Instalar Poetry](https://python-poetry.org/docs/#installation)
* **Apache Spark (Opcional, pero recomendado para el entorno):** Aunque `pyspark` se instalará con Poetry, tener una instalación local de Spark puede ayudar con la configuración de `SPARK_HOME`.
    * [Descargar Spark](https://spark.apache.org/downloads.html) (versión compatible con tu Python y Scala/Java)
    * **Para Windows:** Descarga la versión pre-compilada de Spark y descomprímela en una ruta simple (ej. `C:\spark`). También necesitarás descargar `winutils.exe` (puedes buscar "winutils.exe hadoop version" para encontrar una compatible con tu Spark y Hadoop).
        Coloca `winutils.exe` en `SPARK_HOME/bin` o `HADOOP_HOME/bin` si configuras `HADOOP_HOME`.

### Configuración de Variables de Entorno para Spark

Es crucial que las siguientes variables de entorno estén configuradas correctamente para que PySpark funcione. Si has instalado Spark localmente, asegúrate de que `SPARK_HOME` apunte a tu directorio de instalación de Spark.

#### Para Linux/macOS (añadir a `~/.bashrc` o `~/.zshrc`):

```bash
export JAVA_HOME="/path/to/your/jdk/installation"
export SPARK_HOME="/path/to/your/spark/installation"
export PATH="$PATH:$JAVA_HOME/bin:$SPARK_HOME/bin"
export PYTHONPATH="$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.9-src.zip:$PYTHONPATH"
```

#### Para Windows:

1.  **JAVA_HOME:**
    * Ve a `Panel de Control` -> `Sistema y Seguridad` -> `Sistema` -> `Configuración avanzada del sistema` -> `Variables de entorno`.
    * En `Variables del sistema`, haz clic en `Nueva...`.
    * **Nombre de la variable:** `JAVA_HOME`
    * **Valor de la variable:** `C:\Program Files\Java\jdk-XX.X.X` (reemplaza con tu ruta de instalación de JDK).
2.  **SPARK_HOME:**
    * En `Variables del sistema`, haz clic en `Nueva...`.
    * **Nombre de la variable:** `SPARK_HOME`
    * **Valor de la variable:** `C:\spark` (reemplaza con tu ruta de instalación de Spark).
3.  **Path:**
    * En `Variables del sistema`, busca la variable `Path` y haz clic en `Editar...`.
    * Haz clic en `Nueva` y añade `%JAVA_HOME%\bin`.
    * Haz clic en `Nueva` y añade `%SPARK_HOME%\bin`.
    * **Opcional (para PySpark):** Si usas un entorno Python específico, puedes configurar `PYSPARK_PYTHON`. Por ejemplo, `C:\Users\YourUser\AppData\Local\Programs\Python\Python39\python.exe` (ajusta tu ruta de Python).
        * **Nombre de la variable:** `PYSPARK_PYTHON`
        * **Valor de la variable:** `C:\path\to\your\python.exe`
4.  Haz clic en `Aceptar` en todas las ventanas para guardar los cambios.

Reinicia tu terminal (o Visual Studio Code, etc.) después de configurar estas variables para que los cambios surtan efecto. Si estás usando un entorno virtual de Poetry, PySpark intentará encontrar Spark por sí mismo si `SPARK_HOME` está configurado globalmente.

## 2. Estructura del Proyecto

Vamos a organizar los archivos de la siguiente manera. Crea una carpeta principal para el hands-on, llamada `streaming2`. Dentro de ella, crearemos los archivos.

```
streaming2/
├── docker-compose.yml
├── producer.py
├── spark_consumer.py
├── ml_model/
│   └── simple_model.pkl
├── pyproject.toml
└── poetry.lock
```

## 3. Configuración del Entorno (Kafka con Docker Compose)

Primero, crearemos el archivo `docker-compose.yml` para levantar un servidor Kafka y Zookeeper. Spark Structured Streaming se conectará a este Kafka para consumir los datos.

**Paso 3.1: Crear el archivo `docker-compose.yml`**

Crea un archivo llamado `docker-compose.yml` en la raíz de tu carpeta `streaming2/` y pega el siguiente contenido:

In [None]:
%%writefile streaming2/docker-compose.yml
version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.0.1
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.0.1
    hostname: kafka
    container_name: kafka
    ports:
      - "9092:9092"
      - "9094:9094" # Puerto para conexión externa
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    depends_on:
      - zookeeper

**Paso 3.2: Levantar Kafka**

Abre una terminal (no desde Jupyter, ya que necesitamos un proceso en segundo plano), navega hasta la carpeta `streaming2/` y ejecuta el siguiente comando para levantar los servicios de Kafka y Zookeeper en segundo plano:

```bash
docker compose up -d
```

Puedes verificar que los contenedores estén corriendo con `docker compose ps`.

## 4. Preparación del Modelo de Machine Learning

Para este hands-on, usaremos un modelo muy simple pre-entrenado. Spark puede cargar modelos de diferentes formatos, pero `joblib` es común para modelos de `scikit-learn`.

**Paso 4.1: Crear la carpeta `ml_model`**

Puedes crearla directamente con el siguiente comando en una celda de código (asegúrate de que el directorio `streaming2` exista):


In [None]:
import os

os.makedirs('streaming2/ml_model', exist_ok=True)
print("Carpeta 'streaming2/ml_model' creada.")

**Paso 4.2: Generar y guardar un modelo simple**

Ejecuta la siguiente celda de código para generar el modelo y guardarlo. Asegúrate de tener `scikit-learn` y `joblib` instalados en tu entorno de Jupyter. Si no, puedes instalarlos con `!pip install scikit-learn joblib`.

In [None]:
import joblib
from sklearn.linear_model import LogisticRegression
import numpy as np

# Datos dummy para entrenar un modelo simple
X = np.array([[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6, 7], [7, 8]])
y = np.array([0, 0, 0, 1, 1, 1, 1])

model = LogisticRegression()
model.fit(X, y)

# Guardar el modelo en la ruta esperada por el consumidor de Spark
joblib.dump(model, 'streaming2/ml_model/simple_model.pkl')
print("Modelo simple guardado como streaming2/ml_model/simple_model.pkl")

## 5. Configuración de Dependencias con Poetry

Usaremos Poetry para gestionar las dependencias de Python, incluyendo `pyspark`.

**Paso 5.1: Inicializar Poetry y añadir dependencias**

Desde la terminal, asegúrate de estar en la carpeta `streaming2/` y ejecuta los siguientes comandos:

```bash
cd streaming2/
poetry init --no-interaction
poetry add kafka-python pandas scikit-learn pyspark
```

Esto creará los archivos `pyproject.toml` y `poetry.lock` en tu directorio.

## 6. Implementación del Productor de Datos

El productor generará eventos (simulando datos de sensores o métricas) y los enviará a un tópico de Kafka. Este es el mismo productor que en el ejemplo anterior.

**Paso 6.1: Crear el archivo `producer.py`**

Crea un archivo llamado `producer.py` en la raíz de tu carpeta `streaming2/` y pega el siguiente contenido. Luego, no olvides guardar el archivo.

In [None]:
%%writefile streaming2/producer.py
import time
import json
import random
from kafka import KafkaProducer

def serialize_json(obj):
    return json.dumps(obj).encode('utf-8')

# Configuración del productor Kafka
producer = KafkaProducer(
    bootstrap_servers=['localhost:9094'], # Usar el puerto expuesto por Docker Compose
    value_serializer=serialize_json
)

topic_name = 'sensor_data'

print(f"Enviando datos al tópico: {topic_name}")

try:
    for i in range(100):
        sensor_id = random.randint(1, 5)
        temperature = round(random.uniform(20.0, 30.0), 2)
        humidity = round(random.uniform(40.0, 60.0), 2)
        pressure = round(random.uniform(900.0, 1100.0), 2)

        data = {
            'sensor_id': sensor_id,
            'timestamp': time.time(),
            'temperature': temperature,
            'humidity': humidity,
            'pressure': pressure
        }

        producer.send(topic_name, value=data)
        print(f"Sent: {data}")
        time.sleep(1) # Enviar un evento cada segundo

except KeyboardInterrupt:
    print("Deteniendo productor...")
finally:
    producer.close()
    print("Productor cerrado.")

## 7. Implementación del Consumidor y Procesamiento ML en Tiempo Real con Apache Spark

Este script de PySpark leerá los datos del tópico Kafka, los parseará, aplicará un modelo de ML para la inferencia y mostrará los resultados. Utilizaremos una UDF (User-Defined Function) para cargar el modelo y hacer predicciones.

**Paso 7.1: Crear el archivo `spark_consumer.py`**

Crea un archivo llamado `spark_consumer.py` en la raíz de tu carpeta `streaming2/` y pega el siguiente contenido. No olvides guardar el archivo.

In [None]:
%%writefile streaming2/spark_consumer.py
import json
import joblib
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, udf
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType, ArrayType

# Inicializar SparkSession
# Es importante incluir los paquetes de Kafka para Spark Structured Streaming
spark = SparkSession.builder \
    .appName("SparkKafkaMLStreaming") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0") \ # Ajusta la versión de Spark si es necesario
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

print("SparkSession creada.")

# Definir el esquema de los datos que esperamos de Kafka
schema = StructType([
    StructField("sensor_id", LongType(), True),
    StructField("timestamp", DoubleType(), True),
    StructField("temperature", DoubleType(), True),
    StructField("humidity", DoubleType(), True),
    StructField("pressure", DoubleType(), True)
])

# Función para cargar el modelo (se ejecuta en cada worker/executor)
model = None # Global para que se cargue una vez por proceso
def load_model():
    global model
    if model is None:
        # La ruta del modelo debe ser accesible para los workers de Spark
        # Si se ejecuta en un cluster, este archivo debe ser distribuido a los workers
        model = joblib.load('streaming2/ml_model/simple_model.pkl')
        print("Modelo de ML cargado en el worker.")
    return model

# UDF para realizar la inferencia
def predict_udf(temperature, humidity):
    local_model = load_model()
    features = np.array([[temperature, humidity]])
    prediction = local_model.predict(features)[0]
    prediction_proba = local_model.predict_proba(features)[0].tolist()
    return int(prediction), prediction_proba

# Registrar la UDF. El tipo de retorno es un StructType con la predicción y las probabilidades.
prediction_schema = StructType([
    StructField("prediction", LongType(), True),
    StructField("prediction_proba", ArrayType(DoubleType()), True)
])
predict_spark_udf = udf(predict_udf, prediction_schema)

# Leer datos de Kafka con Structured Streaming
kafka_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9094") \
    .option("subscribe", "sensor_data") \
    .load()

print("Conectado a Kafka para streaming.")

# Parsear el valor JSON y aplicar la UDF
parsed_df = kafka_df.selectExpr("CAST(value AS STRING) as json_data") \
    .select(from_json(col("json_data"), schema).alias("data")) \
    .select("data.*")

# Aplicar la UDF para obtener las predicciones
prediction_df = parsed_df.withColumn(
    "ml_result", 
    predict_spark_udf(col("temperature"), col("humidity"))
).select(
    col("sensor_id"),
    col("timestamp"),
    col("temperature"),
    col("humidity"),
    col("ml_result.prediction").alias("prediction"),
    col("ml_result.prediction_proba").alias("prediction_proba")
)

# Iniciar el query de streaming y mostrar los resultados en la consola
query = prediction_df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .option("truncate", "false") \
    .start()

print("Streaming iniciado. Esperando datos...")

# Esperar hasta que el query termine (por ejemplo, por KeyboardInterrupt)
query.awaitTermination()
print("Streaming detenido.")

## 8. Ejecución del Hands-on

Para ver el flujo de datos en acción, necesitarás abrir dos terminales separadas, ambas navegando a la carpeta `streaming2/`.

**Paso 8.1: Iniciar el Consumidor de Spark (Terminal 1)**

En tu primera terminal, navega a la carpeta `streaming2/` y ejecuta el consumidor de Spark. Asegúrate de que tu entorno Poetry esté activado o usa `poetry run`.

```bash
cd streaming2/
poetry run python spark_consumer.py
```

Spark tardará un momento en inicializarse. Deberías ver los mensajes de Spark y finalmente "Streaming iniciado. Esperando datos..."

**Paso 8.2: Iniciar el Productor (Terminal 2)**

En una **segunda terminal**, también navega a la carpeta `streaming2/` y ejecuta el productor. Este comenzará a enviar datos a Kafka.

```bash
cd streaming2/
poetry run python producer.py
```

Verás cómo el productor envía mensajes en la Terminal 2. Simultáneamente, en la Terminal 1 (donde corre el consumidor de Spark), deberías ver los micro-batches de datos siendo procesados por Spark y las predicciones de ML impresas en la consola.

¡Felicidades! Has implementado un sistema de procesamiento de streaming robusto con Kafka y Apache Spark para realizar inferencia de Machine Learning en tiempo real.

## 9. Limpieza

Una vez que hayas terminado con el hands-on, es importante detener los servicios de Docker para liberar recursos y detener el proceso de Spark Streaming.

**Paso 9.1: Detener los procesos de Python (Productor y Consumidor)**

En ambas terminales donde ejecutaste el `producer.py` y `spark_consumer.py`, presiona `Ctrl+C` para detener los scripts.

**Paso 9.2: Detener los contenedores de Docker**

En la carpeta `streaming2/`, ejecuta:

```bash
docker compose down
```

**Paso 9.3: Limpieza completa (opcional)**

Si deseas eliminar también las imágenes de Docker y los volúmenes para una limpieza más profunda, puedes usar:

```bash
docker compose down --rmi all --volumes
```