# Big Data Hadoop & Spark Exam

## Import libraries

In [None]:
!pip install pyspark



In [None]:
import pyspark
from pyspark.ml import Pipeline
from pyspark.ml import PipelineModel
from pyspark.sql.functions import mean,col,split, col, regexp_extract, when, lit
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName('Boston').getOrCreate()

# Q2) Read the data from hive table as a spark dataframe

In [None]:
# Import data
data = spark.read.csv("/content/boston.csv", header=True, inferSchema=True)

## Exploratory Data Analysis

In [None]:
data.printSchema()

root
 |-- CRIM: double (nullable = true)
 |-- ZN: double (nullable = true)
 |-- INDUS: double (nullable = true)
 |-- CHAS: integer (nullable = true)
 |-- NOX: double (nullable = true)
 |-- RM: double (nullable = true)
 |-- AGE: double (nullable = true)
 |-- DIS: double (nullable = true)
 |-- RAD: integer (nullable = true)
 |-- TAX: integer (nullable = true)
 |-- PT: double (nullable = true)
 |-- B: double (nullable = true)
 |-- LSTAT: double (nullable = true)
 |-- MV: double (nullable = true)



### Inference
* There are 14 columns
* All the columns are numeric
* The dependent variable is MV

In [None]:
data.count()

506

In [None]:
data.describe().show()

+-------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+-----------------+------------------+------------------+-----------------+
|summary|              CRIM|                ZN|             INDUS|              CHAS|               NOX|                RM|               AGE|               DIS|              RAD|               TAX|               PT|                 B|             LSTAT|               MV|
+-------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+-----------------+------------------+------------------+-----------------+
|  count|               506|               506|               506|               506|               506|               506|               506|               506|              506|  

In [None]:
data.dtypes

[('CRIM', 'double'),
 ('ZN', 'double'),
 ('INDUS', 'double'),
 ('CHAS', 'int'),
 ('NOX', 'double'),
 ('RM', 'double'),
 ('AGE', 'double'),
 ('DIS', 'double'),
 ('RAD', 'int'),
 ('TAX', 'int'),
 ('PT', 'double'),
 ('B', 'double'),
 ('LSTAT', 'double'),
 ('MV', 'double')]

## Missing value analysis

In [None]:
def null_value_count(df):
    null_columns_counts = []
    numRows = df.count()
    for k in df.columns:
        nullRows = df.where(col(k).isNull()).count()
        if(nullRows > 0):
            temp = k,nullRows
            null_columns_counts.append(temp)
    return(null_columns_counts)

In [None]:
null_value_count(data)

[]

### Inference
* There are no missing values in the dataset

# Q3) Get the correlation between dependent and independent variables

In [None]:
independent_var = ['CRIM','ZN','INDUS','CHAS','NOX','RM','AGE','DIS','RAD','TAX','PT','B','LSTAT']
dependent_var = 'MV' # Changed dependent_var to a string instead of a list
for col in independent_var:
  correlation = data.stat.corr(col,dependent_var)
  print(f"Correlation between {col} and {dependent_var} is {correlation}")

Correlation between CRIM and MV is -0.3883046116575089
Correlation between ZN and MV is 0.360445344637529
Correlation between INDUS and MV is -0.48372517128143366
Correlation between CHAS and MV is 0.1752601777529185
Correlation between NOX and MV is -0.4273207763683772
Correlation between RM and MV is 0.6953599371272672
Correlation between AGE and MV is -0.3769545671428867
Correlation between DIS and MV is 0.24992873873512172
Correlation between RAD and MV is -0.38162623156691683
Correlation between TAX and MV is -0.46853593528654536
Correlation between PT and MV is -0.5077867038116086
Correlation between B and MV is 0.3334608226834165
Correlation between LSTAT and MV is -0.7376627294671615


### Inference
* Values of correlation close to -1 and 1 have a strong correlation
* RM has a correlation of 0.6953599371272672 and has a good correlation with MV

In [None]:
data.printSchema()

root
 |-- CRIM: double (nullable = true)
 |-- ZN: double (nullable = true)
 |-- INDUS: double (nullable = true)
 |-- CHAS: integer (nullable = true)
 |-- NOX: double (nullable = true)
 |-- RM: double (nullable = true)
 |-- AGE: double (nullable = true)
 |-- DIS: double (nullable = true)
 |-- RAD: integer (nullable = true)
 |-- TAX: integer (nullable = true)
 |-- PT: double (nullable = true)
 |-- B: double (nullable = true)
 |-- LSTAT: double (nullable = true)
 |-- MV: double (nullable = true)



In [None]:
numerical_cols = [item[0] for item in data.dtypes if item[1].startswith('int') | item[1].startswith('double')][:-1]
print(numerical_cols)

['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PT', 'B', 'LSTAT']


In [None]:
print(str(len(numerical_cols)) + ' numerical features')

13 numerical features


## Data Preparation
* VectorAssembler

In [None]:
from pyspark.ml.feature import VectorAssembler

stages = []

# Iterate over the categorical columns
for numerical_col in numerical_cols:
    Vectassembler = VectorAssembler(inputCols=numerical_cols, outputCol="features")

# Add the VectorAssembler to stages
stages += [Vectassembler]

In [None]:
import pandas as pd

## Pipeline

In [None]:
from pyspark.ml import Pipeline
cols = data.columns
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(data)
data = pipelineModel.transform(data)
selectedCols = ['features']+cols
data = data.select(selectedCols)
pd.DataFrame(data.take(5), columns=data.columns)


Unnamed: 0,features,CRIM,ZN,INDUS,CHAS,NOX,RM,AGE,DIS,RAD,TAX,PT,B,LSTAT,MV
0,"[0.00632, 18.0, 2.309999943, 0.0, 0.537999988,...",0.00632,18.0,2.31,0,0.538,6.575,65.199997,4.09,1,296,15.3,396.899994,4.98,24.0
1,"[0.027310001, 0.0, 7.070000172, 0.0, 0.4690000...",0.02731,0.0,7.07,0,0.469,6.421,78.900002,4.9671,2,242,17.799999,396.899994,9.14,21.6
2,"[0.02729, 0.0, 7.070000172, 0.0, 0.469000012, ...",0.02729,0.0,7.07,0,0.469,7.185,61.099998,4.9671,2,242,17.799999,392.829987,4.03,34.700001
3,"[0.032370001, 0.0, 2.180000067, 0.0, 0.4580000...",0.03237,0.0,2.18,0,0.458,6.998,45.799999,6.0622,3,222,18.700001,394.630005,2.94,33.400002
4,"[0.069049999, 0.0, 2.180000067, 0.0, 0.4580000...",0.06905,0.0,2.18,0,0.458,7.147,54.200001,6.0622,3,222,18.700001,396.899994,5.33,36.200001


In [None]:
data.show()

+--------------------+-----------+----+-----------+----+-----------+-----------+-----------+-----------+---+---+-----------+-----------+-----------+-----------+
|            features|       CRIM|  ZN|      INDUS|CHAS|        NOX|         RM|        AGE|        DIS|RAD|TAX|         PT|          B|      LSTAT|         MV|
+--------------------+-----------+----+-----------+----+-----------+-----------+-----------+-----------+---+---+-----------+-----------+-----------+-----------+
|[0.00632,18.0,2.3...|    0.00632|18.0|2.309999943|   0|0.537999988|6.574999809|65.19999695|4.090000153|  1|296|15.30000019|396.8999939|4.980000019|       24.0|
|[0.027310001,0.0,...|0.027310001| 0.0|7.070000172|   0|0.469000012|6.421000004|78.90000153|4.967100143|  2|242|17.79999924|396.8999939|9.140000343|21.60000038|
|[0.02729,0.0,7.07...|    0.02729| 0.0|7.070000172|   0|0.469000012|7.184999943|61.09999847|4.967100143|  2|242|17.79999924|392.8299866| 4.03000021|34.70000076|
|[0.032370001,0.0,...|0.032370001|

In [None]:
finalized_data = data.select("features","MV")
finalized_data.show()

+--------------------+-----------+
|            features|         MV|
+--------------------+-----------+
|[0.00632,18.0,2.3...|       24.0|
|[0.027310001,0.0,...|21.60000038|
|[0.02729,0.0,7.07...|34.70000076|
|[0.032370001,0.0,...|33.40000153|
|[0.069049999,0.0,...|36.20000076|
|[0.029850001,0.0,...|28.70000076|
|[0.088289998,12.5...|22.89999962|
|[0.144549996,12.5...|27.10000038|
|[0.211239994,12.5...|       16.5|
|[0.170039997,12.5...|18.89999962|
|[0.224889994,12.5...|       15.0|
|[0.117470004,12.5...|18.89999962|
|[0.093780003,12.5...|21.70000076|
|[0.629760027,0.0,...|20.39999962|
|[0.637960017,0.0,...|18.20000076|
|[0.627390027,0.0,...|19.89999962|
|[1.053930044,0.0,...|23.10000038|
|[0.784200013,0.0,...|       17.5|
|[0.802709997,0.0,...|20.20000076|
|[0.725799978,0.0,...|18.20000076|
+--------------------+-----------+
only showing top 20 rows



## Split the dataset into train and test

In [None]:
train_dataset, test_dataset = finalized_data.randomSplit([0.8, 0.2])

# Q4) Build a linear regression model to predict the House price

In [None]:
from pyspark.ml.regression import LinearRegression

In [None]:
MLR = LinearRegression(featuresCol="features", labelCol="MV")

In [None]:
model = MLR.fit(train_dataset)

In [None]:
pred = model.evaluate(test_dataset)

In [None]:
pred.predictions.show()

+--------------------+-----------+------------------+
|            features|         MV|        prediction|
+--------------------+-----------+------------------+
|[0.00906,90.0,2.9...|32.20000076| 31.56663165871838|
|[0.01311,90.0,1.2...|35.40000153| 31.12965117518516|
|[0.031500001,95.0...|34.90000153|30.211661949221018|
|[0.03537,34.0,6.0...|       22.0|29.100248837142345|
|[0.03932,0.0,3.41...|       22.0|27.463855407701097|
|[0.04301,80.0,1.9...|18.20000076|14.431687400936987|
|[0.04417,70.0,2.2...|24.79999924|30.950639991639516|
|[0.04462,25.0,4.8...|23.89999962|26.994082788970115|
|[0.046659999,80.0...|30.29999924| 32.82950856702982|
|[0.05023,35.0,6.0...|17.10000038| 20.06270232905217|
|[0.057799999,0.0,...|37.20000076| 32.90861757162588|
|[0.057890002,12.5...|       22.0|21.291566519361613|
|[0.06617,0.0,3.24...|19.29999924| 21.60857061053922|
|[0.06724,0.0,3.24...|22.60000038|24.676634232860724|
|[0.068599999,0.0,...|33.20000076| 32.33934825948499|
|[0.068879999,0.0,...|36.200

In [None]:
coefficient = model.coefficients
print ("The coefficients of the model are : %a" %coefficient)

The coefficients of the model are : DenseVector([-0.1222, 0.0428, -0.0092, 3.175, -17.6575, 3.8324, -0.0007, -1.4478, 0.3228, -0.0111, -0.9684, 0.0091, -0.5118])


In [None]:
intercept = model.intercept
print ("The Intercept of the model is : %f" %intercept)

The Intercept of the model is : 36.328940


# Q5) Evaluate the Linear Regression model by getting RMSE and R-squared values

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluation = RegressionEvaluator(labelCol="MV", predictionCol="prediction")
r2 = evaluation.evaluate(pred.predictions, {evaluation.metricName: "r2"})
print("r2: %.3f" %r2)

r2: 0.817


## Inference
* The r2 score of the model is 0.817

In [None]:
# Root Mean Square Error
rmse = evaluation.evaluate(pred.predictions, {evaluation.metricName: "rmse"})
print("rmse: %.3f" %rmse)
# Mean Square Error
mse = evaluation.evaluate(pred.predictions, {evaluation.metricName: "mse"})
print("mse: %.3f" %mse)

rmse: 3.520
mse: 12.387


## Inference
* The root mean squaed error is 3.520
* The mean squared error is 12.387