#**Big Data Hadoop and Spark**

---



In [1]:
pip install pyspark   #installing pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
import numpy as np
import pandas as pd
import pyspark
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf # @udf("integer") def myfunc(x,y): return x - y
from pyspark.sql import functions as F # stddev format_number date_format, dayofyear, when
from pyspark.sql.types import StructField, StringType, IntegerType, StructType

print([(x.__name__,x.__version__) for x in [np, pd, pyspark]])

spark = pyspark.sql.SparkSession.builder.appName('boston').getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)
sc.setLogLevel("INFO")

[('numpy', '1.21.6'), ('pandas', '1.3.5'), ('pyspark', '3.2.1')]




## **Reading the csv file in Hive table**

---



In [3]:
df = spark.read.csv('boston.csv',header=True,inferSchema=True)
print(df.count())
df.show()

506
+-----------+----+-----------+----+-----------+-----------+-----------+-----------+---+---+-----------+-----------+-----------+-----------+
|       CRIM|  ZN|      INDUS|CHAS|        NOX|         RM|        AGE|        DIS|RAD|TAX|         PT|          B|      LSTAT|         MV|
+-----------+----+-----------+----+-----------+-----------+-----------+-----------+---+---+-----------+-----------+-----------+-----------+
|    0.00632|18.0|2.309999943|   0|0.537999988|6.574999809|65.19999695|4.090000153|  1|296|15.30000019|396.8999939|4.980000019|       24.0|
|0.027310001| 0.0|7.070000172|   0|0.469000012|6.421000004|78.90000153|4.967100143|  2|242|17.79999924|396.8999939|9.140000343|21.60000038|
|    0.02729| 0.0|7.070000172|   0|0.469000012|7.184999943|61.09999847|4.967100143|  2|242|17.79999924|392.8299866| 4.03000021|34.70000076|
|0.032370001| 0.0|2.180000067|   0|0.458000004|6.998000145|45.79999924|6.062200069|  3|222|18.70000076|394.6300049|2.940000057|33.40000153|
|0.069049999| 0.

In [5]:
df.write.saveAsTable('bostonhive_table') # saving the hive table

In [6]:
df.printSchema()   # printing the schema 

root
 |-- CRIM: double (nullable = true)
 |-- ZN: double (nullable = true)
 |-- INDUS: double (nullable = true)
 |-- CHAS: integer (nullable = true)
 |-- NOX: double (nullable = true)
 |-- RM: double (nullable = true)
 |-- AGE: double (nullable = true)
 |-- DIS: double (nullable = true)
 |-- RAD: integer (nullable = true)
 |-- TAX: integer (nullable = true)
 |-- PT: double (nullable = true)
 |-- B: double (nullable = true)
 |-- LSTAT: double (nullable = true)
 |-- MV: double (nullable = true)



## **Reading the rows from the table 'bostonhive_table' in hive using pyspark and storing it in the Dataframe**

---



In [8]:
df1=spark.sql("select * from bostonhive_table")
df1.show()

+-----------+----+-----------+----+-----------+-----------+-----------+-----------+---+---+-----------+-----------+-----------+-----------+
|       CRIM|  ZN|      INDUS|CHAS|        NOX|         RM|        AGE|        DIS|RAD|TAX|         PT|          B|      LSTAT|         MV|
+-----------+----+-----------+----+-----------+-----------+-----------+-----------+---+---+-----------+-----------+-----------+-----------+
|    0.00632|18.0|2.309999943|   0|0.537999988|6.574999809|65.19999695|4.090000153|  1|296|15.30000019|396.8999939|4.980000019|       24.0|
|0.027310001| 0.0|7.070000172|   0|0.469000012|6.421000004|78.90000153|4.967100143|  2|242|17.79999924|396.8999939|9.140000343|21.60000038|
|    0.02729| 0.0|7.070000172|   0|0.469000012|7.184999943|61.09999847|4.967100143|  2|242|17.79999924|392.8299866| 4.03000021|34.70000076|
|0.032370001| 0.0|2.180000067|   0|0.458000004|6.998000145|45.79999924|6.062200069|  3|222|18.70000076|394.6300049|2.940000057|33.40000153|
|0.069049999| 0.0|2.

In [9]:
from pyspark.sql.functions import isnull, when, count, col

df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()

+----+---+-----+----+---+---+---+---+---+---+---+---+-----+---+
|CRIM| ZN|INDUS|CHAS|NOX| RM|AGE|DIS|RAD|TAX| PT|  B|LSTAT| MV|
+----+---+-----+----+---+---+---+---+---+---+---+---+-----+---+
|   0|  0|    0|   0|  0|  0|  0|  0|  0|  0|  0|  0|    0|  0|
+----+---+-----+----+---+---+---+---+---+---+---+---+-----+---+



In [10]:
from pyspark.ml.feature import (VectorAssembler, VectorIndexer)

In [11]:
print(df.columns)

['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PT', 'B', 'LSTAT', 'MV']


##**Displaying the Correaltion between dependent and independent variable**

---
 Here the Independent variables are: ['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PT', 'B', 'LSTAT']

 The dependent (Target) variable is :  ['MV']


In [12]:
for i in df.columns:
    print( "Correlation to MV for  ", i,":", df.stat.corr('MV',i))


Correlation to MV for   CRIM : -0.3883046116575088
Correlation to MV for   ZN : 0.36044534463752903
Correlation to MV for   INDUS : -0.48372517128143383
Correlation to MV for   CHAS : 0.17526017775291847
Correlation to MV for   NOX : -0.4273207763683772
Correlation to MV for   RM : 0.695359937127267
Correlation to MV for   AGE : -0.37695456714288667
Correlation to MV for   DIS : 0.24992873873512172
Correlation to MV for   RAD : -0.3816262315669168
Correlation to MV for   TAX : -0.46853593528654536
Correlation to MV for   PT : -0.5077867038116085
Correlation to MV for   B : 0.3334608226834164
Correlation to MV for   LSTAT : -0.7376627294671615
Correlation to MV for   MV : 1.0


***From the above, we can see that the variable [' RM '] has the strongest positive correlation with [' MV ']*** 

***Similarly,the variable ['LSTAT'] has the strongest negative correlation with [' MV ']*** 

In [13]:
from pyspark.ml.feature import VectorAssembler 
vectorAssembler = VectorAssembler(inputCols = ['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PT', 'B', 'LSTAT'], outputCol = 'features')
boston_df = vectorAssembler.transform(df)
boston_df = boston_df.select(['features', 'MV'])
boston_df.show()

+--------------------+-----------+
|            features|         MV|
+--------------------+-----------+
|[0.00632,18.0,2.3...|       24.0|
|[0.027310001,0.0,...|21.60000038|
|[0.02729,0.0,7.07...|34.70000076|
|[0.032370001,0.0,...|33.40000153|
|[0.069049999,0.0,...|36.20000076|
|[0.029850001,0.0,...|28.70000076|
|[0.088289998,12.5...|22.89999962|
|[0.144549996,12.5...|27.10000038|
|[0.211239994,12.5...|       16.5|
|[0.170039997,12.5...|18.89999962|
|[0.224889994,12.5...|       15.0|
|[0.117470004,12.5...|18.89999962|
|[0.093780003,12.5...|21.70000076|
|[0.629760027,0.0,...|20.39999962|
|[0.637960017,0.0,...|18.20000076|
|[0.627390027,0.0,...|19.89999962|
|[1.053930044,0.0,...|23.10000038|
|[0.784200013,0.0,...|       17.5|
|[0.802709997,0.0,...|20.20000076|
|[0.725799978,0.0,...|18.20000076|
+--------------------+-----------+
only showing top 20 rows



## **Building Linear Regression model**

---



In [14]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='MV', maxIter=10, regParam=0.3, elasticNetParam=0.8)

In [15]:
train, test = boston_df.randomSplit([0.7, 0.3])  #splitting the train and test data

In [16]:
lr_model = lr.fit(train)  # fitting the regression model 

In [17]:
results = lr_model.transform(train)   # Displaying the model prediciton with that of actual data on train data
results['MV','prediction'].show(5) 

+-----------+------------------+
|         MV|        prediction|
+-----------+------------------+
|32.20000076| 30.03863948750255|
|       22.0|27.215511941085605|
|32.70000076| 31.68736533051774|
|       50.0| 37.45116579509236|
|31.60000038| 30.49350030635258|
+-----------+------------------+
only showing top 5 rows



In [18]:
results1 = lr_model.transform(test) # Actual Value  Vs Model Prediction on Test data
results1['MV','prediction'].show(5)

+-----------+------------------+
|         MV|        prediction|
+-----------+------------------+
|       24.0| 30.43248782915068|
|35.40000153| 29.58007460266638|
|18.89999962|16.828862159892076|
|29.10000038|  30.0861746640571|
|       24.5| 27.04314280398002|
+-----------+------------------+
only showing top 5 rows



In [19]:
print("Coefficients: " + str(lr_model.coefficients)) 
print("Intercept: " + str(lr_model.intercept))

Coefficients: [-0.07186338380062557,0.0007675323041936928,-0.02846077136835298,1.8379800972094489,-5.120529809021485,4.208996973965753,0.0,-0.5605417304877379,0.0,-0.00022744214235077335,-0.7632050624255082,0.00391231429257904,-0.5179513593119595]
Intercept: 20.629137978468176


In [20]:
from pyspark.ml.evaluation import RegressionEvaluator
my_eval = RegressionEvaluator( labelCol='MV', predictionCol="prediction", metricName="r2")

**Model accuracy**

In [21]:
auc = my_eval.evaluate(results)
auc

0.7150962165314472

## **Evaluating Linear Regression Model**

---



**RMSE and R2 values on Train data**

In [22]:
dt_evaluator = RegressionEvaluator(labelCol="MV", predictionCol="prediction", metricName="rmse")
rmse = dt_evaluator.evaluate(results)
print("Root Mean Squared Error (RMSE) on train data = %g" % rmse)

Root Mean Squared Error (RMSE) on train data = 4.85625


In [23]:
lr_evaluator = RegressionEvaluator(predictionCol="prediction",labelCol="MV",metricName="r2")
print("R Squared (R2) on train data = %g" % lr_evaluator.evaluate(results))

R Squared (R2) on train data = 0.715096


**RMSE and R2 values on Test data**

In [24]:
dt_evaluator = RegressionEvaluator(labelCol="MV", predictionCol="prediction", metricName="rmse")
rmse = dt_evaluator.evaluate(results1)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 5.16856


In [25]:
lr_evaluator = RegressionEvaluator(predictionCol="prediction",labelCol="MV",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(results1))

R Squared (R2) on test data = 0.691724
