### PROBLEM STATEMENTS

Q.1 Read the given CSV file in a Hive table
Perform the following tasks using PySpark
Q2. Read the data from Hive table as spark dataframe 
Q3. Get the correlation between dependent and independent 
variables

Q4. Build a linear regression model to predict house price 
Q5. Evaluate the Linear Regression model by getting the RMSE 
and R-squared values

### IMPORT NECESSARY PACKAGES AND LIBRARIES

In [4]:
import numpy as np
import pandas as pd
import pyspark

In [5]:
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf # @udf("integer") def myfunc(x,y): return x - y
from pyspark.sql import functions as F # stddev format_number date_format, dayofyear, when
from pyspark.sql.types import StructField, StringType, IntegerType, StructType


In [6]:
print([(x.__name__,x.__version__) for x in [np, pd, pyspark]])

spark = pyspark.sql.SparkSession.builder.appName('BOSTON').getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)
sc.setLogLevel("INFO")

[('numpy', '1.21.6'), ('pandas', '1.3.5'), ('pyspark', '3.2.1')]




In [8]:
appName= "boston_hive_pyspark"
master= "local"
spark = SparkSession.builder \
	.master(master).appName(appName).enableHiveSupport().getOrCreate()

In [9]:
from pyspark.ml.feature import (VectorAssembler, VectorIndexer,
                               OneHotEncoder, StringIndexer)


## Q.1 Read the given CSV file in a Hive table

In [11]:
df = spark.read.csv('/content/boston.csv',header=True,inferSchema=True)
print(df.count())
df.show()
df.write.saveAsTable("boston_tb")

506
+-----------+----+-----------+----+-----------+-----------+-----------+-----------+---+---+-----------+-----------+-----------+-----------+
|       CRIM|  ZN|      INDUS|CHAS|        NOX|         RM|        AGE|        DIS|RAD|TAX|         PT|          B|      LSTAT|         MV|
+-----------+----+-----------+----+-----------+-----------+-----------+-----------+---+---+-----------+-----------+-----------+-----------+
|    0.00632|18.0|2.309999943|   0|0.537999988|6.574999809|65.19999695|4.090000153|  1|296|15.30000019|396.8999939|4.980000019|       24.0|
|0.027310001| 0.0|7.070000172|   0|0.469000012|6.421000004|78.90000153|4.967100143|  2|242|17.79999924|396.8999939|9.140000343|21.60000038|
|    0.02729| 0.0|7.070000172|   0|0.469000012|7.184999943|61.09999847|4.967100143|  2|242|17.79999924|392.8299866| 4.03000021|34.70000076|
|0.032370001| 0.0|2.180000067|   0|0.458000004|6.998000145|45.79999924|6.062200069|  3|222|18.70000076|394.6300049|2.940000057|33.40000153|
|0.069049999| 0.

##  Q.2 Read the data from Hive table as spark dataframe

In [12]:
df1=spark.sql("select * from boston_tb")

In [13]:
df.show()

+-----------+----+-----------+----+-----------+-----------+-----------+-----------+---+---+-----------+-----------+-----------+-----------+
|       CRIM|  ZN|      INDUS|CHAS|        NOX|         RM|        AGE|        DIS|RAD|TAX|         PT|          B|      LSTAT|         MV|
+-----------+----+-----------+----+-----------+-----------+-----------+-----------+---+---+-----------+-----------+-----------+-----------+
|    0.00632|18.0|2.309999943|   0|0.537999988|6.574999809|65.19999695|4.090000153|  1|296|15.30000019|396.8999939|4.980000019|       24.0|
|0.027310001| 0.0|7.070000172|   0|0.469000012|6.421000004|78.90000153|4.967100143|  2|242|17.79999924|396.8999939|9.140000343|21.60000038|
|    0.02729| 0.0|7.070000172|   0|0.469000012|7.184999943|61.09999847|4.967100143|  2|242|17.79999924|392.8299866| 4.03000021|34.70000076|
|0.032370001| 0.0|2.180000067|   0|0.458000004|6.998000145|45.79999924|6.062200069|  3|222|18.70000076|394.6300049|2.940000057|33.40000153|
|0.069049999| 0.0|2.

### PRINT THE SCHEMA

In [14]:
df.printSchema()

root
 |-- CRIM: double (nullable = true)
 |-- ZN: double (nullable = true)
 |-- INDUS: double (nullable = true)
 |-- CHAS: integer (nullable = true)
 |-- NOX: double (nullable = true)
 |-- RM: double (nullable = true)
 |-- AGE: double (nullable = true)
 |-- DIS: double (nullable = true)
 |-- RAD: integer (nullable = true)
 |-- TAX: integer (nullable = true)
 |-- PT: double (nullable = true)
 |-- B: double (nullable = true)
 |-- LSTAT: double (nullable = true)
 |-- MV: double (nullable = true)



### CHECKING FOR MISSING VALUES

In [15]:
from pyspark.sql.functions import col,sum
df.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df.columns)).show()

+----+---+-----+----+---+---+---+---+---+---+---+---+-----+---+
|CRIM| ZN|INDUS|CHAS|NOX| RM|AGE|DIS|RAD|TAX| PT|  B|LSTAT| MV|
+----+---+-----+----+---+---+---+---+---+---+---+---+-----+---+
|   0|  0|    0|   0|  0|  0|  0|  0|  0|  0|  0|  0|    0|  0|
+----+---+-----+----+---+---+---+---+---+---+---+---+-----+---+



In [16]:
print(df.columns)

['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PT', 'B', 'LSTAT', 'MV']


## Q3. Get the correlation between dependent and independent variables

In [19]:
import six
for i in df.columns:
    if not( isinstance(df.select(i).take(1)[0][0], six.string_types)):
        print( "Correlation to MV for ", i, df.stat.corr('MV',i))

Correlation to MV for  CRIM -0.3883046116575088
Correlation to MV for  ZN 0.36044534463752903
Correlation to MV for  INDUS -0.48372517128143383
Correlation to MV for  CHAS 0.17526017775291847
Correlation to MV for  NOX -0.4273207763683772
Correlation to MV for  RM 0.695359937127267
Correlation to MV for  AGE -0.37695456714288667
Correlation to MV for  DIS 0.24992873873512172
Correlation to MV for  RAD -0.3816262315669168
Correlation to MV for  TAX -0.46853593528654536
Correlation to MV for  PT -0.5077867038116085
Correlation to MV for  B 0.3334608226834164
Correlation to MV for  LSTAT -0.7376627294671615
Correlation to MV for  MV 1.0


In [22]:
# Creating correlation matrix
from pyspark.ml.stat import Correlation
vector_col = "corr_features"
assembler = VectorAssembler(inputCols=df.columns, outputCol=vector_col)
df_vector = assembler.transform(df).select(vector_col)
matrix = Correlation.corr(df_vector, vector_col)



In [23]:
# Printing correlation matrix
matrix.collect()[0]["pearson({})".format(vector_col)].values

array([ 1.        , -0.20046922,  0.40658343, -0.05589158,  0.42097173,
       -0.2192467 ,  0.35273425, -0.37967009,  0.62550515,  0.58276431,
        0.28994564, -0.38506395,  0.45562148, -0.38830461, -0.20046922,
        1.        , -0.53382819, -0.04269672, -0.51660371,  0.31199059,
       -0.56953734,  0.66440822, -0.31194783, -0.31456332, -0.39167853,
        0.17552031, -0.41299458,  0.36044534,  0.40658343, -0.53382819,
        1.        ,  0.06293803,  0.76365146, -0.39167586,  0.64477851,
       -0.70802699,  0.5951293 ,  0.7207602 ,  0.38324764, -0.35697654,
        0.60379972, -0.48372517, -0.05589158, -0.04269672,  0.06293803,
        1.        ,  0.0912028 ,  0.09125123,  0.08651777, -0.09917578,
       -0.00736824, -0.03558652, -0.12151517,  0.04878849, -0.0539293 ,
        0.17526018,  0.42097173, -0.51660371,  0.76365146,  0.0912028 ,
        1.        , -0.30218816,  0.73147011, -0.76923012,  0.61144056,
        0.66802321,  0.18893272, -0.38005064,  0.59087892, -0.42

In [24]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols = ['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PT', 'B', 'LSTAT'], outputCol = 'features')
df2 = vectorAssembler.transform(df)
df2 = df2.select(['features', 'MV'])
df2.show(5)

+--------------------+-----------+
|            features|         MV|
+--------------------+-----------+
|[0.00632,18.0,2.3...|       24.0|
|[0.027310001,0.0,...|21.60000038|
|[0.02729,0.0,7.07...|34.70000076|
|[0.032370001,0.0,...|33.40000153|
|[0.069049999,0.0,...|36.20000076|
+--------------------+-----------+
only showing top 5 rows



### SPLITTING THE DATA 

In [25]:
splits = df2.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

## Q4. Build a linear regression model to predict house price

In [26]:

from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='MV', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [-0.06380798758521199,0.015051712074533218,0.0,1.0734481926495187,-6.907659781462847,5.264771622660271,-0.0016028877107280137,-0.519298700938574,0.0,-0.002153312116481696,-0.7955418052727771,0.0057563377948915,-0.3810391070951662]
Intercept: 13.480804950666327


## Q5. Evaluate the Linear Regression model by getting the RMSE and R-squared values

In [27]:
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 4.283744
r2: 0.771884


In [31]:
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","MV","features").show(5)
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="MV",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

+------------------+-----------+--------------------+
|        prediction|         MV|            features|
+------------------+-----------+--------------------+
| 31.81995659667394|32.90000153|[0.01778,95.0,1.4...|
| 26.46364203550829|23.10000038|[0.0187,85.0,4.15...|
|27.318877934302243|       33.0|[0.019509999,17.5...|
| 37.28534019463267|42.29999924|[0.02177,82.5,2.0...|
|31.433339186432768|34.70000076|[0.02729,0.0,7.07...|
+------------------+-----------+--------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.569139


In [29]:
test_result = lr_model.evaluate(test_df)
print("RMSE on test data = %g" % test_result.rootMeanSquaredError)

RMSE on test data = 6.36461


In [30]:
predictions = lr_model.transform(test_df)
predictions.select("prediction","MV","features").show()

+------------------+-----------+--------------------+
|        prediction|         MV|            features|
+------------------+-----------+--------------------+
| 31.81995659667394|32.90000153|[0.01778,95.0,1.4...|
| 26.46364203550829|23.10000038|[0.0187,85.0,4.15...|
|27.318877934302243|       33.0|[0.019509999,17.5...|
| 37.28534019463267|42.29999924|[0.02177,82.5,2.0...|
|31.433339186432768|34.70000076|[0.02729,0.0,7.07...|
|  26.6351172340554|26.60000038|[0.02899,40.0,1.2...|
| 29.42935621711289|31.20000076|[0.03049,55.0,3.7...|
| 31.00508748038974|34.90000153|[0.031500001,95.0...|
| 27.96939977049282|24.10000038|[0.034449998,82.5...|
| 30.47730670977407|       28.5|[0.035020001,80.0...|
|  27.9190820801373|       22.0|[0.03537,34.0,6.0...|
|24.167260198926222|22.89999962|[0.03551,25.0,4.8...|
|28.746262102743863|27.89999962|[0.036150001,80.0...|
|26.037088806268564|23.20000076|[0.038710002,52.5...|
| 26.72473969886656|22.89999962|[0.042029999,28.0...|
| 26.87653228798501|24.79999

### COMPLETED