
#
# MODULO 07 - EJERCICIO 06-A
# ALEXIS YURI M.


Primero es necesario instalar y configurar PySpark en el notebook de Colab. Para eso se ejecutan las siguientes celdas para instalar las librerías necesarias y crear un contexto de Spark.




In [31]:
# Se instala PySpark y findspark
!pip install pyspark findspark

# Se inicializa findspark para encontrar la instalación de Spark.
import findspark
findspark.init()

# Se importa SparkContext.
from pyspark import SparkContext
from pyspark.sql import SparkSession

# Se crea una sesión de Spark si no existe.
try:
    spark.stop()
except:
    pass
spark = SparkSession.builder.master("local[*]").appName("Pipeline_mlib").getOrCreate()

sc = spark.sparkContext

print("SparkSession está lista para usar.")

SparkSession está lista para usar.


Paso 1: Lee un CSV distribuido usando spark.read.csv() y explora su estructura.


In [32]:
# Se crea un DataFrame de ejemplo simulando un archivo cvs.
data = spark.createDataFrame([
    ("F", 25, 1),
    ("M", 30, 0),
    ("F", 35, 1),
    ("M", 40, 0),
    ("F", 45, 1),
    ("M", 50, 0)
], ["sexo", "edad", "etiqueta"])


# Se muestra la estructura del DataFrame.
data.printSchema()
data.show()


root
 |-- sexo: string (nullable = true)
 |-- edad: long (nullable = true)
 |-- etiqueta: long (nullable = true)

+----+----+--------+
|sexo|edad|etiqueta|
+----+----+--------+
|   F|  25|       1|
|   M|  30|       0|
|   F|  35|       1|
|   M|  40|       0|
|   F|  45|       1|
|   M|  50|       0|
+----+----+--------+



Paso 2: Aplica StringIndexer para convertir variables categóricas.


In [33]:
# Se importa la librería StringIndexer.
from pyspark.ml.feature import StringIndexer

# Se indexa la columna 'sexo' y crea una nueva columna 'sexoIndex'.
indexer = StringIndexer(inputCol="sexo", outputCol="sexoIndex")
indexed_data = indexer.fit(data).transform(data)

print("DataFrame después de la indexación:")
indexed_data.show()


DataFrame después de la indexación:
+----+----+--------+---------+
|sexo|edad|etiqueta|sexoIndex|
+----+----+--------+---------+
|   F|  25|       1|      0.0|
|   M|  30|       0|      1.0|
|   F|  35|       1|      0.0|
|   M|  40|       0|      1.0|
|   F|  45|       1|      0.0|
|   M|  50|       0|      1.0|
+----+----+--------+---------+



Paso 3: Usa VectorAssembler para combinar las columnas de entrada.


In [34]:
# Se importa la librería.
from pyspark.ml.feature import VectorAssembler

# Se combinan las columnas de entrada en una sola columna 'features'.
assembler = VectorAssembler(
    inputCols=["sexoIndex", "edad"],
    outputCol="features"
)
assembled_data = assembler.transform(indexed_data)

print("DataFrame con la columna 'features' ensamblada:")
assembled_data.show(truncate=False)

DataFrame con la columna 'features' ensamblada:
+----+----+--------+---------+----------+
|sexo|edad|etiqueta|sexoIndex|features  |
+----+----+--------+---------+----------+
|F   |25  |1       |0.0      |[0.0,25.0]|
|M   |30  |0       |1.0      |[1.0,30.0]|
|F   |35  |1       |0.0      |[0.0,35.0]|
|M   |40  |0       |1.0      |[1.0,40.0]|
|F   |45  |1       |0.0      |[0.0,45.0]|
|M   |50  |0       |1.0      |[1.0,50.0]|
+----+----+--------+---------+----------+




Paso 4: Define el modelo de clasificación (LogisticRegression).

In [35]:
# Se importa la librería del modelo.
from pyspark.ml.classification import LogisticRegression

# Se define el modelo
lr = LogisticRegression(labelCol="etiqueta", featuresCol="features")


Paso 5: Encadena las etapas en un Pipeline.

In [36]:
# Se importa la librería del pipeline.
from pyspark.ml import Pipeline

# Se encadenan las etapas en un Pipeline.
pipeline = Pipeline(stages=[indexer, assembler, lr])

Paso 6: Divide en train y test, y entrena el modelo con .fit().

In [37]:
# Se dividen los datos 80/20 y se entrena el modelo
(train_df, test_df) = data.randomSplit([0.8, 0.2], seed=42)
pipeline_model = pipeline.fit(train_df)

print("Pipeline entrenado con éxito.")

Pipeline entrenado con éxito.


Paso 7: Evalúa con BinaryClassificationEvaluator.

In [38]:
# Se mportan las librerías necesarias para la evaluación.
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# Se realizan predicciones en el conjunto de prueba.
predictions = pipeline_model.transform(test_df)

# Se muestran algunas predicciones para verificar el resultado.
print("Predicciones en el conjunto de prueba:")
predictions.select("etiqueta", "prediction", "probability").show()



Predicciones en el conjunto de prueba:
+--------+----------+--------------------+
|etiqueta|prediction|         probability|
+--------+----------+--------------------+
|       0|       0.0|[0.99999967348773...|
|       1|       1.0|[3.26512258673585...|
+--------+----------+--------------------+



Paso 7: Muestra resultados e interpreta las métricas.

In [39]:
# Se evalua con BinaryClassificationEvaluator para el AUC.
binary_evaluator = BinaryClassificationEvaluator(labelCol="etiqueta", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
auc = binary_evaluator.evaluate(predictions)

# Se evalua con MulticlassClassificationEvaluator para el F1-score.
multiclass_evaluator = MulticlassClassificationEvaluator(labelCol="etiqueta", predictionCol="prediction", metricName="f1")
f1_score = multiclass_evaluator.evaluate(predictions)

print(f"\nResultados de la evaluación:")
print(f"Área Bajo la Curva (AUC): {auc:.4f}")
print(f"F1-score: {f1_score:.4f}")

# Se detiene la sesión de Spark.
spark.stop()



Resultados de la evaluación:
Área Bajo la Curva (AUC): 1.0000
F1-score: 1.0000


Para el ejercicio, se evalúa la capacidad del modelo para distinguir entre dos clases.

Las dos métricas principales obtenidas son el Área Bajo la Curva (AUC) y el F1-score.

- AUC (Area Under the ROC Curve): Mide la capacidad del modelo para clasificar correctamente los ejemplos positivos y negativos. Un valor de AUC de 1.0 es perfecto, mientras que 0.5 es un rendimiento aleatorio. Cuanto más cerca de 1.0, mejor.

- F1-score: Es un promedio de la precisión y el recall del modelo. Es muy útil cuando las clases en el conjunto de datos no están balanceadas. Un valor de F1-score de 1.0 indica un modelo perfecto, mientras que 0.0 indica que es ineficaz. Un valor cercano a 1.0 es excelente.

En resumen, valores altos en ambas métricas (AUC y F1-score) demuestran que el modelo de clasificación tiene un alto rendimiento y es fiable en su tarea.