In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
sc= SparkContext()
sqlContext = SQLContext(sc)

In [3]:
house_df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('C:\\Users\\Lucky\\Downloads\\boston-housing\\boston_housing.csv')
house_df.take(1)

[Row(crim=0.00632, zn=18.0, indus=2.31, chas=0, nox=0.538, rm=6.575, age=65.2, dis=4.09, rad=1, tax=296.0, ptratio=15.3, black=396.9, lstat=4.98, medv=24.0)]

In [4]:
house_df.cache()

DataFrame[crim: double, zn: double, indus: double, chas: int, nox: double, rm: double, age: double, dis: double, rad: int, tax: double, ptratio: double, black: double, lstat: double, medv: double]

In [6]:
house_df.printSchema()

root
 |-- crim: double (nullable = true)
 |-- zn: double (nullable = true)
 |-- indus: double (nullable = true)
 |-- chas: integer (nullable = true)
 |-- nox: double (nullable = true)
 |-- rm: double (nullable = true)
 |-- age: double (nullable = true)
 |-- dis: double (nullable = true)
 |-- rad: integer (nullable = true)
 |-- tax: double (nullable = true)
 |-- ptratio: double (nullable = true)
 |-- black: double (nullable = true)
 |-- lstat: double (nullable = true)
 |-- medv: double (nullable = true)



In [11]:
house_df.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
crim,506,3.6135235573122535,8.601545105332491,0.00632,88.9762
zn,506,11.363636363636363,23.32245299451514,0.0,100.0
indus,506,11.136778656126504,6.860352940897589,0.46,27.74
chas,506,0.0691699604743083,0.2539940413404101,0,1
nox,506,0.5546950592885372,0.11587767566755584,0.385,0.871
rm,506,6.284634387351787,0.7026171434153232,3.561,8.78
age,506,68.57490118577078,28.148861406903595,2.9,100.0
dis,506,3.795042687747034,2.10571012662761,1.1296,12.1265
rad,506,9.549407114624506,8.707259384239366,1,24


In [14]:
from pandas.plotting import scatter_matrix

In [15]:
numeric_features = [t[0] for t in house_df.dtypes if t[1] == 'int' or t[1] == 'double']
sampled_data = house_df.select(numeric_features).sample(False, 0.8).toPandas()
axs = scatter_matrix(sampled_data, figsize=(10, 10))
n = len(sampled_data.columns)
for i in range(n):
    v = axs[i, 0]
    v.yaxis.label.set_rotation(0)
    v.yaxis.label.set_ha('right')
    v.set_yticks(())
    h = axs[n-1, i]
    h.xaxis.label.set_rotation(90)
    h.set_xticks(())

In [19]:
import six
for i in house_df.columns:
    if not( isinstance(house_df.select(i).take(1)[0][0], six.string_types)):
        print( "Correlation to medv for ", i, house_df.stat.corr('medv',i))

Correlation to medv for  crim -0.38830460858681154
Correlation to medv for  zn 0.3604453424505433
Correlation to medv for  indus -0.4837251600283728
Correlation to medv for  chas 0.1752601771902987
Correlation to medv for  nox -0.4273207723732821
Correlation to medv for  rm 0.6953599470715401
Correlation to medv for  age -0.3769545650045961
Correlation to medv for  dis 0.249928734085904
Correlation to medv for  rad -0.38162623063977735
Correlation to medv for  tax -0.46853593356776674
Correlation to medv for  ptratio -0.5077866855375622
Correlation to medv for  black 0.3334608196570661
Correlation to medv for  lstat -0.7376627261740145
Correlation to medv for  medv 1.0


In [21]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols = ['crim', 'zn', 'indus', 'chas', 'nox', 'rm', 'age', 'dis', 'rad', 'tax', 'ptratio', 'black', 'lstat'], outputCol = 'features')
vhouse_df = vectorAssembler.transform(house_df)
vhouse_df = vhouse_df.select(['features', 'medv'])
vhouse_df.show(3)

+--------------------+----+
|            features|medv|
+--------------------+----+
|[0.00632,18.0,2.3...|24.0|
|[0.02731,0.0,7.07...|21.6|
|[0.02729,0.0,7.07...|34.7|
+--------------------+----+
only showing top 3 rows



In [22]:
splits = vhouse_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [24]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='medv', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [-0.0006875442326925336,0.01275700989404824,-0.031732117878271904,0.4867361949913643,-4.090042449314189,3.3042212351338414,-0.004312713199166634,-0.5166193799676568,0.0,-0.003133184974348571,-0.701247964289656,0.006438356036265741,-0.5166782955187311]
Intercept: 24.70655940920911


In [25]:
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 4.755510
r2: 0.694535


In [26]:
train_df.describe().show()

+-------+------------------+
|summary|              medv|
+-------+------------------+
|  count|               327|
|   mean|22.280428134556583|
| stddev| 8.617505144075226|
|    min|               7.0|
|    max|              50.0|
+-------+------------------+



In [27]:
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","medv","features").show(5)
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="medv",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

+------------------+----+--------------------+
|        prediction|medv|            features|
+------------------+----+--------------------+
|30.319338661415888|24.0|[0.00632,18.0,2.3...|
|30.543317289038605|32.2|[0.00905999999999...|
| 37.05790422466536|50.0|[0.01381,80.0,0.4...|
|27.992508797873477|24.5|[0.01500999999999...|
| 38.45145002369892|50.0|[0.01500999999999...|
+------------------+----+--------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.694655


In [28]:
test_result = lr_model.evaluate(test_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

Root Mean Squared Error (RMSE) on test data = 5.60978


In [29]:
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()

numIterations: 11
objectiveHistory: [0.5, 0.4297702797114969, 0.23822403668149839, 0.21919654955553028, 0.192676812105485, 0.190368079520636, 0.18968918826081413, 0.18919280123552548, 0.18895043627098246, 0.18879383242309675, 0.1885974613568025]
+--------------------+
|           residuals|
+--------------------+
|  -5.775271100494596|
|   1.379907262457614|
|  5.0638224513138255|
|  0.9293005287968441|
|  0.1966780443928826|
| -1.5582356379331443|
|   9.700788993926004|
|   2.120407102114072|
| -3.3943615248841397|
|   6.711432219399516|
| -1.7512643127865744|
|  12.087679387798332|
|  -2.042753083447355|
|   6.924541303959472|
|-0.01572750769847...|
|    -9.7118498225698|
|  -3.866495368995615|
|  1.3810665948589076|
| -1.9271794044477524|
|  2.0380533360286606|
+--------------------+
only showing top 20 rows



In [30]:
predictions = lr_model.transform(test_df)
predictions.select("prediction","medv","features").show()

+------------------+----+--------------------+
|        prediction|medv|            features|
+------------------+----+--------------------+
|30.319338661415888|24.0|[0.00632,18.0,2.3...|
|30.543317289038605|32.2|[0.00905999999999...|
| 37.05790422466536|50.0|[0.01381,80.0,0.4...|
|27.992508797873477|24.5|[0.01500999999999...|
| 38.45145002369892|50.0|[0.01500999999999...|
|27.567105928263466|30.1|[0.01709,90.0,2.0...|
| 27.57355467315705|23.9|[0.02543,55.0,3.7...|
|30.681708682367034|34.7|[0.02729,0.0,7.07...|
| 26.26211588678883|26.6|[0.02899,40.0,1.2...|
|21.037796430898233|17.5|[0.03113,0.0,4.39...|
|25.049387028849527|19.4|[0.03466,35.0,6.0...|
|30.398819060924094|28.5|[0.03501999999999...|
|27.258127638352896|22.0|[0.03537,34.0,6.0...|
| 23.85575516747518|20.9|[0.03548000000000...|
|24.945195534212125|22.9|[0.03551,25.0,4.8...|
|35.605560970568256|45.4|[0.03578,20.0,3.3...|
|26.567900056927442|22.9|[0.04203,28.0,15....|
| 24.48271204092662|19.4|[0.04378999999999...|
| 30.22741962

In [31]:
from pyspark.ml.regression import DecisionTreeRegressor
dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'medv')
dt_model = dt.fit(train_df)
dt_predictions = dt_model.transform(test_df)
dt_evaluator = RegressionEvaluator(
    labelCol="medv", predictionCol="prediction", metricName="rmse")
rmse = dt_evaluator.evaluate(dt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 4.56846


In [32]:
dt_model.featureImportances

SparseVector(13, {0: 0.0773, 2: 0.0149, 4: 0.0116, 5: 0.2629, 6: 0.0022, 7: 0.0579, 9: 0.0056, 10: 0.0219, 11: 0.0079, 12: 0.538})

In [33]:
house_df.take(1)

[Row(crim=0.00632, zn=18.0, indus=2.31, chas=0, nox=0.538, rm=6.575, age=65.2, dis=4.09, rad=1, tax=296.0, ptratio=15.3, black=396.9, lstat=4.98, medv=24.0)]

In [35]:
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(featuresCol = 'features', labelCol = 'medv', maxIter=10)
gbt_model = gbt.fit(train_df)
gbt_predictions = gbt_model.transform(test_df)
gbt_predictions.select('prediction', 'medv', 'features').show(5)

+------------------+----+--------------------+
|        prediction|medv|            features|
+------------------+----+--------------------+
| 27.01300415180857|24.0|[0.00632,18.0,2.3...|
|33.420843961508254|32.2|[0.00905999999999...|
|41.742092014972414|50.0|[0.01381,80.0,0.4...|
| 24.84246694702423|24.5|[0.01500999999999...|
| 42.49162102638479|50.0|[0.01500999999999...|
+------------------+----+--------------------+
only showing top 5 rows



In [36]:
gbt_evaluator = RegressionEvaluator(
    labelCol="medv", predictionCol="prediction", metricName="rmse")
rmse = gbt_evaluator.evaluate(gbt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 4.41186
