In [1]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [4]:
spark_conf = SparkConf()
spark_conf.setMaster('local[*]')
spark_conf.setAppName('Spark Price predictions')

<pyspark.conf.SparkConf at 0x1c661407550>

In [5]:
spark = SparkSession.builder.config(conf=spark_conf).getOrCreate()

In [18]:
data = spark.read.options(**{"inferSchema": True, "header": True}).csv("../train.csv")

selecting high correlation features based on previous statistic results

In [20]:
feature_target_list = ['YearRemodAdd',
                      'YearBuilt',
                      'TotRmsAbvGrd',
                      'FullBath',
                      '1stFlrSF',
                      'TotalBsmtSF',
                      'GarageArea',
                      'GarageCars',
                      'GrLivArea',
                      'OverallQual',
                      'SalePrice']
data_high_corr = data.select(feature_target_list)

ML lib

In [25]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

Transforming df for ml 

In [21]:
vectorAssembler = VectorAssembler(inputCols=feature_target_list[:-2], outputCol='features')
vdf_train_high_corr = vectorAssembler.transform(data_high_corr)
vdf_train_high_corr.select(['features', 'SalePrice']).show(truncate=False)

+-------------------------------------------------------+---------+
|features                                               |SalePrice|
+-------------------------------------------------------+---------+
|[2003.0,2003.0,8.0,2.0,856.0,856.0,548.0,2.0,1710.0]   |208500   |
|[1976.0,1976.0,6.0,2.0,1262.0,1262.0,460.0,2.0,1262.0] |181500   |
|[2002.0,2001.0,6.0,2.0,920.0,920.0,608.0,2.0,1786.0]   |223500   |
|[1970.0,1915.0,7.0,1.0,961.0,756.0,642.0,3.0,1717.0]   |140000   |
|[2000.0,2000.0,9.0,2.0,1145.0,1145.0,836.0,3.0,2198.0] |250000   |
|[1995.0,1993.0,5.0,1.0,796.0,796.0,480.0,2.0,1362.0]   |143000   |
|[2005.0,2004.0,7.0,2.0,1694.0,1686.0,636.0,2.0,1694.0] |307000   |
|[1973.0,1973.0,7.0,2.0,1107.0,1107.0,484.0,2.0,2090.0] |200000   |
|[1950.0,1931.0,8.0,2.0,1022.0,952.0,468.0,2.0,1774.0]  |129900   |
|[1950.0,1939.0,5.0,1.0,1077.0,991.0,205.0,1.0,1077.0]  |118000   |
|[1965.0,1965.0,5.0,1.0,1040.0,1040.0,384.0,1.0,1040.0] |129500   |
|[2006.0,2005.0,11.0,3.0,1182.0,1175.0,736.0,3.0

Spliting between train and test

In [37]:
splits = vdf_train_high_corr.randomSplit(weights=[0.7, 0.3])
df_train = splits[0]
df_test = splits[1]

In [43]:
lr = LinearRegression(featuresCol= 'features', labelCol='SalePrice', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(df_train)
display("Coefficients: " + str(lr_model.coefficients))
display("Intercept: " + str(lr_model.intercept))

'Coefficients: [691.5792677071958,383.3711502709877,520.131599554052,-7433.711358088316,7.461129934263919,33.6365329429784,7.026929713949023,18410.787887794293,67.56553625070083]'

'Intercept: -2121034.4671841715'

In [44]:
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 40785.840661
r2: 0.737006


In [45]:
lr_predictions = lr_model.transform(df_test)
lr_predictions.select("prediction","SalePrice","features").show(truncate=False)

+------------------+---------+------------------------------------------------------+
|prediction        |SalePrice|features                                              |
+------------------+---------+------------------------------------------------------+
|149442.4772968609 |117500   |[1950.0,1880.0,8.0,2.0,1178.0,1008.0,205.0,1.0,2210.0]|
|171967.63640888175|122500   |[1950.0,1885.0,11.0,2.0,1246.0,777.0,560.0,2.0,2290.0]|
|142450.72397768917|137000   |[1950.0,1910.0,9.0,2.0,908.0,1020.0,440.0,1.0,1928.0] |
|129003.84104908863|105000   |[1950.0,1915.0,6.0,1.0,694.0,672.0,936.0,3.0,1214.0]  |
|121982.83459070558|128000   |[1950.0,1915.0,6.0,1.0,841.0,806.0,216.0,1.0,1647.0]  |
|115033.20444269711|107400   |[1950.0,1915.0,8.0,2.0,840.0,840.0,379.0,1.0,1605.0]  |
|133581.6395658427 |98000    |[1950.0,1916.0,7.0,1.0,624.0,624.0,513.0,3.0,1344.0]  |
|179449.07605061587|136000   |[1950.0,1916.0,10.0,2.0,1664.0,714.0,216.0,1.0,2526.0]|
|171032.8279574872 |129500   |[1950.0,1917.0,8.0,1.0,1

1st test evaluation method

In [46]:
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="SalePrice",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

R Squared (R2) on test data = 0.712811


2nd test evaluation method

In [47]:
test_result = lr_model.evaluate(df_test)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)
print("r2 on test data = %g" % test_result.r2)

Root Mean Squared Error (RMSE) on test data = 42410.3
r2 on test data = 0.712811
