In [11]:
import pyspark
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
import six
from pyspark.sql.functions import isnan,when,count,col
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

In [5]:
Boston = pd.read_csv('C:/Users/venky kishore/Downloads/BigDataHadoop_SparkExam/Big Data Hadoop _ Spark Exam/Dataset/boston.csv')

In [6]:
Boston.head()

Unnamed: 0,CRIM,ZN,INDUS,CHAS,NOX,RM,AGE,DIS,RAD,TAX,PT,B,LSTAT,MV
0,0.00632,18.0,2.31,0,0.538,6.575,65.199997,4.09,1,296,15.3,396.899994,4.98,24.0
1,0.02731,0.0,7.07,0,0.469,6.421,78.900002,4.9671,2,242,17.799999,396.899994,9.14,21.6
2,0.02729,0.0,7.07,0,0.469,7.185,61.099998,4.9671,2,242,17.799999,392.829987,4.03,34.700001
3,0.03237,0.0,2.18,0,0.458,6.998,45.799999,6.0622,3,222,18.700001,394.630005,2.94,33.400002
4,0.06905,0.0,2.18,0,0.458,7.147,54.200001,6.0622,3,222,18.700001,396.899994,5.33,36.200001


In [None]:
spark=SparkSession.builder.appName('Pyspark').getOrCreate()

In [6]:
spark

In [39]:
data_boston=spark.read.csv('boston.csv',header=True,inferSchema=True)

In [40]:
data_boston.printSchema()

root
 |-- CRIM: double (nullable = true)
 |-- ZN: double (nullable = true)
 |-- INDUS: double (nullable = true)
 |-- CHAS: integer (nullable = true)
 |-- NOX: double (nullable = true)
 |-- RM: double (nullable = true)
 |-- AGE: double (nullable = true)
 |-- DIS: double (nullable = true)
 |-- RAD: integer (nullable = true)
 |-- TAX: integer (nullable = true)
 |-- PT: double (nullable = true)
 |-- B: double (nullable = true)
 |-- LSTAT: double (nullable = true)
 |-- MV: double (nullable = true)



In [41]:
data_boston.describe().show()

+-------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+-----------------+------------------+------------------+-----------------+
|summary|              CRIM|                ZN|             INDUS|              CHAS|               NOX|                RM|               AGE|               DIS|              RAD|               TAX|               PT|                 B|             LSTAT|               MV|
+-------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+-----------------+------------------+------------------+-----------------+
|  count|               506|               506|               506|               506|               506|               506|               506|               506|              506|  

In [42]:
data_boston.select([count(when(isnan(c),c)).alias(c) for c in data.columns]).show()

+----+---+-----+----+---+---+---+---+---+---+---+---+-----+---+
|CRIM| ZN|INDUS|CHAS|NOX| RM|AGE|DIS|RAD|TAX| PT|  B|LSTAT| MV|
+----+---+-----+----+---+---+---+---+---+---+---+---+-----+---+
|   0|  0|    0|   0|  0|  0|  0|  0|  0|  0|  0|  0|    0|  0|
+----+---+-----+----+---+---+---+---+---+---+---+---+-----+---+



In [43]:
for i in data_boston.columns:
    if not( isinstance(data_boston.select(i).take(1)[0][0], six.string_types)):
        print( "Correlation to MV for ", i, data_boston.stat.corr('MV',i))

Correlation to MV for  CRIM -0.3883046116575088
Correlation to MV for  ZN 0.36044534463752903
Correlation to MV for  INDUS -0.48372517128143383
Correlation to MV for  CHAS 0.17526017775291847
Correlation to MV for  NOX -0.4273207763683772
Correlation to MV for  RM 0.695359937127267
Correlation to MV for  AGE -0.37695456714288667
Correlation to MV for  DIS 0.24992873873512172
Correlation to MV for  RAD -0.3816262315669168
Correlation to MV for  TAX -0.46853593528654536
Correlation to MV for  PT -0.5077867038116085
Correlation to MV for  B 0.3334608226834164
Correlation to MV for  LSTAT -0.7376627294671615
Correlation to MV for  MV 1.0


In [None]:
vectorAssembler = VectorAssembler(inputCols = ['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PT', 'B', 'LSTAT'], outputCol = 'features')
new_data= vectorAssembler.transform(data)
new_data = new_data.select(['features', 'MV'])
new_data.show()

In [46]:
splits = new_data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1]

In [47]:
lr = LinearRegression(featuresCol = 'features', labelCol='MV', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [-0.06127590642427279,0.013719752673313683,-0.010894030030047117,2.372625324467242,-1.0149803862092803,3.8513312090443312,0.0,-0.6178579201083955,0.0034091366529530732,0.0,-0.7588518659582126,0.007392527453483071,-0.6233278305733613]
Intercept: 20.606896656202945


In [48]:
training = lr_model.summary
print("RMSE: %f" % training.rootMeanSquaredError)
print("r2: %f" % training.r2)

RMSE: 5.101659
r2: 0.703701
