In [2]:
import pyspark

In [4]:
from pyspark.sql import SparkSession

In [5]:
spark=SparkSession.builder.appName('EXAM').getOrCreate()

In [6]:
spark

In [39]:
data=spark.read.csv('boston.csv',header=True,inferSchema=True)

In [40]:
data.printSchema()

root
 |-- CRIM: double (nullable = true)
 |-- ZN: double (nullable = true)
 |-- INDUS: double (nullable = true)
 |-- CHAS: integer (nullable = true)
 |-- NOX: double (nullable = true)
 |-- RM: double (nullable = true)
 |-- AGE: double (nullable = true)
 |-- DIS: double (nullable = true)
 |-- RAD: integer (nullable = true)
 |-- TAX: integer (nullable = true)
 |-- PT: double (nullable = true)
 |-- B: double (nullable = true)
 |-- LSTAT: double (nullable = true)
 |-- MV: double (nullable = true)



In [41]:
data.describe().show()

+-------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+-----------------+------------------+------------------+-----------------+
|summary|              CRIM|                ZN|             INDUS|              CHAS|               NOX|                RM|               AGE|               DIS|              RAD|               TAX|               PT|                 B|             LSTAT|               MV|
+-------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+-----------------+------------------+------------------+-----------------+
|  count|               506|               506|               506|               506|               506|               506|               506|               506|              506|  

In [42]:
from pyspark.sql.functions import isnan,when,count,col
data.select([count(when(isnan(c),c)).alias(c) for c in data.columns]).show()

+----+---+-----+----+---+---+---+---+---+---+---+---+-----+---+
|CRIM| ZN|INDUS|CHAS|NOX| RM|AGE|DIS|RAD|TAX| PT|  B|LSTAT| MV|
+----+---+-----+----+---+---+---+---+---+---+---+---+-----+---+
|   0|  0|    0|   0|  0|  0|  0|  0|  0|  0|  0|  0|    0|  0|
+----+---+-----+----+---+---+---+---+---+---+---+---+-----+---+



In [43]:
import six
for i in data.columns:
    if not( isinstance(data.select(i).take(1)[0][0], six.string_types)):
        print( "Correlation to MV for ", i, data.stat.corr('MV',i))

Correlation to MV for  CRIM -0.3883046116575088
Correlation to MV for  ZN 0.36044534463752903
Correlation to MV for  INDUS -0.48372517128143383
Correlation to MV for  CHAS 0.17526017775291847
Correlation to MV for  NOX -0.4273207763683772
Correlation to MV for  RM 0.695359937127267
Correlation to MV for  AGE -0.37695456714288667
Correlation to MV for  DIS 0.24992873873512172
Correlation to MV for  RAD -0.3816262315669168
Correlation to MV for  TAX -0.46853593528654536
Correlation to MV for  PT -0.5077867038116085
Correlation to MV for  B 0.3334608226834164
Correlation to MV for  LSTAT -0.7376627294671615
Correlation to MV for  MV 1.0


In [45]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols = ['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PT', 'B', 'LSTAT'], outputCol = 'features')
data_new = vectorAssembler.transform(data)
data_new = data_new.select(['features', 'MV'])
data_new.show()

+--------------------+-----------+
|            features|         MV|
+--------------------+-----------+
|[0.00632,18.0,2.3...|       24.0|
|[0.027310001,0.0,...|21.60000038|
|[0.02729,0.0,7.07...|34.70000076|
|[0.032370001,0.0,...|33.40000153|
|[0.069049999,0.0,...|36.20000076|
|[0.029850001,0.0,...|28.70000076|
|[0.088289998,12.5...|22.89999962|
|[0.144549996,12.5...|27.10000038|
|[0.211239994,12.5...|       16.5|
|[0.170039997,12.5...|18.89999962|
|[0.224889994,12.5...|       15.0|
|[0.117470004,12.5...|18.89999962|
|[0.093780003,12.5...|21.70000076|
|[0.629760027,0.0,...|20.39999962|
|[0.637960017,0.0,...|18.20000076|
|[0.627390027,0.0,...|19.89999962|
|[1.053930044,0.0,...|23.10000038|
|[0.784200013,0.0,...|       17.5|
|[0.802709997,0.0,...|20.20000076|
|[0.725799978,0.0,...|18.20000076|
+--------------------+-----------+
only showing top 20 rows



In [46]:
splits = data_new.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [47]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='MV', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [-0.06127590642427279,0.013719752673313683,-0.010894030030047117,2.372625324467242,-1.0149803862092803,3.8513312090443312,0.0,-0.6178579201083955,0.0034091366529530732,0.0,-0.7588518659582126,0.007392527453483071,-0.6233278305733613]
Intercept: 20.606896656202945


In [48]:
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 5.101659
r2: 0.703701
