In [1]:
import os
import numpy as np # linear algebra
import pandas as pd 
import matplotlib.pyplot as plt
import seaborn as sns 
import statsmodels.api as sm

import copy
from statsmodels.formula.api import ols
from collections import Counter, defaultdict
from sklearn.model_selection import train_test_split
import LibrairiePerso_v4_8 as ownLibrary

'''
import sys
sys.path.append("c:\python38\lib\site-packages")
'''
#import importlib
#importlib.reload(ownLibrary)

pd.set_option('display.max_columns', 500)

In [2]:
#pip install statsmodels

In [3]:
import os
cwd = os.getcwd()


train_rwrk = pd.read_csv(cwd + "\\data\\train_step_2.csv", sep=",")
#y_train=X_train['SalePrice']

test_rwrk = pd.read_csv(cwd + "\\data\\test_step_2.csv", sep=",")
#y_test=X_test['SalePrice']

submission = pd.read_csv(cwd + "\\data\\submission_step_2.csv", sep=",")

'''
for df in [X_train, X_test]:
    df.drop(['SalePrice'], axis=1, inplace=True)
'''

"\nfor df in [X_train, X_test]:\n    df.drop(['SalePrice'], axis=1, inplace=True)\n"

# Linear Regression with SparkML

In [4]:
try:
    from pyspark import SparkContext, SparkConf
    from pyspark.sql import SparkSession
except ImportError as e:
    printmd('<<<<<!!!!! Please restart your kernel after installing Apache Spark !!!!!>>>>>')

In [5]:
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

spark = SparkSession \
    .builder \
    .getOrCreate()

In [6]:
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import StringIndexer

from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

from pyspark.sql.types import IntegerType
from pyspark.ml import Pipeline

from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

In [7]:
#pip install pyarrow

In [8]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark_train = spark.createDataFrame(train_rwrk)
spark_test = spark.createDataFrame(test_rwrk)

In [9]:
spark_train.createOrReplaceTempView('spark_train')
spark.sql("SELECT Id, BsmtUnfSF, hasgarage, YearBuilt, SalePrice_log, SalePrice FROM spark_train limit 2").show()

+---+---------+---------+---------+------------------+---------+
| Id|BsmtUnfSF|hasgarage|YearBuilt|     SalePrice_log|SalePrice|
+---+---------+---------+---------+------------------+---------+
|211|      396|        0|        1| 11.49272275765271|    98000|
|319|      360|        1|        3|12.468436909997664|   260000|
+---+---------+---------+---------+------------------+---------+



## Convert feature to Int and define their type

In [10]:
for feature in spark_train.columns:
    spark_train = spark_train.withColumn(feature, spark_train[feature].cast(IntegerType()))
    spark_test = spark_test.withColumn(feature, spark_test[feature].cast(IntegerType()))

In [11]:
other_features = ['Id', 'SalePrice_log', 'SalePrice']
quantitative_features = ['MasVnrArea', 'BsmtFinSF1', 'BsmtUnfSF', 'TotalBsmtSF', '1stFlrSF', '2ndFlrSF', 'GrLivArea', 'GarageArea', 'ScreenPorch', 'MiscVal', 'TotalSF', 'Total_sqr_footage', 'Total_Bathrooms', 'Total_porch_sf', '1stFlrSF²', '1stFlrSFGarageArea', 'GarageArea²', 'PR3_6', 'PR3_7', 'PR3_8', 'PR3_9', 'PR3_10', 'PR3_11', 'PR3_12', 'PR3_13', 'PR3_14', 'PR3_15', 'PR3_16', 'PR3_17', 'PR3_18', 'PR3_19', 'PR3_20', 'PR3_21', 'PR3_22', 'PR3_23', 'PR3_24', 'PR3_25', 'PR3_26', 'PR3_27', 'PR3_28', 'PR3_29', 'PR3_30', 'PR3_31', 'PR3_32', 'PR3_33', 'PR3_34', 'PR3_35', 'PR3_36', 'PR3_37', 'PR3_38', 'PR3_39', 'PR3_40', 'PR3_41', 'PR3_42', 'PR3_43', 'PR3_44', 'PR3_45', 'PR3_46', 'PR3_47', 'PR3_48', 'PR3_49', 'PR3_50', 'PR3_51', 'PR3_52', 'PR3_53', 'PR3_54', 'PR3_55']
boolean_features = [ 'haspool', 'has2ndfloor', 'hasgarage', 'hasbsmt', 'hasfireplace']
categorical_features = ['MSSubClass', 'MSZoning', 'LotFrontage', 'LotArea', 'Alley', 'LotShape', 'LandContour', 'LotConfig', 'LandSlope', 'Neighborhood', 'Condition1', 'BldgType', 'HouseStyle', 'OverallQual', 'OverallCond', 'YearBuilt', 'YearRemodAdd', 'RoofStyle', 'Exterior1st', 'Exterior2nd', 'MasVnrType', 'Foundation', 'BsmtQual', 'BsmtCond', 'BsmtExposure', 'BsmtFinType1', 'BsmtFinType2', 'BsmtFinSF2', 'HeatingQC', 'BsmtFullBath', 'BsmtHalfBath', 'FullBath', 'HalfBath', 'BedroomAbvGr', 'TotRmsAbvGrd', 'Functional', 'GarageType', 'GarageYrBlt', 'GarageFinish', 'GarageCars', 'GarageQual', 'GarageCond', 'PavedDrive', 'WoodDeckSF', 'OpenPorchSF', 'EnclosedPorch', 'Fence', 'MoSold', 'YrSold', 'YrBltAndRemod', 'NewFirePlaces', 'NewExterQualCond', 'NewCentrAirElec', 'NewKitchen', 'NewSale']

## Pipeline Stages

In [12]:
indexed_features = [var+'_indexed' for var in categorical_features ]
encoded_features = [var+'_ohe' for var in categorical_features ]

indexer = StringIndexer()
indexer.setInputCols(categorical_features)
indexer.setOutputCols(indexed_features)
indexed_train = indexer.fit(spark_train).transform(spark_train)
indexed_test = indexer.fit(spark_test).transform(spark_test)

encoder = OneHotEncoder()
encoder.setInputCols(indexed_features)
encoder.setOutputCols(encoded_features)
encoded_train = encoder.fit(indexed_train).transform(indexed_train)
encoded_test = encoder.fit(indexed_test).transform(indexed_test)

vectorAssembler = VectorAssembler(inputCols=quantitative_features+boolean_features+encoded_features, outputCol="quant_features")
vectorized_train = vectorAssembler.transform(encoded_train)
vectorized_test = vectorAssembler.transform(encoded_test)

In [13]:
pipeline = Pipeline(stages = [indexer, encoder, vectorAssembler])

df_train = pipeline.fit(spark_train).transform(spark_train)
df_test = pipeline.fit(spark_test).transform(spark_test)


In [14]:
df_train.createOrReplaceTempView('df_train')
#spark_train.show()
spark.sql("SELECT Id, quant_features, SalePrice_log, SalePrice FROM df_train limit 5").show()

+----+--------------------+-------------+---------+
|  Id|      quant_features|SalePrice_log|SalePrice|
+----+--------------------+-------------+---------+
| 211|(196,[1,2,3,4,6,1...|           11|    98000|
| 319|(196,[0,1,2,3,4,5...|           12|   260000|
| 240|(196,[1,2,3,4,5,6...|           11|   113000|
| 987|(196,[2,3,4,5,6,7...|           11|   117000|
|1417|(196,[2,3,4,5,6,7...|           11|   122500|
+----+--------------------+-------------+---------+



## First model with Spark using all features

In [15]:
lr = LinearRegression(featuresCol = 'quant_features', labelCol='SalePrice', maxIter=10, regParam=0.3, elasticNetParam=0.8)
supermodel = lr.fit(df_train)
supermodel.summary.r2


0.8985154906421481

In [38]:
predictions = supermodel.transform(df_train)
predictions.select("Id","prediction").show(5);

+----+------------------+
|  Id|        prediction|
+----+------------------+
| 211|   96560.428756378|
| 319| 335061.5902159068|
| 240|115373.79903877417|
| 987|114709.72185573752|
|1417|158447.88400895565|
+----+------------------+
only showing top 5 rows



In [22]:
predictions = supermodel.transform(df_test)
result= predictions.select('prediction').collect()

Py4JJavaError: An error occurred while calling o1762.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 18.0 failed 1 times, most recent failure: Lost task 1.0 in stage 18.0 (TID 122) (DESKTOP-2KQMR2K executor driver): org.apache.spark.SparkException: Failed to execute user defined function(PredictionModel$$Lambda$4440/137865441: (struct<type:tinyint,size:int,indices:array<int>,values:array<double>>) => double)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: requirement failed: BLAS.dot(x: Vector, y:Vector) was given Vectors with non-matching sizes: x.size = 195, y.size = 196
	at scala.Predef$.require(Predef.scala:281)
	at org.apache.spark.ml.linalg.BLAS$.dot(BLAS.scala:115)
	at org.apache.spark.ml.regression.LinearRegressionModel.predict(LinearRegression.scala:736)
	at org.apache.spark.ml.regression.LinearRegressionModel.predict(LinearRegression.scala:686)
	at org.apache.spark.ml.PredictionModel.$anonfun$transformImpl$1(Predictor.scala:251)
	at org.apache.spark.ml.PredictionModel.$anonfun$transformImpl$1$adapted(Predictor.scala:250)
	... 17 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2253)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2202)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2201)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2201)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1078)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1078)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1078)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2440)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2382)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2371)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2202)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2223)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2242)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2267)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:390)
	at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:3519)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3516)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function(PredictionModel$$Lambda$4440/137865441: (struct<type:tinyint,size:int,indices:array<int>,values:array<double>>) => double)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: java.lang.IllegalArgumentException: requirement failed: BLAS.dot(x: Vector, y:Vector) was given Vectors with non-matching sizes: x.size = 195, y.size = 196
	at scala.Predef$.require(Predef.scala:281)
	at org.apache.spark.ml.linalg.BLAS$.dot(BLAS.scala:115)
	at org.apache.spark.ml.regression.LinearRegressionModel.predict(LinearRegression.scala:736)
	at org.apache.spark.ml.regression.LinearRegressionModel.predict(LinearRegression.scala:686)
	at org.apache.spark.ml.PredictionModel.$anonfun$transformImpl$1(Predictor.scala:251)
	at org.apache.spark.ml.PredictionModel.$anonfun$transformImpl$1$adapted(Predictor.scala:250)
	... 17 more


## RFE

In [26]:
pipeline = Pipeline(stages = [indexer, encoder])

df_train_rfe = pipeline.fit(spark_train).transform(spark_train)
df_test_rfe = pipeline.fit(spark_test).transform(spark_test)


In [27]:
usable_features = quantitative_features+boolean_features+encoded_features
def model_with_spark(train, usable_features):
    vectorAssembler_rfe = VectorAssembler(inputCols=usable_features, outputCol="quant_features")
    vectorized_train = vectorAssembler_rfe.transform(train)

    lr = LinearRegression(featuresCol = 'quant_features', labelCol='SalePrice', maxIter=10, regParam=0.3, elasticNetParam=0.8)
    model = lr.fit(vectorized_train)
    predictions = model.transform(vectorized_train)
    #predictions.select("prediction").show(5);
    result_pdf = predictions.select("prediction").toPandas()
    return result_pdf

spark_df = model_with_spark(df_train_rfe, df_train_rfe.columns)

In [36]:
spark_df

Unnamed: 0,prediction
0,110121.601213
1,304717.837082
2,121359.107042
3,119739.209683
4,149559.354121
...,...
1017,189840.434462
1018,336276.683660
1019,304539.700956
1020,119860.003461


In [32]:
def recursive_feature_elimination_rf(X_train, X_train_features_to_use, tol=0.0001):

    features_to_remove = []
    count = 1
    # initial model using all the features
    y_pred_test = model_with_spark(X_train, X_train_features_to_use)
    y_test = X_train.select("SalePrice").toPandas()
    auc_score_all = ownLibrary.rsquared(y_test['SalePrice'], y_pred_test['prediction'])
    print('r2 for all features : ' + str(round(auc_score_all,4)))
    
    for feature in X_train_features_to_use:
        count += 1
        
        features_to_use = [x for x in X_train_features_to_use if x not in features_to_remove + [feature]]
        y_pred_test = model_with_spark(X_train, features_to_use)
        auc_score_int = ownLibrary.rsquared(y_test['SalePrice'], y_pred_test['prediction'])

        diff_auc = auc_score_all - auc_score_int
    
        if diff_auc >= tol:

            score = diff_auc
            
        else:

            auc_score_all = auc_score_int
            score = auc_score_int
            
            features_to_remove.append(feature)
        #if count==6:
        #    break
        print('r2 : ' + str(round(auc_score_int,4)) + ' features to remove count : ' + str(len(features_to_remove)))
    print('total features to remove: ', len(features_to_remove))  
    features_to_keep = [x for x in X_train_features_to_use if x not in features_to_remove]
    print('total features to keep: ', len(features_to_keep))
    
    return features_to_keep

In [33]:
features_to_keep, score = recursive_feature_elimination_rf(df_train_rfe, usable_features)

r2 for all features : 0.8987
r2 : 0.8982 features to remove count : 0
r2 : 0.8987 features to remove count : 1
r2 : 0.8988 features to remove count : 2
r2 : 0.899 features to remove count : 3
r2 : 0.8973 features to remove count : 3
r2 : 0.8984 features to remove count : 3
r2 : 0.8998 features to remove count : 4
r2 : 0.8989 features to remove count : 4
r2 : 0.8995 features to remove count : 4
r2 : 0.8998 features to remove count : 5
r2 : 0.8995 features to remove count : 5
r2 : 0.9012 features to remove count : 6
r2 : 0.9018 features to remove count : 7
r2 : 0.8989 features to remove count : 7
r2 : 0.8989 features to remove count : 7
r2 : 0.9006 features to remove count : 7
r2 : 0.8998 features to remove count : 7
r2 : 0.8956 features to remove count : 7
r2 : 0.8984 features to remove count : 7
r2 : 0.8978 features to remove count : 7
r2 : 0.9003 features to remove count : 7
r2 : 0.8981 features to remove count : 7
r2 : 0.8997 features to remove count : 7
r2 : 0.8994 features to remov

ValueError: too many values to unpack (expected 2)

In [121]:
y_pred_train = model_with_spark(df_train_rfe, features_to_keep)
y_train = df_train_rfe.select("SalePrice").toPandas()
rsquared = ownLibrary.rsquared(y_train['SalePrice'], y_pred_train['prediction'])
rsquared

0.9043316452720312

In [124]:
y_pred_test = model_with_spark(df_test_rfe, features_to_keep)
y_test = df_test_rfe.select("SalePrice").toPandas()
rsquared = ownLibrary.rsquared(y_test['SalePrice'], y_pred_test['prediction'])
rsquared

0.9037777937689194

## RFA

In [None]:
def recursive_feature_addition(dataset, dataset_features_to_use, model):
    colonnes = dataset_features_to_use
    features_to_keep = [colonnes[0]]

    # set this value according to you.
    threshold = 0.0001

    # create your prefered model and  fit it to the training data.
    model_one_feature = model
    model_one_feature.fit(X_train[features_to_keep], y_train)

    # evaluate against your metric.
    y_pred_test = model_one_feature.predict(X_test[features_to_keep])
    score =  rsquared(y_test, y_pred_test)

    # start iterating from the feature.
    for feature in colonnes[1:]:    
        # fit model with  the selected features and the feature to be evaluated
        #model = LinearRegression()
        model.fit(X_train[features_to_keep + [feature]], y_train)
        y_pred_test = model.predict(X_test[features_to_keep + [feature]])
        score_int =  rsquared(y_test, y_pred_test)

        # determine the drop in the roc-auc
        diff_score = score_int - score

        # compare the drop in roc-auc with the threshold
        if diff_score >= threshold:
            
            # if the increase in the roc is bigger than the threshold
            # we keep the feature and re-adjust the roc-auc to the new value
            # considering the added feature
            score = score_int
            features_to_keep.append(feature)

    # print the feature to keep.
    #print(features_to_keep)
    print(score)
    print(len(features_to_keep))
    return features_to_keep, score

In [None]:
def recursive_feature_elimination_rf(X_train, X_train_features_to_use, tol=0.0001):

    features_to_remove = []
    count = 1
    # initial model using all the features
    y_pred_test = model_with_spark(X_train, X_train_features_to_use)
    y_test = X_train.select("SalePrice").toPandas()
    auc_score_all = ownLibrary.rsquared(y_test['SalePrice'], y_pred_test['prediction'])
    print('r2 for all features : ' + str(round(auc_score_all,4)))
    
    for feature in X_train_features_to_use:
        count += 1
        
        features_to_use = [x for x in X_train_features_to_use if x not in features_to_remove + [feature]]
        y_pred_test = model_with_spark(X_train, features_to_use)
        auc_score_int = ownLibrary.rsquared(y_test['SalePrice'], y_pred_test['prediction'])

        diff_auc = auc_score_all - auc_score_int
    
        if diff_auc >= tol:

            score = diff_auc
            
        else:

            auc_score_all = auc_score_int
            score = auc_score_int
            
            features_to_remove.append(feature)
        #if count==6:
        #    break
        print('r2 : ' + str(round(auc_score_int,4)) + ' features to remove count : ' + str(len(features_to_remove)))
    print('total features to remove: ', len(features_to_remove))  
    features_to_keep = [x for x in X_train_features_to_use if x not in features_to_remove]
    print('total features to keep: ', len(features_to_keep))
    
    return features_to_keep

In [75]:
#https://gist.github.com/colbyford/184097b0ec37b2b35667dab2da57d349


lr = LinearRegression(featuresCol = 'quant_features', labelCol='SalePrice')

# Create ParamGrid for Cross Validation

lrparamGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.05, 0.1, 0.3, 0.5])
             #  .addGrid(lr.regParam, [0.01, 0.1, 0.5])
             .addGrid(lr.elasticNetParam, [0.7, 0.8, 0.9, 1.0])
             #  .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [5, 10, 20])
             #  .addGrid(lr.maxIter, [1, 5, 10])
             .build())


# Evaluate model
lrevaluator = RegressionEvaluator(predictionCol="prediction", labelCol="SalePrice", metricName="rmse")
                                  
# Create 5-fold CrossValidator
lrcv = CrossValidator(estimator = lr,
                    estimatorParamMaps = lrparamGrid,
                    evaluator = lrevaluator,
                    numFolds = 5)

# Run cross validations
lrcvModel = lrcv.fit(df_train)
print(lrcvModel)
                                  


CrossValidatorModel_2934efed8682


In [76]:
lrcvModel.bestModel.summary.rootMeanSquaredError

28262.27894925193

In [77]:
lrcvModel.bestModel.summary.r2

0.8721715816934545

In [78]:
print ('Best Param (regParam): ', lrcvModel.bestModel._java_obj.getRegParam())
print ('Best Param (MaxIter): ', lrcvModel.bestModel._java_obj.getMaxIter())
print ('Best Param (elasticNetParam): ', lrcvModel.bestModel._java_obj.getElasticNetParam())


Best Param (regParam):  0.05
Best Param (MaxIter):  10
Best Param (elasticNetParam):  0.7


In [79]:
# Get Model Summary Statistics
lrcvSummary = lrcvModel.bestModel.summary
#print("Coefficient Standard Errors: " + str(lrcvSummary.coefficientStandardErrors))
#print("P Values: " + str(lrcvSummary.pValues)) # Last element is the intercept

# Use test set here so we can measure the accuracy of our model on new data
lrpredictions = lrcvModel.transform(df_test)
lrpredictions_train = lrcvModel.transform(df_train)
# cvModel uses the best model found from the Cross Validation
# Evaluate best model
print('RMSE:', lrevaluator.evaluate(lrpredictions))
                                  

RMSE: 27488.32960360495


In [80]:
lrpredictions_train.select("Id","prediction").show(5);

+----+------------------+
|  Id|        prediction|
+----+------------------+
| 211| 97650.06711397617|
| 319| 352638.2941583546|
| 240|115739.37570982237|
| 987|118567.21734835854|
|1417|161679.35340367886|
+----+------------------+
only showing top 5 rows

