In [1]:
import importlib
import subprocess
import sys
import gc

def check_and_install_package(package_name, version=None):
    try:
        importlib.import_module(package_name)
        print(f"\n{package_name} is already installed.")
    except ImportError:
        print(f"\n{package_name} is NOT installed. Installing now...")
        if version:
            subprocess.check_call([sys.executable, "-m", "pip", "install", f"{package_name}=={version}"])
        else:
            subprocess.check_call([sys.executable, "-m", "pip", "install", package_name])
        print(f"{package_name} installation completed.")

# List of packages to check along with specific versions if necessary
packages = [
    {"name": "tqdm", "version": None},
    {"name": "pyspark", "version": "3.1.1"},
    {"name": "gdown", "version": None},
    {"name": "numpy", "version": "1.22.4"},
    {"name": "xgboost", "version": None},
    {"name": "sparkxgb", "version": None},
]

# Checking and installing the packages
for package in packages:
    check_and_install_package(package["name"], package["version"])


tqdm is already installed.

pyspark is already installed.

gdown is already installed.

numpy is already installed.

xgboost is already installed.

sparkxgb is already installed.


In [2]:
!pip install numpy==1.22.4



In [3]:
import numpy
print(numpy.__version__)

1.22.4


In [4]:
!pip install sparkxgb



In [5]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [6]:
import os
import shutil

# Defining local resources directory
local_resources_path = "/resources"
os.makedirs(local_resources_path, exist_ok=True)

# Defining the source paths from mounted Google Drive
xgboost4j_source = "/content/drive/MyDrive/Big Data Analytics - Project/resources/xgboost4j_2.12-1.7.6.jar"
xgboost4j_spark_source = "/content/drive/MyDrive/Big Data Analytics - Project/resources/xgboost4j-spark_2.12-1.7.6.jar"

# Defining the destination paths in the instance's local file system
xgboost4j_dest = os.path.join(local_resources_path, "xgboost4j_2.12-1.7.6.jar")
xgboost4j_spark_dest = os.path.join(local_resources_path, "xgboost4j-spark_2.12-1.7.6.jar")

# Copying the files from Google Drive to the local instance
shutil.copyfile(xgboost4j_source, xgboost4j_dest)
shutil.copyfile(xgboost4j_spark_source, xgboost4j_spark_dest)

# Verifying that the files are copied
print(f"Jar Files copied to: {local_resources_path}")
print(os.listdir(local_resources_path))


Jar Files copied to: /resources
['xgboost4j-spark_2.12-1.7.6.jar', 'xgboost4j_2.12-1.7.6.jar']


**Testing if spark XGB works on dummy data**

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.sql import Row
from pyspark.ml.evaluation import RegressionEvaluator
from sparkxgb import XGBoostRegressor

# Path to the copied JAR files
xgboost4j_jar = "/resources/xgboost4j_2.12-1.7.6.jar"
xgboost4j_spark_jar = "/resources/xgboost4j-spark_2.12-1.7.6.jar"

# Initializing Spark session
spark = SparkSession.builder \
    .appName("Test_SparkXGB") \
    .config("spark.jars", f"{xgboost4j_jar},{xgboost4j_spark_jar}") \
    .getOrCreate()

# Creating some dummy data for testing
data = [
    Row(price=30000, feature1=4.0, feature2=1.2),
    Row(price=25000, feature1=5.0, feature2=1.5),
    Row(price=22000, feature1=6.0, feature2=1.7),
    Row(price=35000, feature1=3.0, feature2=1.1),
    Row(price=28000, feature1=7.0, feature2=1.9),
    Row(price=32000, feature1=8.0, feature2=2.0),
    Row(price=27000, feature1=5.5, feature2=1.8),
]

# Creating DataFrame
df = spark.createDataFrame(data)

# Assembling features
assembler = VectorAssembler(inputCols=["feature1", "feature2"], outputCol="features")
df_assembled = assembler.transform(df)

# Scaling features
scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=True)
scaler_model = scaler.fit(df_assembled)
df_scaled = scaler_model.transform(df_assembled)

# Splitting data
train_df, test_df = df_scaled.randomSplit([0.8, 0.2], seed=42)

# Defining XGBoost Regressor (from sparkxgb)
xgb_regressor = XGBoostRegressor(
    featuresCol="scaled_features",
    labelCol="price",
    maxDepth=6,
    numRound=100,
    objective="reg:squarederror",
)

# Training the model
model = xgb_regressor.fit(train_df)

# Making predictions
predictions = model.transform(test_df)

# Evaluating the model
evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)
print(f"R-Squared Score: {r2}")

# Showing predictions
predictions.select("price", "prediction").show()

spark.catalog.clearCache()
spark.stop()
print("Test_SparkXGB Stopped !")

R-Squared Score: -9.000014062505493
+-----+---------------+
|price|     prediction|
+-----+---------------+
|30000|24999.994140625|
|35000|24999.994140625|
+-----+---------------+

Test_SparkXGB Stopped !


In [7]:
from pyspark.sql import SparkSession

# Defining the path to the copied jar files in the local instance
jar_files = "/resources/xgboost4j_2.12-1.7.6.jar,/resources/xgboost4j-spark_2.12-1.7.6.jar"

# Initializing Spark session with the JAR files
spark = SparkSession.builder \
    .appName("XGBoostRegressor") \
    .config("spark.driver.memory", "120g") \
    .config("spark.executor.memory", "120g") \
    .config("spark.driver.maxResultSize", "40g") \
    .config("spark.executor.memoryOverhead", "40g") \
    .config("spark.executor.cores", "5") \
    .config("spark.kryoserializer.buffer.max", "2047m") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.sql.shuffle.partitions", "400") \
    .config("spark.hadoop.fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem") \
    .config("spark.executor.extraJavaOptions", "-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35 -XX:ConcGCThreads=4 -XX:ParallelGCThreads=4") \
    .config("spark.jars", jar_files) \
    .getOrCreate()

# Verifying Spark session creation
print(f"Spark session started with version: {spark.version}")

Spark session started with version: 3.1.1


In [8]:
# Testing if sparkxgb is loaded properly
try:
    from sparkxgb import XGBoostRegressor

    # Create a test XGBoost model using sparkxgb
    model = XGBoostRegressor()
    print("sparkxgb loaded successfully!")
except Exception as e:
    print(f"Error loading sparkxgb: {e}")


sparkxgb loaded successfully!


# **Predictions BEFORE Feature Engineering**

In [None]:
# loading the df

!cp '/content/drive/MyDrive/Big Data Analytics - Project/Datasets/Processed_DF.parquet' /content/

output_path = '/content/Processed_DF.parquet'
df = spark.read.parquet(output_path)
print("The Processed DataFrame has been loaded successfully.")


The Processed DataFrame has been loaded successfully.


In [None]:
df = df.repartition(100)  # Repartitioning into 100 partitions for parallelism

In [None]:
# Printing the shape of the DataFrame
total_rows = df.count()
total_columns = len(df.columns)

print(f"The shape of the loaded DataFrame is: ({total_rows}, {total_columns})")

The shape of the loaded DataFrame is: (3000040, 42)


### **Handling Categorical Coloumns**

In [None]:
df=df.drop('description','major_options','mileage')
# Keeping the columns ['exterior_color','dealer_zip','interior_color']

In [None]:
# Counting unique values in 'exterior_color' and 'interior_color' columns
exterior_colors_count = df.select('exterior_color').distinct().count()
interior_colors_count = df.select('interior_color').distinct().count()

print(f"Unique exterior colors: {exterior_colors_count}")
print(f"Unique interior colors: {interior_colors_count}")

Unique exterior colors: 23036
Unique interior colors: 38528


In [None]:
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, StringType

# Listing top colors for both exterior and interior
colors = ['White', 'Black', 'Gray', 'Silver', 'Red', 'Blue', 'Brown', 'Green', 'Beige', 'Orange', 'Gold', 'Yellow', 'Purple']

# Creating a UDF to find colors in the color columns
@F.udf(returnType=ArrayType(StringType()))
def find_colors(color_string):
    if color_string is None or color_string.strip() == "":
        return ["Other"]  # Handle empty or null values
    found_colors = [c for c in colors if c.lower() in color_string.lower()]
    return found_colors if found_colors else ["Other"]  # Label non-matching colors as "Other"

# Applying the UDF to both the exterior and interior color columns
df = df.withColumn("exterior_color_array", find_colors("exterior_color"))
df = df.withColumn("interior_color_array", find_colors("interior_color"))

# Creating a column with the count of colors found for both exterior and interior
df = df.withColumn("exterior_color_count", F.size("exterior_color_array"))
df = df.withColumn("interior_color_count", F.size("interior_color_array"))

# Joining the color arrays into string columns
df = df.withColumn("exterior_color", F.array_join("exterior_color_array", ", "))
df = df.withColumn("interior_color", F.array_join("interior_color_array", ", "))

# Labeling mixed colors for both exterior and interior colors
df = df.withColumn(
    "exterior_color",
    F.when(F.col("exterior_color_count") > 1, "Mixed Colors")
     .otherwise(F.col("exterior_color")))

df = df.withColumn(
    "interior_color",
    F.when(F.col("interior_color_count") > 1, "Mixed Colors")
     .otherwise(F.col("interior_color")))

# Dropping temporary columns
df = df.drop("exterior_color_array", "exterior_color_count", "interior_color_array", "interior_color_count")


In [None]:
# Counting the occurrences of each exterior and interior color and calculating percentages
exterior_color_counts = df.groupBy("exterior_color").count().withColumn(
    "percentage", F.round((F.col("count") / df.count()) * 100, 2))

interior_color_counts = df.groupBy("interior_color").count().withColumn(
    "percentage", F.round((F.col("count") / df.count()) * 100, 2))

# Showing the results
print("Exterior Color Distribution:")
exterior_color_counts.orderBy(F.desc("count")).show(truncate=False)

Exterior Color Distribution:
+--------------+------+----------+
|exterior_color|count |percentage|
+--------------+------+----------+
|White         |675979|22.53     |
|Black         |580148|19.34     |
|Other         |543638|18.12     |
|Silver        |384540|12.82     |
|Blue          |253263|8.44      |
|Red           |242331|8.08      |
|Gray          |231172|7.71      |
|Green         |23026 |0.77      |
|Mixed Colors  |19728 |0.66      |
|Brown         |12905 |0.43      |
|Orange        |11638 |0.39      |
|Gold          |10544 |0.35      |
|Beige         |5065  |0.17      |
|Yellow        |4855  |0.16      |
|Purple        |1208  |0.04      |
+--------------+------+----------+



In [None]:
print("Interior Color Distribution:")
interior_color_counts.orderBy(F.desc("count")).show(truncate=False)

Interior Color Distribution:
+--------------+-------+----------+
|interior_color|count  |percentage|
+--------------+-------+----------+
|Black         |1624033|54.13     |
|Other         |577578 |19.25     |
|Gray          |383966 |12.8      |
|Mixed Colors  |171212 |5.71      |
|White         |91545  |3.05      |
|Brown         |65943  |2.2       |
|Red           |34117  |1.14      |
|Silver        |24124  |0.8       |
|Blue          |22828  |0.76      |
|Green         |2048   |0.07      |
|Gold          |1193   |0.04      |
|Orange        |1133   |0.04      |
|Yellow        |134    |0.0       |
|Purple        |121    |0.0       |
|Beige         |65     |0.0       |
+--------------+-------+----------+



In [None]:
print(f"Final processed DataFrame used for the model has {df.count()} rows and {len(df.columns)} columns.")

Final processed DataFrame used for the model has 3000040 rows and 39 columns.


In [None]:
# Calculating the average price
avg_price = df.agg({"price": "avg"}).collect()[0][0]
print(f"Average price of a car: {round(avg_price)}")

Average price of a car: 29933


In [None]:
import pandas as pd
from IPython.display import display
import pyspark.sql.functions as F

# Converting the Spark DataFrame to a Pandas DataFrame and displaying the first 5 rows
pd.set_option('display.max_columns', None)
pandas_df = df.orderBy(F.rand()).limit(5).toPandas()
display(pandas_df)


Unnamed: 0,fuel_type,body_type,city,city_fuel_economy,daysonmarket,dealer_zip,engine_displacement,engine_type,exterior_color,franchise_dealer,fuel_tank_volume,height,highway_fuel_economy,horsepower,interior_color,is_new,latitude,length,listed_date,listing_color,longitude,make_name,maximum_seating,model_name,price,savings_amount,seller_rating,sp_name,torque,transmission,transmission_display,wheel_system_display,wheelbase,width,year,combined_fuel_economy,legroom,log_mileage,major_options_count
0,Gasoline,Sedan,Fort Worth,15.0,3,76108,5000.0,V8,White,True,21.700001,57.0,21.0,470.0,Other,False,32.754902,206.6,2020-09-08,WHITE,-97.4795,Jaguar,5.0,XJ-Series,16995.0,540,3.66,Alfa Romeo of Fort Worth,424.0,A,6-Speed Automatic,Rear-Wheel Drive,124.3,83.1,2011,18.0,85.6,11.45,8
1,Gasoline,Coupe,Broken Arrow,22.0,49,74012,2000.0,I4,White,True,13.2,55.1,29.0,250.0,Black,True,36.060902,168.0,2020-07-24,WHITE,-95.82,Hyundai,4.0,Veloster N,29241.0,0,4.0,Regional Hyundai,260.0,M,6-Speed Manual,Front-Wheel Drive,104.3,71.3,2020,25.5,76.7,2.71,5
2,Gasoline,SUV / Crossover,Temecula,22.0,316,92591,2400.0,I4,Black,True,15.9,65.7,31.0,180.0,Black,True,33.513199,182.0,2019-11-01,BLACK,-117.153999,Jeep,5.0,Cherokee,27430.0,0,4.142857,DCH Chrysler Dodge Jeep Ram FIAT of Temecula,239.0,A,9-Speed Automatic,Front-Wheel Drive,106.5,73.2,2020,26.5,81.4,1.61,4
3,Gasoline,SUV / Crossover,Bloomington,19.0,40,55437,3500.0,V6,Red,True,19.200001,70.1,26.0,295.0,Other,False,44.857601,192.5,2020-08-01,RED,-93.337601,Toyota,8.0,Highlander,34998.0,430,4.64,Walser Toyota,263.0,A,Automatic,All-Wheel Drive,109.8,75.8,2018,22.5,82.6,9.77,10
4,Gasoline,Minivan,Raleigh,19.0,238,27616,3500.0,V6,Silver,True,19.5,69.6,28.0,280.0,Gray,True,35.861301,203.2,2020-01-16,SILVER,-78.581703,Honda,8.0,Odyssey,40830.0,0,4.354839,Leith Honda,262.0,A,Automatic,Front-Wheel Drive,118.1,92.3,2019,23.5,81.8,2.71,7




---



# **XGB**

In [None]:
import warnings
from tqdm import tqdm
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.sql.functions import mean as sql_mean
import pyspark.sql.functions as F

# Ignore warnings
warnings.filterwarnings('ignore')

print("Processing the data...")
with tqdm(total=6, desc="Progress") as pbar:

    df_sample = df.sample(fraction=0.1, seed=42)   # Randomly sampling 10% of the data
    pbar.update(1)

    # Handling categorical columns
    cat_columns = [field for (field, dtype) in df_sample.dtypes if dtype == "string"]
    stages = []
    for col_name in cat_columns:
        indexer = StringIndexer(inputCol=col_name, outputCol=f"{col_name}_indexed", handleInvalid="keep")
        encoder = OneHotEncoder(inputCol=f"{col_name}_indexed", outputCol=f"{col_name}_encoded")
        stages += [indexer, encoder]
    pbar.update(1)

    # Assembling features
    num_columns = [col for col in df_sample.columns if col != 'price' and col not in cat_columns]
    encoded_columns = [f"{col}_encoded" for col in cat_columns]
    feature_columns = num_columns + encoded_columns
    assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
    stages += [assembler]
    pbar.update(1)

    # Adding scaling to the pipeline
    scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withMean=True, withStd=True)
    stages += [scaler]

    # Creating and apply the pipeline
    pipeline = Pipeline(stages=stages)
    pipeline_model = pipeline.fit(df_sample)
    df_sample = pipeline_model.transform(df_sample)
    pbar.update(1)

    # Filling in missing values
    for col in df_sample.columns:
        if df_sample.schema[col].dataType.typeName() in ["double", "float", "int", "long"]:
            mean_value = df_sample.select(sql_mean(col)).first()[0]
            df_sample = df_sample.na.fill({col: mean_value})
    pbar.update(1)

    # Splitting the data
    train_df, test_df = df_sample.randomSplit([0.8, 0.2], seed=42)
    pbar.update(1)

print("\n\nData preprocessing and splitting completed!\n")


Processing the data...


Progress: 100%|██████████| 6/6 [00:51<00:00,  8.64s/it]



Data preprocessing and splitting completed!






In [None]:
print(f"Train_DF has {train_df.count()} rows and {len(train_df.columns)} columns")

Train_DF has 240412 rows and 71 columns


In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
from sparkxgb import XGBoostRegressor
import time

# Model training
print("Training XGBoost model...")

xgb_regressor = XGBoostRegressor(
    featuresCol="scaled_features",  # Use scaled features
    labelCol="price",               # Target column
    maxDepth=6,
    numRound=100,
    objective="reg:squarederror",   # Regression task
    treeMethod="hist",
)


# Before training
start_time = time.time()

# Training the model
model = xgb_regressor.fit(train_df)

# Making predictions
print("Making predictions...")
predictions = model.transform(test_df)

# Evaluating the model
print("Evaluating the model...")
evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)

print(f"\nTrain size: {train_df.count()} samples")
print(f"Test size: {test_df.count()} samples")
print(f"\n\nR-Squared Score (Accuracy): {round(r2 * 100)}%\n")

# Calculating total runtime
end_time = time.time()
total_runtime = (end_time - start_time) / 60
print(f"\nOverall runtime: {round(total_runtime)} minutes.")

Training XGBoost model...
Making predictions...
Evaluating the model...

Train size: 240,412 samples
Test size: 60,072 samples


R-Squared Score (Accuracy): 85.36%


Overall runtime: 119 minutes.


In [None]:
# Calculate additional metrics
mae_evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="mae")
mae = mae_evaluator.evaluate(predictions)

mse_evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="mse")
mse = mse_evaluator.evaluate(predictions)

rmse_evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="rmse")
rmse = rmse_evaluator.evaluate(predictions)

print("Additional Metrics:")
print(f"Mean Absolute Error: {round(mae)}")
print(f"Mean Squared Error: {round(mse)}")
print(f"Root Mean Squared Error: {round(rmse)}")

Additional Metrics:
Mean Absolute Error: 3066
Mean Squared Error: 56468542
Root Mean Squared Error: 7515


The Mean Absolute Error (`MAE`) of **\$3066**  suggests that, on average, the predicted car prices deviate from the actual prices by this amount. Given that the Mean car price is **\$29,933** , this error represents about `10.2% of the mean price`, which suggests that the model performs extremely well.

In [None]:
# Saving the trained XGBoost model
model.write().overwrite().save("/tmp/xgboost_model")

In [None]:
import os
print(os.listdir("/tmp/xgboost_model"))

['data', 'metadata']


In [None]:
import xgboost as xgb

# Loading the model using the XGBoost native API
native_model = xgb.Booster()
native_model.load_model("/tmp/xgboost_model/data/XGBoostRegressionModel")


In [None]:
# Getting feature importances from the loaded native XGBoost model
importance_dict = native_model.get_score(importance_type='weight')

features_list = pipeline_model.stages[-2].getInputCols()  # Get the input column names from the VectorAssembler

# Mapping the feature indices (f0, f1, ...) to the actual feature names safely
sorted_importance = [
    (features_list[int(f[1:])], importance)
    for f, importance in importance_dict.items()
    if int(f[1:]) < len(features_list)  # Ensuring the index is within bounds
]

# Sorting by importance
sorted_importance = sorted(sorted_importance, key=lambda x: x[1], reverse=True)

# Printing the top 10 features with their actual names (sorted by importance)
print("Top 10 Features Ranked by Importance (Highest to Lowest)")
for rank, (feature, importance) in enumerate(sorted_importance[:10], 1):
    print(f"{rank}. {feature}")

Top 10 Features Ranked by Importance (Highest to Lowest)
1. log_mileage
2. daysonmarket
3. year
4. major_options_count
5. city_fuel_economy
6. horsepower
7. latitude
8. fuel_tank_volume
9. longitude
10. savings_amount




---



# **Predictions AFTER Feature Engineering**

In [9]:
!cp '/content/drive/MyDrive/Big Data Analytics - Project/Datasets/Feature_Engineered_DF.parquet' /content/

output_path = '/content/Feature_Engineered_DF.parquet'
df = spark.read.parquet(output_path)
print("The Feature Engineered DataFrame has been loaded successfully.")


The Feature Engineered DataFrame has been loaded successfully.


In [10]:
# Printing the shape of the DataFrame
total_rows = df.count()
total_columns = len(df.columns)

print(f"The shape of the loaded DataFrame is: ({total_rows}, {total_columns})")

The shape of the loaded DataFrame is: (3000040, 47)


In [11]:
# Calculating the average price
avg_price = df.agg({"price": "avg"}).collect()[0][0]
print(f"Average price of a car: {round(avg_price)}")

Average price of a car: 29933


In [12]:
import pandas as pd
from IPython.display import display
import pyspark.sql.functions as F

# Converting the Spark DataFrame to a Pandas DataFrame and displaying 5 random rows with all columns
pd.set_option('display.max_columns', None)
pandas_df = df.orderBy(F.rand()).limit(5).toPandas()
display(pandas_df)


Unnamed: 0,fuel_type,body_type,city,city_fuel_economy,days_in_market,dealer_zip,engine_displacement,engine_type,exterior_color,franchise_dealer,fuel_tank_volume,height,highway_fuel_economy,horsepower,interior_color,is_new,latitude,length,listing_color,longitude,make_name,maximum_seating,model_name,price,savings_amount,seller_rating,sp_name,torque,transmission,transmission_display,wheel_system_display,wheelbase,width,manufactured_year,combined_fuel_economy,legroom,log_mileage,major_options_count,hp_x_engine_disp,hp_x_torque,listed_day,listed_month,listed_year,age,resale_value_score,maintenance_cost,luxury_score
0,Hybrid,Sedan,Aurora,53.0,79,80012,1800.0,I4,Red,True,11.4,56.5,52.0,121.0,Gray,True,39.708401,182.3,RED,-104.865997,Toyota,5.0,Corolla Hybrid,25163.0,0,3.933333,Stevinson Toyota East,265.22,CVT,Continuously Variable Transmission,Front-Wheel Drive,106.3,70.1,2020,52.5,76.8,2.3,4,1.25,-1e-05,25,6,2020,0,27,37,30
1,Hybrid,Hatchback,Beverly,51.0,92,1915,1800.0,I4,Gray,False,11.9,58.7,48.0,134.0,Gray,False,42.552601,176.4,GRAY,-70.8825,Toyota,5.0,Prius,9495.0,447,4.5,CJ Motors,265.22,A,Automatic,Front-Wheel Drive,106.3,68.7,2013,49.5,78.5,11.59,3,1.12,-1e-05,9,6,2020,7,13,29,27
2,Gasoline,SUV / Crossover,Burlington,17.0,176,66839,3500.0,V6,Other,True,18.6,68.0,24.0,262.0,Other,False,38.181599,201.8,UNKNOWN,-95.738701,Ford,7.0,Flex,10775.0,989,4.75,Crow-Moddie Chevrolet Ford,248.0,A,Automatic,Front-Wheel Drive,117.9,88.8,2009,20.5,85.1,11.39,3,0.07,-0.02595,19,3,2020,11,12,32,32
3,Gasoline,SUV / Crossover,Rochester,17.0,7,55901,3500.0,V6,Black,True,21.0,72.7,24.0,250.0,Other,False,44.036098,191.4,BLACK,-92.512299,Honda,8.0,Pilot,18750.0,503,4.028572,Tom Kadlec Honda,253.0,A,5-Speed Automatic,Four-Wheel Drive,109.2,78.5,2015,20.5,79.9,11.61,6,0.01,-0.00308,3,9,2020,5,23,36,31
4,Flex Fuel Vehicle,Hatchback,Kenner,26.0,44,70065,2000.0,I4,White,False,12.4,57.8,38.0,160.0,Brown,False,30.014299,171.7,WHITE,-90.243401,Ford,5.0,Focus,11998.0,932,4.181818,CarMax Kenner - Now offering Curbside Pickup,148.0,A,Automatic,Front-Wheel Drive,104.3,80.5,2016,32.0,76.3,10.9,3,0.71,1.07428,29,7,2020,4,20,35,31


In [13]:
import warnings
from tqdm import tqdm
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.sql.functions import mean as sql_mean
import pyspark.sql.functions as F

# Ignore warnings
warnings.filterwarnings('ignore')

print("Processing the data...")
with tqdm(total=6, desc="Progress") as pbar:

    df_sample = df.sample(fraction=0.1, seed=42)   # Randomly sampling 10% of the data
    pbar.update(1)

    # Handling categorical columns
    cat_columns = [field for (field, dtype) in df_sample.dtypes if dtype == "string"]
    stages = []
    for col_name in cat_columns:
        indexer = StringIndexer(inputCol=col_name, outputCol=f"{col_name}_indexed", handleInvalid="keep")
        encoder = OneHotEncoder(inputCol=f"{col_name}_indexed", outputCol=f"{col_name}_encoded")
        stages += [indexer, encoder]
    pbar.update(1)

    # Assembling features
    num_columns = [col for col in df_sample.columns if col != 'price' and col not in cat_columns]
    encoded_columns = [f"{col}_encoded" for col in cat_columns]
    feature_columns = num_columns + encoded_columns
    assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
    stages += [assembler]
    pbar.update(1)

    # Adding scaling to the pipeline
    scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withMean=True, withStd=True)
    stages += [scaler]

    # Creating and apply the pipeline
    pipeline = Pipeline(stages=stages)
    pipeline_model = pipeline.fit(df_sample)
    df_sample = pipeline_model.transform(df_sample)
    pbar.update(1)

    # Filling in missing values
    for col in df_sample.columns:
        if df_sample.schema[col].dataType.typeName() in ["double", "float", "int", "long"]:
            mean_value = df_sample.select(sql_mean(col)).first()[0]
            df_sample = df_sample.na.fill({col: mean_value})
    pbar.update(1)

    # Splitting the data
    train_df, test_df = df_sample.randomSplit([0.8, 0.2], seed=42)
    pbar.update(1)

print("\n\nData preprocessing and splitting completed!")


Processing the data...


Progress: 100%|██████████| 6/6 [00:34<00:00,  5.76s/it]



Data preprocessing and splitting completed!





In [14]:
from pyspark.ml.evaluation import RegressionEvaluator
from sparkxgb import XGBoostRegressor
import time

# Model training
print("Training XGBoost model...")

xgb_regressor = XGBoostRegressor(
    featuresCol="scaled_features",  # Use scaled features
    labelCol="price",               # Target column
    maxDepth=6,
    numRound=100,
    objective="reg:squarederror",   # Regression task
    treeMethod="hist",
)


# Before training
start_time = time.time()

# Training the model
model = xgb_regressor.fit(train_df)

# Making predictions
print("Making predictions...")
predictions = model.transform(test_df)

# Evaluating the model
print("Evaluating the model...")
evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)

print(f"\nTrain size: {train_df.count()} samples")
print(f"Test size: {test_df.count()} samples")
print(f"\n\nR-Squared Score (Accuracy): {round(r2 * 100)}%\n")

# Calculating total runtime
end_time = time.time()
total_runtime = (end_time - start_time) / 60
print(f"\nOverall runtime: {round(total_runtime)} minutes.")

Training XGBoost model...
Making predictions...
Evaluating the model...

Train size: 240048 samples
Test size: 59933 samples


R-Squared Score (Accuracy): 92%


Overall runtime: 76 minutes.


In [15]:
new_model_path = "/content/drive/MyDrive/Big Data Analytics - Project/models/XGB_Regression_model_FE"

# Saving the trained model to the new path
model.save(new_model_path)

print(f"Model saved successfully at {new_model_path}")

Model saved successfully at /content/drive/MyDrive/Big Data Analytics - Project/models/XGB_Regression_model_FE


In [16]:
# Calculating additional metrics
mae_evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="mae")
mae = mae_evaluator.evaluate(predictions)

mse_evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="mse")
mse = mse_evaluator.evaluate(predictions)

rmse_evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="rmse")
rmse = rmse_evaluator.evaluate(predictions)

print("Additional Metrics:")
print(f"Mean Absolute Error: {round(mae)}")
print(f"Mean Squared Error: {round(mse)}")
print(f"Root Mean Squared Error: {round(rmse)}")

Additional Metrics:
Mean Absolute Error: 3018
Mean Squared Error: 27751838
Root Mean Squared Error: 5268


In **XGBoost**, the model saw an **improvement in R² accuracy by 6.48%**, while the Mean Absolute Error **(MAE) was reduced by 0.14%**, and the Root Mean Square Error **(RMSE) was reduced by 7.50%**, demonstrating notable improvements in prediction accuracy.

In [None]:
# Saving the trained XGBoost model
model.write().overwrite().save("/tmp/xgboost_model")

In [None]:
import os
print(os.listdir("/tmp/xgboost_model"))

['data', 'metadata']


In [None]:
import xgboost as xgb

# Loading the model using the XGBoost native API
native_model = xgb.Booster()
native_model.load_model("/tmp/xgboost_model/data/XGBoostRegressionModel")


In [None]:
# Getting feature importances from the loaded native XGBoost model
importance_dict = native_model.get_score(importance_type='weight')

features_list = pipeline_model.stages[-2].getInputCols()  # Getting the input column names from the VectorAssembler

# Mapping the feature indices (f0, f1, ...) to the actual feature names safely
sorted_importance = [
    (features_list[int(f[1:])], importance)
    for f, importance in importance_dict.items()
    if int(f[1:]) < len(features_list)  # Ensuring the index is within bounds
]

# Sorting by importance
sorted_importance = sorted(sorted_importance, key=lambda x: x[1], reverse=True)

# Printing the top 10 features with their actual names (sorted by importance)
print("Top 10 Features Ranked by Importance (Highest to Lowest)")
for rank, (feature, importance) in enumerate(sorted_importance[:10], 1):
    print(f"{rank}. {feature}")

Top 10 Features Ranked by Importance (Highest to Lowest)
1. log_mileage
2. days_in_market
3. maintenance_cost
4. city_fuel_economy
5. major_options_count
6. latitude
7. manufactured_year
8. luxury_score
9. seller_rating
10. longitude




---



### **Punishing Large errors**

In [None]:
import warnings
import numpy as np
from tqdm import tqdm
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.sql.functions import mean as sql_mean
import pyspark.sql.functions as F
from pyspark.sql.functions import when, lit

# Ignore warnings
warnings.filterwarnings('ignore')

print("Processing the data...")
with tqdm(total=5, desc="Progress") as pbar:

    df_sample = df.sample(fraction=0.1, seed=42)  # Randomly sample 300k records of the data
    pbar.update(1)

    # Handle categorical columns
    cat_columns = [field for (field, dtype) in df_sample.dtypes if dtype == "string"]
    stages = []
    for col_name in cat_columns:
        indexer = StringIndexer(inputCol=col_name, outputCol=f"{col_name}_indexed", handleInvalid="keep")
        encoder = OneHotEncoder(inputCol=f"{col_name}_indexed", outputCol=f"{col_name}_encoded")
        stages += [indexer, encoder]
    pbar.update(1)

    # Assemble features
    num_columns = [col for col in df_sample.columns if col != 'price' and col not in cat_columns]
    encoded_columns = [f"{col}_encoded" for col in cat_columns]
    feature_columns = num_columns + encoded_columns
    assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
    stages += [assembler]
    pbar.update(1)

    # Add scaling to the pipeline
    scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withMean=True, withStd=True)
    stages += [scaler]

    # Create and apply the pipeline
    pipeline = Pipeline(stages=stages)
    pipeline_model = pipeline.fit(df_sample)
    df_sample = pipeline_model.transform(df_sample)
    pbar.update(1)

    # Fill missing values
    for col in df_sample.columns:
        if df_sample.schema[col].dataType.typeName() in ["double", "float", "int", "long"]:
            mean_value = df_sample.select(sql_mean(col)).first()[0]
            df_sample = df_sample.na.fill({col: mean_value})
    pbar.update(1)

    # Split the data
    train_df, test_df = df_sample.randomSplit([0.8, 0.2], seed=42)
    pbar.update(1)

print("\n\nData preprocessing and splitting completed!")

print(f"Train_DF has {train_df.count()} rows and {len(train_df.columns)} columns")

from pyspark.ml.evaluation import RegressionEvaluator
from sparkxgb import XGBoostRegressor
import time
print("-------------------------------------------------------------------------------------------------------------------------------")
# Model training
print("Training XGBoost model...")

# This function will be used by XGBoost during training
from pyspark.sql.types import FloatType
from pyspark.sql.functions import udf

@udf(returnType=FloatType())
def custom_weighted_loss_udf(preds, labels):
    # Calculate residuals (errors)
    residuals = preds - labels
    # Penalize more heavily for prices above 38,210 (75th percentile)
    weight = np.where(labels > 38210, 2.0, 1.0)  # Apply weight of 2 for cars above 38,210
    # Calculate weighted squared error
    loss = (residuals ** 2) * weight
    return float(loss)  # Ensure the output is a float for Spark


# Create a weight column
train_df = train_df.withColumn("weight", when(train_df["price"] > 38210, 2.0).otherwise(1.0))

# Initialize the XGBoostRegressor with weight column
xgb_regressor = XGBoostRegressor(
    featuresCol="scaled_features",
    labelCol="price",
    weightCol="weight",  # Use the weight column
    maxDepth=6,
    numRound=100,
    objective="reg:squarederror",
    treeMethod="hist",
)

# Before training
start_time = time.time()

# Train the model
model = xgb_regressor.fit(train_df)

# Make predictions
print("Making predictions...")
predictions = model.transform(test_df)

# Apply custom loss to predictions for evaluation
predictions = predictions.withColumn("custom_loss", custom_weighted_loss_udf(predictions["prediction"], predictions["price"]))

# Evaluate the model
print("Evaluating the model...")
evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)

print(f"\nTrain size: {train_df.count()} samples")
print(f"Test size: {test_df.count()} samples")
print(f"\n\nR-Squared Score (Accuracy): {r2 * 100:.2f}%")

# Calculate total runtime
end_time = time.time()
total_runtime = (end_time - start_time) / 60  # Convert seconds to minutes

print(f"\n\nOverall runtime: {round(total_runtime)} minutes.")
print("-------------------------------------------------------------------------------------------------------------------------------")

# Calculate additional metrics
mae_evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="mae")
mae = mae_evaluator.evaluate(predictions)

mse_evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="mse")
mse = mse_evaluator.evaluate(predictions)

rmse_evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="rmse")
rmse = rmse_evaluator.evaluate(predictions)

print("\nAdditional Metrics:")
print(f"Mean Absolute Error: {round(mae)}")
print(f"Mean Squared Error: {round(mse)}")
print(f"Root Mean Squared Error: {round(rmse)}")

Processing the data...


Progress: 6it [00:30,  5.04s/it]                       




Data preprocessing and splitting completed!
Train_DF has 240048 rows and 77 columns
-------------------------------------------------------------------------------------------------------------------------------
Training XGBoost model...
Making predictions...
Evaluating the model...

Train size: 240048 samples
Test size: 59933 samples


R-Squared Score (Accuracy): 91.70%


Overall runtime: 76 minutes.
-------------------------------------------------------------------------------------------------------------------------------

Additional Metrics:
Mean Absolute Error: 3090
Mean Squared Error: 28202433
Root Mean Squared Error: 5311


### **No substantial improvement in performance**

In [None]:
# Select the relevant columns ('price' and 'prediction') and show the first 10 rows
predictions.select("price", "prediction").show(10)

+-------+--------------+
|  price|    prediction|
+-------+--------------+
|56270.0| 63611.0078125|
|60637.0| 62643.0078125|
|48365.0| 57950.5546875|
|59428.0|63419.37109375|
|59939.0|61276.08203125|
|60055.0| 67952.7265625|
|46976.0|     59387.875|
|46600.0| 43039.3984375|
|63470.0| 64744.4765625|
|71998.0| 70182.4296875|
+-------+--------------+
only showing top 10 rows

