In [0]:
pip install xgboost

Python interpreter will be restarted.
Collecting xgboost
  Downloading xgboost-2.1.1-py3-none-manylinux_2_28_x86_64.whl (153.9 MB)
Collecting nvidia-nccl-cu12
  Downloading nvidia_nccl_cu12-2.23.4-py3-none-manylinux2014_x86_64.whl (199.0 MB)
Installing collected packages: nvidia-nccl-cu12, xgboost
Successfully installed nvidia-nccl-cu12-2.23.4 xgboost-2.1.1
Python interpreter will be restarted.


In [0]:
destination_path = "/dbfs/mnt/bde-assignment2/nyc_taxi_final.parquet"

nyc_taxi = spark.read.parquet(destination_path)

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import avg, col
from pyspark.sql.types import IntegerType, StringType

spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

In [0]:
green_taxi = nyc_taxi.filter(F.col("taxi_type") == "green")

In [0]:
green_taxi.printSchema()

root
 |-- DOLocationID: long (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- VendorID: long (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- payment_type: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- trip_distance_km: double (nullable = true)
 |-- trip_duration: double (nullable = true)
 |-- speed_kmh: double (nullable = true)
 |-- taxi_typ

In [0]:
green_taxi.count()

Out[5]: 63421542

In [0]:
null_counts = green_taxi.select(
    [F.sum(F.col(c).isNull().cast("int")).alias(c) for c in green_taxi.columns]
)

null_counts.show()


+------------+------------+--------+---------------+----------------+---------------+-------------+----------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+----------------+-------------+---------+---------+---------+---------+--------------+-----------+-------------------+---------------+------------+--------------------+
|DOLocationID|PULocationID|VendorID|pickup_datetime|dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|trip_distance_km|trip_duration|speed_kmh|taxi_type|trip_type|ehail_fee|pickup_borough|pickup_zone|pickup_service_zone|dropoff_borough|dropoff_zone|dropoff_service_zone|
+------------+------------+--------+---------------+----------------+---------------+-------------+----------+------------------+------------+

Experiment 1 - Green taxi dataset with Linear Regression model

In [0]:
gt_cleaned = green_taxi

drop_columns = ["mta_tax", "tolls_amount", "store_and_fwd_flag", "improvement_surcharge", "RatecodeID" ,"congestion_surcharge", "airport_fee", "speed_kmh", "trip_type", "ehail_fee", "payment_type", "VendorID", "fare_amount", "taxi_type"]
                
drop_point_columns = ['pickup_borough', 'pickup_zone', 'pickup_service_zone', 'dropoff_borough', 'dropoff_zone', 'dropoff_service_zone']


In [0]:
gt_cleaned = gt_cleaned.drop(*drop_columns)
gt_cleaned = gt_cleaned.drop(*drop_point_columns)

In [0]:
gt_cleaned = gt_cleaned.dropna()

In [0]:
null_counts = gt_cleaned.select(
    [F.sum(F.col(c).isNull().cast("int")).alias(c) for c in gt_cleaned.columns]
)

null_counts.show()

+------------+------------+---------------+----------------+---------------+-------------+-----+----------+------------+----------------+-------------+---------+
|DOLocationID|PULocationID|pickup_datetime|dropoff_datetime|passenger_count|trip_distance|extra|tip_amount|total_amount|trip_distance_km|trip_duration|taxi_type|
+------------+------------+---------------+----------------+---------------+-------------+-----+----------+------------+----------------+-------------+---------+
|           0|           0|              0|               0|              0|            0|    0|         0|           0|               0|            0|        0|
+------------+------------+---------------+----------------+---------------+-------------+-----+----------+------------+----------------+-------------+---------+



In [0]:
from pyspark.sql.functions import year, month, hour, dayofweek, expr

gt_cleaned = gt_cleaned.withColumn("year", year("pickup_datetime")) \
       .withColumn("month", month("pickup_datetime")) \
       .withColumn("hour", hour("pickup_datetime")) \
       .withColumn("weekday", dayofweek("pickup_datetime"))

In [0]:
gt_cleaned = gt_cleaned.drop("pickup_datetime", "dropoff_datetime")

In [0]:
#cat_cols = ['store_and_fwd_flag']
num_cols = [field.name for field in gt_cleaned.schema.fields if field.dataType.typeName() in ["integer", "double", "long"]]

# Display the list
print(num_cols)


['DOLocationID', 'PULocationID', 'passenger_count', 'trip_distance', 'extra', 'tip_amount', 'total_amount', 'trip_distance_km', 'trip_duration', 'year', 'month', 'hour', 'weekday']


 * Add Pipeline

In [0]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

stages = []

In [0]:
target_indexer = StringIndexer(inputCol='total_amount', outputCol='label')

In [0]:
assembler = VectorAssembler(inputCols=num_cols, outputCol="features")

In [0]:
stages += [target_indexer, assembler]

In [0]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=stages)

pipeline_model = pipeline.fit(gt_cleaned)
gt_cleaned = pipeline_model.transform(gt_cleaned)

In [0]:
# Train and validation data (all data except October, November, December 2022)
gt_cleaned_filtered = gt_cleaned.filter((col("year") != 2022) | ((col("year") == 2022) & (col("month") < 10)))

# Test data (October, November, December 2022)
gt_cleaned_test = gt_cleaned.filter((col("year") == 2022) & (col("month").between(10, 12)))


In [0]:
gt_train, gt_val = gt_cleaned_filtered.randomSplit([0.8, 0.2], seed=8)

In [0]:
train_data = gt_train.sample(fraction=0.1)
val_data = gt_val.sample(fraction=0.1)

train_data = train_data.repartition(10)
val_data = val_data.repartition(10)


 - Build baseline model using lit function

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import lit

# Calculate the baseline average total_amount
baseline_value = train_data.select(avg("total_amount")).first()[0]

# Create a DataFrame with baseline predictions
baseline_predictions = train_data.withColumn("prediction", lit(baseline_value))

# Calculate RMSE for baseline model
evaluator = RegressionEvaluator(labelCol='label', predictionCol="prediction", metricName="rmse")
baseline_rmse = evaluator.evaluate(baseline_predictions)

# Print the baseline RMSE
print(f"Baseline RMSE: {baseline_rmse}")


Baseline RMSE: 557.599952232893


In [0]:
from pyspark.ml.regression import LinearRegression 
from pyspark.ml.evaluation import RegressionEvaluator

lr = LinearRegression(featuresCol='features', labelCol='label', maxIter=10) 

lr_model = lr.fit(train_data)
train_predictions = lr_model.transform(train_data)

evaluator = RegressionEvaluator(labelCol='label', predictionCol="prediction", metricName="rmse")

# Evaluate train RMSE
train_rmse = evaluator.evaluate(train_predictions)
print(f"Train set RMSE: {train_rmse}")
val_predictions = lr_model.transform(val_data)

# Evaluate Val RMSE
val_rmse = evaluator.evaluate(val_predictions)
print(f"Validate set RMSE: {val_rmse}")

Train set RMSE: 443.9566476347632
Validate set RMSE: 440.0376251042335


Experiment 2 - Yellow taxi dataset (2021~2022.9) with Linear Regression model

In [0]:
yellow_taxi = nyc_taxi.filter(F.col("taxi_type") == "yellow")

In [0]:
yt_cleaned = yellow_taxi

drop_columns = ["mta_tax", "tolls_amount", "store_and_fwd_flag", "improvement_surcharge", "RatecodeID" ,"congestion_surcharge", "airport_fee", "speed_kmh", "trip_type", "ehail_fee", "payment_type", "VendorID", "fare_amount", "taxi_type"]
                
drop_point_columns = ['pickup_borough', 'pickup_zone', 'pickup_service_zone', 'dropoff_borough', 'dropoff_zone', 'dropoff_service_zone']


In [0]:
yt_cleaned = yt_cleaned.drop(*drop_columns)
yt_cleaned = yt_cleaned.drop(*drop_point_columns)

In [0]:
yt_cleaned = yt_cleaned.dropna()

In [0]:
from pyspark.sql.functions import year, month, hour, dayofweek, expr

yt_cleaned = yt_cleaned.withColumn("year", year("pickup_datetime")) \
       .withColumn("month", month("pickup_datetime")) \
       .withColumn("hour", hour("pickup_datetime")) \
       .withColumn("weekday", dayofweek("pickup_datetime"))

In [0]:
yt_cleaned = yt_cleaned.drop("pickup_datetime", "dropoff_datetime")

In [0]:
yt_cleaned_2021 = yt_cleaned.filter((col("year") == 2021) | ((col("year") == 2022) & (col("month") < 10)))
yt_cleaned_2019 = yt_cleaned.filter((col("year") >= 2019) & ((col("year") <= 2020)))
yt_cleaned_2016 = yt_cleaned.filter((col("year") >= 2016) & ((col("year") <= 2018)))
yt_cleaned_2015 = yt_cleaned.filter((col("year") == 2015))

# Test data (October, November, December 2022)
yt_cleaned_test = yt_cleaned.filter((col("year") == 2022) & (col("month").between(10, 12)))


In [0]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

num_cols = [field.name for field in yt_cleaned.schema.fields if field.dataType.typeName() in ["integer", "double", "long"]]

stages = []

target_indexer = StringIndexer(inputCol='total_amount', outputCol='label')

assembler = VectorAssembler(inputCols=num_cols, outputCol="features")

stages += [target_indexer, assembler]

from pyspark.ml import Pipeline

pipeline = Pipeline(stages=stages)

pipeline_model = pipeline.fit(yt_cleaned_2021)
yt_cleaned_2021 = pipeline_model.transform(yt_cleaned_2021)

In [0]:
pipeline_model = pipeline.fit(yt_cleaned_test)
yt_cleaned_test = pipeline_model.transform(yt_cleaned_test)

In [0]:
yt_train, yt_val = yt_cleaned_2021.randomSplit([0.8, 0.2], seed=8)

train_data_yt = yt_train.sample(fraction=0.1)
val_data_yt = yt_val.sample(fraction=0.1)

train_data_yt = train_data_yt.repartition(10)
val_data_yt = val_data_yt.repartition(10)


In [0]:
from pyspark.ml.regression import LinearRegression 
from pyspark.ml.evaluation import RegressionEvaluator

lr = LinearRegression(featuresCol='features', labelCol='label', maxIter=10) 

lr_model = lr.fit(train_data_yt)
train_predictions_yt = lr_model.transform(train_data_yt)

evaluator = RegressionEvaluator(labelCol='label', predictionCol="prediction", metricName="rmse")

# Evaluate train RMSE
train_rmse = evaluator.evaluate(train_predictions_yt)
print(f"Train set RMSE: {train_rmse}")
val_predictions_yt = lr_model.transform(val_data_yt)

# Evaluate Val RMSE
val_rmse = evaluator.evaluate(val_predictions_yt)
print(f"Validate set RMSE: {val_rmse}")

Train set RMSE: 490.18584634597516
Validate set RMSE: 486.0378436001615


In [0]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import lit

# Calculate the baseline average total_amount
baseline_value = train_data_yt.select(avg("total_amount")).first()[0]

# Create a DataFrame with baseline predictions
baseline_predictions = train_data_yt.withColumn("prediction", lit(baseline_value))

# Calculate RMSE for baseline model
evaluator = RegressionEvaluator(labelCol='label', predictionCol="prediction", metricName="rmse")
baseline_rmse = evaluator.evaluate(baseline_predictions)

# Print the baseline RMSE
print(f"Baseline RMSE: {baseline_rmse}")


Baseline RMSE: 611.979028414324


In [0]:

test_predictions_yt = lr_model.transform(yt_cleaned_test)

test_rmse = evaluator.evaluate(test_predictions_yt)
print(f"Test set RMSE: {test_rmse}")


Test set RMSE: 641.5817413624575


Experiment 3 - Yellow taxi dataset (2021~2022.9) with Xgboosting model

In [0]:
from xgboost.spark import SparkXGBRegressor
from pyspark.ml.evaluation import RegressionEvaluator

xgb_regressor = SparkXGBRegressor(features_col="features", label_col='label')
xgb_model = xgb_regressor.fit(train_data_yt)
xgb_predictions = xgb_model.transform(train_data_yt)

evaluator = RegressionEvaluator(labelCol='label', predictionCol="prediction", metricName="rmse")
xgb_rmse = evaluator.evaluate(xgb_predictions)
print(f"XGBoost RMSE: {xgb_rmse}")


2024-10-01 06:52:20,772 INFO XGBoost-PySpark: _fit Running xgboost-2.1.1 on 1 workers with
	booster params: {'objective': 'reg:squarederror', 'device': 'cpu', 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
2024-10-01 07:09:01,625 INFO XGBoost-PySpark: _fit Finished xgboost training!


[0;31m---------------------------------------------------------------------------[0m
[0;31mKeyError[0m                                  Traceback (most recent call last)
File [0;32m<command-1882788124453729>:6[0m
[1;32m      4[0m xgb_regressor [38;5;241m=[39m SparkXGBRegressor(features_col[38;5;241m=[39m[38;5;124m"[39m[38;5;124mfeatures[39m[38;5;124m"[39m, label_col[38;5;241m=[39m[38;5;124m'[39m[38;5;124mlabel[39m[38;5;124m'[39m)
[1;32m      5[0m xgb_model [38;5;241m=[39m xgb_regressor[38;5;241m.[39mfit(train_data_yt)
[0;32m----> 6[0m xgb_predictions [38;5;241m=[39m xgb_model[38;5;241m.[39mtransform(yt_cleaned_test)
[1;32m      8[0m evaluator [38;5;241m=[39m RegressionEvaluator(labelCol[38;5;241m=[39m[38;5;124m'[39m[38;5;124mlabel[39m[38;5;124m'[39m, predictionCol[38;5;241m=[39m[38;5;124m"[39m[38;5;124mprediction[39m[38;5;124m"[39m, metricName[38;5;241m=[39m[38;5;124m"[39m[38;5;124mrmse[39m[38;5;124m"[39m)
[1;32m      9

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator

xgb_predictions = xgb_model.transform(train_data_yt)

evaluator = RegressionEvaluator(labelCol='label', predictionCol="prediction", metricName="rmse")
xgb_rmse = evaluator.evaluate(xgb_predictions)
print(f"XGBoost RMSE: {xgb_rmse}")

XGBoost RMSE: 349.23459891933686


In [0]:
val_predictions_yt = xgb_model.transform(val_data_yt)

val_rmse = evaluator.evaluate(val_predictions_yt)
print(f"Validate set RMSE: {val_rmse}")

Validate set RMSE: 359.77156179780553


In [0]:
pipeline_model = pipeline.fit(yt_cleaned_test)
yt_cleaned_test = pipeline_model.transform(yt_cleaned_test)

test_predictions_yt = xgb_model.transform(yt_cleaned_test)

test_rmse = evaluator.evaluate(test_predictions_yt)
print(f"Test set RMSE: {test_rmse}")

Test set RMSE: 520.5412869879067


In [0]:
train_data_yt.printSchema()

root
 |-- DOLocationID: long (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- trip_distance_km: double (nullable = true)
 |-- trip_duration: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- weekday: integer (nullable = true)
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)



Experiment 4 - Yellow taxi dataset (2021~2022.9) with GBTRegressor

In [0]:
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator

gbt = GBTRegressor(featuresCol="features", labelCol='label', maxIter=50)

gbt_model = gbt.fit(train_data_yt)
gbt_predictions = gbt_model.transform(train_data_yt)

evaluator = RegressionEvaluator(labelCol='label', predictionCol="prediction", metricName="rmse")
gbt_rmse = evaluator.evaluate(gbt_predictions)
print(f"GBT Train RMSE: {gbt_rmse}")

val_predictions_yt = gbt_model.transform(val_data_yt)

val_rmse = evaluator.evaluate(val_predictions_yt)
print(f"GBT Validate set RMSE: {val_rmse}")


Experiment 5 - Yellow taxi dataset (2021~2022.9) with RamdomForestRegressor

In [0]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

rf = RandomForestRegressor(featuresCol='features', labelCol='label', numTrees=5)

rf_model = rf.fit(train_data_yt)
rf_predictions = rf_model.transform(train_data_yt)

evaluator = RegressionEvaluator(labelCol='label', predictionCol="prediction", metricName="rmse")
rf_rmse = evaluator.evaluate(rf_predictions)
print(f"rf Train RMSE: {rf_rmse}")

val_predictions_yt = rf_model.transform(val_data_yt)

val_rmse = evaluator.evaluate(val_predictions_yt)
print(f"rf Validate set RMSE: {val_rmse}")


rf Train RMSE: 467.53106056694156
rf Validate set RMSE: 470.41473282095524


In [0]:
test_predictions_yt = rf_model.transform(yt_cleaned_test)

test_rmse = evaluator.evaluate(test_predictions_yt)
print(f"Test set RMSE: {test_rmse}")

Test set RMSE: 623.1471469120612


Experiment 6 - NYC taxi dataset (2021~2022.9) with XGB

In [0]:
nyc_cleaned = nyc_taxi

drop_columns = ["mta_tax", "tolls_amount", "store_and_fwd_flag", "improvement_surcharge", "RatecodeID" ,"congestion_surcharge", "airport_fee", "speed_kmh", "trip_type", "ehail_fee", "payment_type", "VendorID", "fare_amount"]
                
drop_point_columns = ['pickup_borough', 'pickup_zone', 'pickup_service_zone', 'dropoff_borough', 'dropoff_zone', 'dropoff_service_zone']

In [0]:
nyc_cleaned = nyc_cleaned.drop(*drop_columns)
nyc_cleaned = nyc_cleaned.drop(*drop_point_columns)
nyc_cleaned = nyc_cleaned.dropna()

In [0]:
from pyspark.sql.functions import year, month, hour, dayofweek, expr

nyc_cleaned = nyc_cleaned.withColumn("year", year("pickup_datetime")) \
       .withColumn("month", month("pickup_datetime")) \
       .withColumn("hour", hour("pickup_datetime")) \
       .withColumn("weekday", dayofweek("pickup_datetime"))

nyc_cleaned = nyc_cleaned.drop("pickup_datetime", "dropoff_datetime")

In [0]:
nyc_cleaned_2021 = nyc_cleaned.filter((col("year") == 2021) | ((col("year") == 2022) & (col("month") < 10)))
nyc_cleaned_2019 = nyc_cleaned.filter((col("year") >= 2019) & ((col("year") <= 2020)))
nyc_cleaned_2016 = nyc_cleaned.filter((col("year") >= 2016) & ((col("year") <= 2018)))
nyc_cleaned_2015 = nyc_cleaned.filter((col("year") == 2015))

# Test data (October, November, December 2022)
nyc_cleaned_test = nyc_cleaned.filter((col("year") == 2022) & (col("month").between(10, 12)))


In [0]:
nyc_cleaned_2021 = nyc_cleaned_2021.drop("year")
nyc_cleaned_2015 = nyc_cleaned_2015.drop("year")

In [0]:
nyc_cleaned_test = nyc_cleaned_test.drop("year")

In [0]:
nyc_cleaned_2021.printSchema()

root
 |-- DOLocationID: long (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- trip_distance_km: double (nullable = true)
 |-- trip_duration: double (nullable = true)
 |-- taxi_type: string (nullable = true)
 |-- month: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- weekday: integer (nullable = true)



In [0]:
cat_cols = ['taxi_type']
num_cols2 = [field.name for field in nyc_cleaned_2021.schema.fields if field.dataType.typeName() in ["integer", "double", "long"]]


In [0]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

stages = []

for cat_col in cat_cols:
    col_indexer = StringIndexer(inputCol=cat_col, outputCol=f"{cat_col}_ind")
    col_encoder = OneHotEncoder(inputCols=[f"{cat_col}_ind"], outputCols=[f"{cat_col}_ohe"])
    stages += [col_indexer, col_encoder]
    
cat_cols_ohe = [f"{cat_col}_ohe" for cat_col in cat_cols]

In [0]:
from pyspark.ml import Pipeline

target_indexer = StringIndexer(inputCol='total_amount', outputCol='label')

assembler = VectorAssembler(inputCols= cat_cols_ohe+num_cols2, outputCol="features")

stages += [target_indexer, assembler]

pipeline = Pipeline(stages=stages)

In [0]:
pipeline_model = pipeline.fit(nyc_cleaned_2021)
nyc_cleaned_2021 = pipeline_model.transform(nyc_cleaned_2021)

In [0]:
nt_train, nt_val = nyc_cleaned_2021.randomSplit([0.8, 0.2], seed=8)

train_data_nt = nt_train.sample(fraction=0.1)
val_data_nt = nt_val.sample(fraction=0.1)

train_data_nt = train_data_nt.repartition(10)
val_data_nt = val_data_nt.repartition(10)


In [0]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import lit

# Calculate the baseline average total_amount
baseline_value = train_data_nt.select(avg("total_amount")).first()[0]

# Create a DataFrame with baseline predictions
baseline_predictions = train_data_nt.withColumn("prediction", lit(baseline_value))

# Calculate RMSE for baseline model
evaluator = RegressionEvaluator(labelCol='label', predictionCol="prediction", metricName="rmse")
baseline_rmse = evaluator.evaluate(baseline_predictions)

# Print the baseline RMSE
print(f"Baseline RMSE: {baseline_rmse}")


Baseline RMSE: 622.1433566463232


In [0]:
from xgboost.spark import SparkXGBRegressor
from pyspark.ml.evaluation import RegressionEvaluator

xgb_regressor = SparkXGBRegressor(features_col="features", label_col='label')
xgb_model = xgb_regressor.fit(train_data_nt)
xgb_predictions = xgb_model.transform(train_data_nt)

evaluator = RegressionEvaluator(labelCol='label', predictionCol="prediction", metricName="rmse")
xgb_rmse = evaluator.evaluate(xgb_predictions)
print(f"XGBoost RMSE: {xgb_rmse}")

2024-10-01 23:23:16,256 INFO XGBoost-PySpark: _fit Running xgboost-2.1.1 on 1 workers with
	booster params: {'objective': 'reg:squarederror', 'device': 'cpu', 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
2024-10-01 23:40:45,101 INFO XGBoost-PySpark: _fit Finished xgboost training!


XGBoost RMSE: 355.40350778784403


In [0]:
pipeline_model = pipeline.fit(nyc_cleaned_test)
test_data_nt = pipeline_model.transform(nyc_cleaned_test)

test_predictions_yt = xgb_model.transform(test_data_nt)

test_rmse = evaluator.evaluate(test_predictions_yt)
print(f"Test set RMSE: {test_rmse}")

Test set RMSE: 547.1826172095356


Experiment 7 - NYC taxi dataset (2021~2022.9) with XGB (Add more train data)

In [0]:
nt_train, nt_val = nyc_cleaned_2021.randomSplit([0.8, 0.2], seed=8)

train_data_nt2 = nt_train.sample(fraction=0.5)
val_data_nt2 = nt_val.sample(fraction=0.5)

train_data_nt2 = train_data_nt2.repartition(10)
val_data_nt2 = val_data_nt2.repartition(10)


In [0]:
pipeline_model = pipeline.fit(train_data_nt2)
train_data_nt2 = pipeline_model.transform(train_data_nt2)

In [0]:
from xgboost.spark import SparkXGBRegressor
from pyspark.ml.evaluation import RegressionEvaluator

xgb_regressor = SparkXGBRegressor(features_col="features", label_col='label')
xgb_model2 = xgb_regressor.fit(train_data_nt2)
xgb_predictions2 = xgb_model2.transform(train_data_nt2)

evaluator = RegressionEvaluator(labelCol='label', predictionCol="prediction", metricName="rmse")
xgb_rmse2 = evaluator.evaluate(xgb_predictions2)
print(f"XGBoost RMSE: {xgb_rmse2}")

2024-10-02 05:16:34,589 INFO XGBoost-PySpark: _fit Running xgboost-2.1.1 on 1 workers with
	booster params: {'objective': 'reg:squarederror', 'device': 'cpu', 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
2024-10-02 05:36:34,642 INFO XGBoost-PySpark: _fit Finished xgboost training!


XGBoost RMSE: 354.62711950168637


In [0]:
pipeline_model = pipeline.fit(nyc_cleaned_test)
test_data_nt = pipeline_model.transform(nyc_cleaned_test)

test_predictions_nt = xgb_model2.transform(test_data_nt)

test_rmse = evaluator.evaluate(test_predictions_nt)
print(f"Test set RMSE: {test_rmse}")

Test set RMSE: 520.1583918296437


Experiment 8 - NYC taxi dataset (2015~2016) with XGB model above to validate model's performance 
for the past data


In [0]:
nyc_cleaned_2015 = nyc_cleaned_2015.drop("year")

In [0]:
nt_train3, nt_val3 = nyc_cleaned_2015.randomSplit([0.8, 0.2], seed=8)

train_data_nt3 = nt_train3.sample(fraction=0.2)
val_data_nt3 = nt_val3.sample(fraction=0.2)

train_data_nt3 = train_data_nt3.repartition(10)
val_data_nt3 = val_data_nt3.repartition(10)


In [0]:
# Validate 2015 NYC Taxi data with Trained model from 2021~2022 

pipeline_model = pipeline.fit(train_data_nt3)
train_data_nt_2015 = pipeline_model.transform(train_data_nt3)

train_predictions_nt_2015 = xgb_model2.transform(train_data_nt_2015)

train_rmse_2015 = evaluator.evaluate(train_predictions_nt_2015)
print(f"Train 2015 dataset RMSE: {train_rmse_2015}")

Train 2015 dataset RMSE: 338.9996428306595


In [0]:
# Validate 2015 NYC Taxi data with Trained XGB model from 2021~2022 (10% sample data)

pipeline_model = pipeline.fit(val_data_nt3)
val_data_nt_2015 = pipeline_model.transform(val_data_nt3)

val_predictions_nt_2015 = xgb_model2.transform(val_data_nt_2015)

val_rmse_2015 = evaluator.evaluate(val_predictions_nt_2015)
print(f"validate 2015 dataset RMSE: {val_rmse_2015}")

validate 2015 dataset RMSE: 331.43194364837166


Experiment 9 - XGB Hyperparameter tuning (max_depth, learning_rate)

In [0]:
from xgboost.spark import SparkXGBRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

xgb_regressor = SparkXGBRegressor(features_col="features", label_col='label')


param_grid = (ParamGridBuilder()
              .addGrid(xgb_regressor.max_depth, [3, 5, 7])  # Tree depth
              .addGrid(xgb_regressor.learning_rate, [0.1, 0.2, 0.3])  # Learning rate
              .build())

# Define an evaluator
evaluator = RegressionEvaluator(labelCol='label', predictionCol="prediction", metricName="rmse")

# Set up cross-validation with 3 folds
crossval = CrossValidator(estimator=xgb_regressor,
                          estimatorParamMaps=param_grid,
                          evaluator=evaluator,
                          numFolds=3)

# Fit the model with cross-validation
cv_model = crossval.fit(train_data_nt2)

# Get the best model from cross-validation
best_xgb_model = cv_model.bestModel

2024-10-03 13:13:37,320 INFO XGBoost-PySpark: _fit Running xgboost-2.1.1 on 1 workers with
	booster params: {'device': 'cpu', 'learning_rate': 0.1, 'max_depth': 3, 'objective': 'reg:squarederror', 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
2024-10-03 13:32:35,662 INFO XGBoost-PySpark: _fit Finished xgboost training!
2024-10-03 13:53:16,628 INFO XGBoost-PySpark: _fit Running xgboost-2.1.1 on 1 workers with
	booster params: {'device': 'cpu', 'learning_rate': 0.2, 'max_depth': 3, 'objective': 'reg:squarederror', 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
2024-10-03 13:57:00,969 INFO XGBoost-PySpark: _fit Finished xgboost training!
2024-10-03 13:57:58,352 INFO XGBoost-PySpark: _fit Running xgboost-2.1.1 on 1 workers with
	booster params: {'device': 'cpu', 'learning_rate': 0.3, 'max_depth': 3, 'objective': '

Best Parameters:


[0;31m---------------------------------------------------------------------------[0m
[0;31mAttributeError[0m                            Traceback (most recent call last)
File [0;32m<command-457186220349137>:30[0m
[1;32m     28[0m [38;5;66;03m# Print the best parameters[39;00m
[1;32m     29[0m [38;5;28mprint[39m([38;5;124m"[39m[38;5;124mBest Parameters:[39m[38;5;124m"[39m)
[0;32m---> 30[0m [38;5;28mprint[39m([38;5;124mf[39m[38;5;124m"[39m[38;5;124mMax Depth: [39m[38;5;132;01m{[39;00mbest_xgb_model[38;5;241m.[39m_java_obj[38;5;241m.[39mgetMaxDepth()[38;5;132;01m}[39;00m[38;5;124m"[39m)
[1;32m     31[0m [38;5;28mprint[39m([38;5;124mf[39m[38;5;124m"[39m[38;5;124mLearning Rate (eta): [39m[38;5;132;01m{[39;00mbest_xgb_model[38;5;241m.[39m_java_obj[38;5;241m.[39mgetEta()[38;5;132;01m}[39;00m[38;5;124m"[39m)
[1;32m     32[0m [38;5;28mprint[39m([38;5;124mf[39m[38;5;124m"[39m[38;5;124mSubsample: [39m[38;5;132;01m{[39;00mbest

In [0]:
xgb_predictions = best_xgb_model.transform(train_data_nt2)

# Evaluate the tuned model
xgb_rmse = evaluator.evaluate(xgb_predictions)
print(f"Tuned XGBoost RMSE: {xgb_rmse}")


Tuned XGBoost RMSE: 342.6183663171135


In [0]:
# Print the best parameters using the bestModel's params
print("Best Parameters:")
print(f"Max Depth: {cv_model.bestModel.getOrDefault('max_depth')}")
print(f"Learning Rate (eta): {cv_model.bestModel.getOrDefault('learning_rate')}")


Best Parameters:
Max Depth: 7
Learning Rate (eta): 0.3
