# Uso de pipelines

Pasos:

1. Indexar columna label 'island': 0, 1, 2

2. Indexar columnas categóricas features: 'species' 0 1 2 y 'sex' 0 1

3. Imputer columnas categóricas features indexadas: 0, 1, 2  y 0, 1 se rellenan con moda. (obligatorio imputer después después indexer ya que imputer no admite texto)

4. (Opcional) OneHotEncoder para columnas categóricas features indexadas imputadas, es decir, aplicar encoder a las columnas del punto 3.

5. Imputer columnas numéricas con la mediana

6. MinMaxScaler para las columnas numéricas imputadas

7. VectorAssembler para ensamblar numéricas + categóricas

8. Crear modelo: RandomForestClassifier

9. Particionamiento de datos en df_train, df_test

10. Pipeline

In [52]:
import seaborn as sns
import pandas as pd
import requests
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, FloatType, StringType, IntegerType
from pyspark.sql.functions import col, sum
from pyspark.sql.types import NumericType, StringType
from pyspark.ml.feature import StringIndexer, Imputer, OneHotEncoder, VectorAssembler, MinMaxScaler
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [2]:
spark = SparkSession.builder.appName('uso_pipelines').getOrCreate()

In [3]:
url = 'https://raw.githubusercontent.com/mwaskom/seaborn-data/refs/heads/master/penguins.csv'
csv_path = 'penguins.csv'
with open(csv_path, 'wb') as file:
    file.write(requests.get(url).content)


In [4]:
    
schema = StructType([
    StructField('species', StringType(), True),
    StructField('island', StringType(), True),
    StructField('bill_length_mm', FloatType(), True),
    StructField('bill_depth_mm', FloatType(), True),
    StructField('flipper_length_mm', FloatType(), True),
    StructField('body_mass_g', IntegerType(), True),
    StructField('sex', StringType(), True),
])
df = spark.read.csv(csv_path, header=True, inferSchema=False, schema=schema)
df.show(5)
df.printSchema()

+-------+---------+--------------+-------------+-----------------+-----------+------+
|species|   island|bill_length_mm|bill_depth_mm|flipper_length_mm|body_mass_g|   sex|
+-------+---------+--------------+-------------+-----------------+-----------+------+
| Adelie|Torgersen|          39.1|         18.7|            181.0|       3750|  MALE|
| Adelie|Torgersen|          39.5|         17.4|            186.0|       3800|FEMALE|
| Adelie|Torgersen|          40.3|         18.0|            195.0|       3250|FEMALE|
| Adelie|Torgersen|          NULL|         NULL|             NULL|       NULL|  NULL|
| Adelie|Torgersen|          36.7|         19.3|            193.0|       3450|FEMALE|
+-------+---------+--------------+-------------+-----------------+-----------+------+
only showing top 5 rows

root
 |-- species: string (nullable = true)
 |-- island: string (nullable = true)
 |-- bill_length_mm: float (nullable = true)
 |-- bill_depth_mm: float (nullable = true)
 |-- flipper_length_mm: float 

In [5]:
# Como vamos a predecir island borramos filas donde island sea nan:
df = df.dropna(subset=['island']) #columna a predecir

# contar nulos en todas las columnas: equivalente a pandas df.isna().sum()
df.select([sum(col(c).isNull().cast('int')).alias(c) for c in df.columns]).show()

+-------+------+--------------+-------------+-----------------+-----------+---+
|species|island|bill_length_mm|bill_depth_mm|flipper_length_mm|body_mass_g|sex|
+-------+------+--------------+-------------+-----------------+-----------+---+
|      0|     0|             2|            2|                2|          2| 11|
+-------+------+--------------+-------------+-----------------+-----------+---+



In [6]:
# seleccionar los nombres de las columnas a las que aplicar Preprocesados
numerical_cols = [field.name for field in df.schema.fields if isinstance(field.dataType, NumericType)]
categorical_cols = [field.name for field in df.schema.fields if isinstance(field.dataType, StringType) and field.name != 'island']
label_col = 'island'

In [7]:
# Indexer para 'island' la columna a predecir
indexer_label = StringIndexer(
    inputCol= label_col,
    outputCol='label',
    handleInvalid='keep'
)

In [17]:
# Indexers para las features de la entrada que no son la columna label a predecir
# crea un objeto StringIndexer por cada columna categórica a indexar
indexers_features = [
    StringIndexer( inputCol=c, outputCol=c + '_indexed', handleInvalid='keep') for c in categorical_cols
]
categorical_cols_indexed = [c + '_indexed' for c in categorical_cols]
print(categorical_cols_indexed)

['species_indexed', 'sex_indexed']


In [9]:
# Imputer con la moda para las columnas categóricas indexadas
imputer_categorical = Imputer(
    inputCols=categorical_cols_indexed,
    outputCols= [c + '_imputed' for c in categorical_cols_indexed],
    strategy='mode'
)
categorical_cols_indexed_imputed = [c + '_imputed' for c in categorical_cols_indexed]
print(categorical_cols_indexed_imputed)

['species_indexed_imputed', 'sex_indexed_imputed']


In [10]:
# one hot encoders para las categóricas indexadas imputadas
encoders_onehot = [
    OneHotEncoder(inputCol=c, outputCol=c + '_onehot')
    for c in categorical_cols_indexed_imputed
]
categorical_cols_onehot = [c + '_onehot' for c in categorical_cols_indexed_imputed]
print(categorical_cols_onehot)

['species_indexed_imputed_onehot', 'sex_indexed_imputed_onehot']


In [11]:
# Imputer con la mediana para la columnas numéricas
imputer_numerical = Imputer(
    inputCols=numerical_cols,
    outputCols= [c + '_imputed' for c in numerical_cols],
    strategy='median'
)
numerical_cols_imputed = [c + '_imputed' for c in numerical_cols]
print(numerical_cols_imputed)

['bill_length_mm_imputed', 'bill_depth_mm_imputed', 'flipper_length_mm_imputed', 'body_mass_g_imputed']


In [38]:
# (Opcional) escalar numéricas con MinMaxScaler
assembler_numerical = VectorAssembler(
    inputCols=numerical_cols_imputed,
    outputCol='numeric_features'
)
scaler = MinMaxScaler(
    inputCol='numeric_features',
    outputCol='numeric_features_scaled'
)

In [13]:
all_columns = ['numeric_features_scaled'] + categorical_cols_onehot

In [14]:
# Ensamblar todo: numéricas + categóricas y obtener features
assembler_all = VectorAssembler(
    inputCols=all_columns,
    outputCol='features')

In [31]:
classifier = RandomForestClassifier(seed=42)

In [32]:
# Particionamiento de datos
df_train, df_test = df.randomSplit([0.8, 0.2], seed=42)

In [40]:
pipeline = Pipeline(stages = [
    # 1. indexer para la columna 'island' StringIndexer
    indexer_label,
    # 2. indexer para columnas categóricas 'species' y 'sex'
    *indexers_features, #ponemos * prorque es una lista de objetos 
    # 3. Imputer para categóricas
    imputer_categorical,
    # 4. OneHotEncoders para categóricas
    *encoders_onehot,
    # 5. Imputer para columnas numericas
    imputer_numerical,
    # 6.ensamblar numéricas + escalado
    assembler_numerical,
    scaler,
    # 7. ensamblar todas las columnas (numericas escaladas + categoricas en features)
    assembler_all,
    # 8. modelo de clasificacion
    classifier
])

In [41]:
pipeline_model = pipeline.fit(df_train)
df_pred = pipeline_model.transform(df_test)

In [43]:
evaluator_accuracy = MulticlassClassificationEvaluator(metricName='accuracy')
evaluator_f1 = MulticlassClassificationEvaluator(metricName='f1')
evaluator_precision = MulticlassClassificationEvaluator(metricName='weightedPrecision')
evaluator_recall = MulticlassClassificationEvaluator(metricName='weightedRecall')

In [47]:
print('accuracy', evaluator_accuracy.evaluate(df_pred))
print('f1', evaluator_f1.evaluate(df_pred))
print('precision', evaluator_precision.evaluate(df_pred))
print('recall', evaluator_recall.evaluate(df_pred))

accuracy 0.6037735849056604
f1 0.6368579517843184
precision 0.6937556154537285
recall 0.6037735849056604


## GridSearch y validación cruzada

In [50]:
paramGrid = (
    ParamGridBuilder()
    .addGrid(classifier.numTrees, [5, 10, 15, 20, 25, 30]) # por defecto es 20
    .addGrid(classifier.maxDepth, [3, 5, 10, 15]) # por defecto es 5 (rango de 0 a 30)
    .build()
)

In [53]:
crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid, #parametros para grid search hyper parameter tuning
    evaluator=evaluator_f1,
    numFolds=3, # 3 por defecto
    parallelism=4, 
    seed=42
)
cv_model = crossval.fit(df_train)
df_pred = cv_model.transform(df_test)

In [54]:
print('accuracy', evaluator_accuracy.evaluate(df_pred))
print('f1', evaluator_f1.evaluate(df_pred))
print('precision', evaluator_precision.evaluate(df_pred))
print('recall', evaluator_recall.evaluate(df_pred))

accuracy 0.6415094339622641
f1 0.6595386045996479
precision 0.6801257861635219
recall 0.6415094339622642


In [61]:
best_model = cv_model.bestModel
best_rf =best_model.stages[-1]
print(best_rf.extractParamMap())
print(best_rf.getNumTrees)
print(best_rf.getOrDefault('maxDepth'))
print(best_rf.featureImportances)

{Param(parent='RandomForestClassifier_fceb85e06d54', name='bootstrap', doc='Whether bootstrap samples are used when building trees.'): True, Param(parent='RandomForestClassifier_fceb85e06d54', name='cacheNodeIds', doc='If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval.'): False, Param(parent='RandomForestClassifier_fceb85e06d54', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext.'): 10, Param(parent='RandomForestClassifier_fceb85e06d54', name='featureSubsetStrategy', doc="The number of features to consider for splits at each tree node. Support

## Exportar modelo

In [62]:
pipeline_model.write().overwrite().save('pipeline_spark')