# BDA - Project SPark MLlib Vs Mahout performance comparision

### Submitted by:
#### Name      : Muhammad Amin Ghias
#### ERP ID    : 25366

Date : 2nd June 2022

(on full dataset)

In [1]:
!pip install pyspark



## Import Libraries

In [2]:
import os
import time
import pandas as pd
import numpy as np

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext

from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import udf, col

from pyspark.ml.regression import LinearRegression
from pyspark.mllib.evaluation import RegressionMetrics

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.evaluation import RegressionEvaluator

In [3]:
import seaborn as sns
import matplotlib.pyplot as plt

In [4]:
# Visualization
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

pd.set_option('display.max_columns', 200)
pd.set_option('display.max_colwidth', 400)

from matplotlib import rcParams
sns.set(context='notebook', style='whitegrid', rc={'figure.figsize': (18,4)})
rcParams['figure.figsize'] = 18,4

%matplotlib inline
%config InlineBackend.figure_format = 'retina'

In [5]:
rnd_seed=23
np.random.seed=rnd_seed
np.random.set_state=rnd_seed

In [6]:
spark = SparkSession.builder.master("local[2]").appName("Linear-Regression-Gas-Sensor").getOrCreate()

In [7]:
spark

In [8]:
sqlContext = SQLContext(spark.sparkContext)
sqlContext



<pyspark.sql.context.SQLContext at 0x7fe21c18d9a0>

## Read Dataset

In [9]:
df = spark.read.csv('gas_sensor_data_f.csv',header=True)

In [10]:
type(df)

pyspark.sql.dataframe.DataFrame

In [11]:
df.count()

3843160

In [12]:
df.describe().show()

+-------+------------------+-------------------+------------------+-----------------+-------------------+------------------+------------------+------------------+-----------------+-----------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+-----------------+
|summary|          Time (s)|humidity_percentage|   Temperature (C)|        flow_rate| Heater voltage (V)|         R1 (MOhm)|         R2 (MOhm)|         R3 (MOhm)|        R4 (MOhm)|        R5 (MOhm)|         R6 (MOhm)|        R7 (MOhm)|         R8 (MOhm)|         R9 (MOhm)|        R10 (MOhm)|        R11 (MOhm)|        R12 (MOhm)|        R13 (MOhm)|       R14 (MOhm)|         CO (ppm)|
+-------+------------------+-------------------+------------------+-----------------+-------------------+------------------+------------------+------------------+-----------------+-----------------+------------------+-----------

In [13]:
df.show(2)

+--------+-------------------+---------------+---------+------------------+---------+---------+---------+---------+---------+---------+---------+---------+---------+----------+----------+----------+----------+----------+--------+
|Time (s)|humidity_percentage|Temperature (C)|flow_rate|Heater voltage (V)|R1 (MOhm)|R2 (MOhm)|R3 (MOhm)|R4 (MOhm)|R5 (MOhm)|R6 (MOhm)|R7 (MOhm)|R8 (MOhm)|R9 (MOhm)|R10 (MOhm)|R11 (MOhm)|R12 (MOhm)|R13 (MOhm)|R14 (MOhm)|CO (ppm)|
+--------+-------------------+---------------+---------+------------------+---------+---------+---------+---------+---------+---------+---------+---------+---------+----------+----------+----------+----------+----------+--------+
|     0.0|            49.7534|        23.7184| 233.2737|            0.8993|   0.2231|   0.6365|   1.1493|   0.8483|   1.2534|   1.4449|   1.9906|   1.3303|    1.448|    1.9148|    3.4651|    5.2144|    6.5806|    8.6385|     0.0|
|   0.309|              55.84|          26.62| 241.6323|            0.2112|   2.

In [14]:
df.printSchema()

root
 |-- Time (s): string (nullable = true)
 |-- humidity_percentage: string (nullable = true)
 |-- Temperature (C): string (nullable = true)
 |-- flow_rate: string (nullable = true)
 |-- Heater voltage (V): string (nullable = true)
 |-- R1 (MOhm): string (nullable = true)
 |-- R2 (MOhm): string (nullable = true)
 |-- R3 (MOhm): string (nullable = true)
 |-- R4 (MOhm): string (nullable = true)
 |-- R5 (MOhm): string (nullable = true)
 |-- R6 (MOhm): string (nullable = true)
 |-- R7 (MOhm): string (nullable = true)
 |-- R8 (MOhm): string (nullable = true)
 |-- R9 (MOhm): string (nullable = true)
 |-- R10 (MOhm): string (nullable = true)
 |-- R11 (MOhm): string (nullable = true)
 |-- R12 (MOhm): string (nullable = true)
 |-- R13 (MOhm): string (nullable = true)
 |-- R14 (MOhm): string (nullable = true)
 |-- CO (ppm): string (nullable = true)



In [15]:
df.columns

['Time (s)',
 'humidity_percentage',
 'Temperature (C)',
 'flow_rate',
 'Heater voltage (V)',
 'R1 (MOhm)',
 'R2 (MOhm)',
 'R3 (MOhm)',
 'R4 (MOhm)',
 'R5 (MOhm)',
 'R6 (MOhm)',
 'R7 (MOhm)',
 'R8 (MOhm)',
 'R9 (MOhm)',
 'R10 (MOhm)',
 'R11 (MOhm)',
 'R12 (MOhm)',
 'R13 (MOhm)',
 'R14 (MOhm)',
 'CO (ppm)']

## Change data type from string double

In [16]:
df = df.withColumn("Time (s)",col("Time (s)").cast( DoubleType()))
df = df.withColumn("CO (ppm)",col("CO (ppm)").cast( DoubleType()))
df = df.withColumn("humidity_percentage",col("humidity_percentage").cast( DoubleType()))
df = df.withColumn("Temperature (C)",col("Temperature (C)").cast( DoubleType()))
df = df.withColumn("Temperature (C)",col("Temperature (C)").cast( DoubleType()))
df = df.withColumn("flow_rate",col("flow_rate").cast( DoubleType()))
df = df.withColumn("Heater voltage (V)",col("Heater voltage (V)").cast( DoubleType()))
df = df.withColumn("R1 (MOhm)",col("R1 (MOhm)").cast( DoubleType()))
df = df.withColumn("R2 (MOhm)",col("R2 (MOhm)").cast( DoubleType()))
df = df.withColumn("R3 (MOhm)",col("R3 (MOhm)").cast( DoubleType()))
df = df.withColumn("R4 (MOhm)",col("R4 (MOhm)").cast( DoubleType()))
df = df.withColumn("R5 (MOhm)",col("R5 (MOhm)").cast( DoubleType()))
df = df.withColumn("R6 (MOhm)",col("R6 (MOhm)").cast( DoubleType()))
df = df.withColumn("R7 (MOhm)",col("R7 (MOhm)").cast( DoubleType()))
df = df.withColumn("R8 (MOhm)",col("R8 (MOhm)").cast( DoubleType()))
df = df.withColumn("R9 (MOhm)",col("R9 (MOhm)").cast( DoubleType()))
df = df.withColumn("R10 (MOhm)",col("R10 (MOhm)").cast( DoubleType()))
df = df.withColumn("R11 (MOhm)",col("R11 (MOhm)").cast( DoubleType()))
df = df.withColumn("R12 (MOhm)",col("R12 (MOhm)").cast( DoubleType()))
df = df.withColumn("R13 (MOhm)",col("R13 (MOhm)").cast( DoubleType()))
df = df.withColumn("R14 (MOhm)",col("R14 (MOhm)").cast( DoubleType()))



In [17]:
df.printSchema()

root
 |-- Time (s): double (nullable = true)
 |-- humidity_percentage: double (nullable = true)
 |-- Temperature (C): double (nullable = true)
 |-- flow_rate: double (nullable = true)
 |-- Heater voltage (V): double (nullable = true)
 |-- R1 (MOhm): double (nullable = true)
 |-- R2 (MOhm): double (nullable = true)
 |-- R3 (MOhm): double (nullable = true)
 |-- R4 (MOhm): double (nullable = true)
 |-- R5 (MOhm): double (nullable = true)
 |-- R6 (MOhm): double (nullable = true)
 |-- R7 (MOhm): double (nullable = true)
 |-- R8 (MOhm): double (nullable = true)
 |-- R9 (MOhm): double (nullable = true)
 |-- R10 (MOhm): double (nullable = true)
 |-- R11 (MOhm): double (nullable = true)
 |-- R12 (MOhm): double (nullable = true)
 |-- R13 (MOhm): double (nullable = true)
 |-- R14 (MOhm): double (nullable = true)
 |-- CO (ppm): double (nullable = true)



In [18]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols = ['Time (s)', 'humidity_percentage', 'Temperature (C)', 'flow_rate', 'Heater voltage (V)','R1 (MOhm)','R2 (MOhm)',
                                                'R3 (MOhm)', 'R4 (MOhm)', 'R5 (MOhm)', 'R6 (MOhm)', 'R7 (MOhm)', 'R8 (MOhm)', 'R9 (MOhm)','R10 (MOhm)','R11 (MOhm)',
                                                'R12 (MOhm)', 'R13 (MOhm)', 'R14 (MOhm)'], outputCol = 'features')
data = vectorAssembler.transform(df)

data = data.withColumn('label', col('CO (ppm)'))

data = data.select(['features', 'label'])
data.show(3)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[0.0,49.7534,23.7...|  0.0|
|[0.309,55.84,26.6...|  0.0|
|[0.618,55.84,26.6...|  0.0|
+--------------------+-----+
only showing top 3 rows



## Splitting Data

In [19]:
splits = data.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [20]:
train_df.show(3)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[0.0,49.21,26.38,...|  0.0|
|[0.0,50.25,26.54,...|  0.0|
|[0.0,54.6258,25.3...|  0.0|
+--------------------+-----+
only showing top 3 rows



In [21]:
test_df.show(3)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[0.0,49.7534,23.7...|  0.0|
|[0.31,52.63,25.3,...|  0.0|
|[0.618,55.84,26.6...|  0.0|
+--------------------+-----+
only showing top 3 rows



# Linear Regression

In [22]:
from pyspark.ml.regression import LinearRegression

lr_f = LinearRegression(featuresCol = 'features', predictionCol='pred_CO (ppm)', 
                        maxIter=10, regParam=0.0, solver="normal", standardization=False)

start = time.perf_counter()
lr_modelf = lr_f.fit(train_df)
end = time.perf_counter()
duration_fit = format((end-start),'.4f')
print("Model Fitting Time Duration - {}".format(duration_fit))

print("Coefficients: " + str(lr_modelf.coefficients))
print("Intercept: " + str(lr_modelf.intercept))

start = time.perf_counter()
yfpredictions = lr_modelf.transform(test_df)
end = time.perf_counter()
duration_pred = format((end-start),'.4f')

trainingSummary = lr_modelf.summary

print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)
print("Model Fitting Time Duration - {}".format(duration_fit))
print("Model predicting Time Duration - {}".format(duration_pred))

Model Fitting Time Duration - 41.8281
Coefficients: [-2.747324384838938e-06,-0.015039206926646202,-0.04810147820691096,0.012800961867793345,-9.942679670257316,0.06636701391223498,-0.0866525537733854,0.05639783368486679,0.05927557608421093,-0.06383846539883631,-0.07507315442883519,0.13450215478096225,0.029271455644799143,0.2683168462757321,-0.43185033121518396,0.1146963715738742,0.23492645152281474,-0.49981070230506913,-0.04288485054873854]
Intercept: 17.828974407878604
numIterations: 0
objectiveHistory: [0.0]




+-------------------+
|          residuals|
+-------------------+
| -9.283122084388022|
|-10.114362975269774|
|-1.9950856786005744|
| -9.981096984804712|
| -6.300524369647359|
|-11.766964215558097|
| -9.944733258182723|
| -4.235942048632857|
| 1.4518447234436422|
| -2.038987575892529|
|  7.046021567298087|
|-4.2406062972835095|
| -9.943431738433233|
| 0.4424688477849408|
|   -9.9055289019664|
| -2.276182517065781|
| -9.905706654958923|
| -9.932312781041258|
|   -9.9058483709346|
| 0.9875593926127628|
+-------------------+
only showing top 20 rows

RMSE: 4.186850
r2: 0.575813
Model Fitting Time Duration - 41.8281
Model predicting Time Duration - 0.0338


In [23]:
yfpredandlabels = yfpredictions.select('pred_CO (ppm)', 'label')

y_pred_f = yfpredandlabels.toPandas()

y_pred_f.to_csv("ylRf_Mllib.csv",index=False)

### Using the RegressionEvaluator from pyspark.ml package:

In [24]:
evaluator = RegressionEvaluator(predictionCol='pred_CO (ppm)', labelCol='label', metricName='rmse')
print("RMSE: {0}".format(evaluator.evaluate(yfpredandlabels)))

RMSE: 4.180058211093116


In [25]:
evaluator = RegressionEvaluator(predictionCol='pred_CO (ppm)', labelCol='label', metricName='mae')
print("MAE: {0}".format(evaluator.evaluate(yfpredandlabels)))

MAE: 3.0061282691657776


In [26]:
evaluator = RegressionEvaluator(predictionCol='pred_CO (ppm)', labelCol='label', metricName='r2')
print("R2: {0}".format(evaluator.evaluate(yfpredandlabels)))

R2: 0.5767692947753741


### Using the RegressionMetrics from pyspark.mllib package:

In [27]:
# mllib is old so the methods are available in rdd
metrics = RegressionMetrics(yfpredandlabels.rdd)



In [28]:
print("RMSE: {0}".format(metrics.rootMeanSquaredError))

RMSE: 4.180058211093116


In [29]:
print("MAE: {0}".format(metrics.meanAbsoluteError))

MAE: 3.0061282691657776


In [30]:
print("R2: {0}".format(metrics.r2))

R2: 0.5767692947753741


# Decission Tree regressor

In [31]:
from pyspark.ml.regression import DecisionTreeRegressor
dt = DecisionTreeRegressor()

start = time.perf_counter()
model_dt = dt.fit(train_df)
end = time.perf_counter()
duration_fit = format((end-start),'.4f')
print("Model Fitting Time Duration - {}".format(duration_fit))

start = time.perf_counter()
ypred_dt = model_dt.transform(test_df)
end = time.perf_counter()
duration_pred = format((end-start),'.4f')

print("Model Fitting Time Duration - {}".format(duration_fit))
print("Model predicting Time Duration - {}".format(duration_pred))



Model Fitting Time Duration - 77.9203
Model Fitting Time Duration - 77.9203
Model predicting Time Duration - 0.0411


In [32]:
evaluator = RegressionEvaluator()
print("R2 :",evaluator.evaluate(ypred_dt,
{evaluator.metricName: "r2"}))

print("MSE :",evaluator.evaluate(ypred_dt,
{evaluator.metricName: "mse"}))

print("RMSE :",evaluator.evaluate(ypred_dt,
{evaluator.metricName: "rmse"}))

print("MAE :",evaluator.evaluate(ypred_dt,
{evaluator.metricName: "mae"}))

R2 : 0.7484396758793779
MSE : 10.385553255624137
RMSE : 3.2226624482908752
MAE : 2.1642999933938785


In [33]:
ypred_dt.select("features","label", "prediction").show(5)

+--------------------+-----+------------------+
|            features|label|        prediction|
+--------------------+-----+------------------+
|[0.0,49.7534,23.7...|  0.0|11.165923002809174|
|[0.31,52.63,25.3,...|  0.0|3.1480610121624246|
|[0.618,55.84,26.6...|  0.0|2.3264769198412116|
|[0.62,49.21,26.38...|  0.0|3.3276835139892635|
|[0.928,50.25,26.5...|  0.0|3.3276835139892635|
+--------------------+-----+------------------+
only showing top 5 rows



# Random Forrest Regressor

In [34]:
from pyspark.ml.regression import RandomForestRegressor

# Define LinearRegression algorithm
rf = RandomForestRegressor() # featuresCol="eatures",numTrees=2, maxDepth=2, seed=42

start = time.perf_counter()
model_rf = rf.fit(train_df)
end = time.perf_counter()
duration_fit = format((end-start),'.4f')
print("Model Fitting Time Duration - {}".format(duration_fit))

start = time.perf_counter()
ypred_rf = model_rf.transform(test_df)
end = time.perf_counter()
duration_pred = format((end-start),'.4f')

print("Model Fitting Time Duration - {}".format(duration_fit))
print("Model predicting Time Duration - {}".format(duration_pred))


Model Fitting Time Duration - 104.6386
Model Fitting Time Duration - 104.6386
Model predicting Time Duration - 0.1611


In [35]:
evaluator = RegressionEvaluator()
print("R2 :",evaluator.evaluate(ypred_rf,
{evaluator.metricName: "r2"}))

print("MSE :",evaluator.evaluate(ypred_rf,
{evaluator.metricName: "mse"}))

print("RMSE :",evaluator.evaluate(ypred_rf,
{evaluator.metricName: "rmse"}))

print("MAE :",evaluator.evaluate(ypred_rf,
{evaluator.metricName: "mae"}))

R2 : 0.7633407364675332
MSE : 9.770369764966192
RMSE : 3.1257590702045785
MAE : 2.06678014299914


In [36]:
ypred_rf.select("features","label", "prediction").show(5)

+--------------------+-----+------------------+
|            features|label|        prediction|
+--------------------+-----+------------------+
|[0.0,49.7534,23.7...|  0.0| 10.20420090568429|
|[0.31,52.63,25.3,...|  0.0| 2.255308009144843|
|[0.618,55.84,26.6...|  0.0| 2.493680318817966|
|[0.62,49.21,26.38...|  0.0|3.5645754187718572|
|[0.928,50.25,26.5...|  0.0|3.8712724131240157|
+--------------------+-----+------------------+
only showing top 5 rows

