In [1]:
pip install pyspark



In [2]:
import os
os.environ["JAVA_HOME"]="/lib/jvm/java-11-openjdk-amd64"

In [3]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext

In [None]:
conf = SparkConf().set('spark.ui.port', '4050').setAppName("films").setMaster("local[2]")
sc = SparkContext.getOrCreate(conf=conf)
sqlContext = SQLContext(sc)
#sc.stop()

In [None]:
house_df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('Boston.csv')
house_df.show()

In [None]:
## Printing schema
house_df.cache()
house_df.printSchema()


In [None]:
## Descriptive analysis
house_df.toPandas()

In [None]:
import pandas as pd
from matplotlib import cm
from pandas.plotting import scatter_matrix
numeric_features = [t[0] for t in house_df.dtypes if t[1] == 'int' or t[1] == 'double']
sampled_data = house_df.select(numeric_features).sample(False, 0.8).toPandas()
axs = scatter_matrix(sampled_data, figsize=(10, 10))
n = len(sampled_data.columns)
for i in range(n):
    v = axs[i, 0]
    v.yaxis.label.set_rotation(0)
    v.yaxis.label.set_ha('right')
    v.set_yticks(())
    h = axs[n-1, i]
    h.xaxis.label.set_rotation(90)
    h.set_xticks(())

In [None]:
|-- _c0: integer (nullable = true)
 |-- : double (nullable = true)
 |-- zn: double (nullable = true)
 |-- : double (nullable = true)
 |-- : integer (nullable = true)
 |-- : double (nullable = true)
 |-- : double (nullable = true)
 |-- : double (nullable = true)
 |-- : double (nullable = true)
 |-- : integer (nullable = true)
 |-- : integer (nullable = true)
 |-- : double (nullable = true)
 |-- : double (nullable = true)
 |-- : double (nullable = true)
 |-- : double (nullable = true)

In [9]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols = ['crim', 'zn', 'indus', 'chas', 'nox', 'rm', 'age', 'dis', 'rad', 'tax', 'ptratio', 'black', 'lstat'], outputCol = 'features')
vhouse_df = vectorAssembler.transform(house_df)
vhouse_df = vhouse_df.select(['features', 'medv'])
vhouse_df.show(3)

+--------------------+----+
|            features|medv|
+--------------------+----+
|[0.00632,18.0,2.3...|24.0|
|[0.02731,0.0,7.07...|21.6|
|[0.02729,0.0,7.07...|34.7|
+--------------------+----+
only showing top 3 rows



In [None]:
import six
for i in house_df.columns:
    if not( isinstance(house_df.select(i).take(1)[0][0], six.string_types)):
        print( "Correlation to medv for ", i, house_df.stat.corr('medv',i))

Correlation to medv for  _c0 -0.22660364293533927
Correlation to medv for  crim -0.38830460858681154
Correlation to medv for  zn 0.3604453424505433
Correlation to medv for  indus -0.4837251600283728
Correlation to medv for  chas 0.1752601771902987
Correlation to medv for  nox -0.4273207723732821
Correlation to medv for  rm 0.6953599470715401
Correlation to medv for  age -0.3769545650045961
Correlation to medv for  dis 0.249928734085904
Correlation to medv for  rad -0.38162623063977735
Correlation to medv for  tax -0.46853593356776674
Correlation to medv for  ptratio -0.5077866855375622
Correlation to medv for  black 0.3334608196570661
Correlation to medv for  lstat -0.7376627261740145
Correlation to medv for  medv 1.0


In [10]:
splits = vhouse_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]
#train_df,test_df=vhouse_df.randomSplit([0.7,0.3])

In [11]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='medv', maxIter=10)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [-0.1085578791106107,0.051169847021377966,0.031557988116536526,4.562893458905863,-16.96667508863869,4.478570783193395,-0.018941061670907946,-1.4637131347304269,0.3010781854607575,-0.011076070153995934,-0.8507452878584548,0.009404491275523787,-0.42652349288265506]
Intercept: 29.184511991670288


In [None]:
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

In [None]:
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","medv","features").show(5)
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="medv",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

In [None]:
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()

In [None]:
predictions = lr_model.transform(test_df)
predictions.select("prediction","medv","features").show()

+------------------+----+--------------------+
|        prediction|medv|            features|
+------------------+----+--------------------+
| 27.73619480353222|22.0|[0.01096,55.0,2.2...|
|25.918153891765243|30.1|[0.01709,90.0,2.0...|
|31.014350767064446|32.9|[0.01778,95.0,1.4...|
| 31.35915379210034|31.1|[0.02187,60.0,2.9...|
|27.789719126915646|23.9|[0.02543,55.0,3.7...|
| 29.26232640785991|30.8|[0.02763,75.0,2.9...|
|27.043208341573887|25.0|[0.02875,28.0,15....|
|25.409928237665433|26.6|[0.02899,40.0,1.2...|
| 19.76077301272168|18.5|[0.03041,0.0,5.19...|
| 29.58499242407223|33.4|[0.03237,0.0,2.18...|
| 32.18142236719568|34.9|[0.03359,75.0,2.9...|
|20.395766998830684|19.5|[0.03427,0.0,5.19...|
|28.599404967980284|24.1|[0.03445,82.5,2.0...|
| 37.96912723168561|48.5|[0.0351,95.0,2.68...|
|22.822289706863266|20.9|[0.03548,80.0,3.6...|
| 28.72242512495242|27.9|[0.03615,80.0,4.9...|
|22.775049889511184|20.7|[0.03738,0.0,5.19...|
| 26.07529515906109|23.2|[0.03871,52.5,5.3...|
|27.249209499

## Decision tree regression

In [14]:
from pyspark.ml.regression import DecisionTreeRegressor
dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'medv')
dt_model = dt.fit(train_df)
dt_predictions = dt_model.transform(test_df)
dt_evaluator = RegressionEvaluator(
    labelCol="medv", predictionCol="prediction", metricName="rmse")
rmse = dt_evaluator.evaluate(dt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 4.72301


In [15]:
lr_evaluator.evaluate(dt_predictions)

0.7412971779552553

In [None]:
dt_model.featureImportances

SparseVector(13, {0: 0.0148, 2: 0.0027, 3: 0.0094, 4: 0.0089, 5: 0.6354, 7: 0.0388, 9: 0.0072, 10: 0.038, 11: 0.0029, 12: 0.242})

In [None]:
house_df.take(1)

[Row(_c0=1, crim=0.00632, zn=18.0, indus=2.31, chas=0, nox=0.538, rm=6.575, age=65.2, dis=4.09, rad=1, tax=296, ptratio=15.3, black=396.9, lstat=4.98, medv=24.0)]

## Gradient-boosted tree regression

In [None]:
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(featuresCol = 'features', labelCol = 'medv', maxIter=10)
gbt_model = gbt.fit(train_df)
gbt_predictions = gbt_model.transform(test_df)
gbt_predictions.select('prediction', 'medv', 'features').show(5)

+------------------+----+--------------------+
|        prediction|medv|            features|
+------------------+----+--------------------+
| 20.66696084176724|22.0|[0.01096,55.0,2.2...|
|  33.0721830507196|30.1|[0.01709,90.0,2.0...|
|34.625351588448396|32.9|[0.01778,95.0,1.4...|
|24.114463807082725|31.1|[0.02187,60.0,2.9...|
| 26.69383363147453|23.9|[0.02543,55.0,3.7...|
+------------------+----+--------------------+
only showing top 5 rows



In [None]:
gbt_evaluator = RegressionEvaluator(
    labelCol="medv", predictionCol="prediction", metricName="rmse")
rmse = gbt_evaluator.evaluate(gbt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 4.78316
