<font size=5>

Regression with boston_data.csv. Dataset downloaded from Kaggle, to predict Boston housing price



</font>

| Code   | Description   |
|:---|:---|
|**CRIM** | per capita crime rate by town |
|**ZN**  | proportion of residential land zoned for lots over 25,000 sq.ft. |
|**INDUS**  | proportion of non-retail business acres per town |
|**CHAS**  | Charles River dummy variable (= 1 if tract bounds river; 0 otherwise) |
|**NOX**  | nitric oxides concentration (parts per 10 million) |
|**RM**  | average number of rooms per dwelling |
|**AGE**  | proportion of owner-occupied units built prior to 1940 |
|**DIS**  | weighted distances to five Boston employment centres |
|**RAD**  | index of accessibility to radial highways |
|**TAX**  | full-value property-tax rate per $10,000 |
|**PTRATIO**  | pupil-teacher ratio by town |
|**B**  | 1000(Bk - 0.63)^2 where Bk is the proportion of blacks by town |
|**LSTAT**  | % lower status of the population |
|**MEDV**  | Median value of owner-occupied homes in \$1000's |



<font size=5>mdev is the label, all other columns are features. </font>

<font size=5> Import PySpark libraries, create SparkContext and SQL context, then load the csv data file. </font>

In [None]:
import findspark
findspark.init()
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext

sc= SparkContext()
sqlContext = SQLContext(sc)
boston_house_df = sqlContext.read.format('csv').options(header='true', inferschema='true')\
.load('file:///opt/hadoop/jentekllc/Spark/datasets/BostonHousing.csv')

<font size=5> Show statistics of each column, including feature columns and label column (medv)  </font>

In [None]:
boston_house_df.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
crim,506,3.6135235573122535,8.601545105332491,0.00632,88.9762
zn,506,11.363636363636363,23.32245299451514,0.0,100.0
indus,506,11.136778656126504,6.860352940897589,0.46,27.74
chas,506,0.0691699604743083,0.2539940413404101,0,1
nox,506,0.5546950592885372,0.11587767566755584,0.385,0.871
rm,506,6.284634387351787,0.7026171434153232,3.561,8.78
age,506,68.57490118577078,28.148861406903595,2.9,100.0
dis,506,3.795042687747034,2.10571012662761,1.1296,12.1265
rad,506,9.549407114624506,8.707259384239366,1,24


<font size=5>

We need to find out corelationship beween each feature column with label medv.  The corelationship is between 0 to |1|, the more close to -1, or 1, that means that feature column is more negatively or positively corelated to medv, the more close to 0, that means less or little corelationship between the feature column and label medv.

   
    
</font>

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
import six
for i in boston_house_df.columns:
    if not( isinstance(boston_house_df.select(i).take(1)[0][0], six.string_types)):
        print( "Correlation to medv for ", i, boston_house_df.stat.corr('medv',i))

Correlation to medv for  crim -0.38830460858681154
Correlation to medv for  zn 0.3604453424505433
Correlation to medv for  indus -0.4837251600283728
Correlation to medv for  chas 0.1752601771902987
Correlation to medv for  nox -0.4273207723732821
Correlation to medv for  rm 0.6953599470715401
Correlation to medv for  age -0.3769545650045961
Correlation to medv for  dis 0.249928734085904
Correlation to medv for  rad -0.38162623063977735
Correlation to medv for  tax -0.46853593356776674
Correlation to medv for  ptratio -0.5077866855375622
Correlation to medv for  b 0.3334608196570661
Correlation to medv for  lstat -0.7376627261740145
Correlation to medv for  medv 1.0


<font size=5>

Spark ML requires features of the dataset are vectorized before the dataset can be fit into ML model,
VectorAssembler is to convert a Spark Dataframe into Spark Vectorized Dataframe

</font>

In [None]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols = ['crim', 'zn', 'indus', 'chas', 'nox', 'rm', 'age', 'dis', 'rad', 'tax', 'ptratio', 'b', 'lstat'], outputCol = 'features')
#vectorAssembler = VectorAssembler(inputCols = ['rm'], outputCol = 'features')
vector_house_df = vectorAssembler.transform(boston_house_df)
vector_house_df = vector_house_df.select(['features', 'medv'])
vector_house_df.show(2)

+--------------------+----+
|            features|medv|
+--------------------+----+
|[0.00632,18.0,2.3...|24.0|
|[0.02731,0.0,7.07...|21.6|
+--------------------+----+
only showing top 2 rows



<font size=5>  

Now randomly split Spark Vectorized DataFrame (dataset) into training data (70%) and testing data (30%)
    
    
</font>

In [None]:
splits = vector_house_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]
print(test_df.count())

138


In [None]:
train_df.show(2)

+--------------------+----+
|            features|medv|
+--------------------+----+
|[0.00632,18.0,2.3...|24.0|
|[0.01096,55.0,2.2...|22.0|
+--------------------+----+
only showing top 2 rows



<font size=5>

Let's do Linear Regression first, fit the Linear Regression model with train_df
    
</font>

In [None]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='medv', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [-0.008705106328913166,0.006023590381011056,-0.057906071607189094,3.1847344475277866,-6.187200885207628,4.203725365598978,-8.763688222585704e-05,-0.6276537162738738,0.0,0.0,-0.799411786344425,0.008868403042993868,-0.4929121373907858]
Intercept: 20.292695035558626


<font size=5>
Linear Regression produced slope coefficients and intercept

y=a1 X x1 + a2 X x2 +...+ an X xn + b

a1,a2,...an are coefficients for the xn in their space
b is intercept

x1, x2, ... xn are independent variables

</font>

In [None]:
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 5.114079
r2: 0.691711


In [None]:
train_df.describe().show()

+-------+-----------------+
|summary|             medv|
+-------+-----------------+
|  count|              368|
|   mean|22.62690217391305|
| stddev|9.223146996202797|
|    min|              5.0|
|    max|             50.0|
+-------+-----------------+



<font size=5>

Test the model with test_df, testing produces metrics that evaluates the performance of the regressor with RMSE and R2 score.

  
    
</font>

In [None]:
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","medv","features").show(5)
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="medv",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

+------------------+----+--------------------+
|        prediction|medv|            features|
+------------------+----+--------------------+
| 30.79578189766023|32.2|[0.00906,90.0,2.9...|
| 32.10188130417794|32.7|[0.01301,35.0,1.5...|
|30.113937760762752|35.4|[0.01311,90.0,1.2...|
|34.717089031603834|44.0|[0.01538,90.0,3.7...|
|25.633973841160756|21.6|[0.02731,0.0,7.07...|
+------------------+----+--------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.777433


In [None]:
test_result = lr_model.evaluate(test_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

Root Mean Squared Error (RMSE) on test data = 4.30383


In [None]:
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show(2)

numIterations: 11
objectiveHistory: [0.5, 0.433284889386758, 0.24842120216691616, 0.22711926490905993, 0.19675487395842117, 0.19348544217889674, 0.19292689313998745, 0.1921582874243395, 0.19124798715184915, 0.19079822667085858, 0.19061607284127202]
+-------------------+
|          residuals|
+-------------------+
|-6.8394305915109825|
|-5.8370462871607245|
+-------------------+
only showing top 2 rows



In [None]:
predictions = lr_model.transform(test_df)
predictions.select("prediction","medv","features").show(5)

+------------------+----+--------------------+
|        prediction|medv|            features|
+------------------+----+--------------------+
| 30.79578189766023|32.2|[0.00906,90.0,2.9...|
| 32.10188130417794|32.7|[0.01301,35.0,1.5...|
|30.113937760762752|35.4|[0.01311,90.0,1.2...|
|34.717089031603834|44.0|[0.01538,90.0,3.7...|
|25.633973841160756|21.6|[0.02731,0.0,7.07...|
+------------------+----+--------------------+
only showing top 5 rows



<font size=5>
    
Now try Gradient Boost Tree Regressor with the same train_df and test_df
    
    
</font>

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator


In [None]:
gbt = GBTRegressor(featuresCol="features",labelCol='medv', maxIter=10)
gbt_model = gbt.fit(train_df)


In [None]:
gbt_predictions = gbt_model.transform(test_df)
gbt_predictions.select("prediction","medv","features").show(5)

+------------------+----+--------------------+
|        prediction|medv|            features|
+------------------+----+--------------------+
|31.901881375197288|32.2|[0.00906,90.0,2.9...|
| 34.59554945652568|32.7|[0.01301,35.0,1.5...|
|  34.7920240155147|35.4|[0.01311,90.0,1.2...|
| 45.71246197955683|44.0|[0.01538,90.0,3.7...|
| 20.23234916276675|21.6|[0.02731,0.0,7.07...|
+------------------+----+--------------------+
only showing top 5 rows



<font size=5>

Test the model with test_df, testing produces metrics that evaluates the performance of the regressor with RMSE and R2 score.

Looks like the metrics of Gradient Boost Tree are better that those of Linear Regressor
    
    
</font>

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
gbt_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="medv",metricName="r2")
print("R Squared (R2) on test data = %g" % gbt_evaluator.evaluate(gbt_predictions))

R Squared (R2) on test data = 0.824957


In [None]:
gbt_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="medv",metricName="rmse")

In [None]:
print("RMSE on test data = %g" % gbt_evaluator.evaluate(gbt_predictions))

RMSE on test data = 3.81678


<font size=5>

Now try Random Forest Regressor with the same train_df and test_df
    
</font>

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator


In [None]:

rf = RandomForestRegressor(featuresCol="features",labelCol='medv', maxDepth=3)
rf_model = rf.fit(train_df)

In [None]:
rf_predictions = rf_model.transform(test_df)
rf_predictions.select("prediction","medv","features").show(5)

+------------------+----+--------------------+
|        prediction|medv|            features|
+------------------+----+--------------------+
|29.282180897063693|32.2|[0.00906,90.0,2.9...|
|31.653408126980445|32.7|[0.01301,35.0,1.5...|
| 32.40155126739403|35.4|[0.01311,90.0,1.2...|
|41.573830793016235|44.0|[0.01538,90.0,3.7...|
| 23.32195042099955|21.6|[0.02731,0.0,7.07...|
+------------------+----+--------------------+
only showing top 5 rows



<font size=5>
    
Test the model with test_df, testing produces metrics that evaluates the performance of the regressor with RMSE and R2 score.

Looks like the metrics of Random Forest are better that those of Linear Regressor, but similar to those of Gradient Boost Tree
    
</font>

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
rf_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="medv",metricName="r2")
print("R Squared (R2) on test data = %g" % rf_evaluator.evaluate(rf_predictions))

R Squared (R2) on test data = 0.797658


In [None]:
rf_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="medv",metricName="rmse")

In [None]:
print("RMSE on test data = %g" % gbt_evaluator.evaluate(gbt_predictions))

RMSE on test data = 3.81678


<font size=5>

Finally, try Decision Tree regressor with the same train_df and test_df
    
    
</font>

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator


In [None]:
dt = DecisionTreeRegressor(featuresCol="features",labelCol='medv', maxDepth=3)
dt_model = dt.fit(train_df)

In [None]:
dt_predictions = dt_model.transform(test_df)
dt_predictions.select("prediction","medv","features").show(5)

+------------------+----+--------------------+
|        prediction|medv|            features|
+------------------+----+--------------------+
|         32.071875|32.2|[0.00906,90.0,2.9...|
|         32.071875|32.7|[0.01301,35.0,1.5...|
|         32.071875|35.4|[0.01311,90.0,1.2...|
|46.731578947368426|44.0|[0.01538,90.0,3.7...|
|23.002923976608184|21.6|[0.02731,0.0,7.07...|
+------------------+----+--------------------+
only showing top 5 rows



<font size=5>
    
Test the model with test_df, testing produces metrics that evaluates the performance of the regressor with RMSE and R2 score.

Looks like the metrics of Decision Tree Regressor are slightly better than that those of Linear Regressor, but not as good as Gradient Boost Tree and Random Forest

</font>

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
dt_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="medv",metricName="r2")
print("R Squared (R2) on test data = %g" % dt_evaluator.evaluate(dt_predictions))

R Squared (R2) on test data = 0.751557


In [None]:
dt_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="medv",metricName="rmse")

In [None]:
print("RMSE on test data = %g" % dt_evaluator.evaluate(dt_predictions))

RMSE on test data = 4.54714


<font size=5>

This concludes the testing of Spark ML regressors

</font>