# Wine Quality notebook
Calidad de vino predicción

In [0]:
# Para ver todas las bases de datos
spark.catalog.listDatabases()

# Para ver todas las tablas en la base de datos actual
spark.catalog.listTables()

Out[9]: [Table(name='wine1', catalog='spark_catalog', namespace=['default'], description=None, tableType='EXTERNAL', isTemporary=False),
 Table(name='winequality_red_3_csv', catalog='spark_catalog', namespace=['default'], description=None, tableType='EXTERNAL', isTemporary=False)]

In [0]:
df = spark.table("wine1")
df.head()

Out[11]: Row(fixed acidity=7.4, volatile acidity=0.7, citric acid=0.0, residual sugar=1.9, chlorides=0.076, free sulfur dioxide=11.0, total sulfur dioxide=34.0, density=0.9978, pH=3.51, sulphates=0.56, alcohol=9.4, quality=5)

In [0]:
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml import Pipeline

# Cargar los datos
df = spark.table("wine1")

# Verificar clases únicas en la variable target
print("Distribución de clases de calidad:")
df.groupBy("quality").count().orderBy("quality").show()

# Seleccionar las características (features)
feature_cols = [
    "fixed acidity", "volatile acidity", "citric acid", "residual sugar",
    "chlorides", "free sulfur dioxide", "total sulfur dioxide", "density",
    "pH", "sulphates", "alcohol"
]

# Crear el pipeline de preprocesamiento y modelo
assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="assembled_features"
)

scaler = StandardScaler(
    inputCol="assembled_features",
    outputCol="features",
    withStd=True,
    withMean=True
)

# Ajustando parámetros del Random Forest con valores válidos
rf = RandomForestClassifier(
    labelCol="quality",
    featuresCol="features",
    numTrees=100,
    maxDepth=10,
    seed=42,
    bootstrap=True
)

pipeline = Pipeline(stages=[assembler, scaler, rf])

# Dividir los datos
train_data, test_data = df.randomSplit([0.75, 0.25], seed=42)

# Crear una cuadrícula de parámetros con valores válidos
paramGrid = (ParamGridBuilder()
    .addGrid(rf.numTrees, [50, 100, 200])
    .addGrid(rf.maxDepth, [5, 10, 15])
    .addGrid(rf.minInstancesPerNode, [1, 2, 4])
    .addGrid(rf.featureSubsetStrategy, ['auto', 'sqrt', 'log2'])
    .build())

# Evaluadores
evaluator_accuracy = MulticlassClassificationEvaluator(
    labelCol="quality",
    predictionCol="prediction",
    metricName="accuracy"
)

evaluator_f1 = MulticlassClassificationEvaluator(
    labelCol="quality",
    predictionCol="prediction",
    metricName="f1"
)

# Configurar la validación cruzada
crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator_accuracy,
    numFolds=5,
    parallelism=2
)

print("\nEntrenando el modelo...")
cv_model = crossval.fit(train_data)

predictions = cv_model.transform(test_data)

# Evaluar el modelo
accuracy = evaluator_accuracy.evaluate(predictions)
f1 = evaluator_f1.evaluate(predictions)

print(f"\nMétricas de evaluación:")
print(f"Accuracy: {accuracy:.4f}")
print(f"F1 Score: {f1:.4f}")

# Matriz de confusión con porcentajes
print("\nMatriz de confusión (con porcentajes):")
confusion_matrix = predictions.groupBy("quality", "prediction").count()
total_by_actual = predictions.groupBy("quality").count().withColumnRenamed("count", "total")
confusion_matrix_pct = confusion_matrix.join(total_by_actual, "quality") \
    .withColumn("percentage", (confusion_matrix.count / total_by_actual.total) * 100)
confusion_matrix_pct.orderBy("quality", "prediction").show()

# Mejores parámetros
best_model = cv_model.bestModel.stages[-1]
print("\nMejores parámetros encontrados:")
print(f"Número de árboles: {best_model.getNumTrees}")
print(f"Profundidad máxima: {best_model.getMaxDepth}")
print(f"Instancias mínimas por nodo: {best_model.getMinInstancesPerNode}")
print(f"Estrategia de subconjunto de características: {best_model.getFeatureSubsetStrategy}")

# Importancia de características
feature_importance = best_model.featureImportances
print("\nImportancia de características (ordenadas por importancia):")
feature_importances = list(zip(feature_cols, feature_importance))
feature_importances.sort(key=lambda x: x[1], reverse=True)
for feature, importance in feature_importances:
    print(f"{feature}: {importance:.4f}")

# Ejemplos de predicciones
print("\nEjemplos de predicciones con probabilidades:")
predictions.select("quality", "prediction", "probability").show(5, truncate=False)

# Métricas por clase
print("\nMétricas por clase:")
for label in predictions.select("quality").distinct().collect():
    label_value = label.quality
    label_predictions = predictions.withColumn("is_correct", 
        (predictions.quality == predictions.prediction).cast("double"))
    
    label_metrics = label_predictions.filter(predictions.quality == label_value)
    accuracy_class = label_metrics.select("is_correct").mean().collect()[0][0]
    
    print(f"Clase {label_value}:")
    print(f"Accuracy: {accuracy_class:.4f}")
    print("---")

Distribución de clases de calidad:
+-------+-----+
|quality|count|
+-------+-----+
|      3|   10|
|      4|   53|
|      5|  681|
|      6|  638|
|      7|  199|
|      8|   18|
+-------+-----+


Entrenando el modelo...


In [0]:
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline

# Cargar los datos
df = spark.table("wine1")

# Verificar clases únicas en la variable target
print("Distribución de clases de calidad:")
df.groupBy("quality").count().orderBy("quality").show()

# Seleccionar las características (features)
feature_cols = [
    "fixed acidity", "volatile acidity", "citric acid", "residual sugar",
    "chlorides", "free sulfur dioxide", "total sulfur dioxide", "density",
    "pH", "sulphates", "alcohol"
]

# Crear el pipeline de preprocesamiento y modelo
assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="assembled_features"
)

scaler = StandardScaler(
    inputCol="assembled_features",
    outputCol="features",
    withStd=True,
    withMean=True
)

# Configurar el Random Forest con parámetros predefinidos
rf = RandomForestClassifier(
    labelCol="quality",
    featuresCol="features",
    numTrees=50,
    maxDepth=10,
    seed=42
)

# Crear el pipeline
pipeline = Pipeline(stages=[assembler, scaler, rf])

# Dividir los datos
train_data, test_data = df.randomSplit([0.7, 0.3], seed=42)

# Entrenar el modelo
print("\nEntrenando el modelo...")
model = pipeline.fit(train_data)

# Hacer predicciones
predictions = model.transform(test_data)

# Evaluar el modelo
evaluator_accuracy = MulticlassClassificationEvaluator(
    labelCol="quality",
    predictionCol="prediction",
    metricName="accuracy"
)

evaluator_f1 = MulticlassClassificationEvaluator(
    labelCol="quality",
    predictionCol="prediction",
    metricName="f1"
)

accuracy = evaluator_accuracy.evaluate(predictions)
f1 = evaluator_f1.evaluate(predictions)

print(f"\nMétricas de evaluación:")
print(f"Accuracy: {accuracy:.4f}")
print(f"F1 Score: {f1:.4f}")

# Matriz de confusión
print("\nMatriz de confusión:")
predictions.groupBy("quality", "prediction").count().orderBy("quality", "prediction").show()

# Importancia de características
rf_model = model.stages[-1]
feature_importance = rf_model.featureImportances
print("\nImportancia de características (ordenadas por importancia):")
feature_importances = list(zip(feature_cols, feature_importance))
feature_importances.sort(key=lambda x: x[1], reverse=True)
for feature, importance in feature_importances:
    print(f"{feature}: {importance:.4f}")

# Ejemplos de predicciones
print("\nEjemplos de predicciones:")
predictions.select("quality", "prediction", "probability").show(5)

Distribución de clases de calidad:
+-------+-----+
|quality|count|
+-------+-----+
|      3|   10|
|      4|   53|
|      5|  681|
|      6|  638|
|      7|  199|
|      8|   18|
+-------+-----+


Entrenando el modelo...

Métricas de evaluación:
Accuracy: 0.6690
F1 Score: 0.6548

Matriz de confusión:
+-------+----------+-----+
|quality|prediction|count|
+-------+----------+-----+
|      3|       4.0|    1|
|      3|       5.0|    2|
|      4|       5.0|    4|
|      4|       6.0|    3|
|      5|       5.0|  149|
|      5|       6.0|   47|
|      5|       7.0|    1|
|      6|       5.0|   29|
|      6|       6.0|  113|
|      6|       7.0|   12|
|      7|       5.0|    2|
|      7|       6.0|   32|
|      7|       7.0|   23|
|      8|       6.0|    7|
|      8|       7.0|    1|
+-------+----------+-----+


Importancia de características (ordenadas por importancia):
alcohol: 0.1833
sulphates: 0.1132
volatile acidity: 0.0976
fixed acidity: 0.0972
total sulfur dioxide: 0.0961
density: 0.08