Script en celda

En dos terminales ejecutar:

```bash
docker exec -it kafka bash
```
y luego en uno esto:

```bash
 ./opt/kafka/bin/kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic mydata_prediction_response
```

y en otro:
```bash
/opt/kafka/bin/kafka-console-producer.sh   --broker-list kafka:9092   --topic mydata_prediction_request   --property "parse.key=false"   --property "key.separator=:"   --property "value.serializer=org.apache.kafka.common.serialization.StringSerializer"
```
y de ejemplo en el producer:
```bash
{"order_id": 1, "customer_id": "cust_123", "restaurant_id": "rest_456", "order_date_and_time": "2025-08-06T13:00:00", "delivery_date_and_time": "2025-08-06T13:45:00", "order_value": 1000, "delivery_fee": 100, "payment_method": "credit_card", "discounts_and_offers": "10% off", "commission_fee": 100, "payment_processing_fee": 20, "refunds/chargebacks": 0}
```

Pudes abrir mongo express y ver el resultado http://localhost:8081/db/agile_data_science/mydata_prediction_response

Para ejecutar el form:
```bash
docker exec -it agile bash
``` 
y dentro de agile:
```bash
python /home/jovyan/Food_delivery/Deploying_Predictive_Systems/web/MY_flask_api.py
``` 

Para ejecutar el notebook desde fuera:
```bash
papermill "/home/jovyan/Food_delivery/Deploying_Predictive_Systems/Make_Predictions.ipynb" "/home/jovyan/Food_delivery/Deploying_Predictive_Systems/MY_Make predictions_out.ipynb"

docker exec -it -u jovyan agile bash -lc '
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
export SPARK_HOME=/usr/local/spark
export PATH="$SPARK_HOME/bin:$PATH"
export PYTHONPATH="$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.9.2-src.zip:$PYTHONPATH"

papermill "/home/jovyan/Food_delivery/Deploying_Predictive_Systems/Make_Predictions.ipynb" "/home/jovyan/Food_delivery/Deploying_Predictive_Systems/Make_Predictions_out.ipynb"
'
``` 


In [None]:
# -*- coding: utf-8 -*-
# CELDA ÚNICA: Streaming Kafka -> Enriquecido -> Predicción -> Mongo + Kafka + Elasticsearch

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, DoubleType, LongType
from pyspark.sql.functions import from_json, col, hour, dayofweek, when, current_timestamp
from pyspark.ml import PipelineModel

try:
    # Si existe un SparkSession activo, ciérralo
    _active = SparkSession.getActiveSession()
    if _active is not None:
        _active.stop()
except Exception:
    pass

try:
    # Si existe un SparkContext activo, deténlo
    if SparkContext._active_spark_context is not None:
        SparkContext._active_spark_context.stop()
except Exception:
    pass

print("🔧 Creando SparkSession...")
spark = (
    SparkSession.builder.appName("mydata-streaming-predict")
    .master("spark://agile:7077")
    .config("spark.driver.bindAddress", "0.0.0.0")
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0")
    .getOrCreate()
)
spark.sparkContext.setLogLevel("ERROR")
print("✅ SparkSession creada")

# -------------------------
# 1) Cargar modelo
# -------------------------
print("🔧 Cargando PipelineModel...")
modelo = PipelineModel.load("./models/pipeline_model.bin")
print("✅ Modelo cargado")

# -------------------------
# 2) Esquema de entrada (incluye UUID)
# -------------------------
schema = StructType([
    StructField("UUID", StringType(), True),
    StructField("order_id", LongType(), True),
    StructField("customer_id", StringType(), True),
    StructField("restaurant_id", StringType(), True),
    StructField("order_date_and_time", TimestampType(), True),
    StructField("delivery_date_and_time", TimestampType(), True),
    StructField("order_value", DoubleType(), True),
    StructField("delivery_fee", DoubleType(), True),
    StructField("payment_method", StringType(), True),
    StructField("discounts_and_offers", StringType(), True),
    StructField("commission_fee", DoubleType(), True),
    StructField("payment_processing_fee", DoubleType(), True),
    StructField("refunds/chargebacks", DoubleType(), True)
])

# -------------------------
# 3) Lectura desde Kafka
# -------------------------
print("🔌 Conectando a Kafka (topic: mydata_prediction_request)...")
raw_stream = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("subscribe", "mydata_prediction_request")
    .option("startingOffsets", "latest")   # cambia a "earliest" si quieres re-consumir desde el inicio
    .load()
)

json_df = (
    raw_stream
    .selectExpr("CAST(value AS STRING) AS json_data")
    .select(from_json("json_data", schema).alias("data"))
    .select("data.*")
)

# -------------------------
# 4) Enriquecido EXACTO como en el entrenamiento
# -------------------------
df_enriched = (
    json_df
    .withColumn("day_of_week", dayofweek("order_date_and_time"))
    .withColumn("hour_of_day", hour("order_date_and_time"))
    .withColumn("es_fin_de_semana", when(col("day_of_week").isin([1, 7]), 1).otherwise(0))
    .withColumn("es_hora_punta", when(
        (col("hour_of_day").between(13, 15)) | (col("hour_of_day").between(20, 22)), 1
    ).otherwise(0))
    .withColumn("has_discount", when(col("discounts_and_offers").isNotNull(), 1).otherwise(0))
    .withColumn("discount_value", when(col("has_discount") == 1, col("order_value") * 0.1).otherwise(0.0))
    .withColumn("refunded", when(col("refunds/chargebacks") > 0, 1).otherwise(0))
)

# -------------------------
# 5) Selección de columnas:
#    - las que el pipeline necesita
#    - + columnas de CONTEXTO que queremos conservar (pasan a través del modelo)
# -------------------------
required_numeric = [
    "order_value", "delivery_fee", "commission_fee", "payment_processing_fee",
    "refunds/chargebacks", "discount_value", "has_discount", "refunded",
    "day_of_week", "hour_of_day", "es_fin_de_semana", "es_hora_punta"
]
required_categorical = ["payment_method", "discounts_and_offers"]
context_cols = ["UUID", "order_id", "order_date_and_time", "payment_method", "discounts_and_offers"]
features_df = df_enriched.select(*(required_numeric + required_categorical + context_cols))

# -------------------------
# 6) Predicción
# -------------------------
predicciones = modelo.transform(features_df)

# Documento final (sin joins)
resultado_enriquecido = (
    predicciones
    .select(
        "UUID",
        "order_id",
        "prediction",
        "order_date_and_time",
        "payment_method",
        "discounts_and_offers",
        "day_of_week",
        "hour_of_day",
        "es_fin_de_semana",
        "es_hora_punta"
    )
    .withColumn("@ingest_ts", current_timestamp())
)

# -------------------------
# 7) foreachBatch: Mongo + Kafka + Elasticsearch (Bulk)
# -------------------------
def write_to_mongo_kafka_es(batch_df, epoch_id):
    import pymongo, json, urllib.request, urllib.error
    from uuid import uuid4

    client = None
    try:
        # Colectar microbatch (en dicts normales)
        rows = [r.asDict() for r in batch_df.collect()]
        print(f"🧱 Microbatch {epoch_id}: {len(rows)} docs")

        if not rows:
            return

        # --- Prepara versión SANEADA para ES (NUNCA con _id)
        es_rows = []
        for d in rows:
            d_es = dict(d)        # copia
            d_es.pop("_id", None) # por si viniera de otra etapa
            es_rows.append(d_es)

        # --- Construye y envía BULK NDJSON a ES ANTES de tocar Mongo
        ndjson_lines = []
        for d in es_rows:
            es_id = d.get("UUID") or str(uuid4())  # idempotencia
            ndjson_lines.append(json.dumps({"index": {"_index": "mydata_prediction_response", "_id": es_id}}))
            ndjson_lines.append(json.dumps(d, default=str))
        payload = ("\n".join(ndjson_lines) + "\n").encode("utf-8")

        try:
            req = urllib.request.Request(
                "http://elastic:9200/_bulk",
                data=payload,
                headers={"Content-Type": "application/x-ndjson"},
                method="POST",
            )
            with urllib.request.urlopen(req, timeout=15) as resp:
                body = resp.read().decode("utf-8", errors="replace")
            # Parseamos y solo mostramos si errors=true
            es_resp = json.loads(body)
            if es_resp.get("errors", False):
                print("❌ ES bulk con errores:", body[:800], "...")
                
        except urllib.error.HTTPError as he:
            body = he.read().decode("utf-8", errors="replace")
            print(f"❌ ES HTTPError {he.code}: {body[:800]}")
        except Exception as e:
            print("❌ Error conectando a ES:", e)

        # --- Kafka (respuesta): usa el batch_df tal cual
        (batch_df
         .selectExpr("UUID as key", "to_json(struct(*)) as value")
         .write
         .format("kafka")
         .option("kafka.bootstrap.servers", "kafka:9092")
         .option("topic", "mydata_prediction_response")
         .save())

        # --- Mongo: inserta COPIAS para que PyMongo pueda inyectar _id sin contaminar nada
        client = pymongo.MongoClient("mongo")
        db = client["agile_data_science"]
        out = db["mydata_prediction_response"]
        out.create_index("UUID", unique=False)

        rows_for_mongo = [dict(d) for d in es_rows]  # copia independiente
        out.insert_many(rows_for_mongo)

    except Exception as e:
        print("❌ Error en foreachBatch:", e)
        try:
            if client is None:
                client = pymongo.MongoClient("mongo")
            db = client["agile_data_science"]
            db["mydata_prediction_errors"].insert_one({
                "epoch_id": int(epoch_id),
                "error": str(e),
                "note": "Fallo en foreachBatch",
            })
        except Exception as e2:
            print("❌ Error registrando error en Mongo:", e2)
    finally:
        try:
            client and client.close()
        except:
            pass

# -------------------------
# 8) Lanzar el stream
# -------------------------
print("🚀 Iniciando streaming (Mongo + Kafka + Elasticsearch)...")
query = (resultado_enriquecido.writeStream
         .outputMode("append")
         .option("checkpointLocation", "/tmp/checkpoints-foreachbatch-es-v6")  # cambia si reinicias
         .foreachBatch(write_to_mongo_kafka_es)
         .start())

print("⏳ Esperando microbatches...")
query.awaitTermination()