## Ejercicio Módulo 6 con Dataset Diamonds

0. Librerías a utilizar

In [1]:
import pandas as pd 
import seaborn as sns 
import requests

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, FloatType, StringType, IntegerType, NumericType
from pyspark.sql.functions import col, sum 
from pyspark.ml.feature import StringIndexer, Imputer, OneHotEncoder, VectorAssembler
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator 
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml import PipelineModel

1. Carga de datos de diamonds desde CSV con schema

In [3]:
spark = SparkSession.builder.appName("pipeline_diamonds").getOrCreate()
spark

In [4]:
url = 'https://raw.githubusercontent.com/mwaskom/seaborn-data/refs/heads/master/diamonds.csv'
csv_path = 'diamonds.csv'

with open(csv_path, 'wb') as file:
    file.write(requests.get(url).content)
    
schema = StructType([
    StructField('carat', FloatType(), True),
    StructField('cut', StringType(), True),
    StructField('color', StringType(), True),
    StructField('clarity', StringType(), True),
    StructField('depth', FloatType(), True),
    StructField('table', IntegerType(), True),
    StructField('price', IntegerType(), True),
    StructField('x', FloatType(), True),
    StructField('y', FloatType(), True),
    StructField('z', FloatType(), True)
])

df = spark.read.csv(csv_path, header=True, inferSchema=False, schema=schema)
df.show(5)
df.printSchema()

+-----+-------+-----+-------+-----+-----+-----+----+----+----+
|carat|    cut|color|clarity|depth|table|price|   x|   y|   z|
+-----+-------+-----+-------+-----+-----+-----+----+----+----+
| 0.23|  Ideal|    E|    SI2| 61.5|   55|  326|3.95|3.98|2.43|
| 0.21|Premium|    E|    SI1| 59.8|   61|  326|3.89|3.84|2.31|
| 0.23|   Good|    E|    VS1| 56.9|   65|  327|4.05|4.07|2.31|
| 0.29|Premium|    I|    VS2| 62.4|   58|  334| 4.2|4.23|2.63|
| 0.31|   Good|    J|    SI2| 63.3|   58|  335|4.34|4.35|2.75|
+-----+-------+-----+-------+-----+-----+-----+----+----+----+
only showing top 5 rows

root
 |-- carat: float (nullable = true)
 |-- cut: string (nullable = true)
 |-- color: string (nullable = true)
 |-- clarity: string (nullable = true)
 |-- depth: float (nullable = true)
 |-- table: integer (nullable = true)
 |-- price: integer (nullable = true)
 |-- x: float (nullable = true)
 |-- y: float (nullable = true)
 |-- z: float (nullable = true)



In [5]:
# Borramos las filas donde las variables a predeicir sean NaN
df_regresion = df.dropna(subset=['price'])
df_clasificacion = df.dropna(subset=['cut'])

# Contamos nulos
df_regresion.select([sum(col(c).isNull().cast('int')).alias(c) for c in df.columns]).show()
df_clasificacion.select([sum(col(c).isNull().cast('int')).alias(c) for c in df.columns]).show()

+-----+---+-----+-------+-----+-----+-----+---+---+---+
|carat|cut|color|clarity|depth|table|price|  x|  y|  z|
+-----+---+-----+-------+-----+-----+-----+---+---+---+
|    0|  0|    0|      0|    0|  924|    0|  0|  0|  0|
+-----+---+-----+-------+-----+-----+-----+---+---+---+

+-----+---+-----+-------+-----+-----+-----+---+---+---+
|carat|cut|color|clarity|depth|table|price|  x|  y|  z|
+-----+---+-----+-------+-----+-----+-----+---+---+---+
|    0|  0|    0|      0|    0|  924|    0|  0|  0|  0|
+-----+---+-----+-------+-----+-----+-----+---+---+---+



2. Pipeline regresión price con preprocesados: 
    * Imputer, StringIndexer, OneHotEncoder, MinMaxScaler o StandardScaler, VectorAssembler

In [6]:
df_regresion = df_regresion.withColumnRenamed('price', 'label')
df_regresion.show(3)

+-----+-------+-----+-------+-----+-----+-----+----+----+----+
|carat|    cut|color|clarity|depth|table|label|   x|   y|   z|
+-----+-------+-----+-------+-----+-----+-----+----+----+----+
| 0.23|  Ideal|    E|    SI2| 61.5|   55|  326|3.95|3.98|2.43|
| 0.21|Premium|    E|    SI1| 59.8|   61|  326|3.89|3.84|2.31|
| 0.23|   Good|    E|    VS1| 56.9|   65|  327|4.05|4.07|2.31|
+-----+-------+-----+-------+-----+-----+-----+----+----+----+
only showing top 3 rows



In [7]:
numerical_cols = [field.name for field in df_regresion.schema.fields if isinstance(field.dataType, NumericType) and field.name != 'price']
categorical_cols = [field.name for field in df_regresion.schema.fields if isinstance(field.dataType, StringType)]
label_col = 'price'

In [8]:
indexers_features = [
    StringIndexer(inputCol=c, outputCol=c + '_indexed', handleInvalid='keep') for c in categorical_cols
]
categorical_cols_indexed = [c + '_indexed' for c in categorical_cols]
print(categorical_cols_indexed)

['cut_indexed', 'color_indexed', 'clarity_indexed']


In [9]:
imputer_categorical = Imputer(
    inputCols=categorical_cols_indexed,
    outputCols=[c + '_imputed' for c in categorical_cols_indexed],
    strategy='mode'
)
categorical_cols_indexed_imputed = [c + '_imputed' for c in categorical_cols_indexed]
print(categorical_cols_indexed_imputed)

['cut_indexed_imputed', 'color_indexed_imputed', 'clarity_indexed_imputed']


In [10]:
encoders_onehot = [
    OneHotEncoder(inputCol=c, outputCol=c + '_onehot') 
    for c in categorical_cols_indexed_imputed
]
categorical_cols_onehot = [c + '_onehot' for c in categorical_cols_indexed_imputed]
print(categorical_cols_onehot)

['cut_indexed_imputed_onehot', 'color_indexed_imputed_onehot', 'clarity_indexed_imputed_onehot']


In [11]:
imputer_numerical = Imputer(
    inputCols=numerical_cols,
    outputCols=[c + '_imputed' for c in numerical_cols],
    strategy='median'
)
numerical_cols_imputed = [c + '_imputed' for c in numerical_cols]
print(numerical_cols_imputed)

['carat_imputed', 'depth_imputed', 'table_imputed', 'label_imputed', 'x_imputed', 'y_imputed', 'z_imputed']


In [12]:
# Escalar numéricas con MinMaxScaler
assembler_numerical = VectorAssembler(
    inputCols=numerical_cols_imputed,
    outputCol='numeric_features'
)
scaler = MinMaxScaler(
    inputCol='numeric_features',
    outputCol='numeric_features_scaled'
)

In [13]:
all_columns = ['numeric_features_scaled'] + categorical_cols_onehot
print(all_columns)

['numeric_features_scaled', 'cut_indexed_imputed_onehot', 'color_indexed_imputed_onehot', 'clarity_indexed_imputed_onehot']


In [14]:
# Ensamblar todo: numéricas + categóricas y obtener features
assembler_all = VectorAssembler(
    inputCols=all_columns,
    outputCol='features'
)

In [15]:
from pyspark.ml.regression import RandomForestRegressor
regressor = RandomForestRegressor(seed=42)

In [16]:
df_train, df_test = df_regresion.randomSplit([0.8, 0.2], seed=42)

In [17]:
pipeline = Pipeline(stages = [
    # 2. Indexers para columnas categóricas: 'species', 'sex' 
    *indexers_features, # ponemos * porque es una lista de objetos
    # 3. Imputer para categóricas
    imputer_categorical,
    # 4. One Hot Encoders para categóricas
    *encoders_onehot, # ponemos * porque es una lista de objetos
    # 5. Imputer para numéricas
    imputer_numerical,
    # 6. Ensamblar numéricas + escalado
    assembler_numerical,
    scaler,
    # 7. Ensamblar numéricas escaladas + categóricas en una sola columna 'features'
    assembler_all,
    # 8. modelo
    regressor
])

In [18]:
pipeline_model = pipeline.fit(df_train)
df_pred = pipeline_model.transform(df_test)

In [19]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator_r2 = RegressionEvaluator(metricName='r2')
evaluator_mae = RegressionEvaluator(metricName='mae')
evaluator_mse = RegressionEvaluator(metricName='mse')
evaluator_rmse = RegressionEvaluator(metricName='rmse')

print('r2', evaluator_r2.evaluate(df_pred))
print('mae', evaluator_mae.evaluate(df_pred))
print('mse', evaluator_mse.evaluate(df_pred))
print('rmse', evaluator_rmse.evaluate(df_pred))

r2 0.9827593975412918
mae 304.639249887946
mse 280516.28423282265
rmse 529.6378802850327


3. Pipeline clasificación multiclase sobre variable cut con preprocesados:
    * Imputer, StringIndexer, OneHotEncoder, MinMaxScaler o StandardScaler, VectorAssembler

In [20]:
numerical_cols = [field.name for field in df.schema.fields if isinstance(field.dataType, NumericType)]
categorical_cols = [field.name for field in df.schema.fields if isinstance(field.dataType, StringType) and field.name != 'cut']
label_col = 'cut'

In [21]:
# Indexer para la columna a predecir "cut"
indexer_label = StringIndexer(
    inputCol=label_col,
    outputCol='label',
    handleInvalid='keep'
)

In [22]:
# Indexers para las features de la entrada que no son la columna label a predecir
indexers_features = [
    StringIndexer(inputCol=c, outputCol=c + '_indexed', handleInvalid='keep') for c in categorical_cols
]
categorical_cols_indexed = [c + '_indexed' for c in categorical_cols]
print(categorical_cols_indexed)

['color_indexed', 'clarity_indexed']


In [23]:
# Imputer con la moda para las columnas categóricas indexadas
imputer_categorical = Imputer(
    inputCols=categorical_cols_indexed,
    outputCols=[c + '_imputed' for c in categorical_cols_indexed],
    strategy='mode'
)
categorical_cols_indexed_imputed = [c + '_imputed' for c in categorical_cols_indexed]
print(categorical_cols_indexed_imputed)

['color_indexed_imputed', 'clarity_indexed_imputed']


In [24]:
# one hot encoders para las categóricas indexadas imputadas
encoders_onehot = [
    OneHotEncoder(inputCol=c, outputCol=c + '_onehot') 
    for c in categorical_cols_indexed_imputed
]
categorical_cols_onehot = [c + '_onehot' for c in categorical_cols_indexed_imputed]
print(categorical_cols_onehot)

['color_indexed_imputed_onehot', 'clarity_indexed_imputed_onehot']


In [25]:
# Imputer con la mediana para la columnas numéricas
imputer_numerical = Imputer(
    inputCols=numerical_cols,
    outputCols=[c + '_imputed' for c in numerical_cols],
    strategy='median'
)
numerical_cols_imputed = [c + '_imputed' for c in numerical_cols]
print(numerical_cols_imputed)

['carat_imputed', 'depth_imputed', 'table_imputed', 'price_imputed', 'x_imputed', 'y_imputed', 'z_imputed']


In [26]:
# Escalar numéricas con MinMaxScaler
assembler_numerical = VectorAssembler(
    inputCols=numerical_cols_imputed,
    outputCol='numeric_features'
)
scaler = MinMaxScaler(
    inputCol='numeric_features',
    outputCol='numeric_features_scaled'
)

In [27]:
all_columns = ['numeric_features_scaled'] + categorical_cols_onehot
print(all_columns)

['numeric_features_scaled', 'color_indexed_imputed_onehot', 'clarity_indexed_imputed_onehot']


In [28]:
# Ensamblar todo: numéricas + categóricas y obtener features
assembler_all = VectorAssembler(
    inputCols=all_columns,
    outputCol='features'
)

In [29]:
classifier = RandomForestClassifier(seed=42)

In [30]:
# Particionamiento de datos
df_train, df_test = df_clasificacion.randomSplit([0.8, 0.2], seed=42)

In [31]:
pipeline = Pipeline(stages = [
    indexer_label, 
    *indexers_features, 
    imputer_categorical,
    *encoders_onehot,
    imputer_numerical,
    assembler_numerical,
    scaler,
    assembler_all,
    classifier
])

In [33]:
pipeline_model = pipeline.fit(df_train)
df_pred = pipeline_model.transform(df_test)

In [34]:
evaluator_accuracy = MulticlassClassificationEvaluator(metricName='accuracy')
evaluator_f1 = MulticlassClassificationEvaluator(metricName='f1')
evaluator_precision = MulticlassClassificationEvaluator(metricName='weightedPrecision')
evaluator_recall = MulticlassClassificationEvaluator(metricName='weightedRecall')

In [35]:
print('accuracy', evaluator_accuracy.evaluate(df_pred))
print('f1', evaluator_f1.evaluate(df_pred))
print('precision', evaluator_precision.evaluate(df_pred))
print('recall', evaluator_recall.evaluate(df_pred))

accuracy 0.6671271990420926
f1 0.6163093712469612
precision 0.6458667343549683
recall 0.6671271990420927


4. Gridsearch con CrossValidation sobre cualquiera de los pipelines

In [36]:
paramGrid = (
    ParamGridBuilder()
    .addGrid(classifier.numTrees, [5, 10, 15, 20, 25, 30]) 
    .addGrid(classifier.maxDepth, [3, 5, 10, 15]) 
    .build()
)

In [37]:
crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid, 
    evaluator=evaluator_f1,
    numFolds=3, 
    parallelism=4,
    seed=42
)
cv_model = crossval.fit(df_train)
df_pred = cv_model.transform(df_test)

In [38]:
print('accuracy', evaluator_accuracy.evaluate(df_pred))
print('f1', evaluator_f1.evaluate(df_pred))
print('precision', evaluator_precision.evaluate(df_pred))
print('recall', evaluator_recall.evaluate(df_pred))

accuracy 0.7285622179239201
f1 0.7104826620172301
precision 0.7157801300796856
recall 0.72856221792392


In [39]:
best_model = cv_model.bestModel
best_rf = best_model.stages[-1] # accede a la última fase del pipeline que es el modelo classifier
print(best_rf.extractParamMap())
print(best_rf.getNumTrees)
print(best_rf.getOrDefault('maxDepth'))
print(best_rf.featureImportances)

{Param(parent='RandomForestClassifier_cb43ed48ccef', name='bootstrap', doc='Whether bootstrap samples are used when building trees.'): True, Param(parent='RandomForestClassifier_cb43ed48ccef', name='cacheNodeIds', doc='If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval.'): False, Param(parent='RandomForestClassifier_cb43ed48ccef', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext.'): 10, Param(parent='RandomForestClassifier_cb43ed48ccef', name='featureSubsetStrategy', doc="The number of features to consider for splits at each tree node. Support

Exportar modelo 

In [40]:
pipeline_model.write().overwrite().save('pipeline_spark')

In [41]:
# Cargar el modelo
loaded_pipeline = PipelineModel.load('pipeline_spark')