# Big Data Hadoop & Spark Exam 

In [None]:
# Installing Pyspark
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
# Importing Necessary Libraries
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
appName= "hive_pyspark"
master= "local"

In [None]:
# Creting Pyspark Session
spark = SparkSession.builder \
	.master(master).appName(appName).enableHiveSupport().getOrCreate()

In [None]:
# Checking for Pre-existing Databases
df=spark.sql("show databases")
df.show()

+---------+
|namespace|
+---------+
|  default|
+---------+



### Q.1 Read the given CSV file in a Hive table

In [None]:
# Reading the given CSV file in a Hive table
datafile=spark.read.csv("boston.csv",header=True)
datafile.show(5)
datafile.write.saveAsTable("boston_table")

+-----------+---+-----------+----+-----------+-----------+-----------+-----------+---+---+-----------+-----------+-----------+-----------+
|       CRIM| ZN|      INDUS|CHAS|        NOX|         RM|        AGE|        DIS|RAD|TAX|         PT|          B|      LSTAT|         MV|
+-----------+---+-----------+----+-----------+-----------+-----------+-----------+---+---+-----------+-----------+-----------+-----------+
|    0.00632| 18|2.309999943|   0|0.537999988|6.574999809|65.19999695|4.090000153|  1|296|15.30000019|396.8999939|4.980000019|         24|
|0.027310001|  0|7.070000172|   0|0.469000012|6.421000004|78.90000153|4.967100143|  2|242|17.79999924|396.8999939|9.140000343|21.60000038|
|    0.02729|  0|7.070000172|   0|0.469000012|7.184999943|61.09999847|4.967100143|  2|242|17.79999924|392.8299866| 4.03000021|34.70000076|
|0.032370001|  0|2.180000067|   0|0.458000004|6.998000145|45.79999924|6.062200069|  3|222|18.70000076|394.6300049|2.940000057|33.40000153|
|0.069049999|  0|2.18000006

### Q.2 Read the data from Hive table as spark dataframe

In [None]:
# Reading the data from Hive table as spark dataframe
df=spark.sql("select * from boston_table")
df.show()

+-----------+----+-----------+----+-----------+-----------+-----------+-----------+---+---+-----------+-----------+-----------+-----------+
|       CRIM|  ZN|      INDUS|CHAS|        NOX|         RM|        AGE|        DIS|RAD|TAX|         PT|          B|      LSTAT|         MV|
+-----------+----+-----------+----+-----------+-----------+-----------+-----------+---+---+-----------+-----------+-----------+-----------+
|    0.00632|  18|2.309999943|   0|0.537999988|6.574999809|65.19999695|4.090000153|  1|296|15.30000019|396.8999939|4.980000019|         24|
|0.027310001|   0|7.070000172|   0|0.469000012|6.421000004|78.90000153|4.967100143|  2|242|17.79999924|396.8999939|9.140000343|21.60000038|
|    0.02729|   0|7.070000172|   0|0.469000012|7.184999943|61.09999847|4.967100143|  2|242|17.79999924|392.8299866| 4.03000021|34.70000076|
|0.032370001|   0|2.180000067|   0|0.458000004|6.998000145|45.79999924|6.062200069|  3|222|18.70000076|394.6300049|2.940000057|33.40000153|
|0.069049999|   0|2.

### Q.3 Get the correlation between dependent and independent variables


In [None]:
#Get All column names and it's types
for col in df.dtypes:
    print(col[0]+" , "+col[1])

CRIM , string
ZN , string
INDUS , string
CHAS , string
NOX , string
RM , string
AGE , string
DIS , string
RAD , string
TAX , string
PT , string
B , string
LSTAT , string
MV , string


In [None]:
# Distinct() function 
df.select("INDUS").distinct().show(truncate=False)

+-----------+
|INDUS      |
+-----------+
|2.970000029|
|3.440000057|
|2.460000038|
|2.930000067|
|4.389999866|
|4.949999809|
|1.470000029|
|5.130000114|
|25.64999962|
|3.75       |
|2.680000067|
|4.150000095|
|4.050000191|
|2.890000105|
|2.950000048|
|0.460000008|
|6.409999847|
|11.93000031|
|10.01000023|
|27.73999977|
+-----------+
only showing top 20 rows



In [None]:
# Changing the Data Type of the columns to double
from pyspark.sql.functions import col
df = df.select([col(column).cast('double') for column in df.columns])

In [None]:
#Get All column names and it's types
for col in df.dtypes:
    print(col[0]+" , "+col[1])

CRIM , double
ZN , double
INDUS , double
CHAS , double
NOX , double
RM , double
AGE , double
DIS , double
RAD , double
TAX , double
PT , double
B , double
LSTAT , double
MV , double


In [None]:
# convert to vector column first
vector_col = "corr_features"
assembler = VectorAssembler(inputCols=df.columns, outputCol=vector_col)
df_vector = assembler.transform(df).select(vector_col)

In [None]:
# Creating correlation matrix
matrix = Correlation.corr(df_vector, vector_col)



In [None]:
# Printing correlation matrix
matrix.collect()[0]["pearson({})".format(vector_col)].values

array([ 1.        , -0.20046922,  0.40658343, -0.05589158,  0.42097173,
       -0.2192467 ,  0.35273425, -0.37967009,  0.62550515,  0.58276431,
        0.28994564, -0.38506395,  0.45562148, -0.38830461, -0.20046922,
        1.        , -0.53382819, -0.04269672, -0.51660371,  0.31199059,
       -0.56953734,  0.66440822, -0.31194783, -0.31456332, -0.39167853,
        0.17552031, -0.41299458,  0.36044534,  0.40658343, -0.53382819,
        1.        ,  0.06293803,  0.76365146, -0.39167586,  0.64477851,
       -0.70802699,  0.5951293 ,  0.7207602 ,  0.38324764, -0.35697654,
        0.60379972, -0.48372517, -0.05589158, -0.04269672,  0.06293803,
        1.        ,  0.0912028 ,  0.09125123,  0.08651777, -0.09917578,
       -0.00736824, -0.03558652, -0.12151517,  0.04878849, -0.0539293 ,
        0.17526018,  0.42097173, -0.51660371,  0.76365146,  0.0912028 ,
        1.        , -0.30218816,  0.73147011, -0.76923012,  0.61144056,
        0.66802321,  0.18893272, -0.38005064,  0.59087892, -0.42

### Q.4 Build a linear regression model to predict house price

In [None]:
# Using Vector assembler to transform each row into a vector
vectorAssembler = VectorAssembler(inputCols = ['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PT', 'B', 'LSTAT'], outputCol = 'features')
vhouse_df = vectorAssembler.transform(df)
vhouse_df = vhouse_df.select(['features', 'MV'])
vhouse_df.show(3)

+--------------------+-----------+
|            features|         MV|
+--------------------+-----------+
|[0.00632,18.0,2.3...|       24.0|
|[0.027310001,0.0,...|21.60000038|
|[0.02729,0.0,7.07...|34.70000076|
+--------------------+-----------+
only showing top 3 rows



In [None]:
# Spliting the dataset into training and testing datasets
splits = vhouse_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [None]:
# Building a Linear Regression Model
lr = LinearRegression(featuresCol = 'features', labelCol='MV', maxIter=10, regParam=0.3, elasticNetParam=0.8)
# fitting the Linear Regression Model
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [-0.028395854681958883,0.02930461876789947,0.0,3.066308621998608,-6.61937643464388,4.378778328923477,0.0,-0.6737944552172304,0.0,0.0,-0.8144944626041648,0.008190891682473573,-0.4670282611821067]
Intercept: 19.060284096681574


### Q.5 Evaluate the Linear Regression model by getting the RMSE and R-squared values


In [None]:
# Printing the RMSE & R2 Values of the model
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 4.974147
r2: 0.717410


In [None]:
# Comparing the Predicted and Visually
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","MV","features").show(5)
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="MV",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

+------------------+-----------+--------------------+
|        prediction|         MV|            features|
+------------------+-----------+--------------------+
|28.357129025153956|       22.0|[0.01096,55.0,2.2...|
|27.061294656153194|24.70000076|[0.02055,85.0,0.7...|
| 31.91531648842608|31.10000038|[0.02187,60.0,2.9...|
| 28.48817994084186|23.89999962|[0.025429999,55.0...|
|30.907242119279495|34.70000076|[0.02729,0.0,7.07...|
+------------------+-----------+--------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.705648


In [None]:
# Print RMSE for Test Data
test_result = lr_model.evaluate(test_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

Root Mean Squared Error (RMSE) on test data = 4.7291


In [None]:
# Printing Residuals
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()

numIterations: 10
objectiveHistory: [0.5000000000000004, 0.43282548495909523, 0.23128639178979948, 0.20752552512685604, 0.1824042012619024, 0.18029326775735582, 0.1798083075900671, 0.17895128600710877, 0.17834582557881745, 0.17800162498800387, 0.1779481047172851]




+--------------------+
|           residuals|
+--------------------+
|  -6.524409258343688|
|-0.06802440968493784|
|  0.5309288795108742|
|  4.0710759045414875|
|  0.3528128429231998|
|  10.598795073681913|
| -0.9808973620652957|
| -2.1243139137715445|
|  -4.014095159352905|
|   6.476281116815855|
|   7.779037114296727|
|  2.6260403177277674|
|  0.4150482418684476|
|  -4.056614777358806|
|   6.977447813010258|
| -1.8467051234294303|
|   9.299200548530358|
|   4.658942486905211|
|  -9.749209443383936|
| -3.6086773073277953|
+--------------------+
only showing top 20 rows



In [None]:
# Checking the predictions
predictions = lr_model.transform(test_df)
predictions.select("prediction","MV","features").show()

+------------------+-----------+--------------------+
|        prediction|         MV|            features|
+------------------+-----------+--------------------+
|28.357129025153956|       22.0|[0.01096,55.0,2.2...|
|27.061294656153194|24.70000076|[0.02055,85.0,0.7...|
| 31.91531648842608|31.10000038|[0.02187,60.0,2.9...|
| 28.48817994084186|23.89999962|[0.025429999,55.0...|
|30.907242119279495|34.70000076|[0.02729,0.0,7.07...|
| 29.98081591464804|30.79999924|[0.027629999,75.0...|
| 19.99652170388329|       17.5|[0.031129999,0.0,...|
|21.756491372777685|20.60000038|[0.033059999,0.0,...|
|20.045994844578853|       19.5|[0.03427,0.0,5.19...|
| 31.33575603506175|       28.5|[0.035020001,80.0...|
|23.885121157185814|20.89999962|[0.03548,80.0,3.6...|
|24.553913159936528|22.89999962|[0.03551,25.0,4.8...|
|32.624166492954885|35.40000153|[0.037050001,20.0...|
| 26.57015088220502|23.20000076|[0.038710002,52.5...|
|28.093948955447303|       28.0|[0.041129999,25.0...|
|27.146989711980428|22.89999