### **Import Libraries**

In [1]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 36 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 62.8 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=2582178602a13fca1d58f0a915ac39f69763c1475bfea5ab592eeb76132d994d
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [2]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext

sc= SparkContext()
sqlContext = SQLContext(sc)



In [3]:
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('/content/sample_data/boston.csv')

In [4]:
df.take(1)

[Row(CRIM=0.00632, ZN=18.0, INDUS=2.309999943, CHAS=0, NOX=0.537999988, RM=6.574999809, AGE=65.19999695, DIS=4.090000153, RAD=1, TAX=296, PT=15.30000019, B=396.8999939, LSTAT=4.980000019, MV=24.0)]

In [5]:
df.show(10, truncate=False)

+-----------+----+-----------+----+-----------+-----------+-----------+-----------+---+---+-----------+-----------+-----------+-----------+
|CRIM       |ZN  |INDUS      |CHAS|NOX        |RM         |AGE        |DIS        |RAD|TAX|PT         |B          |LSTAT      |MV         |
+-----------+----+-----------+----+-----------+-----------+-----------+-----------+---+---+-----------+-----------+-----------+-----------+
|0.00632    |18.0|2.309999943|0   |0.537999988|6.574999809|65.19999695|4.090000153|1  |296|15.30000019|396.8999939|4.980000019|24.0       |
|0.027310001|0.0 |7.070000172|0   |0.469000012|6.421000004|78.90000153|4.967100143|2  |242|17.79999924|396.8999939|9.140000343|21.60000038|
|0.02729    |0.0 |7.070000172|0   |0.469000012|7.184999943|61.09999847|4.967100143|2  |242|17.79999924|392.8299866|4.03000021 |34.70000076|
|0.032370001|0.0 |2.180000067|0   |0.458000004|6.998000145|45.79999924|6.062200069|3  |222|18.70000076|394.6300049|2.940000057|33.40000153|
|0.069049999|0.0 |2.

In [6]:
df.printSchema()

root
 |-- CRIM: double (nullable = true)
 |-- ZN: double (nullable = true)
 |-- INDUS: double (nullable = true)
 |-- CHAS: integer (nullable = true)
 |-- NOX: double (nullable = true)
 |-- RM: double (nullable = true)
 |-- AGE: double (nullable = true)
 |-- DIS: double (nullable = true)
 |-- RAD: integer (nullable = true)
 |-- TAX: integer (nullable = true)
 |-- PT: double (nullable = true)
 |-- B: double (nullable = true)
 |-- LSTAT: double (nullable = true)
 |-- MV: double (nullable = true)



### **Correlation between Independent and Dependent variable**

In [7]:
import six
for col in df.columns:
    if not(isinstance(df.select(col).take(1)[0][0], six.string_types)):
        print("Correlation between MV and ", col, " ", df.stat.corr("MV", col))

Correlation between MV and  CRIM   -0.3883046116575088
Correlation between MV and  ZN   0.36044534463752903
Correlation between MV and  INDUS   -0.48372517128143383
Correlation between MV and  CHAS   0.17526017775291847
Correlation between MV and  NOX   -0.4273207763683772
Correlation between MV and  RM   0.695359937127267
Correlation between MV and  AGE   -0.37695456714288667
Correlation between MV and  DIS   0.24992873873512172
Correlation between MV and  RAD   -0.3816262315669168
Correlation between MV and  TAX   -0.46853593528654536
Correlation between MV and  PT   -0.5077867038116085
Correlation between MV and  B   0.3334608226834164
Correlation between MV and  LSTAT   -0.7376627294671615
Correlation between MV and  MV   1.0


In [8]:
from pyspark.ml.feature import VectorAssembler

vectAssembler = VectorAssembler(inputCols = ['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PT', 'B', 'LSTAT'], outputCol = 'features')
vdf = vectAssembler.transform(df)
vdf = vdf.select(['features', 'MV'])
vdf.show(5)

+--------------------+-----------+
|            features|         MV|
+--------------------+-----------+
|[0.00632,18.0,2.3...|       24.0|
|[0.027310001,0.0,...|21.60000038|
|[0.02729,0.0,7.07...|34.70000076|
|[0.032370001,0.0,...|33.40000153|
|[0.069049999,0.0,...|36.20000076|
+--------------------+-----------+
only showing top 5 rows



In [9]:
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol="features", outputCol="scaled_features",
                        withStd=True, withMean=False)

scalerModel = scaler.fit(vdf)
scaledData = scalerModel.transform(vdf)
scaledData.show(5)

+--------------------+-----------+--------------------+
|            features|         MV|     scaled_features|
+--------------------+-----------+--------------------+
|[0.00632,18.0,2.3...|       24.0|[7.34751714521701...|
|[0.027310001,0.0,...|21.60000038|[0.00317501108518...|
|[0.02729,0.0,7.07...|34.70000076|[0.00317268580526...|
|[0.032370001,0.0,...|33.40000153|[0.00376327748952...|
|[0.069049999,0.0,...|36.20000076|[0.00802762739762...|
+--------------------+-----------+--------------------+
only showing top 5 rows



In [10]:
vdf2 = scaledData.select(['scaled_features', 'MV'])

vdf2.show(5)

+--------------------+-----------+
|     scaled_features|         MV|
+--------------------+-----------+
|[7.34751714521701...|       24.0|
|[0.00317501108518...|21.60000038|
|[0.00317268580526...|34.70000076|
|[0.00376327748952...|33.40000153|
|[0.00802762739762...|36.20000076|
+--------------------+-----------+
only showing top 5 rows



### **Splitting file into Train & Test Datasets**

Not Scaled Data

In [11]:
splits = vdf.randomSplit([0.7, 0.3])
train_vdf = splits[0]
test_vdf = splits[1]

Scaled Data

In [12]:
splits = vdf2.randomSplit([0.7, 0.3])
train_vdf2 = splits[0]
test_vdf2 = splits[1]

##**Scaled Data**

### **Linear Regression**

In [13]:
from pyspark.ml.regression import LinearRegression

lreg = LinearRegression(featuresCol='scaled_features', labelCol='MV', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lreg_model = lreg.fit(train_vdf2)
print("LR Coefficients: " + str(lreg_model.coefficients))
print("LR Intercept: " + str(lreg_model.intercept))

LR Coefficients: [-0.2569573806330271,0.17171145807530963,0.0,0.6183055435226239,-0.5837152318473628,3.392186615361845,-0.06298555413115847,-1.392371519737105,0.0,-0.27395688018751685,-1.6713702674644804,1.166767290921547,-3.295040636448978]
LR Intercept: 13.59108932377271


### **Evaluation**

In [14]:
lsummary = lreg_model.summary

In [15]:
rmse = lsummary.rootMeanSquaredError
rsquare = lsummary.r2

print("Root Mean Squared Error: ", str(rmse))
print("R-Squared: ", str(rsquare))

Root Mean Squared Error:  5.009007752450976
R-Squared:  0.7059706691151533


In [16]:
pred = lreg_model.transform(test_vdf2)

In [17]:
pred.show(5)

+--------------------+-----------+------------------+
|     scaled_features|         MV|        prediction|
+--------------------+-----------+------------------+
|[0.00105329913505...|32.20000076| 30.72783270835537|
|[0.00127418968214...|       22.0|27.235205082779576|
|[0.00160552550277...|       50.0| 38.40374449311538|
|[0.00217402801606...|23.10000038|  25.4993662292038|
|[0.00226819702777...|       33.0|26.289938971306356|
+--------------------+-----------+------------------+
only showing top 5 rows



In [19]:
result = lreg_model.evaluate(test_vdf2)

In [20]:
print("Root Mean Squared Error Test Data: ", str(result.rootMeanSquaredError))
print("R-Squared Test Data: ", str(result.r2))

Root Mean Squared Error Test Data:  4.676370921317646
R-Squared Test Data:  0.7342709117623074


##**Not Scaled Data**

### **Linear Regression**

In [22]:
lreg = LinearRegression(featuresCol='features', labelCol='MV', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lreg_model = lreg.fit(train_vdf)
print("LR Coefficients: " + str(lreg_model.coefficients))
print("LR Intercept: " + str(lreg_model.intercept))

LR Coefficients: [-0.0545401858034031,0.015967442495788964,0.0,1.8276461096379344,-3.4792783659551114,4.224345433907338,-0.001126314172838157,-0.4455836341123504,0.0,-0.0019115683915978037,-0.8136538931342597,0.006314286033431694,-0.45845639335862814]
LR Intercept: 18.772472354905137


In [23]:
lsummary = lreg_model.summary

In [24]:
rmse = lsummary.rootMeanSquaredError
rsquare = lsummary.r2

print("Root Mean Squared Error: ", str(rmse))
print("R-Squared: ", str(rsquare))

Root Mean Squared Error:  4.76650165930665
R-Squared:  0.7239410963716459


In [26]:
pred = lreg_model.transform(test_vdf)

pred.show(5)

+--------------------+-----------+------------------+
|            features|         MV|        prediction|
+--------------------+-----------+------------------+
|[0.00632,18.0,2.3...|       24.0|30.275185792539613|
|[0.01096,55.0,2.2...|       22.0| 27.96130712185799|
|[0.01301,35.0,1.5...|32.70000076| 32.01082286806058|
|[0.01381,80.0,0.4...|       50.0|  38.2183026898455|
|[0.01439,60.0,2.9...|29.10000038|30.610580244065545|
+--------------------+-----------+------------------+
only showing top 5 rows



In [27]:
result = lreg_model.evaluate(test_vdf)

print("Root Mean Squared Error Test Data: ", str(result.rootMeanSquaredError))
print("R-Squared Test Data: ", str(result.r2))

Root Mean Squared Error Test Data:  5.352522972080572
R-Squared Test Data:  0.6767562594454459
