In [1]:
# pip install pyspark

In [2]:
# Import Required Libraries

import numpy as np
import pandas as pd
import pyspark
import six
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

In [3]:
# Creating Spark Session

spark = SparkSession.builder.appName('boston').getOrCreate()
db = spark.sql("show databases")
db.show()

+---------+
|namespace|
+---------+
|  default|
+---------+



In [4]:
# Conversion of csv file to hive table

df = spark.read.csv("/content/boston.csv",header=True)
df.show(5)
df.write.saveAsTable("boston")

+-----------+---+-----------+----+-----------+-----------+-----------+-----------+---+---+-----------+-----------+-----------+-----------+
|       CRIM| ZN|      INDUS|CHAS|        NOX|         RM|        AGE|        DIS|RAD|TAX|         PT|          B|      LSTAT|         MV|
+-----------+---+-----------+----+-----------+-----------+-----------+-----------+---+---+-----------+-----------+-----------+-----------+
|    0.00632| 18|2.309999943|   0|0.537999988|6.574999809|65.19999695|4.090000153|  1|296|15.30000019|396.8999939|4.980000019|         24|
|0.027310001|  0|7.070000172|   0|0.469000012|6.421000004|78.90000153|4.967100143|  2|242|17.79999924|396.8999939|9.140000343|21.60000038|
|    0.02729|  0|7.070000172|   0|0.469000012|7.184999943|61.09999847|4.967100143|  2|242|17.79999924|392.8299866| 4.03000021|34.70000076|
|0.032370001|  0|2.180000067|   0|0.458000004|6.998000145|45.79999924|6.062200069|  3|222|18.70000076|394.6300049|2.940000057|33.40000153|
|0.069049999|  0|2.18000006

In [5]:
# Creating df from hive table

new_df = spark.sql("select * from boston")
new_df.show()

+-----------+----+-----------+----+-----------+-----------+-----------+-----------+---+---+-----------+-----------+-----------+-----------+
|       CRIM|  ZN|      INDUS|CHAS|        NOX|         RM|        AGE|        DIS|RAD|TAX|         PT|          B|      LSTAT|         MV|
+-----------+----+-----------+----+-----------+-----------+-----------+-----------+---+---+-----------+-----------+-----------+-----------+
|    0.00632|  18|2.309999943|   0|0.537999988|6.574999809|65.19999695|4.090000153|  1|296|15.30000019|396.8999939|4.980000019|         24|
|0.027310001|   0|7.070000172|   0|0.469000012|6.421000004|78.90000153|4.967100143|  2|242|17.79999924|396.8999939|9.140000343|21.60000038|
|    0.02729|   0|7.070000172|   0|0.469000012|7.184999943|61.09999847|4.967100143|  2|242|17.79999924|392.8299866| 4.03000021|34.70000076|
|0.032370001|   0|2.180000067|   0|0.458000004|6.998000145|45.79999924|6.062200069|  3|222|18.70000076|394.6300049|2.940000057|33.40000153|
|0.069049999|   0|2.

In [6]:
new_df.printSchema()

root
 |-- CRIM: string (nullable = true)
 |-- ZN: string (nullable = true)
 |-- INDUS: string (nullable = true)
 |-- CHAS: string (nullable = true)
 |-- NOX: string (nullable = true)
 |-- RM: string (nullable = true)
 |-- AGE: string (nullable = true)
 |-- DIS: string (nullable = true)
 |-- RAD: string (nullable = true)
 |-- TAX: string (nullable = true)
 |-- PT: string (nullable = true)
 |-- B: string (nullable = true)
 |-- LSTAT: string (nullable = true)
 |-- MV: string (nullable = true)



In [7]:
# Converting Datatypes to numeric

table = spark.sql("select * from boston")
final_df = table.select([col(c).cast("double") for c in table.columns])

In [8]:
final_df.printSchema()

root
 |-- CRIM: double (nullable = true)
 |-- ZN: double (nullable = true)
 |-- INDUS: double (nullable = true)
 |-- CHAS: double (nullable = true)
 |-- NOX: double (nullable = true)
 |-- RM: double (nullable = true)
 |-- AGE: double (nullable = true)
 |-- DIS: double (nullable = true)
 |-- RAD: double (nullable = true)
 |-- TAX: double (nullable = true)
 |-- PT: double (nullable = true)
 |-- B: double (nullable = true)
 |-- LSTAT: double (nullable = true)
 |-- MV: double (nullable = true)



In [9]:
# Correlation between all Independent and Dependent variables

for i in final_df.columns:
    if i != "MV":
      if not( isinstance(final_df.select(i).take(1)[0][0], six.string_types)):
          print( "Correlation to MV for ", i, final_df.stat.corr('MV', i))

Correlation to MV for  CRIM -0.3883046116575088
Correlation to MV for  ZN 0.36044534463752903
Correlation to MV for  INDUS -0.48372517128143383
Correlation to MV for  CHAS 0.17526017775291847
Correlation to MV for  NOX -0.4273207763683772
Correlation to MV for  RM 0.695359937127267
Correlation to MV for  AGE -0.37695456714288667
Correlation to MV for  DIS 0.24992873873512172
Correlation to MV for  RAD -0.3816262315669168
Correlation to MV for  TAX -0.46853593528654536
Correlation to MV for  PT -0.5077867038116085
Correlation to MV for  B 0.3334608226834164
Correlation to MV for  LSTAT -0.7376627294671615


In [10]:
vectorAssembler = VectorAssembler(inputCols = ['CRIM', 'ZN', 'INDUS','CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PT', 'B', 'LSTAT'], outputCol = 'features')
v_df = vectorAssembler.transform(final_df)
v_df = v_df.select(['features', 'MV'])
v_df.show(3)

+--------------------+-----------+
|            features|         MV|
+--------------------+-----------+
|[0.00632,18.0,2.3...|       24.0|
|[0.027310001,0.0,...|21.60000038|
|[0.02729,0.0,7.07...|34.70000076|
+--------------------+-----------+
only showing top 3 rows



In [11]:
splits = v_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [12]:
# Building Linear Regression Model

lr = LinearRegression(featuresCol = 'features', labelCol='MV', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)

In [13]:
train_score = lr_model.summary
print("Train Summary")
print("RMSE: %f" % train_score.rootMeanSquaredError)
print("r2: %f" % train_score.r2)

Train Summary
RMSE: 4.871809
r2: 0.731936


In [14]:
test_score = lr_model.evaluate(test_df)
print("Test Summary")
print("RMSE: %f" % test_score.rootMeanSquaredError)
print("r2: %f" % test_score.r2)

Test Summary
RMSE: 4.872162
r2: 0.681092
