In [1]:
#create sparksession object
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('lin_reg').getOrCreate()

In [2]:
#import Linear Regression from spark's MLlib
from pyspark.ml.regression import LinearRegression

In [3]:
df=spark.read.csv('Boston.csv',inferSchema=True,header=True)

In [4]:
print((df.count(),len(df.columns)))

(506, 15)


In [5]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- crim: double (nullable = true)
 |-- zn: double (nullable = true)
 |-- indus: double (nullable = true)
 |-- chas: integer (nullable = true)
 |-- nox: double (nullable = true)
 |-- rm: double (nullable = true)
 |-- age: double (nullable = true)
 |-- dis: double (nullable = true)
 |-- rad: integer (nullable = true)
 |-- tax: integer (nullable = true)
 |-- ptratio: double (nullable = true)
 |-- black: double (nullable = true)
 |-- lstat: double (nullable = true)
 |-- medv: double (nullable = true)



In [6]:
df.columns

['_c0',
 'crim',
 'zn',
 'indus',
 'chas',
 'nox',
 'rm',
 'age',
 'dis',
 'rad',
 'tax',
 'ptratio',
 'black',
 'lstat',
 'medv']

In [7]:
df.describe().show(2,False)

+-------+-----+------------------+------------------+------------------+------------------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+------------------+------------------+------------------+------------------+
|summary|_c0  |crim              |zn                |indus             |chas              |nox               |rm               |age              |dis              |rad              |tax              |ptratio           |black             |lstat             |medv              |
+-------+-----+------------------+------------------+------------------+------------------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+------------------+------------------+------------------+------------------+
|count  |506  |506               |506               |506               |506               |506               |506              |506              |506              |506  

In [8]:
#sneak into the dataset
df.head(1)

[Row(_c0=1, crim=0.00632, zn=18.0, indus=2.31, chas=0, nox=0.538, rm=6.575, age=65.2, dis=4.09, rad=1, tax=296, ptratio=15.3, black=396.9, lstat=4.98, medv=24.0)]

In [9]:
#perform descriptive analytics
df.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
_c0,506,253.5,146.2138844296259,1,506
crim,506,3.6135235573122535,8.601545105332491,0.00632,88.9762
zn,506,11.363636363636363,23.32245299451514,0.0,100.0
indus,506,11.136778656126504,6.860352940897589,0.46,27.74
chas,506,0.0691699604743083,0.2539940413404101,0,1
nox,506,0.5546950592885372,0.11587767566755584,0.385,0.871
rm,506,6.284634387351787,0.7026171434153232,3.561,8.78
age,506,68.57490118577078,28.148861406903595,2.9,100.0
dis,506,3.795042687747034,2.10571012662761,1.1296,12.1265


In [10]:
import pandas as pd
from pandas.tools.plotting import scatter_matrix
numeric_features = [t[0] for t in df.dtypes if t[1] == 'int' or t[1] == 'double']
sampled_data = df.select(numeric_features).sample(False, 0.8).toPandas()
axs = scatter_matrix(sampled_data, figsize=(10, 10))
n = len(sampled_data.columns)
for i in range(n):
    v = axs[i, 0]
    v.yaxis.label.set_rotation(0)
    v.yaxis.label.set_ha('right')
    v.set_yticks(())
    h = axs[n-1, i]
    h.xaxis.label.set_rotation(90)
    h.set_xticks(())

  """


In [11]:
import six
for i in df.columns:
    if not( isinstance(df.select(i).take(1)[0][0], six.string_types)):
        print( "Correlation to MV for ", i, df.stat.corr('medv',i))

Correlation to MV for  _c0 -0.22660364293533927
Correlation to MV for  crim -0.38830460858681154
Correlation to MV for  zn 0.3604453424505433
Correlation to MV for  indus -0.4837251600283728
Correlation to MV for  chas 0.1752601771902987
Correlation to MV for  nox -0.4273207723732821
Correlation to MV for  rm 0.6953599470715401
Correlation to MV for  age -0.3769545650045961
Correlation to MV for  dis 0.249928734085904
Correlation to MV for  rad -0.38162623063977735
Correlation to MV for  tax -0.46853593356776674
Correlation to MV for  ptratio -0.5077866855375622
Correlation to MV for  black 0.3334608196570661
Correlation to MV for  lstat -0.7376627261740145
Correlation to MV for  medv 1.0


In [12]:
#Prepare data for Machine Learning. And we need two columns only — features and label(“Medv”)
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols =['crim','zn','indus','chas','nox','rm','age','dis','rad','tax','ptratio','black','lstat'], outputCol = 'features')
vhouse_df = vectorAssembler.transform(df)
vhouse_df = vhouse_df.select(['features', 'medv'])
vhouse_df.show(3)

+--------------------+----+
|            features|medv|
+--------------------+----+
|[0.00632,18.0,2.3...|24.0|
|[0.02731,0.0,7.07...|21.6|
|[0.02729,0.0,7.07...|34.7|
+--------------------+----+
only showing top 3 rows



In [13]:
splits = vhouse_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

# Linear Regression

In [14]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='medv', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [0.0,0.0146203410189,0.0,1.93636675231,-9.01636826857,4.23505440856,0.0,-0.737245244497,0.0,0.0,-0.774296137511,0.00746733064124,-0.575307262624]
Intercept: 22.39341612745367


In [15]:
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 5.035155
r2: 0.716358


In [16]:
train_df.describe().show()

+-------+------------------+
|summary|              medv|
+-------+------------------+
|  count|               350|
|   mean|22.956857142857153|
| stddev| 9.467800739302247|
|    min|               5.0|
|    max|              50.0|
+-------+------------------+



In [17]:
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","medv","features").show(5)
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="medv",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

+------------------+----+--------------------+
|        prediction|medv|            features|
+------------------+----+--------------------+
|27.997696625728327|22.0|[0.01096,55.0,2.2...|
|  26.4826778294601|23.1|[0.0187,85.0,4.15...|
|26.447209452660488|24.7|[0.02055,85.0,0.7...|
|31.832680946584304|31.1|[0.02187,60.0,2.9...|
|28.171038860902346|23.9|[0.02543,55.0,3.7...|
+------------------+----+--------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.714562


In [18]:
test_result = lr_model.evaluate(test_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

Root Mean Squared Error (RMSE) on test data = 4.53259


In [19]:
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()

numIterations: 11
objectiveHistory: [0.4999999999999991, 0.43297539031987375, 0.23739009270459582, 0.21361789511785848, 0.18487937390964793, 0.18209786032979755, 0.18102497632976344, 0.1799024378269275, 0.17850281343190819, 0.17760520859882295, 0.17741911877056407]
+--------------------+
|           residuals|
+--------------------+
|  -6.887948283298538|
|  0.8818951803336184|
|  0.5151089513755451|
|   4.706555752617437|
|  1.4555954015999788|
|   10.96974776345418|
|-0.24801335346561615|
| -2.1520796014231323|
|   -3.39908829291938|
|     8.1492220939151|
|   8.161951964180254|
|   3.415249648735916|
|  1.0300239280928984|
|   6.912829908809261|
| -0.6950886532249427|
|   9.840828511121288|
|   5.054887823056411|
|  -9.924194822300382|
|  2.9359335392339716|
|  0.9214620500839033|
+--------------------+
only showing top 20 rows



In [20]:
predictions = lr_model.transform(test_df)
predictions.select("prediction","medv","features").show()

+------------------+----+--------------------+
|        prediction|medv|            features|
+------------------+----+--------------------+
|27.997696625728327|22.0|[0.01096,55.0,2.2...|
|  26.4826778294601|23.1|[0.0187,85.0,4.15...|
|26.447209452660488|24.7|[0.02055,85.0,0.7...|
|31.832680946584304|31.1|[0.02187,60.0,2.9...|
|28.171038860902346|23.9|[0.02543,55.0,3.7...|
|25.619056816332908|21.6|[0.02731,0.0,7.07...|
|24.717428975698837|19.4|[0.03466,35.0,6.0...|
| 31.32328611193907|28.5|[0.03502,80.0,4.9...|
| 38.87424303402016|48.5|[0.0351,95.0,2.68...|
|37.541457741537926|45.4|[0.03578,20.0,3.3...|
|29.549654935039296|23.5|[0.03584,80.0,3.3...|
|26.011716375258466|24.8|[0.03659,25.0,4.8...|
| 27.26907668307364|22.0|[0.03932,0.0,3.41...|
| 21.61916635188563|21.1|[0.03961,0.0,5.19...|
|28.634303884446417|28.0|[0.04113,25.0,4.8...|
|27.361717268378932|22.9|[0.04203,28.0,15....|
|  25.1979941572093|20.6|[0.04294,28.0,15....|
|  24.1240918156084|20.5|[0.04337,21.0,5.6...|
| 27.05709148

# Decision tree Regression

In [21]:
from pyspark.ml.regression import DecisionTreeRegressor
dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'medv')
dt_model = dt.fit(train_df)
dt_predictions = dt_model.transform(test_df)
dt_evaluator = RegressionEvaluator(
    labelCol="medv", predictionCol="prediction", metricName="rmse")
rmse = dt_evaluator.evaluate(dt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 3.4133


In [22]:
#feature importance
dt_model.featureImportances

SparseVector(13, {0: 0.0187, 1: 0.006, 4: 0.0225, 5: 0.2367, 6: 0.0127, 7: 0.0893, 9: 0.0128, 10: 0.0253, 12: 0.5759})

In [23]:
df.take(1)

[Row(_c0=1, crim=0.00632, zn=18.0, indus=2.31, chas=0, nox=0.538, rm=6.575, age=65.2, dis=4.09, rad=1, tax=296, ptratio=15.3, black=396.9, lstat=4.98, medv=24.0)]

# Gradient-boosted tree regression

In [24]:
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(featuresCol = 'features', labelCol = 'medv', maxIter=10)
gbt_model = gbt.fit(train_df)
gbt_predictions = gbt_model.transform(test_df)
gbt_predictions.select('prediction', 'medv', 'features').show(5)

+------------------+----+--------------------+
|        prediction|medv|            features|
+------------------+----+--------------------+
| 21.38790013287501|22.0|[0.01096,55.0,2.2...|
| 22.84235807802391|23.1|[0.0187,85.0,4.15...|
| 22.66031807875485|24.7|[0.02055,85.0,0.7...|
| 26.79643512584938|31.1|[0.02187,60.0,2.9...|
|25.718073035492335|23.9|[0.02543,55.0,3.7...|
+------------------+----+--------------------+
only showing top 5 rows



In [25]:
gbt_evaluator = RegressionEvaluator(
    labelCol="medv", predictionCol="prediction", metricName="rmse")
rmse = gbt_evaluator.evaluate(gbt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 3.29571
