# Read the given CSV file in a Hive table

/gs://boston_mkce/data/boston.csv/

/Create External table for boston/

CREATE EXTERNAL TABLE boston2 (crim float, zn float,indus float,chas float,nox float,rm float,age float,dis float,rad float,tax float,pt float,b float,lstate float,mv float)

ROW FORMAT DELIMITED

FIELDS TERMINATED BY ','

LINES TERMINATED BY '\n'

STORED AS TEXTFILE

LOCATION 'gs://boston_mkce/data/';

The above code is used to import the dataset to hive using cloud.

In [16]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import copy
import seaborn as sns
# pyspark 
import pyspark 
from pyspark.sql import SparkSession 
from pyspark.sql.functions import isnan, when, count, col
from pyspark.sql import SQLContext
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression

# import plotly
import plotly.express as px

# import linear model
from sklearn import linear_model
import scipy.stats as stats

sns.set()

In [17]:
#create instance of spark class 
spark=SparkSession.builder.appName('boston').getOrCreate()

In [18]:
spark

In [19]:
# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.enabled", "false")

# Read the data from Hive table as spark dataframe

In [43]:
#create spark dataframe of input csv file 
df=spark.read.csv("C:/Users/USER/Downloads/BigDataHadoop_SparkExam (2)/boston.csv",inferSchema=True,header=True) 
df.head(2) 

[Row(CRIM=0.00632, ZN=18.0, INDUS=2.309999943, CHAS=0, NOX=0.537999988, RM=6.574999809, AGE=65.19999695, DIS=4.090000153, RAD=1, TAX=296, PT=15.30000019, B=396.8999939, LSTAT=4.980000019, MV=24.0),
 Row(CRIM=0.027310001, ZN=0.0, INDUS=7.070000172, CHAS=0, NOX=0.469000012, RM=6.421000004, AGE=78.90000153, DIS=4.967100143, RAD=2, TAX=242, PT=17.79999924, B=396.8999939, LSTAT=9.140000343, MV=21.60000038)]

In [21]:
type(df)

pyspark.sql.dataframe.DataFrame

In [22]:
df.describe()

DataFrame[summary: string, CRIM: string, ZN: string, INDUS: string, CHAS: string, NOX: string, RM: string, AGE: string, DIS: string, RAD: string, TAX: string, PT: string, B: string, LSTAT: string, MV: string]

In [23]:
# print schema
df.printSchema()

root
 |-- CRIM: double (nullable = true)
 |-- ZN: double (nullable = true)
 |-- INDUS: double (nullable = true)
 |-- CHAS: integer (nullable = true)
 |-- NOX: double (nullable = true)
 |-- RM: double (nullable = true)
 |-- AGE: double (nullable = true)
 |-- DIS: double (nullable = true)
 |-- RAD: integer (nullable = true)
 |-- TAX: integer (nullable = true)
 |-- PT: double (nullable = true)
 |-- B: double (nullable = true)
 |-- LSTAT: double (nullable = true)
 |-- MV: double (nullable = true)



In [24]:
# Column Names
print("\nColumn names :", df.columns)
# Row Count
print("\nNumber of Rows :",df.count()) 
# Column Count
print("\nNumber of features :", len(df.columns))


Column names : ['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PT', 'B', 'LSTAT', 'MV']

Number of Rows : 506

Number of features : 14


In [25]:
data = df.drop('NOX')

# Get the correlation between dependent and independent variables

In [28]:
import six
for i in df.columns:
    if not( isinstance(df.select(i).take(1)[0][0], six.string_types)):
        print( "Correlation to MV for ", i, df.stat.corr('MV',i))

Correlation to MV for  CRIM -0.3883046116575088
Correlation to MV for  ZN 0.36044534463752903
Correlation to MV for  INDUS -0.48372517128143383
Correlation to MV for  CHAS 0.17526017775291847
Correlation to MV for  NOX -0.4273207763683772
Correlation to MV for  RM 0.695359937127267
Correlation to MV for  AGE -0.37695456714288667
Correlation to MV for  DIS 0.24992873873512172
Correlation to MV for  RAD -0.3816262315669168
Correlation to MV for  TAX -0.46853593528654536
Correlation to MV for  PT -0.5077867038116085
Correlation to MV for  B 0.3334608226834164
Correlation to MV for  LSTAT -0.7376627294671615
Correlation to MV for  MV 1.0


In [29]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols = ['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PT', 'B', 'LSTAT'], outputCol = 'features')

In [30]:
vhouse_df = vectorAssembler.transform(df)

In [31]:
vhouse_df =vhouse_df.select(['features', 'MV'])
vhouse_df.show(3)

+--------------------+-----------+
|            features|         MV|
+--------------------+-----------+
|[0.00632,18.0,2.3...|       24.0|
|[0.027310001,0.0,...|21.60000038|
|[0.02729,0.0,7.07...|34.70000076|
+--------------------+-----------+
only showing top 3 rows



# Build a linear regression model to predict house price

In [32]:
splits = vhouse_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [33]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='MV', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [-0.057884278993582644,0.024627352989761615,-0.03297376922363685,1.6041807383501399,-10.046504003008234,4.9155996062835055,0.0,-0.8189660256118617,0.0,-0.001352806748156607,-0.7136391819689502,0.00585861576108996,-0.3809046130693966]
Intercept: 16.971596906851424


# Evaluate the Linear Regression model by getting the RMSE and R-squared values

In [35]:
trainingSummary = lr_model.summary
print("RMSE Value : %f" % trainingSummary.rootMeanSquaredError)
print("R-Squared Value : %f" % trainingSummary.r2)

RMSE Value : 4.826398
R-Squared Value : 0.728430


In [36]:
train_df.describe().show()

+-------+------------------+
|summary|                MV|
+-------+------------------+
|  count|               349|
|   mean|22.594842436659025|
| stddev| 9.274809981969723|
|    min|               5.0|
|    max|              50.0|
+-------+------------------+



In [37]:
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","MV","features").show(5)
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="MV",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

+------------------+-----------+--------------------+
|        prediction|         MV|            features|
+------------------+-----------+--------------------+
| 30.01309945866215|       24.0|[0.00632,18.0,2.3...|
| 41.98969616280249|       50.0|[0.01501,90.0,1.2...|
| 26.29585081205599|30.10000038|[0.01709,90.0,2.0...|
|26.284540202946197|23.10000038|[0.0187,85.0,4.15...|
|26.633572811896702|       25.0|[0.028750001,28.0...|
+------------------+-----------+--------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.678908


In [38]:
test_result = lr_model.evaluate(test_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

Root Mean Squared Error (RMSE) on test data = 5.11172


In [39]:
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()

numIterations: 10
objectiveHistory: [0.49999999999999956, 0.43011532360272786, 0.22967057742208366, 0.2072437505762283, 0.17935380550381416, 0.17637624857836237, 0.17595774340477083, 0.17530356904605493, 0.1748087682376762, 0.1742553053995719, 0.17423947292439113]
+--------------------+
|           residuals|
+--------------------+
|  0.2534795558068481|
|  -5.932266351735606|
|  0.7519585969684357|
|   4.383133687931185|
|  0.3889921931520455|
|  10.672321655050354|
|-0.02286953150970...|
| -1.6394356470214646|
| -3.2840586053362877|
|   8.041120291657705|
|  1.0013719105655028|
|   6.785565344286184|
|  -1.857095524134742|
|   9.260673945570232|
|  -1.430282982550935|
|   5.090851352243739|
| -0.4525356715836075|
|  -9.291438215789999|
|  -4.187742369687125|
|  3.6879837886163784|
+--------------------+
only showing top 20 rows





In [40]:
predictions = lr_model.transform(test_df)
predictions.select("prediction","MV","features").show()

+------------------+-----------+--------------------+
|        prediction|         MV|            features|
+------------------+-----------+--------------------+
| 30.01309945866215|       24.0|[0.00632,18.0,2.3...|
| 41.98969616280249|       50.0|[0.01501,90.0,1.2...|
| 26.29585081205599|30.10000038|[0.01709,90.0,2.0...|
|26.284540202946197|23.10000038|[0.0187,85.0,4.15...|
|26.633572811896702|       25.0|[0.028750001,28.0...|
| 29.28848507768263|31.20000076|[0.03049,55.0,3.7...|
|19.739821781947786|       17.5|[0.031129999,0.0,...|
| 27.91750820477462|       22.0|[0.03537,34.0,6.0...|
|22.344271222752344|20.70000076|[0.037379999,0.0,...|
|27.010437382698406|       22.0|[0.03932,0.0,3.41...|
| 35.99897442192811|33.29999924|[0.040109999,80.0...|
| 27.01781019277925|22.89999962|[0.042029999,28.0...|
|25.196207302052464|20.60000038|[0.042939998,28.0...|
|24.340936714568013|19.39999962|[0.043790001,80.0...|
|30.896516408606033|24.79999924|[0.04417,70.0,2.2...|
|33.138769632421024|30.29999