## Prueba Final: Machine Learning con Spark y Docker
## Predicción de la Calidad del Vino con PySpark en un Entorno Dockerizado
### ROGER DIEGO FLORES CONDORI
### Parte 2: Desarrollo del Proyecto en JupyterLab

### Dentro del notebook que creaste, sigue los siguientes pasos.
---
### Paso 1: Configuración e Inicio de Spark

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("TitanicML").getOrCreate()
# La SparkSession ya está creada. Podemos verificar su configuración.
print("SparkSession disponible. Configuración:")
print(spark.sparkContext.getConf().getAll())

SparkSession disponible. Configuración:
[('spark.app.id', 'local-1753051537974'), ('spark.driver.host', '11bdd832def2'), ('spark.executor.id', 'driver'), ('spark.app.name', 'TitanicML'), ('spark.driver.memory', '1g'), ('spark.driver.extraJavaOptions', '-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.k

---
### Paso 2: Carga y Exploración de Datos (ETL)

In [4]:
# Cargar el dataset, indicando que tiene encabezado e infiriendo el esquema.
df = spark.read.csv('/data/wine.csv', header=True, inferSchema=True, sep=';')
print("Dataset cargado correctamente.")


Dataset cargado correctamente.


In [5]:
# Imprimir el esquema del DataFrame para entender los tipos de datos
print("Esquema del DataFrame:")
df.printSchema()

# Mostrar las primeras 10 filas del DataFrame
print("\nPrimeras 10 filas:")
df.show(10)

Esquema del DataFrame:
root
 |-- fixed acidity: double (nullable = true)
 |-- volatile acidity: double (nullable = true)
 |-- citric acid: double (nullable = true)
 |-- residual sugar: double (nullable = true)
 |-- chlorides: double (nullable = true)
 |-- free sulfur dioxide: double (nullable = true)
 |-- total sulfur dioxide: double (nullable = true)
 |-- density: double (nullable = true)
 |-- pH: double (nullable = true)
 |-- sulphates: double (nullable = true)
 |-- alcohol: double (nullable = true)
 |-- quality: integer (nullable = true)


Primeras 10 filas:
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------

In [6]:
num_filas = df.count()
num_columnas = len(df.columns)

print(f"Cantidad de registros: {num_filas}")
print(f"Cantidad de columnas: {num_columnas}")


Cantidad de registros: 1599
Cantidad de columnas: 12


In [7]:
# Resumen estadístico 
df.summary().show()


+-------+------------------+-------------------+-------------------+------------------+--------------------+-------------------+--------------------+--------------------+-------------------+------------------+------------------+------------------+
|summary|     fixed acidity|   volatile acidity|        citric acid|    residual sugar|           chlorides|free sulfur dioxide|total sulfur dioxide|             density|                 pH|         sulphates|           alcohol|           quality|
+-------+------------------+-------------------+-------------------+------------------+--------------------+-------------------+--------------------+--------------------+-------------------+------------------+------------------+------------------+
|  count|              1599|               1599|               1599|              1599|                1599|               1599|                1599|                1599|               1599|              1599|              1599|              1599|
|   mean

---
### Paso 3: Preparación de Datos y Feature Engineering
#### Transformación de la variable objetivo: Crea una nueva columna label que sea 1.0 si quality es mayor que 5 (buena calidad) y 0.0 en caso contrario (calidad estándar).

In [8]:
from pyspark.sql.functions import expr


df = df.withColumn(
    'label',
    expr("CASE WHEN quality > 5 THEN 1.0 ELSE 0.0 END")
)

df.select('quality', 'label').show(10)
df.groupBy('label').count().show()



+-------+-----+
|quality|label|
+-------+-----+
|      5|  0.0|
|      5|  0.0|
|      5|  0.0|
|      6|  1.0|
|      5|  0.0|
|      5|  0.0|
|      5|  0.0|
|      7|  1.0|
|      7|  1.0|
|      5|  0.0|
+-------+-----+
only showing top 10 rows

+-----+-----+
|label|count|
+-----+-----+
|  0.0|  744|
|  1.0|  855|
+-----+-----+



#### División de Datos: Divide el dataset en entrenamiento (80%) y prueba (20%).

In [9]:
# Semilla para reproducibilidad
seed = 42

# División del dataset en entrenamiento y prueba (80% - 20%)
train_data, test_data = df.randomSplit([0.8, 0.2], seed=seed)

print(f"Tamaño del conjunto de entrenamiento: {train_data.count()} registros")
print(f"Tamaño del conjunto de prueba: {test_data.count()} registros")


Tamaño del conjunto de entrenamiento: 1324 registros
Tamaño del conjunto de prueba: 275 registros


---
### Paso 4: Entrenamiento de Modelos de Machine Learning

In [10]:
from pyspark.ml.feature import VectorAssembler

# Definir las columnas predictoras
caracteristicas = [c for c in df.columns if c not in ['quality', 'label']]

# Crear el ensamblador de características
ensamblador = VectorAssembler(inputCols=caracteristicas, outputCol="caracteristicas")

# Aplicar el ensamblador
datos_entrenamiento = ensamblador.transform(train_data)
datos_prueba = ensamblador.transform(test_data)


#### 1. Regresión Logística (LogisticRegression)

In [11]:
from pyspark.ml.classification import LogisticRegression

# Crear y ajustar el modelo de regresión logística
regresion_logistica = LogisticRegression(featuresCol="caracteristicas", labelCol="label")
modelo_reg_log = regresion_logistica.fit(datos_entrenamiento)

print("Entrenamiento de Regresión Logística finalizado.")


Entrenamiento de Regresión Logística finalizado.


#### 2. Árbol de Decisión (DecisionTreeClassifier)

In [12]:
from pyspark.ml.classification import DecisionTreeClassifier

# Crear y ajustar el modelo de árbol de decisión
arbol_decision = DecisionTreeClassifier(featuresCol="caracteristicas", labelCol="label")
modelo_arbol = arbol_decision.fit(datos_entrenamiento)

print("Entrenamiento de Árbol de Decisión completado.")


Entrenamiento de Árbol de Decisión completado.


---
### Paso 5: Evaluación de Modelos

#### Realiza predicciones sobre el conjunto de prueba para cada modelo.

In [13]:
# Predicciones en los datos de prueba
resultados_reg_log = modelo_reg_log.transform(datos_prueba)
resultados_arbol = modelo_arbol.transform(datos_prueba)


#### Usa BinaryClassificationEvaluator con la métrica Área Bajo la Curva ROC (AUC) para evaluar cada modelo.

In [14]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Crear evaluador
evaluador = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")

# Evaluación de Regresión Logística
roc_reg_log = evaluador.evaluate(resultados_reg_log)
print(f"AUC - Regresión Logística: {roc_reg_log}")

# Evaluación de Árbol de Decisión
roc_arbol = evaluador.evaluate(resultados_arbol)
print(f"AUC - Árbol de Decisión: {roc_arbol}")


AUC - Regresión Logística: 0.8350663129973478
AUC - Árbol de Decisión: 0.702970822281167


#### Imprime el AUC de cada modelo y concluye cuál tuvo el mejor rendimiento.

In [16]:
print("\n--- Conclusión del experimento ---")

if roc_reg_log > roc_arbol:
    print("La Regresión Logística presentó un mejor desempeño en términos de AUC, lo que indica una mayor capacidad para discriminar entre clases.")
elif roc_arbol > roc_reg_log:
    print("El Árbol de Decisión obtuvo un mejor AUC, lo que sugiere un mejor rendimiento en esta tarea de clasificación.")
else:
    print("Ambos modelos tuvieron el mismo AUC, por lo tanto su rendimiento es equivalente en este caso.")



--- Conclusión del experimento ---
La Regresión Logística presentó un mejor desempeño en términos de AUC, lo que indica una mayor capacidad para discriminar entre clases.
