
Las cinco fases que se realizarán en este proyecto forman un flujo de trabajo completo de ciencia de datos aplicado a un contexto de streaming de datos utilizando PySpark, desde la preparación de datos hasta la evaluación del modelo en tiempo real. El objetivo principal de este flujo de trabajo es implementar y evaluar un modelo de Machine Learning (ML), específicamente un modelo de regresión logística, para clasificar entradas de datos en streaming en tiempo real. Esto es particularmente útil en aplicaciones donde las decisiones o predicciones deben actualizarse continuamente a medida que llegan nuevos datos, como puede ser el análisis de sentimientos de reseñas de clientes o la detección de fraude en transacciones financieras.

### Objetivo General

El objetivo es demostrar cómo se puede preparar, implementar, y evaluar un modelo de ML para datos en streaming utilizando las capacidades de procesamiento en tiempo real de Apache Spark y su extensión PySpark. Esto incluye la preparación de datos, el entrenamiento y la evaluación del modelo, y la implementación del modelo para hacer predicciones en tiempo real sobre datos de streaming.

### Descripción de lo que se hizo en cada fase:

1. **Fase 1: Preparación del Modelo de ML de Regresión Logística**
   - Se preparó y entrenó un modelo de regresión logística usando un conjunto de datos estático para predecir la satisfacción del cliente basada en varias características relacionadas con el servicio de aerolíneas. Este modelo se construyó dentro de un pipeline de ML que incluye la preparación de datos como la indexación de variables categóricas y ensamblaje de características.

2. **Fase 2: Preparación de los Datos**
   - Se dividió un archivo de datos estático en 20 partes iguales para simular un flujo de datos en streaming. Estas partes se almacenaron en un directorio específico que luego se usaría como fuente de datos en streaming.

3. **Fase 3: Creación de la Fuente de Streaming**
   - Se configuró una fuente de datos en streaming leyendo los datos divididos desde el directorio especificado. Se aplicó el modelo de ML preparado en la Fase 1 a estos datos en streaming, transformando y prediciendo en tiempo real.

4. **Fase 4: Implementación de una Consulta Adicional**
   - Se creó una consulta adicional sobre el stream resultante para almacenar las predicciones en memoria, lo que permite realizar consultas SQL sobre estos datos en tiempo real para análisis adicionales o para evaluar el rendimiento del modelo.

5. **Fase 5: Evaluación del Modelo en Streaming**
   - Se utilizó un `MulticlassClassificationEvaluator` para evaluar la precisión del modelo en tiempo real. Se generó un DataFrame a partir de la consulta almacenada en memoria para calcular y imprimir la métrica de precisión del modelo en clasificar correctamente las entradas de datos en streaming.

Este flujo de trabajo demuestra el poder de Spark y PySpark para manejar y procesar grandes volúmenes de datos en tiempo real, aplicando modelos de ML complejos y evaluando su rendimiento continuamente a medida que llegan nuevos datos. Es un enfoque valioso en entornos dinámicos donde el contexto y los datos cambian rápidamente, permitiendo a las organizaciones adaptarse y responder de manera más efectiva.


A partir del dataset encuesta_aerolinea.csv con resultados de una encuesta de una aerolínea, generad un modelo de Regresión Logística que permita clasificar la satisfacción del cliente entre las dos posibles alternativas que se indican en la columna (satisfaction_binary) del dataset: 1 para “satisfied” o 0 para “neutral or dissatisfied”. 

Se deben extraer las siguientes columnas para la generación del modelo: Gender, Age, Inflight wifi service,  Departure/Arrival time convenient, Ease of Online booking, Gate location, Food and drink, Online boarding, Seat comfort, Inflight entertainment, On-board service, Leg room service, Baggage handling, Check service, Inflight service, Cleanliness, satisfaction y satisfaction_binary.

Como método de preparación del DataFrame, se elige eliminar todas las observaciones que contengan NaN, utilizando el método na.drop() de la siguiente manera: <mi_dataframe>.na.drop()

La columna “satisfaction_binary” contiene la representación binaria 1 o 0 de las categorías de la columna “satisfaction”. La columna “satisfaction_binary” debe pasarse como el parámetro de labelCol al crear el objeto LogisticRegression.

Las columnas categóricas (Gender) deben convertirse a una representación vectorial utilizando StringIndexer() y OneHotEncoder().

**Los pasos para la implementación del modelo puede reducirse a los siguientes:**

1. Extracción de las columnas requeridas en un nuevo DataFrame.
2. Limpiar el Dataframe eliminando los NaN.
3. Convertir columnas categóricas utilizando StringIndexer y OneHotEncoder.
4. Generar el vector con las features para el entrenamiento del modelo con VectorAssembler()
5. Construir el objeto LogisticRegression pasando los parámetros para featuresCol y labelCol.
6. Construir el objeto Pipeline con los diferentes pasos de transformación.
7. Separa el conjunto de datos en conjuntos de entrenamiento y evaluación.
8. Entrenamos el modelo con el método fit() del Pipeline.
9. Construimos el objeto BinaryClassificationEvaluator().

10\. Evaluar la precisión (accuracy) del modelo.

11\. Utilizar la libreria handyspark para obtener el gráfico de ROC (opcional).

In [0]:
%fs
ls dbfs:/FileStore/tables/

path,name,size,modificationTime
dbfs:/FileStore/tables/encuesta_aerolinea-1.csv,encuesta_aerolinea-1.csv,11986211,1706643440000
dbfs:/FileStore/tables/encuesta_aerolinea.csv,encuesta_aerolinea.csv,11986211,1706643304000
dbfs:/FileStore/tables/encuesta_aerolinea_simplificado-1.csv,encuesta_aerolinea_simplificado-1.csv,11986211,1705509312000
dbfs:/FileStore/tables/encuesta_aerolinea_simplificado.csv,encuesta_aerolinea_simplificado.csv,11986211,1705509276000


In [0]:
lines = sc.textFile('dbfs:/FileStore/tables/encuesta_aerolinea.csv')
df_preview = spark.read.format("csv").option("inferSchema","true").option("header","true").load("dbfs:/FileStore/tables/encuesta_aerolinea.csv")
df_limited = df_preview.limit(5)  # Limita a 5 registros
df_limited.display()


_c0,id,Gender,Customer Type,Age,Type of Travel,Class,Flight Distance,Inflight wifi service,Departure/Arrival time convenient,Ease of Online booking,Gate location,Food and drink,Online boarding,Seat comfort,Inflight entertainment,On-board service,Leg room service,Baggage handling,Checkin service,Inflight service,Cleanliness,Departure Delay in Minutes,Arrival Delay in Minutes,satisfaction
0,70172,Male,Loyal Customer,13,Personal Travel,Eco Plus,460,3,4,3,1,5,3,5,5,4,3,4,4,5,5,25,18,neutral or dissatisfied
1,5047,Male,disloyal Customer,25,Business travel,Business,235,3,2,3,3,1,3,1,1,1,5,3,1,4,1,1,6,neutral or dissatisfied
2,110028,Female,Loyal Customer,26,Business travel,Business,1142,2,2,2,2,5,5,5,5,4,3,4,4,4,5,0,0,satisfied
3,24026,Female,Loyal Customer,25,Business travel,Business,562,2,5,5,5,2,2,2,2,2,5,3,1,4,2,11,9,neutral or dissatisfied
4,119299,Male,Loyal Customer,61,Business travel,Business,214,3,3,3,3,4,5,5,3,3,4,4,3,3,3,0,0,satisfied


In [0]:
from pyspark.sql.types import StructType, StructField, StringType, FloatType, TimestampType, IntegerType
from pyspark.sql.functions import col

In [0]:
schema = StructType([
    StructField("C0", IntegerType(), True),
    StructField("ID", IntegerType(), True),
    StructField("gender", StringType(), True),
    StructField("customer_type", StringType(), True),
    StructField("age", FloatType(), True),
    StructField("type_of_travel", StringType(), True),
    StructField("class", StringType(), True),
    StructField("flight_distance", FloatType(), True),
    StructField("wifi", FloatType(), True),
    StructField("time_convenient", FloatType(), True),
    StructField("booking", FloatType(), True),
    StructField("gate", FloatType(), True),
    StructField("food", FloatType(), True),
    StructField("boarding", FloatType(), True),
    StructField("comfort", FloatType(), True),
    StructField("entertainment", FloatType(), True),
    StructField("on_board_service", FloatType(), True),
    StructField("room_service", FloatType(), True),
    StructField("handling", FloatType(), True),
    StructField("checking", FloatType(), True),
    StructField("inflight", FloatType(), True),
    StructField("cleanliness", FloatType(), True),
    StructField("departures_delay", FloatType(), True),
    StructField("arrivals_delay", FloatType(), True),
    StructField("satisfaction", StringType(), True), 
])

In [0]:
df_raw = spark.read.format("csv").option("header", True).schema(schema).load('dbfs:/FileStore/tables/encuesta_aerolinea.csv')
df = df_raw.na.drop()


MLlib es la biblioteca de aprendizaje automático (machine learning) de Apache Spark. Está diseñada para simplificar la construcción de aplicaciones de aprendizaje automático a gran escala, integrándose de manera fluida con el resto del ecosistema de Spark. 

MLlib proporciona una amplia variedad de algoritmos y utilidades comunes de aprendizaje automático, incluyendo clasificación, regresión, clustering, filtrado colaborativo (como sistemas de recomendación), reducción de dimensionalidad, y selección de modelos. Aprovecha la distribución de datos y el procesamiento en paralelo de Spark, lo que la hace adecuada para manejar grandes volúmenes de datos de manera eficiente.

Aunque MLlib está escrita en Scala, es accesible desde otros lenguajes de programación soportados por Spark, como Python (a través de PySpark) y Java.

Por lo tanto, MLlib es una biblioteca robusta y versátil para realizar aprendizaje automático en plataformas distribuidas, aprovechando la potencia y escala de Apache Spark. Es especialmente útil para aplicaciones que requieren manejar grandes conjuntos de datos y realizar operaciones de aprendizaje automático complejas.

In [0]:
from pyspark.sql.functions import when

# Seleccionar las columnas necesarias
selected_columns = [
    "gender", "age", "wifi", "time_convenient", "booking",
    "gate", "food", "boarding", "comfort", "entertainment",
    "on_board_service", "room_service", "handling", "checking",
    "inflight", "cleanliness", "satisfaction"
]

df_selected = df.select(selected_columns)

# Convertir la columna 'satisfaction' en binaria
df_selected = df_selected.withColumn(
    "satisfaction_binary", 
    when(col("satisfaction") == "satisfied", 1).otherwise(0)
)

In [0]:
#Se hace una copia del df_selected para usarlo a partir de la Fase 2
df_proyecto = df_selected

In [0]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline

# Indexar y codificar las columnas categóricas
gender_indexer = StringIndexer(inputCol="gender", outputCol="genderIndex")
gender_encoder = OneHotEncoder(inputCol="genderIndex", outputCol="genderVec")

# Asssembler de todas las características en un vector
assembler_inputs = ["genderVec"] + selected_columns[1:-2]  # excluyendo la columna 'satisfaction' y 'satisfaction_binary'
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")



En esta parte, se trabaja con la transformación de características, específicamente indexando y codificando las variables categóricas y combinando todas las características relevantes en un solo vector.

1. **Importar las funciones necesarias de PySpark ML**:

   - `StringIndexer`, `OneHotEncoder`, y `VectorAssembler` son transformadores de PySpark ML que ayudan a convertir datos en formatos adecuados para modelos de aprendizaje automático.
   - `Pipeline` es una forma de encadenar múltiples transformaciones y estimadores, lo que facilita la construcción y gestión de flujos de trabajo de aprendizaje automático.

2. **Indexar y codificar las columnas categóricas**:

   - `StringIndexer`: Convierte la columna categórica 'gender' en índices numéricos. Esto es necesario porque muchos algoritmos de aprendizaje automático prefieren trabajar con números en lugar de texto.
   - `OneHotEncoder`: Transforma los índices numéricos generados por `StringIndexer` en un formato de codificación one-hot. En la codificación one-hot, cada categoría única en la columna se representa como un vector binario.

3. **Assembler de todas las características en un vector**:

   - Aquí, se crea una lista `assembler_inputs` que contiene el nombre de la columna codificada ('genderVec') y todas las otras columnas seleccionadas, excepto 'satisfaction' y 'satisfaction_binary'.
   - `VectorAssembler` se utiliza para combinar todas estas columnas en una única columna de características llamada 'features'. Esto es importante porque los modelos de aprendizaje automático en PySpark, como la regresión logística, esperan que los datos de entrada estén en un único vector de características.

Estas operaciones están preparando los datos para ser utilizados en un modelo de aprendizaje automático, asegurando que todas las variables estén en el formato adecuado y combinadas en una estructura que el modelo puede procesar eficientemente.

In [0]:
#Configura y agrega la regresión logística al pipeline.

from pyspark.ml.classification import LogisticRegression

# Configurar la regresión logística
lr = LogisticRegression(labelCol="satisfaction_binary", featuresCol="features")


# Creación del pipeline
preprocessing_stages = [gender_indexer, gender_encoder, assembler,lr]
pipeline = Pipeline(stages=preprocessing_stages)



Aquí, se configura el estimador de regresión logística y se integra junto con las etapas de preprocesamiento previas en un solo pipeline. Veamos cada paso en detalle:

1. **Importar LogisticRegression de PySpark ML**:
   `LogisticRegression` es la clase de PySpark ML que proporciona la implementación del algoritmo de regresión logística para la clasificación binaria.

2. **Configurar la regresión logística**:

   Aquí, se crea una instancia del modelo de regresión logística (`lr`). 
   - `labelCol="satisfaction_binary"` indica que la columna 'satisfaction_binary' será utilizada como la etiqueta (o variable dependiente) para el modelo. Esta es la columna que se transformó anteriormente a formato binario.
   - `featuresCol="features"` especifica que la columna 'features' contiene las características (o variables independientes) que el modelo usará para hacer predicciones. Esta columna fue generada por el `VectorAssembler` y contiene todas las características relevantes en un formato vectorial.

3. **Creación del pipeline**:
   - Aquí, se crea una lista `preprocessing_stages` que incluye todas las etapas de preprocesamiento (`gender_indexer`, `gender_encoder`, `assembler`) y el modelo de regresión logística (`lr`).
   - Luego, se crea un objeto `Pipeline` con estas etapas. En PySpark, un `Pipeline` es una secuencia de transformadores y estimadores (como modelos de aprendizaje automático) que se ejecutan en un orden específico. Este enfoque facilita la gestión del flujo de trabajo, ya que permite que los datos pasen a través de todas las etapas de preprocesamiento y modelización de manera eficiente y ordenada.

Este código finaliza la configuración del flujo de trabajo para el modelo de regresión logística. Una vez que el `Pipeline` está configurado, puedes entrenarlo con un conjunto de datos y luego usarlo para hacer predicciones.

In [0]:
# Dividir en conjunto de entrenamiento y prueba
train_data, test_data = df_selected.randomSplit([0.7, 0.3], seed=42)

# Entrenar el modelo
model = pipeline.fit(train_data)

# Realizar predicciones
predictions = model.transform(test_data)

# Evaluar el modelo
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(labelCol="satisfaction_binary", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)
print(f"Área bajo la curva ROC: {auc}")


Área bajo la curva ROC: 0.87703653846697



Este último fragmento de código completa el flujo de trabajo del modelo de regresión logística en PySpark, abarcando la división de los datos, el entrenamiento del modelo, la realización de predicciones y la evaluación del modelo. Expliquemos cada paso:

1. **Dividir en conjunto de entrenamiento y prueba**:

   Aquí, se divide el DataFrame `df_selected` en dos subconjuntos: uno para entrenamiento (`train_data`) y otro para pruebas (`test_data`). La división se hace de forma que el 70% de los datos se asignan al entrenamiento y el 30% a las pruebas. El `seed` se establece para garantizar la reproducibilidad de los resultados.

2. **Entrenar el modelo**:
   ```python
   model = pipeline.fit(train_data)
   ```
   Esta línea entrena el `pipeline`, que incluye todas las etapas de preprocesamiento y el modelo de regresión logística, utilizando el conjunto de datos de entrenamiento (`train_data`).

3. **Realizar predicciones**:
  
   Una vez que el modelo está entrenado, se utiliza para hacer predicciones en el conjunto de datos de prueba (`test_data`). El resultado, `predictions`, es un DataFrame que incluye las predicciones del modelo.

4. **Evaluar el modelo**:
 
   - Aquí, se importa `BinaryClassificationEvaluator` de PySpark ML, una clase que proporciona herramientas para evaluar modelos de clasificación binaria.
   - Se cre un evaluador `evaluator` configurado para usar la métrica "areaUnderROC" (área bajo la curva ROC). La curva ROC es una herramienta común para evaluar el rendimiento de los modelos de clasificación binaria.
   - Finalmente, se utiliza el evaluador para calcular el AUC (área bajo la curva ROC) en las predicciones realizadas. Un valor más alto de AUC indica generalmente un mejor rendimiento del modelo.

Este flujo de trabajo completo demuestra cómo preparar datos, configurar un modelo de regresión logística, entrenarlo y evaluarlo usando PySpark. El área bajo la curva ROC es una métrica útil para comprender el rendimiento general del modelo en términos de su capacidad para distinguir entre las clases.


El fichero original encuesta_aerolinea.csv debe repartirse entre 20 partes iguales utilizando el método repartition() y las partes del fichero deberán guardarse en un un directorio que constituirá la fuente del stream de datos.

El directorio de fuente del stream debe ser borrado completamente con cada ejecución del stream para que la simulación funcione correctamente. El directorio de origen de los datos debe ser siempre el siguiente: dbfs:/FileStore/tables/proyecto/streaming/

In [0]:
# Repartir en 20 partes
df_repartitioned = df_proyecto.repartition(20)

# Borrar el directorio de destino si existe (Ajustar según el entorno de ejecución)
dbutils.fs.rm("dbfs:/FileStore/tables/proyecto/streaming/", recurse=True)

# Guardar las partes en el directorio
df_repartitioned.write.format("parquet").save("dbfs:/FileStore/tables/proyecto/streaming/")


1. **Repartir DataFrame en 20 partes:** `df_proyecto.repartition(20)` redistribuye el DataFrame `df_proyecto` en 20 particiones para optimizar el procesamiento paralelo.

2. **Eliminar directorio existente:** `dbutils.fs.rm("dbfs:/FileStore/tables/proyecto/streaming/", recurse=True)` usa Databricks Utilities para eliminar un directorio y todo su contenido en DBFS, si ya existe, para evitar conflictos de datos.

3. **Guardar DataFrame en formato Parquet:** `df_repartitioned.write.format("parquet").save("dbfs:/FileStore/tables/proyecto/streaming/")` guarda el DataFrame reparticionado en el sistema de archivos de Databricks (DBFS) en formato Parquet, que es eficiente para el almacenamiento y análisis de grandes volúmenes de datos.


Se deberá crear la fuente de datos para el stream con el método readStream() y los parámetros requeridos para su correcta implementación.

Se deberá aplicar el método transform() sobre el modelo generado pasando como parámetro el stream de fuente previamente creado y visualizar el stream de datos resultante con el  método display().

In [0]:
schema1 = StructType([
    StructField("gender", StringType(), True),
    StructField("age", FloatType(), True),
    StructField("wifi", FloatType(), True),
    StructField("time_convenient", FloatType(), True),
    StructField("booking", FloatType(), True),
    StructField("gate", FloatType(), True),
    StructField("food", FloatType(), True),
    StructField("boarding", FloatType(), True),
    StructField("comfort", FloatType(), True),
    StructField("entertainment", FloatType(), True),
    StructField("on_board_service", FloatType(), True),
    StructField("room_service", FloatType(), True),
    StructField("handling", FloatType(), True),
    StructField("checking", FloatType(), True),
    StructField("inflight", FloatType(), True),
    StructField("cleanliness", FloatType(), True),
    StructField("satisfaction", StringType(), True), 
    StructField("satisfaction_binary", IntegerType(), True)

])

# Crear la fuente de datos para el stream
streamingInputDF = (
  spark
    .readStream
    .schema(schema1)  # Asegúrate de definir el esquema adecuado
    .format("parquet")
    .load("dbfs:/FileStore/tables/proyecto/streaming/")
)

# Aplicar el modelo al stream de datos
streamingPredictions = model.transform(streamingInputDF)

# Visualizar el stream de datos resultante 
display(streamingPredictions.limit(6))




gender,age,wifi,time_convenient,booking,gate,food,boarding,comfort,entertainment,on_board_service,room_service,handling,checking,inflight,cleanliness,satisfaction,satisfaction_binary,genderIndex,genderVec,features,rawPrediction,probability,prediction
Male,53.0,3.0,5.0,3.0,3.0,3.0,3.0,3.0,3.0,3.0,2.0,1.0,4.0,1.0,3.0,neutral or dissatisfied,0,1.0,"Map(vectorType -> sparse, length -> 1, indices -> List(), values -> List())","Map(vectorType -> dense, length -> 15, values -> List(0.0, 53.0, 3.0, 5.0, 3.0, 3.0, 3.0, 3.0, 3.0, 3.0, 3.0, 2.0, 1.0, 4.0, 1.0))","Map(vectorType -> dense, length -> 2, values -> List(1.8417562805588128, -1.8417562805588128))","Map(vectorType -> dense, length -> 2, values -> List(0.86315628725477, 0.13684371274523))",0.0
Female,58.0,5.0,5.0,5.0,5.0,2.0,5.0,4.0,4.0,4.0,4.0,4.0,3.0,4.0,3.0,satisfied,1,0.0,"Map(vectorType -> sparse, length -> 1, indices -> List(0), values -> List(1.0))","Map(vectorType -> dense, length -> 15, values -> List(1.0, 58.0, 5.0, 5.0, 5.0, 5.0, 2.0, 5.0, 4.0, 4.0, 4.0, 4.0, 4.0, 3.0, 4.0))","Map(vectorType -> dense, length -> 2, values -> List(-1.784327266256624, 1.784327266256624))","Map(vectorType -> dense, length -> 2, values -> List(0.1437696263849696, 0.8562303736150304))",1.0
Female,66.0,3.0,4.0,3.0,5.0,2.0,3.0,5.0,1.0,1.0,3.0,1.0,5.0,1.0,5.0,neutral or dissatisfied,0,0.0,"Map(vectorType -> sparse, length -> 1, indices -> List(0), values -> List(1.0))","Map(vectorType -> dense, length -> 15, values -> List(1.0, 66.0, 3.0, 4.0, 3.0, 5.0, 2.0, 3.0, 5.0, 1.0, 1.0, 3.0, 1.0, 5.0, 1.0))","Map(vectorType -> dense, length -> 2, values -> List(1.5222076592829952, -1.5222076592829952))","Map(vectorType -> dense, length -> 2, values -> List(0.8208633396273257, 0.17913666037267428))",0.0
Male,39.0,5.0,1.0,1.0,1.0,5.0,5.0,5.0,5.0,4.0,5.0,5.0,5.0,5.0,5.0,satisfied,1,1.0,"Map(vectorType -> sparse, length -> 1, indices -> List(), values -> List())","Map(vectorType -> dense, length -> 15, values -> List(0.0, 39.0, 5.0, 1.0, 1.0, 1.0, 5.0, 5.0, 5.0, 5.0, 4.0, 5.0, 5.0, 5.0, 5.0))","Map(vectorType -> dense, length -> 2, values -> List(-4.164917611882765, 4.164917611882765))","Map(vectorType -> dense, length -> 2, values -> List(0.015293471594832478, 0.9847065284051675))",1.0
Female,34.0,4.0,2.0,4.0,4.0,5.0,4.0,5.0,5.0,1.0,1.0,4.0,2.0,4.0,5.0,satisfied,1,0.0,"Map(vectorType -> sparse, length -> 1, indices -> List(0), values -> List(1.0))","Map(vectorType -> dense, length -> 15, values -> List(1.0, 34.0, 4.0, 2.0, 4.0, 4.0, 5.0, 4.0, 5.0, 5.0, 1.0, 1.0, 4.0, 2.0, 4.0))","Map(vectorType -> dense, length -> 2, values -> List(0.3538350975341018, -0.3538350975341018))","Map(vectorType -> dense, length -> 2, values -> List(0.5875472701083173, 0.4124527298916827))",0.0
Male,48.0,4.0,4.0,4.0,4.0,5.0,5.0,4.0,4.0,4.0,4.0,4.0,5.0,4.0,3.0,satisfied,1,1.0,"Map(vectorType -> sparse, length -> 1, indices -> List(), values -> List())","Map(vectorType -> dense, length -> 15, values -> List(0.0, 48.0, 4.0, 4.0, 4.0, 4.0, 5.0, 5.0, 4.0, 4.0, 4.0, 4.0, 4.0, 5.0, 4.0))","Map(vectorType -> dense, length -> 2, values -> List(-2.1340192101458753, 2.1340192101458753))","Map(vectorType -> dense, length -> 2, values -> List(0.10583403797251299, 0.894165962027487))",1.0



Este código configura y utiliza un flujo de datos (streaming) en Apache Spark para procesar datos en tiempo real, aplicando un modelo de machine learning a estos datos a medida que llegan. 

1. **Definición del esquema (`schema1`):** 
   - Se define un esquema llamado `schema1` utilizando `StructType`. Este esquema especifica las columnas, sus tipos de datos, y si pueden contener valores nulos (`True` indica que sí pueden ser nulos). 

2. **Creación de la fuente de datos para el stream (`streamingInputDF`):**
   - Utiliza `spark.readStream` para leer datos en tiempo real con el formato `parquet` desde una ubicación especificada en DBFS (`"dbfs:/FileStore/tables/proyecto/streaming/"`), aplicando el esquema `schema1` definido previamente. Esto prepara los datos para ser procesados en streaming.

3. **Aplicar el modelo al stream de datos (`streamingPredictions`):**
   - Aplica un modelo de machine learning (`model`) al DataFrame de streaming (`streamingInputDF`). Esto implica que el modelo realizará predicciones o transformaciones en los datos en tiempo real a medida que llegan.

4. **Visualizar el stream de datos resultante:**
   - `display(streamingPredictions.limit(6))` muestra las primeras 6 filas del DataFrame resultante con las predicciones o transformaciones aplicadas. En el contexto de Databricks, `display` es una función que facilita la visualización de DataFrames y streams de datos.

Este flujo de trabajo permite procesar y analizar datos en tiempo real utilizando Spark Structured Streaming, lo cual es ideal para escenarios donde es crucial obtener información instantánea o aplicar modelos de machine learning a datos que se actualizan constantemente, como en sistemas de recomendación, monitoreo en tiempo real, o análisis de satisfacción del cliente.


Se creará una segunda consulta (query) sobre el stream resultante del paso anterior a partir del método writeStream() con un modo de salida “append”, formato “memory” y nombre “encuestaClassification”.

In [0]:
query = streamingPredictions.writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("encuestaClassification") \
    .start()



La parte del código que has proporcionado se utiliza para iniciar un flujo de escritura (write stream) en Apache Spark, configurado para procesar y almacenar los resultados de las predicciones del modelo en memoria. Esto permite consultar y visualizar los resultados en tiempo real. Te detallo cada componente de esta operación:

1. **`streamingPredictions.writeStream`:** Inicia la configuración de un flujo de escritura para el DataFrame `streamingPredictions`, que contiene las predicciones o transformaciones aplicadas a los datos de entrada en tiempo real.

2. **`.outputMode("append")`:** Configura el modo de salida del stream. El modo `"append"` significa que solo los nuevos registros (filas) resultantes de las predicciones se añadirán al resultado final. Esto es típico en situaciones donde los datos se van acumulando con el tiempo y no necesitas reescribir o actualizar registros existentes.

3. **`.format("memory")`:** Establece el formato de salida del stream a `"memory"`. Esto permite que los resultados del stream se almacenen en la memoria del cluster, lo cual es útil para pruebas, depuración, y visualización rápida de los datos procesados en tiempo real. 

4. **`.queryName("encuestaClassification")`:** Asigna un nombre a la consulta del stream, en este caso, `"encuestaClassification"`. Este nombre se utiliza para referenciar y consultar los datos resultantes almacenados en memoria a través de SQL o API de DataFrame.

5. **`.start()`:** Inicia la ejecución del flujo de escritura configurado. Una vez iniciado, Spark comenzará a procesar los datos de entrada en tiempo real, aplicará las transformaciones o predicciones del modelo y almacenará los resultados en memoria según la configuración especificada.

Con este flujo, se pueden realizar consultas en tiempo real a los datos resultantes utilizando el nombre de la consulta `"encuestaClassification"` para analizar los datos procesados, realizar más transformaciones, o visualizar los resultados directamente en herramientas de Databricks o Spark SQL. Esta capacidad es especialmente valiosa para aplicaciones de análisis en tiempo real, donde la inmediatez de los datos procesados es crucial.


Crear un objeto evaluador del tipo MulticlassClassificationEvaluator para obtener la métrica “accuracy”.

A partir de la consulta “encuestaClassification” generar un Dataframe utilizando spark.sql() y pasar este Dataframe resultante como parámetro al evaluador utilizando el método evaluate(). 

Finalmente, imprimir la métrica de “accuracy” resultante para el modelo.

In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Crear el evaluador
evaluator = MulticlassClassificationEvaluator(
    labelCol="satisfaction_binary", 
    predictionCol="prediction", 
    metricName="accuracy"
)

# Usar spark.sql para seleccionar los datos de la consulta en memoria
accuracyDF = spark.sql("SELECT satisfaction_binary, prediction FROM encuestaClassification")

# Evaluar la precisión
accuracy = evaluator.evaluate(accuracyDF)
print(f"Accuracy: {accuracy}")


Accuracy: 0.8167827995072374
