In [1]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("housing-price-prediction-regression") \
    .config("master", "local[*]") \
    .getOrCreate()


In [2]:
housing = spark.read.csv("housing.csv", header=True, inferSchema = True, sep=' ').withColumnRenamed('MV','label')

In [3]:
housing.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
CRIM,506,3.6135235573122535,8.601545105332491,0.00632,88.9762
ZN,506,11.363636363636363,23.32245299451514,0.0,100.0
INDUS,506,11.136778656126504,6.860352940897589,0.46,27.74
CHAS,506,0.0691699604743083,0.2539940413404101,0,1
NOX,506,0.5546950592885372,0.11587767566755584,0.385,0.871
RM,506,6.284634387351787,0.7026171434153232,3.561,8.78
AGE,506,68.57490118577078,28.148861406903595,2.9,100.0
DIS,506,3.795042687747034,2.10571012662761,1.1296,12.1265
RAD,506,9.549407114624506,8.707259384239366,1,24


In [4]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline


vectorAssemblerFeatures = VectorAssembler(inputCols = ['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PT', 'B', 'LSTAT'], outputCol = 'features')
vectorAssemblerFeaturesAndLabel = VectorAssembler(inputCols = ['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PT', 'B', 'LSTAT','label'], outputCol = 'featuresAndLabel')

pipeline = Pipeline().setStages([vectorAssemblerFeatures,vectorAssemblerFeaturesAndLabel])

model = pipeline.fit(housing)

housing_transformed = model.transform(housing)

In [5]:
housing_transformed.select('label', 'features','featuresAndLabel').limit(10).toPandas()

Unnamed: 0,label,features,featuresAndLabel
0,24.0,"[0.00632, 18.0, 2.31, 0.0, 0.538, 6.575, 65.2,...","[0.00632, 18.0, 2.31, 0.0, 0.538, 6.575, 65.2,..."
1,21.6,"[0.02731, 0.0, 7.07, 0.0, 0.469, 6.421, 78.9, ...","[0.02731, 0.0, 7.07, 0.0, 0.469, 6.421, 78.9, ..."
2,34.7,"[0.02729, 0.0, 7.07, 0.0, 0.469, 7.185, 61.1, ...","[0.02729, 0.0, 7.07, 0.0, 0.469, 7.185, 61.1, ..."
3,33.4,"[0.03237, 0.0, 2.18, 0.0, 0.458, 6.998, 45.8, ...","[0.03237, 0.0, 2.18, 0.0, 0.458, 6.998, 45.8, ..."
4,36.2,"[0.06905, 0.0, 2.18, 0.0, 0.458, 7.147, 54.2, ...","[0.06905, 0.0, 2.18, 0.0, 0.458, 7.147, 54.2, ..."
5,28.7,"[0.02985, 0.0, 2.18, 0.0, 0.458, 6.43, 58.7, 6...","[0.02985, 0.0, 2.18, 0.0, 0.458, 6.43, 58.7, 6..."
6,22.9,"[0.08829, 12.5, 7.87, 0.0, 0.524, 6.012, 66.6,...","[0.08829, 12.5, 7.87, 0.0, 0.524, 6.012, 66.6,..."
7,27.1,"[0.14455, 12.5, 7.87, 0.0, 0.524, 6.172, 96.1,...","[0.14455, 12.5, 7.87, 0.0, 0.524, 6.172, 96.1,..."
8,16.5,"[0.21124, 12.5, 7.87, 0.0, 0.524, 5.631, 100.0...","[0.21124, 12.5, 7.87, 0.0, 0.524, 5.631, 100.0..."
9,18.9,"[0.17004, 12.5, 7.87, 0.0, 0.524, 6.004, 85.9,...","[0.17004, 12.5, 7.87, 0.0, 0.524, 6.004, 85.9,..."


In [6]:
from pyspark.ml.stat import Correlation
import pandas as pd

correlationRow=Correlation.corr(housing_transformed, "featuresAndLabel").head()
correlationArray=correlationRow[0].toArray()
pd.DataFrame(data=correlationArray)

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,10,11,12,13
0,1.0,-0.200469,0.406583,-0.055892,0.420972,-0.219247,0.352734,-0.37967,0.625505,0.582764,0.289946,-0.385064,0.455621,-0.388305
1,-0.200469,1.0,-0.533828,-0.042697,-0.516604,0.311991,-0.569537,0.664408,-0.311948,-0.314563,-0.391679,0.17552,-0.412995,0.360445
2,0.406583,-0.533828,1.0,0.062938,0.763651,-0.391676,0.644779,-0.708027,0.595129,0.72076,0.383248,-0.356977,0.6038,-0.483725
3,-0.055892,-0.042697,0.062938,1.0,0.091203,0.091251,0.086518,-0.099176,-0.007368,-0.035587,-0.121515,0.048788,-0.053929,0.17526
4,0.420972,-0.516604,0.763651,0.091203,1.0,-0.302188,0.73147,-0.76923,0.611441,0.668023,0.188933,-0.380051,0.590879,-0.427321
5,-0.219247,0.311991,-0.391676,0.091251,-0.302188,1.0,-0.240265,0.205246,-0.209847,-0.292048,-0.355501,0.128069,-0.613808,0.69536
6,0.352734,-0.569537,0.644779,0.086518,0.73147,-0.240265,1.0,-0.747881,0.456022,0.506456,0.261515,-0.273534,0.602339,-0.376955
7,-0.37967,0.664408,-0.708027,-0.099176,-0.76923,0.205246,-0.747881,1.0,-0.494588,-0.534432,-0.232471,0.291512,-0.496996,0.249929
8,0.625505,-0.311948,0.595129,-0.007368,0.611441,-0.209847,0.456022,-0.494588,1.0,0.910228,0.464741,-0.444413,0.488676,-0.381626
9,0.582764,-0.314563,0.72076,-0.035587,0.668023,-0.292048,0.506456,-0.534432,0.910228,1.0,0.460853,-0.441808,0.543993,-0.468536


In [7]:
from pyspark.ml.stat import ChiSquareTest

testResult = ChiSquareTest.test(housing_transformed, "features", "label")

resultRow=testResult.head()
pValues=resultRow[0].toArray()
degreesOfFreedom=resultRow[1]
statistics=resultRow[2].toArray()

pd.DataFrame(data=pValues)



Unnamed: 0,0
0,0.1742444
1,3.64682e-09
2,0.04608009
3,0.269119
4,0.04780706
5,0.603553
6,0.9632714
7,0.8141983
8,0.0002858113
9,0.1131694


In [8]:
pd.DataFrame(data=degreesOfFreedom)

Unnamed: 0,0
0,114684
1,5700
2,17100
3,228
4,18240
5,101460
6,80940
7,93708
8,1824
9,14820


In [9]:
pd.DataFrame(data=statistics)

Unnamed: 0,0
0,115132.920833
1,6339.35707
2,17412.666685
3,240.707994
4,18559.477608
5,101341.107738
6,80221.285412
7,93321.067976
8,2039.331005
9,15028.594872


In [10]:
train, test = housing_transformed.select('features','label').randomSplit([0.7, 0.3], seed=12345)

In [11]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='label', maxIter=100, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train)

In [12]:
from pyspark.ml.evaluation import RegressionEvaluator
prediction=lr_model.transform(test)
lr_evaluator_r2 = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="label",metricName="r2")
lr_evaluator_rmse = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="label",metricName="rmse")
print("Linear Regression R2 {}".format(lr_evaluator_r2.evaluate(prediction)))
print("Linear Regression RMSE {}".format(lr_evaluator_rmse.evaluate(prediction)))

Linear Regression R2 0.6376649953104635
Linear Regression RMSE 4.826714491476544


In [13]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.1, 0.01, 0.3]) \
    .addGrid(lr.maxIter, [10, 100, 500]) \
    .build()

crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(predictionCol="prediction", \
                 labelCol="label",metricName="r2"),
                          numFolds=4)

cvModel = crossval.fit(train)

prediction = cvModel.transform(test)
selected = prediction.select("prediction",'label')

bestModelSummary=cvModel.bestModel.summary
print("Linear Regression R2  for best Model {}".format(bestModelSummary.r2))
print("Linear Regression RMSE  for best Model {}".format(bestModelSummary.rootMeanSquaredError))

Linear Regression R2  for best Model 0.755880450495389
Linear Regression RMSE  for best Model 4.7596957888247395
