**Importing Libraries**

In [23]:
import pandas as pandas_obj

from sklearn.feature_selection import SelectKBest
from sklearn.feature_selection import f_regression

import os
import pyspark
import findspark

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import GBTRegressor

from sklearn.metrics import r2_score, mean_absolute_error, mean_squared_error
from sklearn.linear_model import BayesianRidge

from sklearn.model_selection import cross_val_score
from sklearn.model_selection import RepeatedKFold
from sklearn.linear_model import Lasso

from pyspark.sql.functions import *

**Python 3.8.10 64 bit Interpreter on Microsoft VS Code**

**Importing Data and Creating DataFrames**

**Convert nc4 to CSV**

In [None]:
import xarray as xarray_obj

nc = xarray_obj.open_dataset('../dataset/csv/data_2021.nc4')
nc.to_dataframe().to_csv('../dataset/nc4/2021.csv')

In [12]:
file_2018_path = "../dataset/data_2018.csv"
file_2019_path = "../dataset/data_2019.csv"
file_2020_path = "../dataset/data_2020.csv"
file_2021_path = "../dataset/data_2021.csv"

data_2018 = pandas_obj.read_csv(file_2018_path)
data_2019 = pandas_obj.read_csv(file_2019_path)
data_2020 = pandas_obj.read_csv(file_2020_path)
data_2021 = pandas_obj.read_csv(file_2021_path)

data = pandas_obj.concat( [ data_2018, data_2019, data_2020, data_2021  ] )
drop_cols = ['latitude', 'longitude','speedlml']
data_kbest = data.drop( drop_cols, axis = 1 )

**Feature Selection using SelectKBest**

In [28]:
from sklearn.feature_selection import SelectKBest
from sklearn.feature_selection import f_regression


drop_cols = ['tlml']

features = data.drop(drop_cols, axis = 1)
label = data['tlml']

best_features = SelectKBest( score_func = f_regression, k = 3 )
fit = best_features.fit( features , label )
df_scores = pandas_obj.DataFrame( fit.scores_ )
df_columns = pandas_obj.DataFrame( features.columns)
feature_scores = pandas_obj.concat( [ df_columns, df_scores ] , axis = 1 )
feature_scores.columns = [ 'Feature_Name' , 'Score' ]

print("KBest features")
print(feature_scores.nlargest( 20, 'Score' ) )

KBest features
  Feature_Name         Score
2         hlml  4.811379e+09
5         qlml  7.596272e+06
3           ps  3.998960e+06
1     latitude  6.446894e+05
4     speedlml  4.545948e+05
0    longitude  2.175042e+04


**Start PySpark Session**

In [14]:
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/opt/spark"
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

21/12/10 15:06:32 WARN Utils: Your hostname, Abhinav-OMEN resolves to a loopback address: 127.0.1.1; using 192.168.1.187 instead (on interface wlo1)
21/12/10 15:06:32 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/12/10 15:06:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


**Upload DataFrame with all 4 years data into spark**

In [15]:
dataset = spark.read.csv('../dataset/dataset.csv',inferSchema=True, header =True)

                                                                                

**Schema of DataFrame with all data in Spark**

In [6]:
dataset.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- hlml: double (nullable = true)
 |-- tlml: double (nullable = true)
 |-- ps: double (nullable = true)
 |-- speedlml: double (nullable = true)
 |-- qlml: double (nullable = true)



**Separation of SelectKBest Features and Label with the help of VectorAssembler**

In [16]:
assembler_kbest = VectorAssembler( inputCols=[ 'qlml', 'hlml' , 'ps'], outputCol = 'Attributes')
 
output_kbest = assembler_kbest.transform(dataset)
 
finalized_data_kbest = output_kbest.select("Attributes", 'tlml')

**Linear Regression with SelectKBest features in PySpark**

In [18]:

train_data,test_data = finalized_data_kbest.randomSplit([0.75,0.25])
 
 
regressor_kbest = LinearRegression( featuresCol = 'Attributes', labelCol = 'tlml')
 
regressor_kbest = regressor_kbest.fit(test_data)
 
pred_kbest = regressor_kbest.evaluate(test_data)


21/12/10 15:07:22 WARN Instrumentation: [acbd6a58] regParam is zero, which might cause numerical instability and overfitting.
                                                                                

**Random Forrest Regression without SelectKBest Features in PySpark**

In [19]:
(trainingData, testData) = finalized_data_kbest.randomSplit([0.75, 0.25] )


random_forest_all = RandomForestRegressor(featuresCol="Attributes", labelCol= 'tlml')

random_forest_all = random_forest_all.fit(trainingData)

predictions_rf_all = random_forest_all.transform(testData)

predictions_rf_all.select("prediction", "tlml", "Attributes").show()


21/12/10 15:07:59 WARN MemoryStore: Not enough space to cache rdd_66_2 in memory! (computed 17.8 MiB so far)
21/12/10 15:07:59 WARN BlockManager: Persisting block rdd_66_2 to disk instead.
21/12/10 15:07:59 WARN MemoryStore: Not enough space to cache rdd_66_1 in memory! (computed 17.8 MiB so far)
21/12/10 15:07:59 WARN MemoryStore: Not enough space to cache rdd_66_0 in memory! (computed 17.8 MiB so far)
21/12/10 15:07:59 WARN BlockManager: Persisting block rdd_66_0 to disk instead.
21/12/10 15:07:59 WARN BlockManager: Persisting block rdd_66_1 to disk instead.
21/12/10 15:07:59 WARN MemoryStore: Not enough space to cache rdd_66_5 in memory! (computed 11.5 MiB so far)
21/12/10 15:07:59 WARN BlockManager: Persisting block rdd_66_5 to disk instead.
21/12/10 15:07:59 WARN MemoryStore: Not enough space to cache rdd_66_6 in memory! (computed 17.8 MiB so far)
21/12/10 15:07:59 WARN BlockManager: Persisting block rdd_66_6 to disk instead.
21/12/10 15:07:59 WARN MemoryStore: Not enough space to

+------------------+---------+--------------------+
|        prediction|     tlml|          Attributes|
+------------------+---------+--------------------+
|218.29830029560586|211.05702|[4.52E-6,46.62352...|
|218.29830029560586|210.92506|[4.6E-6,46.594906...|
|218.29830029560586|210.36833|[4.68E-6,46.46727...|
|218.29830029560586|210.13583|[4.71E-6,46.41733...|
|218.29830029560586|211.05963|[4.86E-6,46.62463...|
|218.29830029560586|210.32397|[4.94E-6,46.45919...|
|218.29830029560586|210.74211|[4.97E-6,46.54875...|
|218.29830029560586| 210.5935|[5.01E-6,46.51677...|
|218.29830029560586|212.14053|[5.07E-6,46.86019...|
|218.29830029560586| 210.3484|[5.11E-6,46.46166...|
|218.29830029560586|210.34984|[5.33E-6,46.46378...|
|218.29830029560586|211.74455|[5.33E-6,46.77300...|
|218.29830029560586|212.08739|[5.33E-6,46.85158...|
|218.29830029560586| 210.9599|[5.36E-6,46.60118...|
|218.29830029560586|211.47034|[5.53E-6,46.71006...|
|218.29830029560586|211.95544|[5.66E-6,46.81931...|
|218.2983002



Root Mean Squared Error (RMSE) on test data = 2.18548


                                                                                

**Evaluation of Random Forest Regression**

In [29]:
evaluator_rf_all = RegressionEvaluator( labelCol = "tlml", predictionCol = "prediction", metricName = "rmse" )
rmse_rf_all = evaluator_rf_all.evaluate( predictions_rf_all )
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse_rf_all)



Root Mean Squared Error (RMSE) on test data = 2.18548


                                                                                

**Gradient Boosted Tree Regression using KBest features**

In [25]:
train_data,test_data = finalized_data_kbest.randomSplit([0.75,0.25])

target= 'tlml'

gbt = GBTRegressor(featuresCol = 'Attributes', labelCol=target)
model = gbt.fit(train_data)
yhat = model.transform(test_data)
yhat.show()


21/12/10 15:17:29 WARN MemoryStore: Not enough space to cache rdd_117_2 in memory! (computed 12.4 MiB so far)
21/12/10 15:17:29 WARN MemoryStore: Not enough space to cache rdd_117_1 in memory! (computed 19.6 MiB so far)
21/12/10 15:17:29 WARN MemoryStore: Not enough space to cache rdd_117_5 in memory! (computed 19.6 MiB so far)
21/12/10 15:17:29 WARN BlockManager: Persisting block rdd_117_1 to disk instead.
21/12/10 15:17:29 WARN BlockManager: Persisting block rdd_117_5 to disk instead.
21/12/10 15:17:29 WARN BlockManager: Persisting block rdd_117_2 to disk instead.
21/12/10 15:17:29 WARN MemoryStore: Not enough space to cache rdd_117_0 in memory! (computed 19.6 MiB so far)
21/12/10 15:17:29 WARN BlockManager: Persisting block rdd_117_0 to disk instead.
21/12/10 15:17:32 WARN MemoryStore: Not enough space to cache rdd_140_4 in memory! (computed 2.7 MiB so far)
21/12/10 15:17:32 WARN MemoryStore: Not enough space to cache rdd_140_7 in memory! (computed 4.0 MiB so far)
21/12/10 15:17:32 

+--------------------+---------+------------------+
|          Attributes|     tlml|        prediction|
+--------------------+---------+------------------+
|[4.69E-6,46.45361...| 210.3062|218.10996159488855|
|[4.71E-6,46.51388...|210.57622|218.10996159488855|
|[4.75E-6,46.43315...|210.21051|218.10996159488855|
|[4.86E-6,46.62463...|211.05963|218.10996159488855|
|[5.0E-6,46.63298,...|211.09738|218.10996159488855|
|[5.07E-6,46.49131...|210.46935|218.10996159488855|
|[5.07E-6,46.86019...|212.14053|218.10996159488855|
|[5.09E-6,46.75339...|211.65768|218.10996159488855|
|[5.11E-6,46.49336...|210.48387|218.10996159488855|
|[5.21E-6,46.62378...|211.08138|218.10996159488855|
|[5.26E-6,46.47335...|210.39044|218.10996159488855|
|[5.4E-6,46.67138,...|211.27113|218.10996159488855|
|[5.44E-6,46.96546...|212.61775|218.10996159488855|
|[5.56E-6,46.46236...|210.34253|218.10996159488855|
|[5.61E-6,46.89381...|212.29187|218.10996159488855|
|[5.68E-6,46.47651...|210.40468|218.10996159488855|
|[5.7E-6,47.

                                                                                

**Evaluation of Gradient Boosted Tree Regression**

In [26]:
evaluator = RegressionEvaluator(
    labelCol="tlml", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(yhat)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)



Root Mean Squared Error (RMSE) on test data = 1.17847


                                                                                

**Calculation of Coefficient and Intercept**

In [17]:
def coefficient_and_intercept(regressor):
    coeff = regressor.coefficients
    intr = regressor.intercept
    print ("The coefficient of the model is : %a" %coeff)
    print ("The Intercept of the model is : %f" %intr)

**Linear Regression with kbest attributes as features**

In [19]:
coefficient_and_intercept(regressor_kbest)

The coefficient of the model is : DenseVector([-186.4341, 4.5252, -0.0])
The Intercept of the model is : 0.875425


**Function for Evaluation of Regression Models in Pyspark**

In [21]:
def EvaluationMetrics( trainingSummary):
    print("MSE: %f" % trainingSummary.meanSquaredError)
    print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
    print("r2: %f" % trainingSummary.r2)
    print("MAE: %f" % trainingSummary.meanAbsoluteError)
    print("Explained variance = %s" % trainingSummary.explainedVariance)

**Linear Regression with kbest attributes as features**

In [23]:
EvaluationMetrics( regressor_kbest.summary )

MSE: 0.000752
RMSE: 0.027418
r2: 0.999998
MAE: 0.019373
Explained variance = 399.184981679027


**Bayesian Ridge Regression using sklearn**

In [27]:

#training = pandas_obj.concat( [ data_2018 , data_2019, data_2020 ] , ignore_index = True)

training = data_kbest

features_cols = [ 'hlml', 'ps','qlml']
label_cols    = ['tlml']
label_training = training.drop( features_cols, axis = 1)
feature_testing = data_2021[features_cols]

features_training = training.drop( label_cols , axis = 1)
label_testing = data_2021[ label_cols ]

model = BayesianRidge()

model.fit( features_training, label_training )

prediction_BR = model.predict( feature_testing )

print("Evaluation for Bayesian Ridge Regression")
print(f"r2 Score Of Test Set : { r2_score( label_testing , prediction_BR)}")
print(f"MSE Of Test Set : {mean_squared_error( label_testing , prediction_BR)}")
print(f"RMSE Of Test Set : {mean_squared_error( label_testing , prediction_BR , squared= False)}")
print(f"MAE Of Test Set : {mean_absolute_error( label_testing , prediction_BR)}")

  y = column_or_1d(y, warn=True)


Evaluation for Bayesian Ridge Regression
r2 Score Of Test Set : 0.9999983412386868
MSE Of Test Set : 0.0007042312240204884
RMSE Of Test Set : 0.02653735525670349
MAE Of Test Set : 0.018604120782833623


**Lasso Regression using sklearn and K fold cross validation**

In [21]:
X = data_kbest
y = data_kbest['tlml']
model = Lasso(alpha=1.0)
cv = RepeatedKFold(n_splits=10,n_repeats=3 , random_state = False )

scores_r2= cross_val_score(model,X,y,scoring='r2',cv=cv,n_jobs=-1)
scores_rmse= cross_val_score(model,X,y,scoring='neg_root_mean_squared_error',cv=cv,n_jobs=-1)
scores_mse= cross_val_score(model,X,y,scoring='neg_mean_squared_error',cv=cv,n_jobs=-1)
scores_mae= cross_val_score(model,X,y,scoring='neg_mean_absolute_error',cv=cv,n_jobs=-1)

**Evaluation of Lasso Regression**

In [22]:
print("r2:",scores_r2.mean())
print("RMSE:",-1*scores_rmse.mean())
print("MSE:",-1*scores_mse.mean())
print("MAE:",-1*scores_mae.mean())

r2: 0.9999876059102674
RMSE: 0.07037267181890253
MSE: 0.0049523223210262545
MAE: 0.05801063076446411
