# Ejercicio del módulo 6 de Ciencia de Datos

Dataset: diamonds

* PARTE 1 (10 %) Carga de datos de diamonds desde CSV con schema: https://raw.githubusercontent.com/mwaskom/seaborn-data/refs/heads/master/diamonds.csv

* PARTE 2 (40 %) Pipeline regresión price con preprocesados
  * Imputer, StringIndexer, OneHotEncoder, MinMaxScaler o StandardScaler, VectorAssembler

* PARTE 3 (40 %) Pipeline clasificación multiclase sobre variable cut con preprocesados
  * Imputer, StringIndexer, OneHotEncoder, MinMaxScaler o StandardScaler, VectorAssembler

* PARTE 4 (10 %) Gridsearch con CrossValidation sobre cualquiera de los pipelines

Los modelos, se puede utilizar RandomForest para los dos por ejemplo o el que se quiera. Ejemplo RandomForestRegressor para regresión y MultiLayerPerceptronClassifier para clasificación.

m6_nombre_apellido.ipynb

Usar pyspark MLlib y dataframes de pyspark. 

Busco actualizaciones de Ubuntu:

wsl --update

# Probar docker

Primero abrir la aplicación Docker Desktop.

docker run hello-world

Si todo va bien sale esto:

Hello from Docker!
This message shows that your installation appears to be working correctly.


escribimos en la consola de cmd o de ubuntu:

docker run -it --name pyspark -p 8888:8888 jupyter/pyspark-notebook

se abrirá el proceso, nos dará uina dirección que hay que abrir en el navegador, y poner en el Kernel de Visual Studio Code

In [433]:
import pyspark
import pandas as pd
import seaborn as sns
from pyspark.sql import SparkSession
import requests
from pyspark.sql.types import StructType, StructField, FloatType, StringType, IntegerType, NumericType
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, RegressionEvaluator
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, Imputer, OneHotEncoder, VectorAssembler, MinMaxScaler
from pyspark.ml.regression import DecisionTreeRegressor, RandomForestRegressor, GBTRegressor, LinearRegression
from pyspark.sql.functions import col, sum 


In [434]:
spark = SparkSession.builder.appName('pipeline_diamonds').getOrCreate()
spark

In [435]:
# Llamamos el dataset, que está en esta url:
url = 'https://raw.githubusercontent.com/mwaskom/seaborn-data/master/diamonds.csv'
csv_path = 'diamonds.csv'
with open(csv_path, 'wb') as file:
    file.write(requests.get(url).content)

In [436]:
# Pedimos el esquema del archivo:   
schema = StructType([
    StructField('carat', FloatType(), True),
    StructField('cut', StringType(), True),
    StructField('color', StringType(), True),
    StructField('clarity', StringType(), True),
    StructField('depth', FloatType(), True),
    StructField('table', FloatType(), True),
    StructField('price', IntegerType(), True),
    StructField('x', FloatType(), True),
    StructField('y', FloatType(), True),
    StructField('z', FloatType(), True),
])
df = spark.read.csv(csv_path, header=True, inferSchema=False, schema=schema)

In [437]:
df.columns

['carat', 'cut', 'color', 'clarity', 'depth', 'table', 'price', 'x', 'y', 'z']

In [438]:
# equivalente al describe de pandas
df.summary().show()

+-------+------------------+---------+-----+-------+------------------+------------------+-----------------+------------------+------------------+------------------+
|summary|             carat|      cut|color|clarity|             depth|             table|            price|                 x|                 y|                 z|
+-------+------------------+---------+-----+-------+------------------+------------------+-----------------+------------------+------------------+------------------+
|  count|             53940|    53940|53940|  53940|             53940|             53940|            53940|             53940|             53940|             53940|
|   mean|0.7979397459442544|     NULL| NULL|   NULL|61.749404890324215|57.457183908399585|3932.799721913237| 5.731157212872659| 5.734525955793015|3.5387337920972493|
| stddev| 0.474011242836904|     NULL| NULL|   NULL| 1.432621320665403|2.2344905638396657|3989.439738146397|1.1217607437465076|1.1421346736743894| 0.705698843275196|
|   

In [439]:
df.show(5) # Equivale al head de Pandas

+-----+-------+-----+-------+-----+-----+-----+----+----+----+
|carat|    cut|color|clarity|depth|table|price|   x|   y|   z|
+-----+-------+-----+-------+-----+-----+-----+----+----+----+
| 0.23|  Ideal|    E|    SI2| 61.5| 55.0|  326|3.95|3.98|2.43|
| 0.21|Premium|    E|    SI1| 59.8| 61.0|  326|3.89|3.84|2.31|
| 0.23|   Good|    E|    VS1| 56.9| 65.0|  327|4.05|4.07|2.31|
| 0.29|Premium|    I|    VS2| 62.4| 58.0|  334| 4.2|4.23|2.63|
| 0.31|   Good|    J|    SI2| 63.3| 58.0|  335|4.34|4.35|2.75|
+-----+-------+-----+-------+-----+-----+-----+----+----+----+
only showing top 5 rows



In [440]:
# Vemos el esquema:
df.printSchema()

root
 |-- carat: float (nullable = true)
 |-- cut: string (nullable = true)
 |-- color: string (nullable = true)
 |-- clarity: string (nullable = true)
 |-- depth: float (nullable = true)
 |-- table: float (nullable = true)
 |-- price: integer (nullable = true)
 |-- x: float (nullable = true)
 |-- y: float (nullable = true)
 |-- z: float (nullable = true)



In [441]:
df.dtypes

[('carat', 'float'),
 ('cut', 'string'),
 ('color', 'string'),
 ('clarity', 'string'),
 ('depth', 'float'),
 ('table', 'float'),
 ('price', 'int'),
 ('x', 'float'),
 ('y', 'float'),
 ('z', 'float')]

In [442]:
#help(df.withColumn)

In [443]:
# agrupar datos
# equivalente a value_counts de pandas
df.groupBy('cut').count().show()

+---------+-----+
|      cut|count|
+---------+-----+
|  Premium|13791|
|    Ideal|21551|
|     Good| 4906|
|     Fair| 1610|
|Very Good|12082|
+---------+-----+



In [444]:
df.groupBy('color').count().show()

+-----+-----+
|color|count|
+-----+-----+
|    F| 9542|
|    E| 9797|
|    D| 6775|
|    J| 2808|
|    G|11292|
|    I| 5422|
|    H| 8304|
+-----+-----+



In [445]:
df.groupBy('clarity').count().show()

+-------+-----+
|clarity|count|
+-------+-----+
|   VVS2| 5066|
|    SI1|13065|
|     IF| 1790|
|     I1|  741|
|   VVS1| 3655|
|    VS2|12258|
|    SI2| 9194|
|    VS1| 8171|
+-------+-----+



# REGRESIÓN PRICE

In [446]:
help(LinearRegression)

Help on class LinearRegression in module pyspark.ml.regression:

class LinearRegression(_JavaRegressor, _LinearRegressionParams, pyspark.ml.util.JavaMLWritable, pyspark.ml.util.JavaMLReadable)
 |  LinearRegression(*, featuresCol: str = 'features', labelCol: str = 'label', predictionCol: str = 'prediction', maxIter: int = 100, regParam: float = 0.0, elasticNetParam: float = 0.0, tol: float = 1e-06, fitIntercept: bool = True, standardization: bool = True, solver: str = 'auto', weightCol: Optional[str] = None, aggregationDepth: int = 2, loss: str = 'squaredError', epsilon: float = 1.35, maxBlockSizeInMB: float = 0.0)
 |  
 |  Linear regression.
 |  
 |  The learning objective is to minimize the specified loss function, with regularization.
 |  This supports two kinds of loss:
 |  
 |  * squaredError (a.k.a squared loss)
 |  * huber (a hybrid of squared error for relatively small errors and absolute error for     relatively large ones, and we estimate the scale parameter from training data

In [447]:
# Como vamos a predecir 'price', borramos filas donde 'price' sea nan:
df_regresion = df.dropna(subset=['price'])

# contar nulos en todas las columnas: equivalente a pandas df.isna().sum()
df_regresion.select([sum(col(c).isNull().cast('int')).alias(c) for c in df_regresion.columns]).show()

+-----+---+-----+-------+-----+-----+-----+---+---+---+
|carat|cut|color|clarity|depth|table|price|  x|  y|  z|
+-----+---+-----+-------+-----+-----+-----+---+---+---+
|    0|  0|    0|      0|    0|    0|    0|  0|  0|  0|
+-----+---+-----+-------+-----+-----+-----+---+---+---+



In [448]:
# Renombrar la columna 'price' a 'label' para usarla como la columna objetivo
df_regresion = df_regresion.withColumnRenamed('price', 'label') #.select('features', 'label')
df_regresion.show(3)

+-----+-------+-----+-------+-----+-----+-----+----+----+----+
|carat|    cut|color|clarity|depth|table|label|   x|   y|   z|
+-----+-------+-----+-------+-----+-----+-----+----+----+----+
| 0.23|  Ideal|    E|    SI2| 61.5| 55.0|  326|3.95|3.98|2.43|
| 0.21|Premium|    E|    SI1| 59.8| 61.0|  326|3.89|3.84|2.31|
| 0.23|   Good|    E|    VS1| 56.9| 65.0|  327|4.05|4.07|2.31|
+-----+-------+-----+-------+-----+-----+-----+----+----+----+
only showing top 3 rows



In [449]:
# seleccionar los nombres de las columnas a las que vamos a aplicar Preprocesados
# Identificar columnas numéricas y categóricas
numerical_cols = [field.name for field in df_regresion.schema.fields if isinstance(field.dataType, NumericType) and field.name != 'label']
categorical_cols = [field.name for field in df_regresion.schema.fields if isinstance(field.dataType, StringType)]
label_col = 'label'

In [450]:
print(numerical_cols)
print(categorical_cols)

['carat', 'depth', 'table', 'x', 'y', 'z']
['cut', 'color', 'clarity']


In [451]:
# Crear indexadores para las columnas categóricas
# Indexers para las features de la entrada que no son la columna label a predecir
# crea un objeto StringIndexer por cada columna categórica a indexar
indexers_features = [
    StringIndexer(inputCol=c, outputCol=c + '_indexed', handleInvalid='keep') for c in categorical_cols
]
categorical_cols_indexed = [c + '_indexed' for c in categorical_cols]
print(categorical_cols_indexed)

['cut_indexed', 'color_indexed', 'clarity_indexed']


In [452]:
# Imputar valores faltantes en las columnas categóricas indexadas usando la moda
imputer_categorical = Imputer(
    inputCols=categorical_cols_indexed,
    outputCols=[c + '_imputed' for c in categorical_cols_indexed],
    strategy='mode'
)
categorical_cols_indexed_imputed = [c + '_imputed' for c in categorical_cols_indexed]
print(categorical_cols_indexed_imputed)

['cut_indexed_imputed', 'color_indexed_imputed', 'clarity_indexed_imputed']


In [453]:
# Crear codificadores OneHot para las columnas categóricas imputadas
encoders_onehot = [
    OneHotEncoder(inputCol=c, outputCol=c + '_onehot') 
    for c in categorical_cols_indexed_imputed
]
categorical_cols_onehot = [c + '_onehot' for c in categorical_cols_indexed_imputed]
print(categorical_cols_onehot)

['cut_indexed_imputed_onehot', 'color_indexed_imputed_onehot', 'clarity_indexed_imputed_onehot']


In [454]:
# Imputar valores faltantes en las columnas numéricas usando la mediana
imputer_numerical = Imputer(
    inputCols=numerical_cols,
    outputCols=[c + '_imputed' for c in numerical_cols],
    strategy='median'
)
numerical_cols_imputed = [c + '_imputed' for c in numerical_cols]
print(numerical_cols_imputed)

['carat_imputed', 'depth_imputed', 'table_imputed', 'x_imputed', 'y_imputed', 'z_imputed']


In [455]:
# (Opcional) escalar numéricas con MinMaxScaler
# Ensamblar las columnas numéricas imputadas en un vector y escalar los valores
assembler_numerical = VectorAssembler(
    inputCols=numerical_cols_imputed,
    outputCol='numeric_features'
)
scaler = MinMaxScaler(
    inputCol='numeric_features',
    outputCol='numeric_features_scaled'
)

In [456]:
# Ensamblar todas las características (numéricas escaladas + categóricas codificadas) en una sola columna 'features'
all_columns = ['numeric_features_scaled'] + categorical_cols_onehot
print(all_columns)

['numeric_features_scaled', 'cut_indexed_imputed_onehot', 'color_indexed_imputed_onehot', 'clarity_indexed_imputed_onehot']


In [457]:
# Ensamblar todo: numéricas + categóricas y obtener 'features'
assembler_all = VectorAssembler(
    inputCols=all_columns,
    outputCol='features'
)

In [458]:
# Crear un modelo de Random Forest Regressor
# regressor = RandomForestRegressor(seed=42, labelCol='price')
regressor = RandomForestRegressor(seed=42)

In [459]:
# Dividir los datos en conjuntos de entrenamiento y prueba
# particionamiento de datos antes del pipeline
df_train, df_test = df_regresion.randomSplit([0.8, 0.2], seed=42)

In [460]:
# Crear el pipeline con todas las etapas de preprocesamiento y el modelo
pipeline = Pipeline(stages = [
    # 2. Indexers para columnas categóricas: 'cut' 'color' y 'clarity'
    *indexers_features, # ponemos * porque es una lista de objetos
    # 3. Imputer para categóricas
    imputer_categorical,
    # 4. One Hot Encoders para categóricas
    *encoders_onehot, # ponemos * porque es una lista de objetos
    # 5. Imputer para numéricas: 'carat', 'depth', 'table', 'x', 'y' y 'z'
    imputer_numerical,
    # 6. Ensamblar numéricas + escalado
    assembler_numerical,
    scaler,
    # 7. Ensamblar numéricas escaladas + categóricas en una sola columna 'features'
    assembler_all,
    # 8. modelo
    regressor
])

In [461]:
# Entrenar el modelo con los datos de entrenamiento
# aplicamos el modelo a df_train y df_test, haciendo el fit en df_train y el transform en df_test
pipeline_model = pipeline.fit(df_train)

In [462]:
# Realizar predicciones en el conjunto de prueba
df_pred = pipeline_model.transform(df_test)
df_pred.show(4)

+-----+-------+-----+-------+-----+-----+-----+----+----+----+-----------+-------------+---------------+-------------------+---------------------+-----------------------+--------------------------+----------------------------+------------------------------+-------------+-------------+-------------+---------+---------+---------+--------------------+-----------------------+--------------------+-----------------+
|carat|    cut|color|clarity|depth|table|label|   x|   y|   z|cut_indexed|color_indexed|clarity_indexed|cut_indexed_imputed|color_indexed_imputed|clarity_indexed_imputed|cut_indexed_imputed_onehot|color_indexed_imputed_onehot|clarity_indexed_imputed_onehot|carat_imputed|depth_imputed|table_imputed|x_imputed|y_imputed|z_imputed|    numeric_features|numeric_features_scaled|            features|       prediction|
+-----+-------+-----+-------+-----+-----+-----+----+----+----+-----------+-------------+---------------+-------------------+---------------------+-----------------------+--

In [463]:
# Evaluar el modelo usando diferentes métricas de regresión
evaluator_r2 = RegressionEvaluator(metricName='r2')
evaluator_mae = RegressionEvaluator(metricName='mae')
evaluator_mse = RegressionEvaluator(metricName='mse')
evaluator_rmse = RegressionEvaluator(metricName='rmse')

print('r2', evaluator_r2.evaluate(df_pred))
print('mae', evaluator_mae.evaluate(df_pred))
print('mse', evaluator_mse.evaluate(df_pred))
print('rmse', evaluator_rmse.evaluate(df_pred))

r2 0.9070336070647529
mae 684.109865656974
mse 1512592.3852326302
rmse 1229.8749469895833


In [464]:
# GridSearch y CrossValidation para optimizar el modelo
paramGrid = (
    ParamGridBuilder()
    .addGrid(regressor.numTrees, [5, 10, 15, 20, 25, 30]) # por defecto es 20
    .addGrid(regressor.maxDepth, [3, 5, 10, 15]) # por defecto es 5 (rango de 0 a 30)
    .build()
)

In [465]:
# Validación cruzada para precio
crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid, #parametros para grid search hyper parameter tuning
    evaluator=evaluator_r2,
    numFolds=3, # 3 por defecto
    parallelism=4, 
    seed=42
)

In [466]:
# Entrenar el modelo con CrossValidation
cv_model = crossval.fit(df_train)

In [467]:
# Realizar predicciones con el mejor modelo encontrado
df_pred = cv_model.transform(df_test)

In [468]:
# Evaluar el mejor modelo
print('r2', evaluator_r2.evaluate(df_pred))
print('mae', evaluator_mae.evaluate(df_pred))
print('mse', evaluator_mse.evaluate(df_pred))
print('rmse', evaluator_rmse.evaluate(df_pred))

r2 0.9661739441819888
mae 378.4620268242738
mse 550360.5425286874
rmse 741.8628866095725


In [469]:
# Obtener los mejores parámetros del modelo
best_model = cv_model.bestModel
best_rf =best_model.stages[-1]
print(best_rf.extractParamMap())
print(best_rf.getNumTrees)
print(best_rf.getOrDefault('maxDepth'))
print(best_rf.featureImportances)

{Param(parent='RandomForestRegressor_53afc85602a1', name='bootstrap', doc='Whether bootstrap samples are used when building trees.'): True, Param(parent='RandomForestRegressor_53afc85602a1', name='cacheNodeIds', doc='If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval.'): False, Param(parent='RandomForestRegressor_53afc85602a1', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext.'): 10, Param(parent='RandomForestRegressor_53afc85602a1', name='featureSubsetStrategy', doc="The number of features to consider for splits at each tree node. Supported o

## Clasificación


In [470]:
# Eliminar filas con valores nulos en la columna 'cut'
# Como vamos a predecir 'cut' borramos filas donde 'cut' sea nan:
df_class = df.dropna(subset=['cut'])

# contar nulos en todas las columnas: equivalente a pandas df.isna().sum()
df_class.select([sum(col(c).isNull().cast('int')).alias(c) for c in df_class.columns]).show()

+-----+---+-----+-------+-----+-----+-----+---+---+---+
|carat|cut|color|clarity|depth|table|price|  x|  y|  z|
+-----+---+-----+-------+-----+-----+-----+---+---+---+
|    0|  0|    0|      0|    0|    0|    0|  0|  0|  0|
+-----+---+-----+-------+-----+-----+-----+---+---+---+



En este caso, asignamos como label a la variable cut

In [471]:
# Identificar columnas numéricas y categóricas para la clasificación
# seleccionar los nombres de las columnas a las que aplicar Preprocesados
numerical_cols = [field.name for field in df_class.schema.fields if isinstance(field.dataType, NumericType)]
categorical_cols = [field.name for field in df_class.schema.fields if isinstance(field.dataType, StringType) and field.name != 'cut']
label_col = 'cut'

In [472]:
# Crear un indexador para la columna objetivo 'cut'
# Indexer para 'cut' la columna a predecir
indexer_label = StringIndexer(
    inputCol=label_col, # va a indexar cut que es la que queremos predecir
    outputCol='label',
    handleInvalid='keep'
)

In [473]:
# Crear indexadores para las columnas categóricas de entrada
# Indexers para las features de la entrada que no son la columna label a predecir
# crea un objeto StringIndexer por cada columna categórica a indexar
indexers_features = [
    StringIndexer(inputCol=c, outputCol=c + '_indexed', handleInvalid='keep') for c in categorical_cols
]
categorical_cols_indexed = [c + '_indexed' for c in categorical_cols]
print(categorical_cols_indexed)

['color_indexed', 'clarity_indexed']


In [474]:
# Imputar valores faltantes en las columnas categóricas indexadas usando la moda
imputer_categorical = Imputer(
    inputCols=categorical_cols_indexed,
    outputCols=[c + '_imputed' for c in categorical_cols_indexed],
    strategy='mode'
)
categorical_cols_indexed_imputed = [c + '_imputed' for c in categorical_cols_indexed]
print(categorical_cols_indexed_imputed)

['color_indexed_imputed', 'clarity_indexed_imputed']


In [475]:
# Crear codificadores OneHot para las columnas categóricas imputadas
encoders_onehot = [
    OneHotEncoder(inputCol=c, outputCol=c + '_onehot') 
    for c in categorical_cols_indexed_imputed
]
categorical_cols_onehot = [c + '_onehot' for c in categorical_cols_indexed_imputed]
print(categorical_cols_onehot)

['color_indexed_imputed_onehot', 'clarity_indexed_imputed_onehot']


In [476]:
# Imputar valores faltantes en las columnas numéricas usando la mediana
imputer_numerical = Imputer(
    inputCols=numerical_cols,
    outputCols=[c + '_imputed' for c in numerical_cols],
    strategy='median'
)
numerical_cols_imputed = [c + '_imputed' for c in numerical_cols]
print(numerical_cols_imputed)

['carat_imputed', 'depth_imputed', 'table_imputed', 'price_imputed', 'x_imputed', 'y_imputed', 'z_imputed']


In [477]:
# Ensamblar las columnas numéricas imputadas en un vector y escalar los valores
# (Opcional) escalar numéricas con MinMaxScaler
assembler_numerical = VectorAssembler(
    inputCols=numerical_cols_imputed,
    outputCol='numeric_features'
)
scaler = MinMaxScaler(
    inputCol='numeric_features',
    outputCol='numeric_features_scaled'
)

In [478]:
# Ensamblar todas las características (numéricas escaladas + categóricas codificadas) en una sola columna 'features'
all_columns = ['numeric_features_scaled'] + categorical_cols_onehot
print(all_columns)

['numeric_features_scaled', 'color_indexed_imputed_onehot', 'clarity_indexed_imputed_onehot']


In [479]:
# Ensamblar todo: numéricas + categóricas y obtener features
assembler_all = VectorAssembler(
    inputCols=all_columns,
    outputCol='features'
)

In [480]:
# Crear un modelo de Random Forest Classifier
classifier = RandomForestClassifier(seed=42)

In [481]:
# Dividir los datos en conjuntos de entrenamiento y prueba
# particionamiento de datos
df_train, df_test = df_class.randomSplit([0.8, 0.2], seed=42)

In [482]:
# Crear el pipeline con todas las etapas de preprocesamiento y el modelo de clasificación
pipeline = Pipeline(stages = [
    # 1. Indexer para columna categórica 'cut' StringIndexer porque es la columna a predecir
    indexer_label,
    # 2. Indexers para columnas categóricas: 'color' y 'clarity'
    *indexers_features, # ponemos * porque es una lista de objetos
    # 3. Imputer para categóricas
    imputer_categorical,
    # 4. One Hot Encoders para categóricas
    *encoders_onehot, # ponemos * porque es una lista de objetos
    # 5. Imputer para numéricas
    imputer_numerical,
    # 6. Ensamblar numéricas + escalado
    assembler_numerical,
    scaler,
    # 7. Ensamblar numéricas escaladas + categóricas en una sola columna 'features'
    assembler_all,
    # 8. modelo de clasificación
    classifier
])

In [483]:
# Entrenar el modelo con los datos de entrenamiento
pipeline_model = pipeline.fit(df_train)

In [484]:
# Realizar predicciones en el conjunto de prueba
df_pred = pipeline_model.transform(df_test)

In [485]:
# Evaluar el modelo de clasificación usando diferentes métricas
evaluator_accuracy = MulticlassClassificationEvaluator(metricName='accuracy')
evaluator_f1 = MulticlassClassificationEvaluator(metricName='f1')
evaluator_precision = MulticlassClassificationEvaluator(metricName='weightedPrecision')
evaluator_recall = MulticlassClassificationEvaluator(metricName='weightedRecall')

In [486]:
print('accuracy', evaluator_accuracy.evaluate(df_pred))
print('f1', evaluator_f1.evaluate(df_pred))
print('precision', evaluator_precision.evaluate(df_pred))
print('recall', evaluator_recall.evaluate(df_pred))

accuracy 0.6690614350188818
f1 0.6194954681688324
precision 0.647494841273576
recall 0.6690614350188818


## Gridsearch y validación cruzada

In [487]:
# GridSearch y CrossValidation para optimizar el modelo de clasificación
paramGrid = (
    ParamGridBuilder()
    .addGrid(classifier.numTrees, [5, 10, 15, 20, 25, 30]) # por defecto es 20
    .addGrid(classifier.maxDepth, [3, 5, 10, 15]) # por defecto es 5 rango de [0, 30]
    .build()
)

In [488]:
crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid, # Parámetros para grid search hyper parameter tuning
    evaluator=evaluator_f1,
    numFolds=3, # por defecto ya 3 folds
    parallelism=4,
    seed=42
)

In [None]:
# Entrenar el modelo con CrossValidation
cv_model = crossval.fit(df_train)

In [None]:
# Realizar predicciones con el mejor modelo encontrado
df_pred = cv_model.transform(df_test)

In [489]:
# Evaluar el mejor modelo de clasificación
print('accuracy', evaluator_accuracy.evaluate(df_pred))
print('f1', evaluator_f1.evaluate(df_pred))
print('precision', evaluator_precision.evaluate(df_pred))
print('recall', evaluator_recall.evaluate(df_pred))

accuracy 0.6690614350188818
f1 0.6194954681688324
precision 0.647494841273576
recall 0.6690614350188818


In [490]:
# Obtener los mejores parámetros del modelo de clasificación
best_model = cv_model.bestModel
best_rf =best_model.stages[-1]
print(best_rf.extractParamMap())
print(best_rf.getNumTrees)
print(best_rf.getOrDefault('maxDepth'))
print(best_rf.featureImportances)

{Param(parent='RandomForestRegressor_53afc85602a1', name='bootstrap', doc='Whether bootstrap samples are used when building trees.'): True, Param(parent='RandomForestRegressor_53afc85602a1', name='cacheNodeIds', doc='If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval.'): False, Param(parent='RandomForestRegressor_53afc85602a1', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext.'): 10, Param(parent='RandomForestRegressor_53afc85602a1', name='featureSubsetStrategy', doc="The number of features to consider for splits at each tree node. Supported o

## Exportar modelo

In [None]:
# Guardar el modelo entrenado
pipeline_model.write().overwrite().save('pipeline_spark_diamonds')