In [0]:
housing=spark.read.option("header",True).option("inferschema",True).csv("/mnt/housing/data/cleaned/housing_cleaned.csv")

In [0]:
housing.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)
 |-- rooms_per_household: double (nullable = true)
 |-- bedrooms_per_household: double (nullable = true)
 |-- population_per_household: double (nullable = true)



In [0]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer()\
 .setInputCol("ocean_proximity")\
 .setOutputCol("ocean_proximity_index")

In [0]:
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder()\
 .setInputCol("ocean_proximity_index")\
 .setOutputCol("ocean_proximity_encoded")

In [0]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler()\
 .setInputCols(["longitude", "latitude", "housing_median_age", "total_rooms", "total_bedrooms", "population","households", "median_income","rooms_per_household","bedrooms_per_household","population_per_household","ocean_proximity_encoded" ])\
 .setOutputCol("features")

In [0]:
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler()\
    .setInputCol('features')\
    .setOutputCol('scaled_features')

In [0]:
from pyspark.ml import Pipeline
transformationPipeline = Pipeline()\
 .setStages([indexer, encoder, vectorAssembler,scaler])

In [0]:
fittedPipeline = transformationPipeline.fit(housing)

In [0]:
transformedHousing = fittedPipeline.transform(housing)

In [0]:
transformedHousing.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------+----------------------+------------------------+---------------------+-----------------------+--------------------+--------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|rooms_per_household|bedrooms_per_household|population_per_household|ocean_proximity_index|ocean_proximity_encoded|            features|     scaled_features|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------+----------------------+------------------------+---------------------+-----------------------+--------------------+--------------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600

In [0]:
transformedHousing=transformedHousing.select(['scaled_features', 'median_house_value'])

In [0]:
transformedHousing.printSchema()

root
 |-- scaled_features: vector (nullable = true)
 |-- median_house_value: double (nullable = true)



In [0]:
train, test = transformedHousing.randomSplit([0.7, 0.3])

In [0]:
train.count()

Out[32]: 14313

In [0]:
test.count()

Out[33]: 6120

In [0]:
train.write.option("header", "true").option("schema","true").mode('overwrite').parquet('/mnt/housing/processed/traindata/')

In [0]:
datalocation='/mnt/housing/processed/traindata/'
files=dbutils.fs.ls(datalocation)
csv_file=[x.path for x in files if x.path.endswith(".parquet")][0]
dbutils.fs.mv(csv_file,datalocation.rstrip('/')+".parquet")
dbutils.fs.rm(datalocation,recurse=True)

Out[35]: True

In [0]:
test.write.option("header", "true").option("schema","true").mode('overwrite').parquet('/mnt/housing/processed/testdata/')

In [0]:
datalocation='/mnt/housing/processed/testdata/'
files=dbutils.fs.ls(datalocation)
csv_file=[x.path for x in files if x.path.endswith(".parquet")][0]
dbutils.fs.mv(csv_file,datalocation.rstrip('/')+".parquet")
dbutils.fs.rm(datalocation,recurse=True)

Out[37]: True

Linear Regression

In [0]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol='scaled_features',labelCol='median_house_value')

In [0]:
lrModel = lr.fit(train)
lrpredictions=lrModel.transform(test).select("prediction", "median_house_value").rdd.map(lambda x: (float(x[0]), float(x[1])))

In [0]:
from pyspark.mllib.evaluation import RegressionMetrics
lr_metrics = RegressionMetrics(lrpredictions)
print("MSE: " + str(lr_metrics.meanSquaredError))
print("RMSE: " + str(lr_metrics.rootMeanSquaredError))
print("R-squared: " + str(lr_metrics.r2))
print("MAE: " + str(lr_metrics.meanAbsoluteError))
print("Explained variance: " + str(lr_metrics.explainedVariance))



MSE: 4920979212.52486
RMSE: 70149.69146421715
R-squared: 0.6326324105670907
MAE: 50290.14824273634
Explained variance: 8689765512.974714


DECISION TREE REGRESSION

In [0]:
from pyspark.ml.regression import DecisionTreeRegressor
dtr = DecisionTreeRegressor(featuresCol='scaled_features',labelCol='median_house_value')

In [0]:
dtrModel = dtr.fit(train)
dtrpredictions = dtrModel.transform(test).select("prediction", "median_house_value").rdd.map(lambda x: (float(x[0]), float(x[1])))

In [0]:
from pyspark.mllib.evaluation import RegressionMetrics
dtr_metrics = RegressionMetrics(dtrpredictions)
print("MSE: " + str(dtr_metrics.meanSquaredError))
print("RMSE: " + str(dtr_metrics.rootMeanSquaredError))
print("R-squared: " + str(dtr_metrics.r2))
print("MAE: " + str(dtr_metrics.meanAbsoluteError))
print("Explained variance: " + str(dtr_metrics.explainedVariance))



MSE: 4643138468.061013
RMSE: 68140.57871827192
R-squared: 0.6533741532430486
MAE: 48584.86841449843
Explained variance: 9047330595.107292


RANDOM FOREST REGRESSION

In [0]:
from pyspark.ml.regression import RandomForestRegressor
rfr = RandomForestRegressor(featuresCol='scaled_features',labelCol='median_house_value')

In [0]:
rfrModel = rfr.fit(train)
rfrpredictions = rfrModel.transform(test).select("prediction", "median_house_value").rdd.map(lambda x: (float(x[0]), float(x[1])))

In [0]:
from pyspark.mllib.evaluation import RegressionMetrics
rf_metrics = RegressionMetrics(dtrpredictions)
print("MSE: " + str(rf_metrics.meanSquaredError))
print("RMSE: " + str(rf_metrics.rootMeanSquaredError))
print("R-squared: " + str(rf_metrics.r2))
print("MAE: " + str(rf_metrics.meanAbsoluteError))
print("Explained variance: " + str(rf_metrics.explainedVariance))

MSE: 4643138468.061013
RMSE: 68140.57871827192
R-squared: 0.6533741532430486
MAE: 48584.86841449843
Explained variance: 9047330595.107292


GBT REGRESSION

In [0]:
from pyspark.ml.regression import GBTRegressor
gbtr = GBTRegressor(featuresCol='scaled_features',labelCol='median_house_value')

In [0]:
gbtrModel = gbtr.fit(train)
gbtr_predictions = gbtrModel.transform(test).select("prediction", "median_house_value").rdd.map(lambda x: (float(x[0]), float(x[1])))

In [0]:
from pyspark.mllib.evaluation import RegressionMetrics
metrics_gbt = RegressionMetrics(dtrpredictions)
print("MSE: " + str(metrics_gbt.meanSquaredError))
print("RMSE: " + str(metrics_gbt.rootMeanSquaredError))
print("R-squared: " + str(metrics_gbt.r2))
print("MAE: " + str(metrics_gbt.meanAbsoluteError))
print("Explained variance: " + str(metrics_gbt.explainedVariance))

MSE: 4643138468.061013
RMSE: 68140.57871827192
R-squared: 0.6533741532430486
MAE: 48584.86841449843
Explained variance: 9047330595.107292


CROSS VALIDATION AND HYPER PARAMETER TUNING

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

params = ParamGridBuilder().addGrid(rfr.maxDepth, [5,10,15]).build()

evaluator = RegressionEvaluator()\
 .setMetricName("rmse")\
 .setPredictionCol("prediction")\
 .setLabelCol("median_house_value")

cv = CrossValidator()\
 .setEstimator(rfr)\
 .setEvaluator(evaluator)\
 .setEstimatorParamMaps(params)\
 .setNumFolds(10) 
model = cv.fit(train)

In [0]:
from pyspark.mllib.evaluation import RegressionMetrics
out = model.transform(test)\
 .select("prediction", "median_house_value").rdd.map(lambda x: (float(x[0]), float(x[1])))

In [0]:
cv_metrics = RegressionMetrics(out)
print("MSE: " + str(cv_metrics.meanSquaredError))
print("RMSE: " + str(cv_metrics.rootMeanSquaredError))
print("R-squared: " + str(cv_metrics.r2))
print("MAE: " + str(cv_metrics.meanAbsoluteError))
print("Explained variance: " + str(cv_metrics.explainedVariance))



MSE: 2713738626.4935775
RMSE: 52093.556477683276
R-squared: 0.7974103172335066
MAE: 35100.78155408402
Explained variance: 9833223673.785948


BEST MODEL

In [0]:
bestModel = model.bestModel
print(bestModel)

RandomForestRegressionModel: uid=RandomForestRegressor_53a4dd1a0a27, numTrees=20, numFeatures=15


In [0]:
metrics=[lr_metrics,dtr_metrics,rf_metrics,metrics_gbt,cv_metrics]

In [0]:
from pyspark.sql.types import StructType,StructField,DoubleType,StringType

In [0]:
metrics_schema = StructType([ \
    StructField("Model",StringType(),True), \
    StructField("MSE",DoubleType(),True), \
    StructField("RMSE",DoubleType(),True), \
    StructField("R-squared",DoubleType(),True), \
    StructField("MAE", DoubleType(), True), \
    StructField("Explained_Variance", DoubleType(), True), \
  ])

In [0]:
model_names=['linear regession','decision tree regresstion','random forest regression','gbt regression', 'cv random forest regression']

In [0]:
metrics_data=[]
li=[]
j=0
for i in metrics:
    li.append(model_names[j])
    j=j+1
    li.append(i.meanSquaredError)
    li.append(i.rootMeanSquaredError)
    li.append(i.r2)
    li.append(i.meanAbsoluteError)
    li.append(i.explainedVariance)
    metrics_data.append(li)
    li=[]
print(metrics_data)

[['linear regession', 4920979212.52486, 70149.69146421715, 0.6326324105670907, 50290.14824273634, 8689765512.974714], ['decision tree regresstion', 4643138468.061013, 68140.57871827192, 0.6533741532430486, 48584.86841449843, 9047330595.107292], ['random forest regression', 4643138468.061013, 68140.57871827192, 0.6533741532430486, 48584.86841449843, 9047330595.107292], ['gbt regression', 4643138468.061013, 68140.57871827192, 0.6533741532430486, 48584.86841449843, 9047330595.107292], ['cv random forest regression', 2713738626.4935775, 52093.556477683276, 0.7974103172335066, 35100.78155408402, 9833223673.785948]]


In [0]:
metrics_df=spark.createDataFrame(data=metrics_data,schema=metrics_schema)

In [0]:
metrics_df.show(truncate=30)

+---------------------------+--------------------+------------------+------------------+-----------------+-------------------+
|                      Model|                 MSE|              RMSE|         R-squared|              MAE| Explained_Variance|
+---------------------------+--------------------+------------------+------------------+-----------------+-------------------+
|           linear regession|  4.92097921252486E9| 70149.69146421715|0.6326324105670907|50290.14824273634|8.689765512974714E9|
|  decision tree regresstion| 4.643138468061013E9| 68140.57871827192|0.6533741532430486|48584.86841449843|9.047330595107292E9|
|   random forest regression| 4.643138468061013E9| 68140.57871827192|0.6533741532430486|48584.86841449843|9.047330595107292E9|
|             gbt regression| 4.643138468061013E9| 68140.57871827192|0.6533741532430486|48584.86841449843|9.047330595107292E9|
|cv random forest regression|2.7137386264935775E9|52093.556477683276|0.7974103172335066|35100.78155408402|9.833

In [0]:
metrics_df.repartition(1).write.option("header", "true").option("schema","true").mode('overwrite').parquet('/mnt/housing/output/metrics/')

In [0]:
datalocation='/mnt/housing/output/metrics/'
files=dbutils.fs.ls(datalocation)
csv_file=[x.path for x in files if x.path.endswith(".parquet")][0]
dbutils.fs.mv(csv_file,datalocation.rstrip('/')+".parquet")
dbutils.fs.rm(datalocation,recurse=True)

Out[62]: True