# House Prices: Primeros modelos.

#### *Álvaro Sánchez Castañeda*

Una vez tenemos nuestros datos limpios (**preprocesado.ipynb**), podemos proceder a construír modelos predictivos.

**Resumen:**
- Transformación de los datos.
- Definición de metrica **RMSLE**.
- Entrenamiento y evaluación de modelos.

### Inicializamos Spark y cargamos datos y librerias.

In [1]:
import sys

# Añade la ruta de tu carpeta Spark.
spark_path = "C:/Users/AlvaroSanchez91/Apps/spark-2.1.0-bin-hadoop2.7"

sys.path.append(spark_path + '/python')
sys.path.append(spark_path + '/python/lib/py4j-0.10.4-src.zip')

from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").appName("nb06") \
    .getOrCreate()

#LIBRERIAS.
import time
import numpy as np
import pandas as pd
from math import sqrt
from operator import add
from pyspark.sql.functions import col, count, sum as agg_sum
from pyspark.sql.functions import avg
from pyspark.sql.functions import log1p
from pyspark.ml.feature import StringIndexer, IndexToString, VectorAssembler
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor, LinearRegression, GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorSlicer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import Evaluator

#DATOS
path = 'C:/Users/AlvaroSanchez91/Desktop/Master Big Data Sevilla/APBD - Arquitecturas y Paradigmas para Big Data/trabajo_final/HousePrices_clean_csv'
full_df = spark.read.csv(path, header=True, inferSchema=True)

#FIJAMOS SEMILLA PARA USARLA POSTERIORMENTE.
global_seed=1

## Preprocesado.

 - ###  Transformaciónes.
 Haremos algunas **transformaciones logaritmicas** (en la exploración grafica de los datos se ve que puede ser conveniente en algunos casos). Tambien transformaremos las variables categoricas a **variables dummies** (esto ya se hizo, pero aquí se muestra otra alternativa mas eficiente). Por último, transformaremos los datos al formato que usan los modelos de la librería **ml**.

In [2]:
##############################
#           LOG1P
##############################

#TRANSFORMACIONES LOGARITMICAS
#Sustituimos las siguientes variables por su transformación.
loglist=['MasVnrArea','OpenPorchSF','BsmtFinSF2','LotArea','1stFlrSF','BsmtUnfSF','BsmtFinSF1','WoodDeckSF']
for c in loglist:
    full_df=full_df.withColumn(c,log1p(full_df[c]))
    
# SEPARACIÓN EN VARIABLES CATEGÓRICAS Y NUMÉRICAS (PREDICTORAS)
tipos=dict(full_df.dtypes)

var_cat=[c for c in tipos if tipos[c]=='string']
var_num=[c for c in tipos if tipos[c]!='string']
#No usamos ni Id ni SalePrice para predecir.
var_num.remove('SalePrice')
var_num.remove('Id')

########################
#        DUMMIES
########################

#Tenemos muchas columnas a transformar, crearemos diccionarios que contengan transformadores y estimadores.

#STRINGINDEXER.
#A cada columna categórica le asociamos un StringIndexer (añadimos _n a las columnas transformadas).
StringIndexers={c:StringIndexer(inputCol=c, outputCol=c+'_n') for c in var_cat}

#ONEHOTENCODER.
#Transforma las salidas de los StringIndexers en variables dummies (añadimos _dummies a las columnas transformadas).
encoders={c:OneHotEncoder(inputCol=c+'_n', outputCol=c+'_dummies') for c in var_cat}

#VECTORASSEMBLER.
#Une las columnas deseadas en una de tipo vector (formato requerido por los modelos de ml).
#Necesitamos crear un vector con las columnas dummies y las variables numéricas originales.
assembler = VectorAssembler(inputCols=[c+'_dummies' for c in var_cat]+var_num, outputCol="features")

#PIPELINE.
#Concatenamos los estimadores anteriores para ajustar y transformar de forma mas sencilla.
#Pasos del pipeline:
piplist=list(StringIndexers.values())+list(encoders.values())+[assembler] 
pip=Pipeline(stages=piplist)
#Ajustamos.
pippre=pip.fit(full_df)
#Con la siguiente orden podemos mostrar la estructura del vector features.
#pippre.transform(full_df).select('features').first().features.toArray()

Ya podemos transformar nuestros datos, teniendolos listos para entrenar modelos de Machine Learning.

In [3]:
#SEPARACION TEST DE KAGGLE.
test_kaggle=full_df.filter(full_df.SalePrice.isNull())
datos=full_df.filter(full_df.SalePrice.isNotNull())
test_kaggle.select('Id').count()

1459

In [4]:
#TRANSFORMACION TEST DE KAGGLE.
test_kaggle_trans=pippre.transform(test_kaggle)
#features contiene variables predictoras, label contiene variable objetivo.
test_kaggle_trans=test_kaggle_trans.select('features',test_kaggle_trans.SalePrice.alias('label'),'Id')
#Necesitamos Id para que kaggle evalue nuestras predicciones.
test_kaggle_trans.show(5)

+--------------------+-----+----+
|            features|label|  Id|
+--------------------+-----+----+
|(260,[0,5,13,17,2...| null|1461|
|(260,[0,5,15,18,2...| null|1462|
|(260,[0,5,13,17,2...| null|1463|
|(260,[0,5,13,17,2...| null|1464|
|(260,[0,5,13,17,2...| null|1465|
+--------------------+-----+----+
only showing top 5 rows



In [5]:
#TRANSFORMACION DATOS DE ENTRENAMIENTO.
datos_trans=pippre.transform(datos)
#features contiene variables predictoras, label contiene variable objetivo.
datos_trans=datos_trans.select('features',datos_trans.SalePrice.alias('label'))
datos_trans.show(5)

+--------------------+------+
|            features| label|
+--------------------+------+
|(260,[0,5,13,17,2...|208500|
|(260,[0,5,13,17,2...|181500|
|(260,[0,5,13,17,2...|223500|
|(260,[0,5,13,17,2...|140000|
|(260,[0,5,13,17,2...|250000|
+--------------------+------+
only showing top 5 rows



- ### Partición train test.
Ya tenemos nuestros datos de entrenamiento y el dato test para que Kaggle evalue nuestras predicciones. Por el momento nos olvidaremos de este conjunto test, y por test nos referiremos a un subconjunto de los datos de entrenamientos que usaremos nosotros para evaluar nuestros modelos.

In [6]:
#PARTICION TRAIN TEST.
train, test = datos_trans.randomSplit([0.8, 0.2], seed=global_seed)

No será necesario estandarizar ahora, los modelos permiten hacerlo internamente.

## Modelos.

 Para cada modelo se optimizarán los parámetros mediante validación cruzada.

 - ### Definición de evaluador.
Kaggle evalua nuestras predicciones usando la metrica **RMSLE**, de modo que es conveniente que también la usemos nosotros para evaluar nuestros modelos.

In [7]:
class MyEvaluator(Evaluator):
    '''
    When a userID is predicted when it is not already trained (all userID  data is used on validation 
    group and none of them to train), prediction is nan,  so RegressionEvaluator returns Nan.
    To solve this we must change RegressionEvaluator by MiValidacion
    '''
    def __init__(self,predictionCol='prediction', targetCol='label'):        
        super(MyEvaluator, self).__init__()
        self.predictionCol=predictionCol
        self.targetCol=targetCol
        
    def _evaluate(self, dataset):       
        error=self.rmsle(dataset,self.predictionCol,self.targetCol)
        #print ("Error: {}".format(error))
        return error
    
    def isLargerBetter(self):
        return False
    
    @staticmethod 
    def rmsle(dataset,predictionCol,targetCol):
        return sqrt(dataset.dropna().select(avg((log1p(dataset[targetCol]) - log1p(dataset[predictionCol])) ** 2)).first()[0])

- ### Random Forest.

In [8]:
###################################
#   GRID SEARCH
###################################

#MODELO.
rf1 = RandomForestRegressor(predictionCol='pred_rf1', seed=global_seed)
#REJILLA.
paramGrid = ParamGridBuilder() \
    .addGrid(rf1.minInstancesPerNode, [1,20,50,100]) \
    .addGrid(rf1.maxDepth, [5, 7, 10,20]) \
    .addGrid(rf1.numTrees, [10, 50, 100]) \
    .build()
#OPTIMIZAMOS MEDIANTE VALIDACIÓN CRUZADA.
crossval = CrossValidator(estimator=rf1,
                          estimatorParamMaps=paramGrid,
                          evaluator=MyEvaluator(predictionCol='pred_rf1'),
                          numFolds=3, 
                          seed=global_seed) 
t0 = time.time()
rf1_cv_model = crossval.fit(train)
tt = time.time() - t0
print('Tiempo ajuste de parámetros {} min.'.format(round(tt/60,2)))

#TABLA CON RESULTADOS
par_map = rf1_cv_model.getEstimatorParamMaps()
lpars = [{par.name: value for par, value in par_comb.items()} for par_comb in par_map]
pars_df = pd.DataFrame(lpars)
pars_df['score'] = rf1_cv_model.avgMetrics
pars_df.sort_values(by='score', ascending=False)

Tiempo ajuste de parámetros 11.56 min.


Unnamed: 0,maxDepth,minInstancesPerNode,numTrees,score
27,10,100,10,0.214751
3,5,100,10,0.214751
15,7,100,10,0.214751
39,20,100,10,0.214751
31,10,100,50,0.214529
19,7,100,50,0.214529
7,5,100,50,0.214529
43,20,100,50,0.214529
11,5,100,100,0.212768
23,7,100,100,0.212768


In [9]:
###############################################
#        MODELO CON MEJORES PARÁMETROS
###############################################

#EVALUAMOS MEDIANTE TEST.
test=rf1_cv_model.bestModel.transform(test)
test.show(5)
evaluator=MyEvaluator(predictionCol='pred_rf1')
modelos_eval=pd.DataFrame([('rf1',evaluator.evaluate(test))],columns=['Modelo','Score'])
modelos_eval

+--------------------+------+------------------+
|            features| label|          pred_rf1|
+--------------------+------+------------------+
|(260,[0,5,13,17,2...|138000|         147253.89|
|(260,[0,5,13,17,2...|191000|196734.56833333336|
|(260,[0,5,13,17,2...|157000| 149846.2142857143|
|(260,[0,5,13,17,2...|175000|178066.22863247863|
|(260,[0,5,13,17,2...|128000|         137382.74|
+--------------------+------+------------------+
only showing top 5 rows



Unnamed: 0,Modelo,Score
0,rf1,0.142148


- ### Gradient Boosting.

In [10]:
###################################
#   GRID SEARCH
###################################

#MODELO.
gbt = GBTRegressor(predictionCol='pred_gbt',featuresCol='features', seed=global_seed)
#REJILLA.
paramGrid = ParamGridBuilder() \
    .addGrid(gbt.minInstancesPerNode , [20,25,30,40]) \
    .addGrid(gbt.maxDepth, [2,3,4]) \
    .addGrid(gbt.maxIter , [50,100]) \
    .build()
    
#OPTIMIZAMOS MEDIANTE VALIDACIÓN CRUZADA.
crossval = CrossValidator(estimator=gbt,
                          estimatorParamMaps=paramGrid,
                          evaluator=MyEvaluator(predictionCol='pred_gbt'),
                          numFolds=3, 
                          seed=global_seed) 
crossval
t0 = time.time()
gbt_cv_model = crossval.fit(train)
tt = time.time() - t0
print('Tiempo ajuste de parámetros {} min.'.format(round(tt/60,2)))

#TABLA CON RESULTADOS.
par_map = gbt_cv_model.getEstimatorParamMaps()
lpars = [{par.name: value for par, value in par_comb.items()} for par_comb in par_map]
pars_df = pd.DataFrame(lpars)
pars_df['score'] = gbt_cv_model.avgMetrics
pars_df.sort_values(by='score', ascending=False)

Tiempo ajuste de parámetros 27.84 min.


Unnamed: 0,maxDepth,maxIter,minInstancesPerNode,score
14,4,50,30,0.170625
8,4,50,25,0.168977
20,4,50,40,0.166085
2,4,50,20,0.165908
13,3,50,30,0.164371
7,3,50,25,0.164216
17,4,100,30,0.162874
12,2,50,30,0.162086
0,2,50,20,0.161097
1,3,50,20,0.161013


In [11]:
###############################################
#        MODELO CON MEJORES PARÁMETROS
###############################################

#SCORE EN TEST.
test=gbt_cv_model.bestModel.transform(test)
test.show(5)
evaluator=MyEvaluator(predictionCol='pred_gbt')
modelos_eval=pd.concat([modelos_eval,pd.DataFrame([('gbt',evaluator.evaluate(test))],columns=['Modelo','Score'])])
modelos_eval

+--------------------+------+------------------+------------------+
|            features| label|          pred_rf1|          pred_gbt|
+--------------------+------+------------------+------------------+
|(260,[0,5,13,17,2...|138000|         147253.89|134756.88606935611|
|(260,[0,5,13,17,2...|191000|196734.56833333336|191681.64487029784|
|(260,[0,5,13,17,2...|157000| 149846.2142857143| 157621.6790281814|
|(260,[0,5,13,17,2...|175000|178066.22863247863|168021.15734718143|
|(260,[0,5,13,17,2...|128000|         137382.74|139843.53313775416|
+--------------------+------+------------------+------------------+
only showing top 5 rows



Unnamed: 0,Modelo,Score
0,rf1,0.142148
0,gbt,0.156955


- ### Elastic Net.

In [12]:
###################################
#   GRID SEARCH
###################################

#MODELO.
lr = LinearRegression(predictionCol='pred_lr',featuresCol='features')

#REJILLA.
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam , [0, 0.001, 0.01, 0.05, 0.1, 0.15, 0.2]) \
    .addGrid(lr.elasticNetParam, [0, 0.05, 0.1, 0.15, 0.2, 0.25, 0.3, 0.8, 0.9]) \
    .build()

#OPTIMIZAMOS MEDIANTE VALIDACIÓN CRUZADA.
crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=MyEvaluator(predictionCol='pred_lr'),
                          numFolds=3, 
                          seed=global_seed) 
t0 = time.time()
lr_cv_model = crossval.fit(train)
tt = time.time() - t0
print('Tiempo ajuste de parámetros {} min.'.format(round(tt/60,2)))

#TABLA CON RESULTADOS.
par_map = lr_cv_model.getEstimatorParamMaps()
lpars = [{par.name: value for par, value in par_comb.items()} for par_comb in par_map]
pars_df = pd.DataFrame(lpars)
pars_df['score'] = lr_cv_model.avgMetrics
pars_df.sort_values(by='score', ascending=False)

Tiempo ajuste de parámetros 11.05 min.


Unnamed: 0,elasticNetParam,regParam,score
44,0.90,0.100,0.176055
23,0.25,0.010,0.175449
24,0.30,0.010,0.175449
28,0.05,0.050,0.175448
22,0.20,0.010,0.175200
21,0.15,0.010,0.175125
5,0.25,0.000,0.175096
1,0.05,0.000,0.175096
2,0.10,0.000,0.175096
6,0.30,0.000,0.175096


In [13]:
###############################################
#        MODELO CON MEJORES PARÁMETROS
###############################################

#SCORE EN TEST.
test=test.drop('pred_lr')
test=lr_cv_model.bestModel.transform(test)
evaluator=MyEvaluator(predictionCol='pred_lr')
evaluator.evaluate(test)
modelos_eval=pd.concat([modelos_eval,pd.DataFrame([('lr',evaluator.evaluate(test))],columns=['Modelo','Score'])])
modelos_eval

Unnamed: 0,Modelo,Score
0,rf1,0.142148
0,gbt,0.156955
0,lr,0.182645


- ### Ensamblado de modelos.

In [14]:
#PREDECIMOS SOBRE TRAIN.
train=gbt_cv_model.bestModel.transform(train)
train=lr_cv_model.bestModel.transform(train)
train=rf1_cv_model.bestModel.transform(train)
predassembler = VectorAssembler(inputCols=['pred_lr','pred_gbt'], outputCol="features_asembler")

#TRANSFORMAMOS EN VECTOR.
train=predassembler.transform(train)
test=predassembler.transform(test)


#COMBINACIÓN LINEAL MODELOS.
lr_asembler = LinearRegression(predictionCol='pred_asembler',featuresCol='features_asembler')
lr_asembler_model=lr_asembler.fit(train)
print(lr_asembler_model.coefficients)

#EVALUAMOS EN TEST.
test=lr_asembler_model.transform(test)
evaluator=MyEvaluator(predictionCol='pred_asembler')
modelos_eval=pd.concat([modelos_eval,pd.DataFrame([('assembling',evaluator.evaluate(test))],columns=['Modelo','Score'])])

modelos_eval

[0.669492400091,0.353362268401]


Unnamed: 0,Modelo,Score
0,rf1,0.142148
0,gbt,0.156955
0,lr,0.182645
0,assembling,0.163539


- ### Model Stacking.

In [15]:
#PARTICION TRAIN EN DOS CONJUNTOS.
train1, train2 = train.randomSplit([0.5, 0.5], seed=global_seed)

###########################################
#ENTRENAMOS LOS MODELOS CON train1.
###########################################

#ELASTIC NET
lr_ms = LinearRegression(predictionCol='pred_lr_ms',featuresCol='features',elasticNetParam=0.25,regParam=0.2)
lr_ms_model=lr_ms.fit(train1)
train2=lr_ms_model.transform(train2)
test=lr_ms_model.transform(test)

#GRADIENT BOOSTING
gbt_ms = GBTRegressor(predictionCol='pred_gbt_ms',featuresCol='features',maxDepth=2,maxIter=100,minInstancesPerNode=30, seed=global_seed)
gbt_ms_model=gbt_ms.fit(train1)
train2=gbt_ms_model.transform(train2)
test=gbt_ms_model.transform(test)

#RANDOM FOREST.
rf_ms = RandomForestRegressor(predictionCol='pred_rf1_ms',numTrees=50,maxDepth=20, seed=global_seed)
rf_ms_model=rf_ms.fit(train1)
train2=rf_ms_model.transform(train2)
test=rf_ms_model.transform(test)

#UNIMOS PRED A FEATURES.
ms_assembler = VectorAssembler(inputCols=['pred_lr_ms','pred_gbt_ms','pred_rf1_ms','features'], outputCol="features_ms")
train2=ms_assembler.transform(train2)
test=ms_assembler.transform(test)

###########################################
#ENTRENAMOS MODELO FINAL CON train2.
###########################################

#MODELO.
gbt_wrapper = GBTRegressor(predictionCol='pred_ms',featuresCol='features_ms', seed=global_seed)
#REJILLA.
paramGrid = ParamGridBuilder() \
    .addGrid(gbt_wrapper.minInstancesPerNode , [20,25,30,40]) \
    .addGrid(gbt_wrapper.maxDepth, [2,3,4]) \
    .addGrid(gbt_wrapper.maxIter , [50,100]) \
    .build()
    
#OPTIMIZAMOS MEDIANTE VALIDACIÓN CRUZADA.
crossval = CrossValidator(estimator=gbt_wrapper,
                          estimatorParamMaps=paramGrid,
                          evaluator=MyEvaluator(predictionCol='pred_ms'),
                          numFolds=3, 
                          seed=global_seed) 
crossval
t0 = time.time()
gbt_wrapper_model = crossval.fit(train2)
tt = time.time() - t0
print('Tiempo ajuste de parámetros {} min.'.format(round(tt/60,2)))

#TABLA CON RESULTADOS.
par_map = gbt_wrapper_model.getEstimatorParamMaps()
lpars = [{par.name: value for par, value in par_comb.items()} for par_comb in par_map]
pars_df = pd.DataFrame(lpars)
pars_df['score'] = gbt_cv_model.avgMetrics
pars_df.sort_values(by='score', ascending=False)

Tiempo ajuste de parámetros 27.39 min.


Unnamed: 0,maxDepth,maxIter,minInstancesPerNode,score
14,4,50,30,0.170625
8,4,50,25,0.168977
20,4,50,40,0.166085
2,4,50,20,0.165908
13,3,50,30,0.164371
7,3,50,25,0.164216
17,4,100,30,0.162874
12,2,50,30,0.162086
0,2,50,20,0.161097
1,3,50,20,0.161013


In [16]:
#EVALUAMOS EN TEST.
test=gbt_wrapper_model.bestModel.transform(test)
evaluator=MyEvaluator(predictionCol='pred_ms')
evaluator.evaluate(test)
modelos_eval=pd.concat([modelos_eval,pd.DataFrame([('MS',evaluator.evaluate(test))],columns=['Modelo','Score'])])
modelos_eval

Unnamed: 0,Modelo,Score
0,rf1,0.142148
0,gbt,0.156955
0,lr,0.182645
0,assembling,0.163539
0,MS,0.158298


### Submissions.
 Hemos evaluado distintos modelos, lo conveniente sería escoger los mejores y reentrenarlos usando los mejores parámetros (según lo aquí visto). De momento nos limitaremos a usar los ya construidos para predecir sobre los datos **test_kaggle** y ver que resultados nos da la plataforma.

In [17]:
#######################################
#        ENSAMBLADO
#######################################

#PREDECIMOS MEDIANTE GBT Y LR.
test_kaggle_trans=gbt_cv_model.bestModel.transform(test_kaggle_trans)
test_kaggle_trans=lr_cv_model.bestModel.transform(test_kaggle_trans)

#TRANSFORMAMO SEN VECTOR.
test_kaggle_trans=predassembler.transform(test_kaggle_trans)

#COMBINACIÓN LINEAL DE PREDICCIONES.
test_kaggle_trans=lr_asembler_model.transform(test_kaggle_trans)

#TRAEMOS LOS DATOS A LOCAL Y GUARDAMOS EN CSV.
#Esto lo podemos hacer por que no estamos ante un verdadero problema Big Data.
submission=test_kaggle_trans.drop('SalePrice').select('Id',test_kaggle_trans['pred_asembler'].alias('SalePrice'))
local=submission.collect()
local=pd.DataFrame(local,columns=['Id','SalePrice'])
local.to_csv('submission_asembler.csv',index=False)

#######################################
#        MODEL STACKING
#######################################

#ELASTIC NET, GRADIENT BOOSTING Y RANDOM FOREST.
test_kaggle_trans=lr_ms_model.transform(test_kaggle_trans)
test_kaggle_trans=gbt_ms_model.transform(test_kaggle_trans)
test_kaggle_trans=rf_ms_model.transform(test_kaggle_trans)

#UNIMOS PRED A FEATURES.
test_kaggle_trans=ms_assembler.transform(test_kaggle_trans)

#MODELO WRAPPER.
test_kaggle_trans=gbt_wrapper_model.bestModel.transform(test_kaggle_trans)

#TRAEMOS LOS DATOS A LOCAL Y GUARDAMOS EN CSV.
#Esto lo podemos hacer por que no estamos ante un verdadero problema Big Data.
submission=test_kaggle_trans.drop('SalePrice').select('Id',test_kaggle_trans['pred_ms'].alias('SalePrice'))
local=submission.collect()
local=pd.DataFrame(local,columns=['Id','SalePrice'])
local.to_csv('submission_ms.csv',index=False)

In [18]:
#spark.stop()