## Preprocesamiento de datos

Procedamos ahora al preprocesamiento de los datos para la posterior aplicación de algoritmos de machine learning.

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
input_path = '/content/drive/MyDrive/Colab_Notebooks/APBD/Evaluacion/data/output/clean_data_csv'
output_path= '/content/drive/MyDrive/Colab_Notebooks/APBD/Evaluacion/data/output/{}'

Cargamos las librerías necesarias y funciones auxiliares empleadas.

In [None]:
import pandas as pd
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.regression import RandomForestRegressor,LinearRegression
from xgboost.spark import SparkXGBRegressor
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import Evaluator
from pyspark.ml.evaluation import RegressionEvaluator
from math import sqrt

def extract_columns_by_type(df, data_type):
  columns_of_type = [col for col, dtype in df.dtypes if dtype == data_type]
  return df.select(columns_of_type)

Iniciamos el motor de Spark y cargamos los datos limpios. Guardamos además los `Id` de los datos que necesitaremos para la entrega de Kaggle.

In [None]:
spark = SparkSession.builder.master("local[*]").appName("nb03.1") \
    .getOrCreate()

In [None]:
full_df_all = spark.read.csv(input_path, header=True, inferSchema=True)
ids_test = full_df_all.filter(full_df_all.SalePrice.isNull()).select('Id')
full_df = full_df_all.drop('SalePrice')
full_df_all.show(3)

+---+----------+--------+-----------+-------+------+-----+--------+-----------+---------+---------+---------+------------+----------+----------+--------+----------+-----------+-----------+---------+------------+---------+--------+-----------+-----------+----------+----------+---------+---------+----------+--------+--------+------------+------------+----------+------------+----------+---------+-----------+-------+---------+----------+----------+--------+--------+------------+---------+------------+------------+--------+--------+------------+------------+-----------+------------+----------+----------+-----------+----------+------------+----------+----------+----------+----------+----------+----------+-----------+-------------+---------+-----------+--------+------+-----+-----------+-------+------+------+--------+-------------+---------+
| Id|MSSubClass|MSZoning|LotFrontage|LotArea|Street|Alley|LotShape|LandContour|Utilities|LotConfig|LandSlope|Neighborhood|Condition1|Condition2|BldgType|H

Dividimos el dataset y eliminamos la columna `Id`, pues no es explicativa y no la vamos a necesitar más.

In [None]:
full_df = full_df.join(full_df_all.select('Id','SalePrice'), on="Id", how="left")
columns_train = [col for col in full_df.columns if col != 'Id']
columns_test = [col for col in full_df.columns if col != 'SalePrice' and col != 'Id']
train_df = full_df.filter(F.col('SalePrice').isNotNull()).select(*columns_train)
test_df = full_df.filter(F.col('SalePrice').isNull()).select(*columns_test)

In [None]:
train_df.show(3)
test_df.show(3)

+----------+--------+-----------+-------+------+-----+--------+-----------+---------+---------+---------+------------+----------+----------+--------+----------+-----------+-----------+---------+------------+---------+--------+-----------+-----------+----------+----------+---------+---------+----------+--------+--------+------------+------------+----------+------------+----------+---------+-----------+-------+---------+----------+----------+--------+--------+------------+---------+------------+------------+--------+--------+------------+------------+-----------+------------+----------+----------+-----------+----------+------------+----------+----------+----------+----------+----------+----------+-----------+-------------+---------+-----------+--------+------+-----+-----------+-------+------+------+--------+-------------+---------+
|MSSubClass|MSZoning|LotFrontage|LotArea|Street|Alley|LotShape|LandContour|Utilities|LotConfig|LandSlope|Neighborhood|Condition1|Condition2|BldgType|HouseStyl

Procedamos ahora a crear el pipeline para la obtención de los dataframes en el formato requerido para los algoritmos de la API.
1. Creamos los `StringIndexer` para transformar las variables categóricas en numéricas.
2. Creamos el `Assembler`, para reunir todas las variables predictoras en una sola columna conteniendo el vector de covariables.
3. Realizamos un escalado de las variables predictoras. Se ha escogido el método `StandardScaler`, que transforma los datos al intervalo [0,1], aunque se podría haber empleado otros como `MinMaxScaler `, `MaxAbsScaler `, etc.

In [None]:
integer_features = extract_columns_by_type(test_df,'int').columns
categorical_features = extract_columns_by_type(test_df,'string').columns

In [None]:
indexers = [StringIndexer(inputCol=col, outputCol=col + "_n", handleInvalid='keep') for col in categorical_features]

selected_categorical_features = [col + '_n' for col in categorical_features]
selected_columns = integer_features + selected_categorical_features
assembler = VectorAssembler(inputCols=selected_columns, outputCol="features")

scaler = StandardScaler(inputCol="features", outputCol="scaled_features")

pipeline = Pipeline(stages=indexers + [assembler] + [scaler])

Ajustamos el pipeline al conjunto de entrenamiento para que el escalado no tome los datos del conjunto de test. En los `StringIndexer` se ha añadido el argumento `handleInvalid='keep'` para que tome una nueva categoría al transformar un conjunto nuevo en caso de que esta no aparezca en los datos de entrenamiento.

In [None]:
preprocessing_pl = pipeline.fit(train_df)

Hacemos la ransformación y escogemos las columnas requeridas.

In [None]:
preprocessing_train_df = preprocessing_pl.transform(train_df)
preprocessing_test_df = preprocessing_pl.transform(test_df)

In [None]:
preprocessing_train_df = preprocessing_train_df.select(preprocessing_train_df.scaled_features.alias('features'), preprocessing_train_df.SalePrice.alias('label'))
preprocessing_test_df = preprocessing_test_df.select(preprocessing_test_df.scaled_features.alias('features'))
preprocessing_train_df.show(3)

+--------------------+------+
|            features| label|
+--------------------+------+
|(78,[0,1,2,3,4,5,...|208500|
|(78,[0,1,2,3,4,5,...|181500|
|(78,[0,1,2,3,4,5,...|223500|
+--------------------+------+
only showing top 3 rows



Vamos a hacer un breve análisis a la variable objetivo para detectar si es necesario realizar alguna transformación.

In [None]:
preprocessing_train_df.describe('label').show()

+-------+------------------+
|summary|             label|
+-------+------------------+
|  count|              1460|
|   mean|180921.19589041095|
| stddev| 79442.50288288663|
|    min|             34900|
|    max|            755000|
+-------+------------------+



In [None]:
percentiles = preprocessing_train_df.approxQuantile("label", [0.25, 0.5, 0.75, 0.95, 0.99], 0.01)
print(f"Percentiles (Q1, Q2/Mediana, Q3, P95, P99): {percentiles}")

Percentiles (Q1, Q2/Mediana, Q3, P95, P99): [129000.0, 162000.0, 212000.0, 318061.0, 755000.0]


In [None]:
preprocessing_train_df.select(F.skewness("label")).show()

+------------------+
|   skewness(label)|
+------------------+
|1.8809407460340333|
+------------------+



La media es considerablemente mayor que la mediana. Además el coeficiente de asimetría es bastante alto. Esto nos hace pensar que es una distribución sesgada a la derecha. Por tanto, vamos a aplicar la transformación logarítmica.

In [None]:
preprocessing_train_df = preprocessing_train_df.withColumn('label_log', F.log1p(preprocessing_train_df['label']))
preprocessing_train_df.show(3)

+--------------------+------+------------------+
|            features| label|         label_log|
+--------------------+------+------------------+
|(78,[0,1,2,3,4,5,...|208500| 12.24769911637256|
|(78,[0,1,2,3,4,5,...|181500|12.109016442313738|
|(78,[0,1,2,3,4,5,...|223500|12.317171167298682|
+--------------------+------+------------------+
only showing top 3 rows



## Modelos de Machine Learning

Recordemos que contamos con unos datos para los que conocemos la variable objetivo y otros datos para la etrega en la competición de Kaggle, de los que no conocemos el valor del precio de las viviendas. Ajustaremos los modelos para el conjunto del que conocemos todos los datos.

In [None]:
df = preprocessing_train_df
df_test = preprocessing_test_df

Dividimos los datos en conjunto de entrenamiento y test.

In [None]:
global_seed = 123
err = dict()
train, test = df.randomSplit([0.8, 0.2], seed=global_seed)

Para todos los modelos emplearemos validación cruzada para determinar los hiperparámetros más adecuados. El error se medirá con el Root Mean Squared Error (RMSE), tal y como indica la competición de Kaggle.

El entrenamiento de los modelos se llevará a cabo con la variable objetivo transformada, `label_log`, pero el rendimiento final del modelo se medirá sobre la variable `label` original.

### Regresión lineal

Veamos en primer lugar regresión Lasso.

In [None]:
lasso = LinearRegression(featuresCol="features", labelCol="label",
                               elasticNetParam=1.0, regParam=0.1,
                               predictionCol = 'pred_lasso')

lasso_model = lasso.fit(train)
predictions = lasso_model.transform(test)
#predictions = predictions.withColumn("pred_lasso", F.exp(predictions["pred_lasso_log"])-1)

rmse_evaluator = RegressionEvaluator(
    labelCol="label",
    predictionCol="pred_lasso",
    metricName="rmse"
)

print("Parametro de regularizacion:", lasso_model.getRegParam())
print("Parametro de elasticnet:", lasso_model.getElasticNetParam())
print("Error RMSE en el conjunto de test:", rmse_evaluator.evaluate(predictions))

Parametro de regularizacion: 0.1
Parametro de elasticnet: 1.0
Error RMSE en el conjunto de test: 48304.43583499618


In [None]:
err['lasso'] = rmse_evaluator.evaluate(predictions)

Variamos los parámetros ahora, modificando la regularización y el parámetro del elasticNet. Tengamos en cuenta que si el valor es 1 es regresión Lasso, mientras que Ridge se da para el 0.

In [None]:
lr = LinearRegression(featuresCol="features", labelCol="label_log",
                      predictionCol="pred_lr_log")

paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 0.2]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

rmse_evaluator_log = RegressionEvaluator(
    labelCol="label_log",
    predictionCol="pred_lr_log",
    metricName="rmse"
)

crossval_lr = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=rmse_evaluator_log,
                          numFolds=5,
                          seed=global_seed)
lr_cv_model = crossval_lr.fit(train)

In [None]:
lr_model = lr_cv_model.bestModel
best_reg = lr_model.getRegParam()
best_elasticnet= lr_model.getElasticNetParam()
predictions = lr_model.transform(test)
predictions = predictions.withColumn("pred_lr", F. exp(predictions["pred_lr_log"])-1)

rmse_evaluator = RegressionEvaluator(
    labelCol="label",
    predictionCol="pred_lr",
    metricName="rmse"
)

error_lr = rmse_evaluator.evaluate(predictions)
print("Mejor parametro de regularizacion:", best_reg)
print("Mejor parametro de elasticnet:", best_elasticnet)
print("Error RMSE en el conjunto de test:", error_lr)

Mejor parametro de regularizacion: 0.1
Mejor parametro de elasticnet: 0.0
Error RMSE en el conjunto de test: 28932.951518146227


In [None]:
err['lr'] = error_lr

El mejor modelo de regresión lineal encontrado se corresponde con penalización Ridge.

Por último veamos los 5 valores con unos coeficientes más altos y más bajos respectivamente. Estas son, según el modelo, las variables más decisivas para establecer el precio de la vivienda.

In [None]:
coef_dict = dict(zip(selected_columns, lr_model.coefficients))
sorted_items = sorted(coef_dict.items(), key=lambda item: item[1], reverse=True)
dict(sorted_items[:5])

{'OverallQual': np.float64(0.07420234141086748),
 'GrLivArea': np.float64(0.040066000820650156),
 'GarageCars': np.float64(0.033364316288262905),
 '1stFlrSF': np.float64(0.03137225299493725),
 'OverallCond': np.float64(0.030319088852757498)}

In [None]:
coef_dict = dict(zip(selected_columns, lr_model.coefficients))
sorted_items = sorted(coef_dict.items(), key=lambda item: item[1])
dict(sorted_items[:5])

{'PoolArea': np.float64(-0.024840515162420154),
 'MSZoning_n': np.float64(-0.019023487521814063),
 'Functional_n': np.float64(-0.01733474103103184),
 'CentralAir_n': np.float64(-0.01724808424526503),
 'RoofMatl_n': np.float64(-0.015543803092882863)}

### RandomForestRegressor

Comenzamos la búsqueda del mejor modelo.

In [None]:
rf1 = RandomForestRegressor(predictionCol='pred_rf1_log', labelCol = 'label_log',
                            seed = global_seed)
paramGrid = ParamGridBuilder() \
    .addGrid(rf1.maxDepth, [4, 5, 7]) \
    .addGrid(rf1.numTrees, [10, 25, 50]) \
    .build()

rmse_evaluator_log = RegressionEvaluator(
    labelCol="label_log",
    predictionCol="pred_rf1_log",
    metricName="rmse"
)

crossval_rf = CrossValidator(estimator=rf1,
                          estimatorParamMaps=paramGrid,
                          evaluator=rmse_evaluator_log,
                          numFolds=5,
                          seed=global_seed)
rf1_cv_model = crossval_rf.fit(train)

In [None]:
rf1_model = rf1_cv_model.bestModel
best_maxDepth = rf1_model._java_obj.getMaxDepth()
best_numTrees = rf1_model._java_obj.getNumTrees()
predictions = rf1_model.transform(test)
predictions = predictions.withColumn("pred_rf1", F. exp(predictions["pred_rf1_log"])-1)

rmse_evaluator = RegressionEvaluator(
    labelCol="label",
    predictionCol="pred_rf1",
    metricName="rmse"
)

error_rf1 = rmse_evaluator.evaluate(predictions)
err['rf1'] = error_rf1
print("Mejor maxDepth:", best_maxDepth)
print("Mejor numTrees:", best_numTrees)
print("Error RMSE en el conjunto de test:", error_rf1)

Mejor maxDepth: 7
Mejor numTrees: 50
Error RMSE en el conjunto de test: 30771.941822160436


In [None]:
rf2 = RandomForestRegressor(predictionCol='pred_rf2_log', labelCol='label_log',
                            seed = global_seed)
paramGrid = ParamGridBuilder() \
    .addGrid(rf2.maxDepth, [7,8]) \
    .addGrid(rf2.numTrees, [50,75,100]) \
    .build()

rmse_evaluator_log = RegressionEvaluator(
    labelCol="label_log",
    predictionCol="pred_rf2_log",
    metricName="rmse"
)

crossval_rf = CrossValidator(estimator=rf2,
                          estimatorParamMaps=paramGrid,
                          evaluator=rmse_evaluator_log,
                          numFolds=5,
                          seed=global_seed)
rf2_cv_model = crossval_rf.fit(train)

In [None]:
rf2_model = rf2_cv_model.bestModel
best_maxDepth = rf2_model._java_obj.getMaxDepth()
best_numTrees = rf2_model._java_obj.getNumTrees()
predictions = rf2_model.transform(test)
predictions = predictions.withColumn("pred_rf2", F.exp(predictions["pred_rf2_log"])-1)


rmse_evaluator = RegressionEvaluator(
    labelCol="label",
    predictionCol="pred_rf2",
    metricName="rmse"
)

error_rf2 = rmse_evaluator.evaluate(predictions)
err['rf2'] = error_rf2
print("Mejor maxDepth:", best_maxDepth)
print("Mejor numTrees:", best_numTrees)
print("Error RMSE en el conjunto de test:", error_rf2)

Mejor maxDepth: 8
Mejor numTrees: 100
Error RMSE en el conjunto de test: 28566.298535547547


In [None]:
rf3 = RandomForestRegressor(predictionCol='pred_rf3_log', labelCol='label_log',
                            seed = global_seed)
paramGrid = ParamGridBuilder() \
    .addGrid(rf3.maxDepth, [8]) \
    .addGrid(rf3.numTrees, [110,120,130]) \
    .build()

rmse_evaluator_log = RegressionEvaluator(
    labelCol="label_log",
    predictionCol="pred_rf3_log",
    metricName="rmse"
)

crossval_rf = CrossValidator(estimator=rf3,
                          estimatorParamMaps=paramGrid,
                          evaluator=rmse_evaluator_log,
                          numFolds=5,
                          seed=global_seed)
rf3_cv_model = crossval_rf.fit(train)

In [None]:
rf3_model = rf3_cv_model.bestModel
best_maxDepth = rf3_model._java_obj.getMaxDepth()
best_numTrees = rf3_model._java_obj.getNumTrees()
predictions = rf3_model.transform(test)
predictions = predictions.withColumn("pred_rf3", F.exp(predictions["pred_rf3_log"])-1)

rmse_evaluator = RegressionEvaluator(
    labelCol="label",
    predictionCol="pred_rf3",
    metricName="rmse"
)

error_rf3 = rmse_evaluator.evaluate(predictions)
err['rf3'] = error_rf3
print("Mejor maxDepth:", best_maxDepth)
print("Mejor numTrees:", best_numTrees)
print("Error RMSE en el conjunto de test:", error_rf3)

Mejor maxDepth: 8
Mejor numTrees: 130
Error RMSE en el conjunto de test: 29219.933389566326


Podemos ver la importancia de las variables en el modelo.

In [None]:
rf2_fi = pd.DataFrame({'feature': selected_columns, 'importance_rf1': rf1_model.featureImportances.toArray()})
rf2_fi.sort_values(by='importance_rf1', ascending=False).reset_index(drop=True)

Unnamed: 0,feature,importance_rf1
0,OverallQual,0.249398
1,GrLivArea,0.152396
2,ExterQual_n,0.073520
3,FullBath,0.068966
4,GarageCars,0.060711
...,...,...
73,BsmtFinSF2,0.000177
74,3SsnPorch,0.000128
75,MiscFeature_n,0.000085
76,Street_n,0.000076


Destacan `OverallQual`, `GrLivArea` y `ExterQual`. Concuerda con las variables que vimos en la regresión lineal con valores altos de los coeficientes en valor absoluto.

### Gradient Boosting Regressor

In [None]:
xgb = SparkXGBRegressor(
    features_col="features",
    label_col="label_log",
    prediction_col = 'pred_xgb_log'
)

paramGrid = ParamGridBuilder() \
    .addGrid(xgb.max_depth, [3,4]) \
    .addGrid(xgb.n_estimators, [200,250,300]) \
    .addGrid(xgb.learning_rate, [0.1, 0.2]) \
    .build()

rmse_evaluator_log = RegressionEvaluator(
    labelCol="label_log",
    predictionCol="pred_xgb_log",
    metricName="rmse"
)

crossval_xgb = CrossValidator(estimator=xgb,
                          estimatorParamMaps=paramGrid,
                          evaluator=rmse_evaluator_log,
                          numFolds=5,
                          seed=global_seed)
xgb_cv_model = crossval_xgb.fit(train)

INFO:XGBoost-PySpark:Running xgboost-2.1.4 on 1 workers with
	booster params: {'device': 'cpu', 'learning_rate': 0.1, 'max_depth': 3, 'objective': 'reg:squarederror', 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 200}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
INFO:XGBoost-PySpark:Finished xgboost training!
INFO:XGBoost-PySpark:Running xgboost-2.1.4 on 1 workers with
	booster params: {'device': 'cpu', 'learning_rate': 0.2, 'max_depth': 3, 'objective': 'reg:squarederror', 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 200}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
INFO:XGBoost-PySpark:Finished xgboost training!
INFO:XGBoost-PySpark:Running xgboost-2.1.4 on 1 workers with
	booster params: {'device': 'cpu', 'learning_rate': 0.1, 'max_depth': 3, 'objective': 'reg:squarederror', 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 250}
	dmatrix_kwargs: {'nthread': 1, 'missing': 

In [None]:
xgb_model = xgb_cv_model.bestModel
best_maxDepth = xgb_model.getOrDefault("max_depth")
best_nEstimators = xgb_model.getOrDefault("n_estimators")
best_learningrate = xgb_model.getOrDefault("learning_rate")
predictions = xgb_model.transform(test)
predictions = predictions.withColumn("pred_xgb", F.exp(predictions["pred_xgb_log"])-1)

rmse_evaluator = RegressionEvaluator(
    labelCol="label",
    predictionCol="pred_xgb",
    metricName="rmse"
)

error_xgb = rmse_evaluator.evaluate(predictions)
err['xgb'] = error_xgb
print("Mejor maxDepth:", best_maxDepth)
print("Mejor numTrees:", best_nEstimators)
print("Mejor learning rate:", best_learningrate)
print("Error RMSE en el conjunto de test:", error_xgb)

Mejor maxDepth: 3
Mejor numTrees: 250
Mejor learning rate: 0.2
Error RMSE en el conjunto de test: 25822.37839934211


In [None]:
xgb2 = SparkXGBRegressor(
    features_col="features",
    label_col="label_log",
    prediction_col = 'pred_xgb_log'
)

paramGrid = ParamGridBuilder() \
    .addGrid(xgb2.max_depth, [3]) \
    .addGrid(xgb2.n_estimators, [240,245,255,260]) \
    .addGrid(xgb2.learning_rate, [0.1, 0.2]) \
    .build()

rmse_evaluator_log = RegressionEvaluator(
    labelCol="label_log",
    predictionCol="pred_xgb_log",
    metricName="rmse"
)

crossval_xgb2 = CrossValidator(estimator=xgb2,
                          estimatorParamMaps=paramGrid,
                          evaluator=rmse_evaluator_log,
                          numFolds=5,
                          seed=global_seed)
xgb2_cv_model = crossval_xgb2.fit(train)

INFO:XGBoost-PySpark:Running xgboost-2.1.4 on 1 workers with
	booster params: {'device': 'cpu', 'learning_rate': 0.1, 'max_depth': 3, 'objective': 'reg:squarederror', 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 240}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
INFO:XGBoost-PySpark:Finished xgboost training!
INFO:XGBoost-PySpark:Running xgboost-2.1.4 on 1 workers with
	booster params: {'device': 'cpu', 'learning_rate': 0.2, 'max_depth': 3, 'objective': 'reg:squarederror', 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 240}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
INFO:XGBoost-PySpark:Finished xgboost training!
INFO:XGBoost-PySpark:Running xgboost-2.1.4 on 1 workers with
	booster params: {'device': 'cpu', 'learning_rate': 0.1, 'max_depth': 3, 'objective': 'reg:squarederror', 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 245}
	dmatrix_kwargs: {'nthread': 1, 'missing': 

In [None]:
xgb2_model = xgb2_cv_model.bestModel
best_maxDepth = xgb2_model.getOrDefault("max_depth")
best_nEstimators = xgb2_model.getOrDefault("n_estimators")
best_learningrate = xgb2_model.getOrDefault("learning_rate")
predictions = xgb2_model.transform(test)
predictions = predictions.withColumn("pred_xgb", F.exp(predictions["pred_xgb_log"])-1)

rmse_evaluator = RegressionEvaluator(
    labelCol="label",
    predictionCol="pred_xgb",
    metricName="rmse"
)

error_xgb2 = rmse_evaluator.evaluate(predictions)
err['xgb2'] = error_xgb2
print("Mejor maxDepth:", best_maxDepth)
print("Mejor numTrees:", best_nEstimators)
print("Mejor learning rate:", best_learningrate)
print("Error RMSE en el conjunto de test:", error_xgb2)

Mejor maxDepth: 3
Mejor numTrees: 245
Mejor learning rate: 0.2
Error RMSE en el conjunto de test: 25737.453593479153


Obtenemos un modelo bastante bueno. No seguimos aumentando el número de árboles pues puede producir sobreajuste.

### Elección del modelo final

Veamos el diccionario en el que hemos almacenado los errores sobre el conjunto test para ver aquel con un menor valor.

In [None]:
sorted(err.items(), key= lambda item: item[1])

[('xgb2', 25737.453593479153),
 ('xgb', 25822.37839934211),
 ('rf2', 28566.298535547547),
 ('lr', 28932.951518146227),
 ('rf3', 29219.933389566326),
 ('rf1', 30771.941822160436),
 ('lasso', 48304.43583499618)]

Vemos como XGBoost con los parémetros de máxima profundidad de los árboles igual a 3, 245 árboles en total y learning rate 0.2 es con el que obtenemos mejores resultados.

In [None]:
predictions = xgb2_model.transform(test)
predictions = predictions.withColumn("pred_xgb", F. exp(predictions["pred_xgb_log"])-1)

In [None]:
predictions.select('features','label','pred_xgb').show(5)

+--------------------+------+------------------+
|            features| label|          pred_xgb|
+--------------------+------+------------------+
|(78,[0,1,2,3,4,5,...|250000|293979.44762391143|
|(78,[0,1,2,3,4,5,...|173000|171962.49561500587|
|(78,[0,1,2,3,4,5,...|129000|158302.00002483465|
|(78,[0,1,2,3,4,5,...|163000|159095.66704804794|
|(78,[0,1,2,3,4,5,...|217000|210714.58012036403|
+--------------------+------+------------------+
only showing top 5 rows



Por último, entrenamos el modelo con todos los datos conocidos para llevar a cabo la entrega a Kaggle.

In [None]:
xgb = SparkXGBRegressor(
    features_col="features",
    label_col="label_log",
    prediction_col = 'pred_xgb_log',
    max_depth = 3,
    n_estimators = 245,
    learning_rate = 0.2
)

xgb_model = xgb.fit(df)

predictions = xgb_model.transform(df_test)
predictions = predictions.withColumn("pred_xgb", F. exp(predictions["pred_xgb_log"])-1)

INFO:XGBoost-PySpark:Running xgboost-2.1.4 on 1 workers with
	booster params: {'objective': 'reg:squarederror', 'device': 'cpu', 'learning_rate': 0.2, 'max_depth': 3, 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 245}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
INFO:XGBoost-PySpark:Finished xgboost training!


Elaboramos la salida en el formato que requiere la entrega de Kaggle.

In [None]:
entrega = predictions.select('pred_xgb')
entrega = entrega.withColumn("index", F.monotonically_increasing_id())
ids_test = ids_test.withColumn("index", F.monotonically_increasing_id())
entrega  = entrega.join(ids_test, on="index")
entrega = entrega.select(entrega.Id, entrega.pred_xgb.alias('SalePrice'))
entrega.show(3)

+----+------------------+
|  Id|         SalePrice|
+----+------------------+
|1461|128462.13009257859|
|1462|153786.48334555558|
|1463| 200719.7799579249|
+----+------------------+
only showing top 3 rows



Exportamos por último el dataframe en un `.csv` sin particionar.

In [None]:
path = output_path.format('entrega_kaggle')
entrega.coalesce(1).write.mode('overwrite').option("header", "true").csv(path)

In [None]:
spark.stop()