### Big Data Hadoop & Spark Exam

### Problem Statement:
### Perform the following tasks using PySpark
### Q.1 Read the given CSV file in a Hive table
### Q2. Read the data from Hive table as spark dataframe
### Q3. Get the correlation between dependent and independent variables
### Q4. Build a linear regression model to predict house price
### Q5. Evaluate the Linear Regression model by getting the RMSE and R-squared values

### Ignoring Warnings

In [1]:
from warnings import filterwarnings
filterwarnings('ignore')

### Creating a Spark Session

In [2]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
sc= SparkContext()
sqlContext = SQLContext(sc)

### Reading the dataset and converting it into a hive table

In [3]:
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('boston.csv')
df.take(1)

[Row(CRIM=0.00632, ZN=18.0, INDUS=2.309999943, CHAS=0, NOX=0.537999988, RM=6.574999809, AGE=65.19999695, DIS=4.090000153, RAD=1, TAX=296, PT=15.30000019, B=396.8999939, LSTAT=4.980000019, MV=24.0)]

import pandas as pd
numeric_features = [t[0] for t in df.dtypes if t[1] == 'int' or t[1] == 'double']
sampled_data = df.select(numeric_features).sample(False, 0.8).toPandas()
axs = pd.plotting.scatter_matrix(sampled_data, figsize=(10, 10))
n = len(sampled_data.columns)
for i in range(n):
    v = axs[i, 0]
    v.yaxis.label.set_rotation(0)
    v.yaxis.label.set_ha('right')
    v.set_yticks(())
    h = axs[n-1, i]
    h.xaxis.label.set_rotation(90)
    h.set_xticks(())

### Correlations between Variables

In [4]:
import six
for i in df.columns:
    if not( isinstance(df.select(i).take(1)[0][0], six.string_types)):
        print( "Correlation to MV for ", i, df.stat.corr('MV',i))

Correlation to MV for  CRIM -0.3883046116575088
Correlation to MV for  ZN 0.36044534463752903
Correlation to MV for  INDUS -0.48372517128143383
Correlation to MV for  CHAS 0.17526017775291847
Correlation to MV for  NOX -0.4273207763683772
Correlation to MV for  RM 0.695359937127267
Correlation to MV for  AGE -0.37695456714288667
Correlation to MV for  DIS 0.24992873873512172
Correlation to MV for  RAD -0.3816262315669168
Correlation to MV for  TAX -0.46853593528654536
Correlation to MV for  PT -0.5077867038116085
Correlation to MV for  B 0.3334608226834164
Correlation to MV for  LSTAT -0.7376627294671615
Correlation to MV for  MV 1.0


In [5]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols = ['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PT', 'B', 'LSTAT'], outputCol = 'features')
vdf = vectorAssembler.transform(df)
vdf = vdf.select(['features', 'MV'])
vdf.show(3)

+--------------------+-----------+
|            features|         MV|
+--------------------+-----------+
|[0.00632,18.0,2.3...|       24.0|
|[0.027310001,0.0,...|21.60000038|
|[0.02729,0.0,7.07...|34.70000076|
+--------------------+-----------+
only showing top 3 rows



### Train Test Split

In [6]:
splits = vdf.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

### Linear Regression Model

In [7]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='MV', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [-0.04448710389959217,0.009971542914437687,-0.0335038036498445,2.9165655866061764,-2.9443542801818454,3.6290521011439023,0.0,-0.714993622931146,0.026216352820383728,0.0,-0.7866654549852938,0.010919602503659176,-0.6181908570025751]
Intercept: 22.593611583368922


### Evaluvation of the model using RMSE and R2

In [8]:
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 4.881559
r2: 0.712035


### Predicting the Test Data

In [9]:
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","MV","features").show(5)
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="MV",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

+------------------+-----------+--------------------+
|        prediction|         MV|            features|
+------------------+-----------+--------------------+
|  31.2936891196482|       24.0|[0.00632,18.0,2.3...|
|31.585057653339263|32.70000076|[0.01301,35.0,1.5...|
|29.751673813545395|35.40000153|[0.01311,90.0,1.2...|
| 30.59167105337913|29.10000038|[0.01439,60.0,2.9...|
| 38.74669131239574|       50.0|[0.020090001,95.0...|
+------------------+-----------+--------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.724054


### RMSE 

In [10]:
test_result = lr_model.evaluate(test_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

Root Mean Squared Error (RMSE) on test data = 4.93718


In [11]:
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()

numIterations: 10
objectiveHistory: [0.5, 0.4354732271596991, 0.24732703438595374, 0.2239625484138751, 0.19292632169256632, 0.18800192683188915, 0.1866413088184445, 0.1851980568153613, 0.1846183570636647, 0.18398027249446722, 0.18334831937070936]
+-------------------+
|          residuals|
+-------------------+
|  2.040931630960319|
| -5.327131773349777|
| 2.1012346165965923|
| 12.081301831438381|
| 0.3798948082596887|
| -2.952884346324197|
|  8.576031950676871|
|  9.407891103839006|
|  3.964619815953057|
|  2.023764052809625|
| -2.859448851911935|
|  7.553404751823816|
| 0.3459752069061217|
|-1.3136132497002109|
| 6.1572769419371625|
|  3.354056880425915|
| -3.858834526286202|
| 1.4147156918201311|
|-2.1967597888027264|
|  0.873357539968449|
+-------------------+
only showing top 20 rows



In [12]:
predictions = lr_model.transform(test_df)
predictions.select("prediction","MV","features").show()

+------------------+-----------+--------------------+
|        prediction|         MV|            features|
+------------------+-----------+--------------------+
|  31.2936891196482|       24.0|[0.00632,18.0,2.3...|
|31.585057653339263|32.70000076|[0.01301,35.0,1.5...|
|29.751673813545395|35.40000153|[0.01311,90.0,1.2...|
| 30.59167105337913|29.10000038|[0.01439,60.0,2.9...|
| 38.74669131239574|       50.0|[0.020090001,95.0...|
|31.082838331555124|31.10000038|[0.02187,60.0,2.9...|
| 26.68630293182535|       16.5|[0.024979999,0.0,...|
|27.972070950008696|23.89999962|[0.025429999,55.0...|
| 26.62196482199333|28.70000076|[0.029850001,0.0,...|
|20.304116117133706|       18.5|[0.030409999,0.0,...|
|29.584750390794248|31.20000076|[0.03049,55.0,3.7...|
| 30.09201779628995|33.40000153|[0.032370001,0.0,...|
| 22.75497419359761|20.60000038|[0.033059999,0.0,...|
|29.091127273078243|       23.5|[0.035840001,80.0...|
|28.845342524393196|27.89999962|[0.036150001,80.0...|
| 33.65174348996041|34.59999