Importing the data

In [None]:
import pandas as pd
data = pd.read_csv('Final_df.csv')

Data Pre-Processing

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, r2_score

data_copy = data
data_types = data_copy.dtypes

print(data.head(2))

In [None]:
data_copy = data_copy.dropna(subset=['odometer', 'manufacturer', 'model'])

In [None]:
data_copy.fillna('unknown', inplace=True)

In [None]:
data_copy = data_copy.drop_duplicates()

In [None]:
manufacturer_values = data_copy['manufacturer'].value_counts()
data_copy['manufacturer'] =  data_copy['manufacturer'].apply(lambda x: x if str(x) in manufacturer_values[:20] else 'others')

In [None]:
region_values = data_copy['region'].value_counts()
data_copy['region'] = data_copy['region'].apply(lambda x: x if str(x) in region_values[:50] else 'others')
model_values = data_copy['model'].value_counts()
data_copy['model'] = data_copy['model'].apply(lambda x: x if str(x) in model_values[:50] else 'others')

In [None]:
price_percentile25 = data_copy['price'].quantile(0.25)
price_percentile75 = data_copy['price'].quantile(0.75)
price_iqr = price_percentile75 - price_percentile25
price_upper_limit = price_percentile75 + 1.5 * price_iqr
price_lower_limit = data_copy['price'].quantile(0.15)
new_df = data_copy[(data_copy['price'] < price_upper_limit) & (data_copy['price'] > price_lower_limit)]
odometer_percentile75 = data_copy['odometer'].quantile(0.75)
odometer_percentile25 = data_copy['odometer'].quantile(0.25)
odometer_iqr = odometer_percentile75 - odometer_percentile25
odometer_upper_limit = odometer_percentile75 + 1.5 * odometer_iqr
odometer_lower_limit = data_copy['odometer'].quantile(0.05)
new_df = new_df[(new_df['odometer'] < odometer_upper_limit) & (new_df['odometer'] > odometer_lower_limit)]

In [None]:
new_df['odometer'] = new_df['odometer'].astype(int)

In [None]:
# new_df = new_df[new_df['year'] > 1996]
# new_df.shape
# new_df['car_age'] = 2022 - new_df['year']
# new_df.drop(['year'], axis = 1, inplace = True)

In [None]:
final_df = new_df.copy()

Split data into Train and Test and also deal with categorical columns

In [None]:
from sklearn.model_selection import train_test_split
df_encoded = pd.get_dummies(final_df, columns=['region', 'manufacturer', 'model', 'condition', 'cylinders',
                                         'fuel', 'title_status', 'transmission', 'drive', 'type', 'paint_color'])

X = df_encoded.drop('price', axis=1)
y = df_encoded['price']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [None]:

# X_train, X_test, y_train, y_test = train_test_split(final_df.drop(['price'], axis = 1), final_df['price'], random_state = 42, test_size = .2)

### Running XGBoost without any distributed training and without Spark

In [None]:
import xgboost as xgb
from sklearn.metrics import mean_squared_error
import numpy as np
import time

# Convert the dataset into an optimized data structure called Dmatrix that XGBoost supports
dtrain = xgb.DMatrix(X_train, label=y_train, enable_categorical=True)
dtest = xgb.DMatrix(X_test, label=y_test, enable_categorical=True)

# Specify parameters (there are a lot of tuning options here)
params = {
    'max_depth': 3,  # the maximum depth of each tree
    'eta': 0.3,  # the training step for each iteration
    'objective': 'reg:squarederror',  # regression with squared loss
    'num_boost_round': 1000  # the number of boosting rounds or trees to build
}

# Train the model
start = time.time()
bst = xgb.train(params, dtrain, num_boost_round=params['num_boost_round'])
end = time.time()
print("Training time in seconds: ")
print(end - start)

# Predict the labels of the test set
y_pred = bst.predict(dtest)

# Calculate and print the mean squared error
r2 = r2_score(y_test, y_pred)
mse = mean_squared_error(y_test, y_pred)
print(f"R-squared Score: {r2}")
print(f"Mean Squared Error: {mse}")


In [None]:
!pip install pyspark


### Running XGBoost with Spark

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col
from xgboost.spark import SparkXGBRegressor
from pyspark.ml.evaluation import RegressionEvaluator
import time

# Initialize Spark Session
# spark = SparkSession.builder.appName("XGBoostRegressor").getOrCreate()
spark = SparkSession.builder \
    .appName("XGBoostRegressor") \
    .master("local[2]") \
    .config("spark.executor.cores", "2") \
    .getOrCreate()

# Load your data into a Spark DataFrame (assuming df is your pandas DataFrame)
sdf = spark.createDataFrame(df_encoded)

# Convert columns to features column (VectorAssembler is used to transform the input columns to a single vector column)
assembler = VectorAssembler(inputCols=[c for c in sdf.columns if c != 'price'], outputCol="features")
sdf_transformed = assembler.transform(sdf)

# Define the label column
sdf_transformed = sdf_transformed.withColumnRenamed("price", "label")

# Splitting the dataset into the Training set and Test set
train, test = sdf_transformed.randomSplit([0.8, 0.2], seed=42)



In [None]:
spark.conf.set("spark.sql.legacy.setCommandRejectsSparkCoreConfs","false")
spark.conf.set("spark.executor.cores", "1")

In [None]:
spark.conf.set("spark.sql.legacy.setCommandRejectsSparkCoreConfs","false")
num_repeat = 10
max_worker = 4


run_time, R2, RMSE = {}, {}, {}
for num_worker in range(1,max_worker+1):
  print(f"num_worker={num_worker}")
  spark.conf.set("spark.executor.cores", str(num_worker))
  run_time[num_worker] = []
  R2[num_worker] = []
  RMSE[num_worker] = []

  for _ in range(num_repeat):
    spark_reg_estimator = SparkXGBRegressor(
        features_col="features",
        label_col="label",
        numWorkers=num_worker,  # number of workers; adjust based on your cluster
        maxDepth=3,
        eta=0.3,
        objective='reg:squarederror',
        numRound=1000
    )
    # Train the model
    start = time.time()
    spark_model = spark_reg_estimator.fit(train)
    end = time.time()
    run_time[num_worker].append(end - start)
    # Predict the labels of the test set
    predictions = spark_model.transform(test)

    # Evaluate the model for RMSE
    evaluator_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
    rmse_tmp = evaluator_rmse.evaluate(predictions)
    RMSE[num_worker].append(rmse_tmp)

    # Evaluate the model for R-squared
    evaluator_r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")
    r2_tmp = evaluator_r2.evaluate(predictions)
    R2[num_worker].append(r2_tmp)


In [None]:
print(run_time)

In [None]:
!pip3 install seaborn

In [None]:
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

data = []
for num_workers, times in run_time.items():
    for time in times:
        data.append({'number_workers': num_workers, 'run_time': time})

df = pd.DataFrame(data)

# Create the boxplot
sns.boxplot(x='number_workers', y='run_time', data=df, palette='RdPu')

# Labeling the axes
plt.xlabel('Number of Workers(Cores)')
plt.ylabel('Run Time (seconds)')
plt.savefig('temp2.jpeg', format='jpeg')