## install libraries

In [1]:
#libraries installation

!pip install pyspark
!pip install findspark
!pip install pandas matplotlib numpy scikit-learn



# import necessary libraries

In [2]:
# import necessary libraries
import findspark # this will be used to find the and use Apache Spark
findspark.init() # Initiate the findspark to locate and utilise the Spark
from pyspark.sql import SparkSession 

from pyspark.sql.functions import lower, regexp_replace, col, trim
from pyspark.ml.regression import LinearRegression, RandomForestRegressor
#from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
#from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
#from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
#from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
#from pyspark.ml import Pipeline

import pandas as pd 



In [3]:
#start spark session & check it
spark = SparkSession \
    .builder \
    .appName("Eslam Canada Vehicles CO2 emissions Spark Session") \
    .getOrCreate()

# Check if my spark session started
if "spark" in locals() and isinstance(spark, SparkSession):
    print("The spark session", spark.sparkContext.appName, "is active and ready to use")
else:
    print("No sparksessions is active, please create a one to proceed")


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/26 21:40:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/11/26 21:40:58 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


The spark session Eslam Canada Vehicles CO2 emissions Spark Session is active and ready to use


## Loading the data

In [4]:
# Use Spark to read the my csv file.
raw_co2_df = spark.read.csv(r"/home/eslamabuelella/Desktop/MY1995_2023_Fuel_Consumption_Ratings.csv",inferSchema=True,header=True)

                                                                                

# 3. EDA & Data wrangling

In [5]:
# Print the Schema of the Spark DataFrame
raw_co2_df.printSchema()

root
 |-- ModelYear: integer (nullable = true)
 |-- Make: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- VehicleClass: string (nullable = true)
 |-- EngineSize_L: double (nullable = true)
 |-- Cylinders: integer (nullable = true)
 |-- Transmission: string (nullable = true)
 |-- FuelType: string (nullable = true)
 |-- FuelConsCity_L100km: double (nullable = true)
 |-- FuelConsHwy_L100km: double (nullable = true)
 |-- Comb_L100km: double (nullable = true)
 |-- Comb_mpg: integer (nullable = true)
 |-- CO2Emission_g_km: integer (nullable = true)
 |-- CO2Rating: integer (nullable = true)
 |-- SmogRating: integer (nullable = true)



In [6]:
# to standerdise the column names 
co2_df=raw_co2_df
rename_dict = {"ModelYear": "model_year",  "Make":"make", "Model":"model", "VehicleClass":"vehicle_class", "EngineSize_L":"engine_size_l",
    "Cylinders":"cylinders", "Transmission":"transmission", "FuelType":"fuel_type", "FuelConsCity_L100km":"fuel_cons_city_L100km",
               "FuelConsHwy_L100km":"fuel_cons_hwy_L100km", "Comb_L100km":"fuel_cons_comb_L100km", "Comb_mpg":"fuel_cons_comb_MPG",
               "CO2Emission_g_km":"CO2_emission_gkm", "CO2Rating":"CO2_rating", "SmogRating":"smog_rating"}
for old_col, new_col in rename_dict.items():
    co2_df = co2_df.withColumnRenamed(old_col,new_col)

co2_df.printSchema()

root
 |-- model_year: integer (nullable = true)
 |-- make: string (nullable = true)
 |-- model: string (nullable = true)
 |-- vehicle_class: string (nullable = true)
 |-- engine_size_l: double (nullable = true)
 |-- cylinders: integer (nullable = true)
 |-- transmission: string (nullable = true)
 |-- fuel_type: string (nullable = true)
 |-- fuel_cons_city_L100km: double (nullable = true)
 |-- fuel_cons_hwy_L100km: double (nullable = true)
 |-- fuel_cons_comb_L100km: double (nullable = true)
 |-- fuel_cons_comb_MPG: integer (nullable = true)
 |-- CO2_emission_gkm: integer (nullable = true)
 |-- CO2_rating: integer (nullable = true)
 |-- smog_rating: integer (nullable = true)



In [7]:
# Inspecting the categorical variables: vehicle classes
for column_name in ["make","model","vehicle_class","transmission","fuel_type"]:
    print("column:", column_name)
    print("number of unqie categories:", co2_df.select(column_name).distinct().count())

column: make


                                                                                

number of unqie categories: 90
column: model
number of unqie categories: 4815
column: vehicle_class
number of unqie categories: 33
column: transmission
number of unqie categories: 30
column: fuel_type
number of unqie categories: 5


In [8]:
print(co2_df.select("fuel_type").distinct().orderBy(column_name).show())

+---------+
|fuel_type|
+---------+
|        D|
|        E|
|        N|
|        X|
|        Z|
+---------+

None


In [9]:
print(co2_df.select("transmission").distinct().orderBy(column_name).show(30))

                                                                                

+------------+
|transmission|
+------------+
|          A3|
|          A4|
|          A9|
|          AV|
|         AV1|
|         AV6|
|          M4|
|          M5|
|         A10|
|          A5|
|          A6|
|          A7|
|          A8|
|         AM5|
|         AM6|
|         AM7|
|         AM8|
|         AM9|
|        AS10|
|         AS4|
|         AS5|
|         AS6|
|         AS7|
|         AS8|
|         AS9|
|        AV10|
|         AV7|
|         AV8|
|          M6|
|          M7|
+------------+

None


In [10]:
print(co2_df.select("vehicle_class").distinct().orderBy(column_name).show(33))


+--------------------+
|       vehicle_class|
+--------------------+
|             MINIVAN|
|             Minivan|
|PICKUP TRUCK - SMALL|
|PICKUP TRUCK - ST...|
| Pickup truck: Small|
|Pickup truck: Sta...|
|SPECIAL PURPOSE V...|
|STATION WAGON - M...|
|          SUBCOMPACT|
|                 SUV|
|Special purpose v...|
|                  UL|
|      Van: Passenger|
|             COMPACT|
|             Compact|
|           FULL-SIZE|
|           Full-size|
|            MID-SIZE|
|         MINICOMPACT|
|            Mid-size|
|         Minicompact|
|STATION WAGON - S...|
|         SUV - SMALL|
|      SUV - STANDARD|
|          SUV: Small|
|       SUV: Standard|
|Station wagon: Mi...|
|Station wagon: Small|
|          Subcompact|
|          TWO-SEATER|
|          Two-seater|
|         VAN - CARGO|
|     VAN - PASSENGER|
+--------------------+

None


In [11]:
# we can see clear redundancy in categories due to none standardised data entry
# so we need to clean this mess

# from pyspark.sql.functions import lower, regexp_replace, col, trim

# lowercase and trim white spaces
co2_df = co2_df.withColumn("vehicle_class", lower(trim(col("vehicle_class"))))

# replace non-alphanumeric character with "-"
co2_df = co2_df.withColumn("vehicle_class", regexp_replace(col("vehicle_class"), "[^a-zA-Z0-9]+", "-"))

# remove "-" from begings or endings
co2_df = co2_df.withColumn("vehicle_class", regexp_replace(col("vehicle_class"), "^-+|-+$", ""))

# 4. merge multiple "-" into one
co2_df = co2_df.withColumn("vehicle_class", regexp_replace(col("vehicle_class"), "-+", "-"))

print("Clean classes counts:", co2_df.select("vehicle_class").distinct().count())
co2_df.select("vehicle_class").distinct().orderBy("vehicle_class").show(truncate=False)


                                                                                

Clean classes counts: 18
+-----------------------+
|vehicle_class          |
+-----------------------+
|compact                |
|full-size              |
|mid-size               |
|minicompact            |
|minivan                |
|pickup-truck-small     |
|pickup-truck-standard  |
|special-purpose-vehicle|
|station-wagon-mid-size |
|station-wagon-small    |
|subcompact             |
|suv                    |
|suv-small              |
|suv-standard           |
|two-seater             |
|ul                     |
|van-cargo              |
|van-passenger          |
+-----------------------+



In [12]:
# checking the count of nulls in my dataset
co2_df.toPandas().info()  

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 27001 entries, 0 to 27000
Data columns (total 15 columns):
 #   Column                 Non-Null Count  Dtype  
---  ------                 --------------  -----  
 0   model_year             27001 non-null  int32  
 1   make                   27001 non-null  object 
 2   model                  27001 non-null  object 
 3   vehicle_class          27001 non-null  object 
 4   engine_size_l          27001 non-null  float64
 5   cylinders              27001 non-null  int32  
 6   transmission           27001 non-null  object 
 7   fuel_type              27001 non-null  object 
 8   fuel_cons_city_L100km  27001 non-null  float64
 9   fuel_cons_hwy_L100km   27001 non-null  float64
 10  fuel_cons_comb_L100km  27001 non-null  float64
 11  fuel_cons_comb_MPG     27001 non-null  int32  
 12  CO2_emission_gkm       27001 non-null  int32  
 13  CO2_rating             8010 non-null   float64
 14  smog_rating            6900 non-null   float64
dtypes:

In [13]:
# Since CO2_rating & smog_rating are +70% nulls we shouldn't rely on them for finding or predicting CO2 in this data set
co2_df=co2_df.drop("co2_rating", "smog_rating")
co2_df.printSchema()

root
 |-- model_year: integer (nullable = true)
 |-- make: string (nullable = true)
 |-- model: string (nullable = true)
 |-- vehicle_class: string (nullable = true)
 |-- engine_size_l: double (nullable = true)
 |-- cylinders: integer (nullable = true)
 |-- transmission: string (nullable = true)
 |-- fuel_type: string (nullable = true)
 |-- fuel_cons_city_L100km: double (nullable = true)
 |-- fuel_cons_hwy_L100km: double (nullable = true)
 |-- fuel_cons_comb_L100km: double (nullable = true)
 |-- fuel_cons_comb_MPG: integer (nullable = true)
 |-- CO2_emission_gkm: integer (nullable = true)



In [14]:
co2_df.toPandas().info()  

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 27001 entries, 0 to 27000
Data columns (total 13 columns):
 #   Column                 Non-Null Count  Dtype  
---  ------                 --------------  -----  
 0   model_year             27001 non-null  int32  
 1   make                   27001 non-null  object 
 2   model                  27001 non-null  object 
 3   vehicle_class          27001 non-null  object 
 4   engine_size_l          27001 non-null  float64
 5   cylinders              27001 non-null  int32  
 6   transmission           27001 non-null  object 
 7   fuel_type              27001 non-null  object 
 8   fuel_cons_city_L100km  27001 non-null  float64
 9   fuel_cons_hwy_L100km   27001 non-null  float64
 10  fuel_cons_comb_L100km  27001 non-null  float64
 11  fuel_cons_comb_MPG     27001 non-null  int32  
 12  CO2_emission_gkm       27001 non-null  int32  
dtypes: float64(4), int32(4), object(5)
memory usage: 2.3+ MB


In [15]:
# creating lists of predictors names according to their nature

numeric_predictors = ["engine_size_l", "cylinders", "model_year", 
                      "fuel_cons_city_L100km", "fuel_cons_hwy_L100km", 
                      "fuel_cons_comb_L100km", "fuel_cons_comb_MPG"]

categorical_predictors = ["vehicle_class", "make", "transmission", "fuel_type"]

target = "CO2_emission_gkm"


In [16]:
## Building base model using all numeric predictors 

In [17]:
# defining linear reg modeel
lr1 = LinearRegression(featuresCol="features", labelCol="CO2_emission_gkm")
# building the linear regresion evaluator
evaluator = RegressionEvaluator(labelCol="CO2_emission_gkm", predictionCol="prediction", metricName="r2")

In [18]:
# building grid of regularisation parameters
paramGrid = (ParamGridBuilder() .addGrid(lr1.regParam, [0.0, 0.01,0.05, 0.1, 0.3, 0.5, 0.7, 1.0, 1.5]).build())

In [19]:
# building vector assembler to be used in pyspark model
# from pyspark.ml.feature import VectorAssembler
assembler_lr1_all_num = VectorAssembler(inputCols= numeric_predictors, outputCol="features")
output_lr1_all_num = assembler_lr1_all_num.transform(co2_df)
# prepare training and testing data stes
lr1_train_full, lr1_test_full = output_lr1_all_num.randomSplit([0.8, 0.2], seed=42)
lr1_train_full.select("features").show(3)

+--------------------+
|            features|
+--------------------+
|[1.8,4.0,1995.0,1...|
|[1.8,4.0,1995.0,1...|
|[3.2,6.0,1995.0,1...|
+--------------------+
only showing top 3 rows


                                                                                

In [20]:
# building  regression cross validator with 5 folds to validate mmmodels against  regularisation paramters
cv_lr1 = CrossValidator(estimator=lr1, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
cvModel_lr1 = cv_lr1.fit(lr1_train_full)

25/11/26 21:41:40 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
25/11/26 21:41:41 WARN Instrumentation: [c8d649a3] regParam is zero, which might cause numerical instability and overfitting.
25/11/26 21:41:42 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/11/26 21:41:43 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
25/11/26 21:41:49 WARN Instrumentation: [06bfef02] regParam is zero, which might cause numerical instability and overfitting.
25/11/26 21:41:54 WARN Instrumentation: [eea9bee5] regParam is zero, which might cause numerical instability and overfitting.
25/11/26 21:41:58 WARN Instrumentation: [b970317e] regParam is zero, which might cause numerical instability and overfitting.
25/11/26 21:42:02 WARN Instrumentation: [4539a1d7] regParam is zero, which might cause nu

In [21]:
# selecting best model
bestModel = cvModel_lr1.bestModel
# show R2 per regulrisation parameter
print("Cross-validated R2 for each regParam:")
for params, metric in zip(paramGrid, cvModel_lr1.avgMetrics):
    print(f"regParam={params[lr1.regParam]} -> avg R2={metric}")

Cross-validated R2 for each regParam:
regParam=0.0 -> avg R2=0.9056995255592645
regParam=0.01 -> avg R2=0.905704071957645
regParam=0.05 -> avg R2=0.9057056973121286
regParam=0.1 -> avg R2=0.9057051280929519
regParam=0.3 -> avg R2=0.9056929787486249
regParam=0.5 -> avg R2=0.9056702361908832
regParam=0.7 -> avg R2=0.9056407762344676
regParam=1.0 -> avg R2=0.905589100077334
regParam=1.5 -> avg R2=0.905493707196116


In [22]:
best_predictions = bestModel.transform(lr1_test_full)
test_rmse = evaluator.evaluate(best_predictions, {evaluator.metricName: "rmse"})
test_r2   = evaluator.evaluate(best_predictions, {evaluator.metricName: "r2"})
test_mae  = evaluator.evaluate(best_predictions, {evaluator.metricName: "mae"})
print("Test RMSE:", test_rmse)
print("Test MAE :", test_mae)
print("Test R²  :", test_r2)

Test RMSE: 20.079597283294728
Test MAE : 12.010871828718859
Test R²  : 0.9048717846747811


In [23]:
#print("Best regParam:", best_regParam)
print("Intercept:", bestModel.intercept)
for name, coef in zip(numeric_predictors, bestModel.coefficients):
    print(f"{name:25s}: {coef:.4f}")

Intercept: 1402.098663289689
engine_size_l            : 5.7576
cylinders                : 5.7583
model_year               : -0.6085
fuel_cons_city_L100km    : 0.7464
fuel_cons_hwy_L100km     : 6.7660
fuel_cons_comb_L100km    : 2.8548
fuel_cons_comb_MPG       : -2.7680


# Build Models using all variables (numeric + categorical)

In [24]:
# defining the common processes to deal with any categorical var
# from pyspark.ml.feature import StringIndexer, OneHotEncoder

# building indexer to convert to categories into numbers

all_cat_indexers = [StringIndexer(inputCol=cat_col, outputCol=f"{cat_col}_idx", handleInvalid="keep")
                    for cat_col in categorical_predictors]

# building encoder to convert to categories indexes into dummy variables

all_cat_encoders = [OneHotEncoder(inputCol=f"{cat_col}_idx", outputCol=f"{cat_col}_vec")
                    for cat_col in categorical_predictors]


In [25]:
# from pyspark.ml.feature import VectorAssembler
# building assembler for all predictors (cat + numbers)
assembler_all = VectorAssembler(inputCols=[f"{cat_col}_vec" for cat_col in categorical_predictors] + numeric_predictors, outputCol="features")



## Build random forest non linear model using all variables (categorical + numrical)

In [26]:
# Build random forest non linear model using all variables (categorical + numrical)
# from pyspark.ml.regression import RandomForestRegressor
# from pyspark.ml import Pipeline


In [27]:
#feature_pipeline = Pipeline(stages=indexers + [encoder, assembler])
rf1_pipeline = Pipeline(stages=all_cat_indexers + all_cat_encoders + [assembler_all])
rf = RandomForestRegressor(
    labelCol=target,
    featuresCol="features",
    predictionCol="prediction",
    seed=42)

In [28]:
# apply random forestt piplibe to engineer features & 
rf_pipeline = Pipeline(stages=rf1_pipeline.getStages() + [rf])
# definition of random forest grid
rf_paramGrid = (ParamGridBuilder()
    .addGrid(rf.numTrees, [25, 50])#, 75])
    .addGrid(rf.maxDepth, [ 9, 12])#, 15])
    .addGrid(rf.minInfoGain, [0.01])#, 0.1, 0.5])
    .build())

In [29]:
# random forest evaluator
rf_evaluator = RegressionEvaluator(labelCol=target, predictionCol="prediction", metricName="r2")

In [30]:
# cross validation step for randmon forest
rf_cv = CrossValidator(estimator=rf_pipeline, estimatorParamMaps=rf_paramGrid, evaluator=rf_evaluator, numFolds=5)

In [31]:
rf_train_full, rf_test_full = co2_df.randomSplit([0.8, 0.2], seed=42)

In [32]:
# random forest model fitting to training
rf_cv_model = rf_cv.fit(rf_train_full)

25/11/26 21:42:13 WARN DAGScheduler: Broadcasting large task binary with size 1544.6 KiB
25/11/26 21:42:14 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
25/11/26 21:42:21 WARN DAGScheduler: Broadcasting large task binary with size 1544.5 KiB
25/11/26 21:42:21 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
25/11/26 21:42:25 WARN DAGScheduler: Broadcasting large task binary with size 3.8 MiB
25/11/26 21:42:30 WARN DAGScheduler: Broadcasting large task binary with size 5.7 MiB
25/11/26 21:42:32 WARN DAGScheduler: Broadcasting large task binary with size 8.0 MiB
25/11/26 21:42:41 WARN DAGScheduler: Broadcasting large task binary with size 1655.6 KiB
25/11/26 21:42:43 WARN DAGScheduler: Broadcasting large task binary with size 2.9 MiB
25/11/26 21:42:45 WARN DAGScheduler: Broadcasting large task binary with size 4.8 MiB
25/11/26 21:42:54 WARN DAGScheduler: Broadcasting large task binary with size 1655.6 KiB
25/11/26 21:42:55 WARN DAGScheduler: Broad

In [33]:
# identifcation of best rf model
rf_best_model = rf_cv_model.bestModel
# defining the best parameters
best_rf_model = rf_best_model.stages[-1]

In [34]:
# Extract key hyperparameters
best_num_trees = best_rf_model.getNumTrees
best_max_depth = best_rf_model.getMaxDepth()
best_min_gain = best_rf_model.getMinInfoGain()
print("\nBest Random Forest hyperparameters found by CrossValidator:")
print(f"number of trees: {best_num_trees}")
print(f"maximum depth: {best_max_depth}")
print(f"minimum gain: {best_min_gain}")


Best Random Forest hyperparameters found by CrossValidator:
number of trees: 50
maximum depth: 12
minimum gain: 0.01


In [35]:
# predicting using training data
rf_train_predictions = rf_best_model.transform(rf_train_full)
# predict using random forest model best model
rf_predictions = rf_best_model.transform(rf_test_full)
# build model evaluator
rf_evaluator_rmse = RegressionEvaluator(labelCol=target, predictionCol="prediction", metricName="rmse")
rf_evaluator_mae  = RegressionEvaluator(labelCol=target, predictionCol="prediction", metricName="mae")
rf_evaluator_r2   = RegressionEvaluator(labelCol=target, predictionCol="prediction", metricName="r2")
# evaluating model's performance on training data & test data to ensure that the model is not overfitting 
rf_best_model_rmse = rf_evaluator_rmse.evaluate(rf_train_predictions)
rf_best_model_mae  = rf_evaluator_mae.evaluate(rf_train_predictions)
rf_best_model_r2   = rf_evaluator_r2.evaluate(rf_train_predictions)
rf_test_rmse = rf_evaluator_rmse.evaluate(rf_predictions)
rf_test_mae  = rf_evaluator_mae.evaluate(rf_predictions)
rf_test_r2   = rf_evaluator_r2.evaluate(rf_predictions)
print("\n\n Random Forest Regression metric scores on trainindata\n\n")
print(f"Random Forest on training dataset RMSE: {rf_best_model_rmse}")
print(f"Random Forest on training dataset MAE : {rf_best_model_mae}")
print(f"Random Forest on training dataset R2  : {rf_best_model_r2}")
print("\n\n Random Forest Regression metric scores on test set\n\n")
print(f"Random Forest on test set RMSE: {rf_test_rmse}")
print(f"Random Forest on test set MAE : {rf_test_mae}")
print(f"Random Forest on test set R2  : {rf_test_r2}")

                                                                                



 Random Forest Regression metric scores on trainindata


Random Forest on training dataset RMSE: 2.2726680931209327
Random Forest on training dataset MAE : 1.394926207057329
Random Forest on training dataset R2  : 0.9988165017955898


 Random Forest Regression metric scores on test set


Random Forest on test set RMSE: 2.6470084693463787
Random Forest on test set MAE : 1.5619379816619603
Random Forest on test set R2  : 0.9983468585541942


                                                                                

In [36]:
# feature importance 
predictors_importances = best_rf_model.featureImportances.toArray()
# converting to numpy to python floats
predictors_importances = [float(importance_value) for importance_value in predictors_importances]  

predictors_names = assembler_all.getInputCols()
# fill feature importance in spark df
predictors_importance_df = (spark.createDataFrame(list(zip(predictors_names, predictors_importances)),["feature", "importance"]).orderBy("importance", ascending=False))
# export to feature importance to csv
predictors_importance_df.toPandas().to_csv(r"/home/eslamabuelella/Desktop/RF_predictors_importance_df.csv", index=True)

predictors_importance_df.show()



+--------------------+--------------------+
|             feature|          importance|
+--------------------+--------------------+
|       fuel_type_vec| 0.00774787484741296|
|fuel_cons_city_L1...| 5.48752648765268E-4|
|          model_year|4.077014468023422E-4|
|   vehicle_class_vec|1.961810212128047...|
|            make_vec|1.867136417190141...|
|fuel_cons_hwy_L100km|1.341651210060466...|
|    transmission_vec|1.097703427575755...|
|           cylinders|5.622327803230733E-5|
|       engine_size_l|4.206116347974807E-5|
|fuel_cons_comb_L1...|7.228328083821132E-6|
|  fuel_cons_comb_MPG|6.122643982879223E-6|
+--------------------+--------------------+



                                                                                

In [37]:
lr2_cat_num = LinearRegression(labelCol=target, featuresCol="features")
# buiilding indexer, encoder, assembeler pipe line
lr2_pipeline_full = Pipeline(stages=all_cat_indexers + all_cat_encoders + [assembler_all, lr2_cat_num])

In [38]:
# train test splitting
all_cat_train_full, all_cat_test_full = co2_df.randomSplit([0.8, 0.2], seed=42)

In [39]:
lr2_evaluator = RegressionEvaluator(labelCol=target, predictionCol="prediction", metricName="r2")
lr2_paramGrid = (ParamGridBuilder().addGrid(lr2_cat_num.regParam, [0.0, 0.01, 0.05, 0.1, 0.3, 0.5, 0.7, 1.0, 1.5]).build())
lr2_cv_full = CrossValidator(estimator=lr2_pipeline_full, estimatorParamMaps=lr2_paramGrid, evaluator=lr2_evaluator, numFolds=5)
lr2_cvModel_full = lr2_cv_full.fit(all_cat_train_full)

25/11/26 21:48:11 WARN Instrumentation: [43b3e720] regParam is zero, which might cause numerical instability and overfitting.
25/11/26 21:48:11 WARN Instrumentation: [43b3e720] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
25/11/26 21:48:28 WARN Instrumentation: [850c0396] regParam is zero, which might cause numerical instability and overfitting.
25/11/26 21:48:28 WARN Instrumentation: [850c0396] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
25/11/26 21:48:42 WARN Instrumentation: [4c07197a] regParam is zero, which might cause numerical instability and overfitting.
25/11/26 21:48:42 WARN Instrumentation: [4c07197a] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
25/11/26 21:48:55 WARN Instrumentation: [3ad86e24] regParam is zero, which might cause numerical instability and overfitting.
25/11/26 21:48:55 WARN Instrumentation: [3ad86e24] Cholesky solv

In [40]:
# best linear model selection 
bestModel_lr2_full = lr2_cvModel_full.bestModel
best_lr2_stage = bestModel_lr2_full.stages[-1]
print("Cross-validated RMSE for each regParam:")
for params, metric in zip(lr2_paramGrid, lr2_cvModel_full.avgMetrics):
    print(f"for regularisation parameter = {params[lr2_cat_num.regParam]} has avg R2={metric}")
best_regParam_full_lr2 = best_lr2_stage._java_obj.parent().getRegParam()
print("\nBest regularisation Parameter selected by CV:", best_regParam_full_lr2)

Cross-validated RMSE for each regParam:
for regularisation parameter = 0.0 has avg R2=0.994907519040782
for regularisation parameter = 0.01 has avg R2=0.9948993469405375
for regularisation parameter = 0.05 has avg R2=0.9948889132379117
for regularisation parameter = 0.1 has avg R2=0.9948805628136936
for regularisation parameter = 0.3 has avg R2=0.9948207835514937
for regularisation parameter = 0.5 has avg R2=0.9947241735856549
for regularisation parameter = 0.7 has avg R2=0.9946011903802956
for regularisation parameter = 1.0 has avg R2=0.9943811960980591
for regularisation parameter = 1.5 has avg R2=0.9939500161785004

Best regularisation Parameter selected by CV: 0.0


In [41]:
lr2_pred_full = bestModel_lr2_full.transform(all_cat_test_full)
lr2_test_rmse_full = lr2_evaluator.evaluate(lr2_pred_full, {lr2_evaluator.metricName: "rmse"})
lr2_test_mae_full  = lr2_evaluator.evaluate(lr2_pred_full, {lr2_evaluator.metricName: "mae"})
lr2_test_r2_full   = lr2_evaluator.evaluate(lr2_pred_full, {lr2_evaluator.metricName: "r2"})
print("Test RMSE:", lr2_test_rmse_full)
print("Test MAE :", lr2_test_mae_full)
print("Test R²  :", lr2_test_r2_full)

Test RMSE: 4.595858077981154
Test MAE : 2.490036328170884
Test R²  : 0.9950165217115171


In [42]:
lr2_coeffs = best_lr2_stage.coefficients
lr2_intercept = best_lr2_stage.intercept
print("Intercept:", lr2_intercept)
print("Number of coefficients:", len(lr2_coeffs))

Intercept: -23.77869130346107
Number of coefficients: 150


In [43]:
print("coefficients of best linear regression 'lr2' model:", lr2_coeffs)

coefficients of best linear regression 'lr2' model: [-0.1641687383122296,-0.14289092531507464,0.4039027483765898,1.080955440845166,-0.6584058676737291,-0.01889697366412193,0.327645602973472,-0.4400131681511327,1.2369133727272152,-0.8416396069220249,-0.5243789662505579,-0.11869295935242871,0.03443885152787105,-0.439908337683742,-1.0447862989887484,-2.2254210420608223,1.9149973153533888,3.648939042136534,-0.5322245906853331,-0.8963174909633589,0.1428224716781134,-0.14201159219866671,0.3645593888214647,0.2163641309709331,-0.7395261069973066,-2.050454199897383,-1.198615273609069,-0.9890689752904869,-1.1918434756545118,-0.9948362264007505,4.626146559546684,-1.4834517070265005,-1.5476177528697013,-0.5430246983013223,2.8662147657500308,-1.2383689322479756,0.4362333804358753,-0.2879676701146987,-0.6910230770919293,0.393449914227359,-1.6443959513994957,0.3256552941928155,-1.621942113002703,-1.7912729712482094,1.510614385318558,-1.1063606009087013,-1.7489041838131292,3.059917471780421,-0.4497492

In [44]:
sample_lr2_df = bestModel_lr2_full.transform(all_cat_train_full.limit(1))
sample_lr2_df

DataFrame[model_year: int, make: string, model: string, vehicle_class: string, engine_size_l: double, cylinders: int, transmission: string, fuel_type: string, fuel_cons_city_L100km: double, fuel_cons_hwy_L100km: double, fuel_cons_comb_L100km: double, fuel_cons_comb_MPG: int, CO2_emission_gkm: int, vehicle_class_idx: double, make_idx: double, transmission_idx: double, fuel_type_idx: double, vehicle_class_vec: vector, make_vec: vector, transmission_vec: vector, fuel_type_vec: vector, features: vector, prediction: double]

In [45]:
# Extract metadata from the 'features' column
lr2_feature_metadata = sample_lr2_df.schema["features"].metadata["ml_attr"]["attrs"]
lr2_feature_metadata

{'binary': [{'name': 'vehicle_class_vec_compact', 'idx': 0},
  {'name': 'vehicle_class_vec_mid-size', 'idx': 1},
  {'name': 'vehicle_class_vec_suv', 'idx': 2},
  {'name': 'vehicle_class_vec_pickup-truck-standard', 'idx': 3},
  {'name': 'vehicle_class_vec_subcompact', 'idx': 4},
  {'name': 'vehicle_class_vec_suv-small', 'idx': 5},
  {'name': 'vehicle_class_vec_full-size', 'idx': 6},
  {'name': 'vehicle_class_vec_two-seater', 'idx': 7},
  {'name': 'vehicle_class_vec_suv-standard', 'idx': 8},
  {'name': 'vehicle_class_vec_minicompact', 'idx': 9},
  {'name': 'vehicle_class_vec_station-wagon-small', 'idx': 10},
  {'name': 'vehicle_class_vec_minivan', 'idx': 11},
  {'name': 'vehicle_class_vec_pickup-truck-small', 'idx': 12},
  {'name': 'vehicle_class_vec_station-wagon-mid-size', 'idx': 13},
  {'name': 'vehicle_class_vec_van-cargo', 'idx': 14},
  {'name': 'vehicle_class_vec_van-passenger', 'idx': 15},
  {'name': 'vehicle_class_vec_special-purpose-vehicle', 'idx': 16},
  {'name': 'vehicle_clas

In [46]:
lr2_attrs = []
for lr2_attr_type in ["binary", "numeric"]:
    if lr2_attr_type in lr2_feature_metadata:
        lr2_attrs.extend(lr2_feature_metadata[lr2_attr_type])
# sort index to be in correct order
lr2_attrs = sorted(lr2_attrs, key=lambda variable: variable["idx"])
# Extract feature names in order
lr2_feature_names = [variable_name["name"] for variable_name in lr2_attrs]

In [47]:
# import pandas as pd 
coef_values = best_lr2_stage.coefficients.toArray()
coef_df = pd.DataFrame({"feature": lr2_feature_names, "coefficient": coef_values})
coef_df["abs_coefficient"] = coef_df["coefficient"].abs()
# Sort by absolute size (strongest effects first)
coef_df_sorted = coef_df.sort_values("abs_coefficient", ascending=False)
print("Intercept:", best_lr2_stage.intercept)
coef_df_sorted.head(50)

Intercept: -23.77869130346107


Unnamed: 0,feature,coefficient,abs_coefficient
140,fuel_type_vec_E,-112.48985,112.48985
142,fuel_type_vec_N,-63.597055,63.597055
141,fuel_type_vec_D,43.688277,43.688277
103,make_vec_Bugatti,21.805115,21.805115
148,fuel_cons_comb_L100km,15.200116,15.200116
106,make_vec_BUGATTI,13.490865,13.490865
71,make_vec_FERRARI,9.812368,9.812368
139,fuel_type_vec_Z,7.72528,7.72528
88,make_vec_Lamborghini,7.636292,7.636292
138,fuel_type_vec_X,7.526978,7.526978


In [48]:
coef_df_sorted.to_csv(r"/home/eslamabuelella/Desktop/full_linear_model_coefficients.csv", index=False)

In [49]:
coef_df = pd.DataFrame({"feature": lr2_feature_names, "coefficient": coef_values})
coef_df["abs_coefficient"] = coef_df["coefficient"].abs()

In [53]:
# the idea behind creating a function to use it in any model later
def get_field_name(vectorised_feature: str):
    # to handle one hot encoded categorical similar to "fuel_type_vec_E"
    if "_vec_" in vectorised_feature:
        return vectorised_feature.split("_vec_")[0]
    # to handle encoded but without category (rare)
    elif vectorised_feature.endswith("_vec"):
        return vectorised_feature[:-4]
    # numeric feature
    else:
        return vectorised_feature
coef_df["field"] = coef_df["feature"].apply(get_field_name)

In [54]:
coef_df[["feature", "field"]].head(20)

Unnamed: 0,feature,field
0,vehicle_class_vec_compact,vehicle_class
1,vehicle_class_vec_mid-size,vehicle_class
2,vehicle_class_vec_suv,vehicle_class
3,vehicle_class_vec_pickup-truck-standard,vehicle_class
4,vehicle_class_vec_subcompact,vehicle_class
5,vehicle_class_vec_suv-small,vehicle_class
6,vehicle_class_vec_full-size,vehicle_class
7,vehicle_class_vec_two-seater,vehicle_class
8,vehicle_class_vec_suv-standard,vehicle_class
9,vehicle_class_vec_minicompact,vehicle_class


In [55]:
avg_abs_by_field = coef_df.groupby("field")["abs_coefficient"].agg(avg_abs_coefficient_LR = "mean", 
                                                                    sum_abs_coefficient_LR = "sum", 
                                                                    median_abs_coefficient_LR = "median").reset_index()
avg_abs_by_field = avg_abs_by_field.rename(columns={"abs_coefficient": "avg_abs_coefficient_LR"}).sort_values("avg_abs_coefficient_LR", ascending=False)
avg_abs_by_field

Unnamed: 0,field,avg_abs_coefficient_LR,sum_abs_coefficient_LR,median_abs_coefficient_LR
6,fuel_type,47.005488,235.02744,43.688277
3,fuel_cons_comb_L100km,15.200116,15.200116,15.200116
2,fuel_cons_city_L100km,2.852271,2.852271,2.852271
5,fuel_cons_hwy_L100km,2.306729,2.306729,2.306729
7,make,2.165584,194.902594,1.375276
4,fuel_cons_comb_MPG,0.885826,0.885826,0.885826
10,vehicle_class,0.848166,15.266995,0.482196
9,transmission,0.799963,23.9989,0.698122
1,engine_size_l,0.79665,0.79665,0.79665
0,cylinders,0.147984,0.147984,0.147984


## inspect the colinearity between the numeric variables

In [56]:
#co2_df[numeric_predictors+[target]].toPandas.corr()
numeric_co2_df_corr = co2_df.select(numeric_predictors + [target]).toPandas().corr()
# export correlation matrix to be visualised in Tabluea
numeric_co2_df_corr.to_csv(r"/home/eslamabuelella/Desktop/numeric_co2_df_corr.csv", index=True)
numeric_co2_df_corr

Unnamed: 0,engine_size_l,cylinders,model_year,fuel_cons_city_L100km,fuel_cons_hwy_L100km,fuel_cons_comb_L100km,fuel_cons_comb_MPG,CO2_emission_gkm
engine_size_l,1.0,0.910301,-0.058541,0.819401,0.759076,0.806844,-0.755171,0.824808
cylinders,0.910301,1.0,-0.045877,0.786549,0.698313,0.763336,-0.712212,0.791384
model_year,-0.058541,-0.045877,1.0,-0.25195,-0.251898,-0.254957,0.274564,-0.276442
fuel_cons_city_L100km,0.819401,0.786549,-0.25195,1.0,0.950006,0.993385,-0.926304,0.930649
fuel_cons_hwy_L100km,0.759076,0.698313,-0.251898,0.950006,1.0,0.979384,-0.889478,0.908668
fuel_cons_comb_L100km,0.806844,0.763336,-0.254957,0.993385,0.979384,1.0,-0.923746,0.933518
fuel_cons_comb_MPG,-0.755171,-0.712212,0.274564,-0.926304,-0.889478,-0.923746,1.0,-0.90532
CO2_emission_gkm,0.824808,0.791384,-0.276442,0.930649,0.908668,0.933518,-0.90532,1.0


* Since our Target variable is CO2_emission_gkm we found that the stronest correlated predictor:
    1. fuel_cons_comb_L100km : 0.933518
    2. fuel_cons_city_L100km : 0.930649
    3. fuel_cons_hwy_L100km : 0.908668
    4. fuel_cons_comb_MPG : -0.905320
    5. engine_size_l : 0.824808
    6. cylinders : 0.791384
* We found that the stronget predictor **fuel_cons_comb_L100km** is strongly correlated with all the above predictors which indicate clear colinearity between all the strongly correlated predictor.
    1. fuel_cons_city_L100km : 0.993385
    2. fuel_cons_hwy_L100km : 0.979384
    3. fuel_cons_comb_MPG : -0.923746
    4. engine_size_l : 0.806844
    5. cylinders : 0.763336

## Build Model using strong predictors (feult_comb_cons_L100km + feul_type)

In [57]:
numeric_predictors_lr3 = ["fuel_cons_comb_L100km"]
categorical_predictors_lr3 = ["fuel_type"]
target_lr3 = "CO2_emission_gkm"
lr3_indexers = [StringIndexer(inputCol=cat_col, outputCol=f"{cat_col}_idx", handleInvalid="keep")
                for cat_col in categorical_predictors_lr3]
lr3_encoders = [OneHotEncoder(inputCol=f"{cat_col}_idx", outputCol=f"{cat_col}_vec")
                for cat_col in categorical_predictors_lr3
assembler_all = VectorAssembler(inputCols=[f"{cat_col}_vec" for cat_col in categorical_predictors_lr3] + numeric_predictors_lr3, outputCol="features")
lr3_cat_num = LinearRegression( labelCol=target, featuresCol="features")
lr3_pipeline_full = Pipeline(stages=lr3_indexers + lr3_encoders + [assembler_all, lr3_cat_num])
lr3_train_full, lr3_test_full = co2_df.randomSplit([0.8, 0.2], seed=42)
lr3_evaluator = RegressionEvaluator(labelCol=target, predictionCol="prediction", metricName="r2")
lr3_paramGrid = (ParamGridBuilder().addGrid(lr3_cat_num.regParam, [0.0, 0.01, 0.05, 0.1, 0.3, 0.5, 0.7, 1.0, 1.5]).build())
lr3_cv_full = CrossValidator(estimator=lr3_pipeline_full, estimatorParamMaps=lr3_paramGrid, evaluator=lr3_evaluator, numFolds=5)
lr3_cvModel_full = lr3_cv_full.fit(lr3_train_full)
bestModel_lr3_full = lr3_cvModel_full.bestModel
best_lr3_stage = bestModel_lr3_full.stages[-1]
print("Cross-validated RMSE for each regParam:")
for params, metric in zip(lr3_paramGrid, lr3_cvModel_full.avgMetrics):
    print(f"Reglusation parameter = {params[lr3_cat_num.regParam]} has avg R2 = {metric}")
best_regParam_full_lr3 = best_lr3_stage._java_obj.parent().getRegParam()
print("\nBest regParam selected by CV:", best_regParam_full_lr3)
lr3_pred_full = bestModel_lr3_full.transform(lr3_test_full)
lr3_test_rmse_full = lr3_evaluator.evaluate(lr3_pred_full, {lr3_evaluator.metricName: "rmse"})
lr3_test_mae_full  = lr3_evaluator.evaluate(lr3_pred_full, {lr3_evaluator.metricName: "mae"})
lr3_test_r2_full   = lr3_evaluator.evaluate(lr3_pred_full, {lr3_evaluator.metricName: "r2"})
print("Test RMSE:", lr3_test_rmse_full)
print("Test MAE :", lr3_test_mae_full)
print("Test R²  :", lr3_test_r2_full)
lr3_coeffs = best_lr3_stage.coefficients
lr3_intercept = best_lr3_stage.intercept
print("Intercept:", lr3_intercept)
print("Number of coefficients:", len(lr3_coeffs))

25/11/26 21:52:56 WARN Instrumentation: [546fa17f] regParam is zero, which might cause numerical instability and overfitting.
25/11/26 21:52:56 WARN Instrumentation: [546fa17f] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
25/11/26 21:52:57 ERROR LBFGS: Failure! Resetting history: breeze.optimize.FirstOrderException: Line search zoom failed
25/11/26 21:53:06 WARN Instrumentation: [128878b4] regParam is zero, which might cause numerical instability and overfitting.
25/11/26 21:53:13 WARN Instrumentation: [1341f66b] regParam is zero, which might cause numerical instability and overfitting.
25/11/26 21:53:13 WARN Instrumentation: [1341f66b] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
25/11/26 21:53:20 WARN Instrumentation: [afd7762a] regParam is zero, which might cause numerical instability and overfitting.
25/11/26 21:53:20 WARN Instrumentation: [afd7762a] Cholesky solver failed due to singula

Cross-validated RMSE for each regParam:
Reglusation parameter = 0.0 has avg R2 = 0.9930173118237029
Reglusation parameter = 0.01 has avg R2 = 0.9930172968482373
Reglusation parameter = 0.05 has avg R2 = 0.993016311912878
Reglusation parameter = 0.1 has avg R2 = 0.9930130101975296
Reglusation parameter = 0.3 has avg R2 = 0.992977119821297
Reglusation parameter = 0.5 has avg R2 = 0.9929057221125003
Reglusation parameter = 0.7 has avg R2 = 0.9927998403450975
Reglusation parameter = 1.0 has avg R2 = 0.99257852480265
Reglusation parameter = 1.5 has avg R2 = 0.9920509070711155

Best regParam selected by CV: 0.0
Test RMSE: 5.400657314107092
Test MAE : 2.7471596201370936
Test R²  : 0.9931183497807101
Intercept: -4076.341617543456
Number of coefficients: 6


In [66]:
# use the pipeline to transform a small sample to infer schema

sample_lr3_df = bestModel_lr3_full.transform(lr3_train_full.limit(1))
# Extract metadata from the 'features' column
lr3_feature_metadata = sample_lr3_df.schema["features"].metadata["ml_attr"]["attrs"]
# collect attributes one hot encodes and numeric values
lr3_attrs = []
for lr3_attr_type in ["binary", "numeric"]:
    if lr3_attr_type in lr3_feature_metadata:
        lr3_attrs.extend(lr3_feature_metadata[lr3_attr_type])
# sort index to be in correct order
lr3_attrs = sorted(lr3_attrs, key=lambda lr3_variable: lr3_variable["idx"])
# Extract feature names in order
lr3_feature_names = [variable_name["name"] for variable_name in lr3_attrs]

In [67]:
lr3_coef_values = best_lr3_stage.coefficients.toArray()
lr3_coef_df = pd.DataFrame({"feature": lr3_feature_names, "coefficient": lr3_coef_values})
lr3_coef_df["abs_coefficient"] = lr3_coef_df["coefficient"].abs()
# Sort by absolute size (strongest effects first)
coef_df_sorted = lr3_coef_df.sort_values("abs_coefficient", ascending=False)
print("Intercept:", best_lr3_stage.intercept)
coef_df_sorted.head(len(coef_df_sorted))

Intercept: -4076.341617543456


Unnamed: 0,feature,coefficient,abs_coefficient
3,fuel_type_vec_D,4118.131978,4118.131978
1,fuel_type_vec_Z,4083.24345,4083.24345
0,fuel_type_vec_X,4082.489569,4082.489569
4,fuel_type_vec_N,4005.777418,4005.777418
2,fuel_type_vec_E,3958.000123,3958.000123
5,fuel_cons_comb_L100km,22.57699,22.57699
