In [None]:
The Below Code consists of 
    Q.1 Read the given CSV file in a Hive table 
    Q2. Read the data from Hive table as spark dataframe 
    Q3. Get the correlation between dependent and independent variables
    Q4. Build a linear regression model to predict house price 
    Q5. Evaluate the Linear Regression model by getting the RMSE and R-squared values

In [1]:
pip install pyspark

In [2]:
import pyspark

In [3]:
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder.appName("MLhands-on").enableHiveSupport().getOrCreate()

In [5]:
df=spark.read.csv('boston.csv',header='true', inferSchema='true')

In [7]:
df.printSchema()

root
 |-- CRIM: double (nullable = true)
 |-- ZN: double (nullable = true)
 |-- INDUS: double (nullable = true)
 |-- CHAS: integer (nullable = true)
 |-- NOX: double (nullable = true)
 |-- RM: double (nullable = true)
 |-- AGE: double (nullable = true)
 |-- DIS: double (nullable = true)
 |-- RAD: integer (nullable = true)
 |-- TAX: integer (nullable = true)
 |-- PT: double (nullable = true)
 |-- B: double (nullable = true)
 |-- LSTAT: double (nullable = true)
 |-- MV: double (nullable = true)



In [10]:
pandas_df = df.toPandas()
pandas_df.head(5)

In [12]:
pandas_df.corr()['MV']

CRIM    -0.388305
ZN       0.360445
INDUS   -0.483725
CHAS     0.175260
NOX     -0.427321
RM       0.695360
AGE     -0.376955
DIS      0.249929
RAD     -0.381626
TAX     -0.468536
PT      -0.507787
B        0.333461
LSTAT   -0.737663
MV       1.000000
Name: MV, dtype: float64

In [13]:
feature_columns = df.columns[:-1]

In [15]:
from pyspark.ml.feature import VectorAssembler

In [16]:
assembler = VectorAssembler(inputCols= feature_columns, outputCol= "features")

In [17]:
vector = assembler.transform(df)

In [18]:
pands_vector = vector.toPandas()
pands_vector.head()

Unnamed: 0,CRIM,ZN,INDUS,CHAS,NOX,RM,AGE,DIS,RAD,TAX,PT,B,LSTAT,MV,features
0,0.00632,18.0,2.31,0,0.538,6.575,65.199997,4.09,1,296,15.3,396.899994,4.98,24.0,"[0.00632, 18.0, 2.309999943, 0.0, 0.537999988,..."
1,0.02731,0.0,7.07,0,0.469,6.421,78.900002,4.9671,2,242,17.799999,396.899994,9.14,21.6,"[0.027310001, 0.0, 7.070000172, 0.0, 0.4690000..."
2,0.02729,0.0,7.07,0,0.469,7.185,61.099998,4.9671,2,242,17.799999,392.829987,4.03,34.700001,"[0.02729, 0.0, 7.070000172, 0.0, 0.469000012, ..."
3,0.03237,0.0,2.18,0,0.458,6.998,45.799999,6.0622,3,222,18.700001,394.630005,2.94,33.400002,"[0.032370001, 0.0, 2.180000067, 0.0, 0.4580000..."
4,0.06905,0.0,2.18,0,0.458,7.147,54.200001,6.0622,3,222,18.700001,396.899994,5.33,36.200001,"[0.069049999, 0.0, 2.180000067, 0.0, 0.4580000..."


In [20]:
vector_df = vector.select(['features', 'MV'])
vector_df.show()

+--------------------+-----------+
|            features|         MV|
+--------------------+-----------+
|[0.00632,18.0,2.3...|       24.0|
|[0.027310001,0.0,...|21.60000038|
|[0.02729,0.0,7.07...|34.70000076|
|[0.032370001,0.0,...|33.40000153|
|[0.069049999,0.0,...|36.20000076|
|[0.029850001,0.0,...|28.70000076|
|[0.088289998,12.5...|22.89999962|
|[0.144549996,12.5...|27.10000038|
|[0.211239994,12.5...|       16.5|
|[0.170039997,12.5...|18.89999962|
|[0.224889994,12.5...|       15.0|
|[0.117470004,12.5...|18.89999962|
|[0.093780003,12.5...|21.70000076|
|[0.629760027,0.0,...|20.39999962|
|[0.637960017,0.0,...|18.20000076|
|[0.627390027,0.0,...|19.89999962|
|[1.053930044,0.0,...|23.10000038|
|[0.784200013,0.0,...|       17.5|
|[0.802709997,0.0,...|20.20000076|
|[0.725799978,0.0,...|18.20000076|
+--------------------+-----------+
only showing top 20 rows



In [21]:
splits = vector_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [22]:
from pyspark.ml.regression import LinearRegression

In [23]:
model = LinearRegression(featuresCol= 'features', labelCol= 'MV')
model_lr = model.fit(train_df)    

In [24]:
model_lr.coefficients

DenseVector([-0.1441, 0.051, 0.0069, 3.0814, -18.511, 3.6444, -0.0039, -1.5512, 0.4124, -0.0149, -0.9981, 0.01, -0.5169])

In [26]:
model_lr.intercept

39.267672567791216

In [27]:
predictions = model_lr.transform(test_df) # we use transform method to test the model
predictions.show()

+--------------------+-----------+------------------+
|            features|         MV|        prediction|
+--------------------+-----------+------------------+
|[0.01311,90.0,1.2...|35.40000153|31.553693632343943|
|[0.02055,85.0,0.7...|24.70000076|24.768308541596152|
|[0.02177,82.5,2.0...|42.29999924| 37.06391646804898|
|[0.02187,60.0,2.9...|31.10000038| 32.24792705080656|
|[0.02899,40.0,1.2...|26.60000038|21.499407704123705|
|[0.033059999,0.0,...|20.60000038|22.368885139341153|
|[0.034449998,82.5...|24.10000038| 29.44766870147291|
|[0.035020001,80.0...|       28.5|  33.8149350587754|
|[0.035100002,95.0...|       48.5| 42.57840661736431|
|[0.03548,80.0,3.6...|20.89999962| 21.67021644061087|
|[0.041129999,25.0...|       28.0|28.429351844801612|
|[0.042939998,28.0...|20.60000038| 27.02994546526208|
|[0.043790001,80.0...|19.39999962|25.945472554950186|
|[0.04544,0.0,3.24...|19.79999924|21.280646033398444|
|[0.049320001,33.0...|28.20000076| 33.26874085034083|
|[0.055610001,70.0...|      

In [28]:
summary = model_lr.evaluate(test_df)

In [29]:
print(summary.rootMeanSquaredError)

4.46829041377678


In [30]:
print(summary.r2)

0.7668479296315855
