# MAIN TASK

Our main task is to predict **tip_amount** column in the dataset. We
find this interesting as an accurate fare prediction could be valuable for both drivers and
passengers to estimate trip costs.

We are going to solve a regression task, then the algorithms we are expecting to implement
are linear regression, decision tree regression, random forest regressor and gradient-boosted
tree regressor. We are going to be measuring the performance of each of them and
evaluating the results given with metrics like Root Mean Squared Error, Mean Absolute Error
and R-Squared.

In [1]:
import pyspark
from pyspark.sql.functions import min, max

spark = (pyspark.sql.SparkSession.builder
         .appName("tip prediction")
         .config("spark.executor.memory", "6g")
         .config("spark.driver.memory", "6g")
         .getOrCreate())

data = spark.read.parquet("src/yellow_tripdata_2024-12.parquet")
data.show()

25/05/14 16:32:25 WARN Utils: Your hostname, SurfaceMateo resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/05/14 16:32:25 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/14 16:32:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2024-12-01 00:12:27|  2024-12-01 00:31:12|              1|         9.76|         1|                 N|         138|          33|           1|       38.0|  6.0|    0.5|      4.7

In [2]:
value_counts = data.groupBy("passenger_count").count()
value_counts.show()

+---------------+-------+
|passenger_count|  count|
+---------------+-------+
|              0|  30397|
|              7|      7|
|              6|  14205|
|              9|      2|
|              5|  22283|
|              1|2531706|
|              3| 133727|
|              8|      9|
|              2| 507628|
|              4| 102116|
|           NULL| 326291|
+---------------+-------+



Looking at this we observe a couple of things:
1. We have many null values which we are eliminating as are possibly wrong values inserted in the dataset
2. Also having over 30k 0 values shows that there might be a data entry error
3. Having very few samples of 6,7,8, and 9

Conclusions:

Add values of columns 6,7,8 and 9 will be sum as the standard sports of a car are 5 and we thing that by doing this we could catch up relation with customers asking for bigger cars.

# DATA CLEANING + FEATURE ENGINEERING

We have find out that the data might need some preprocessing as some columns might not follow a proper structure for the models we are attempting to implement.

It is necessary to understand the dataset first. As we need to find out which columns might be relevant for building our predictions. Checking the dictionary of the dataset: https://www.nyc.gov/assets/tlc/downloads/pdf/data_dictionary_trip_records_yellow.pdf we have found this columns relevant to predict the tip_amount:

- passenger_count: Number of passengers in the vehicle.
- trip_distance: distance in miles.
- PULocationID: Pickup point of the trip. It has numerical values, each one represents a different town.
- DOLocationID: Drooff point of the trip. Follows the same range as PULocationID
- fare_amount

### EXPLANATION:
- We have decided to substract from pickup_datetime and dropoff_datetime a column of trip_duration. By substracting dopoff and pickup times and dividing it to obtain the minutes which might be useful as the models could perform better with it.
- We have also decided to create a column pickup_hour taken from the  column which we find it is one of the most relevant features as taxis have different fees depending on the hour.
- A new column day_of_week is added stracting its values from pickup_datetime
- In the sample print at the beginning of the notebook we have realized that there is one row which has negative values in columns that must be >=0 so we have decided to apply a filter for those cases.
- For passenger count column we are adapting the range of values of columns 6,7,8 and 9. Now the value 6 will represent 6 or more passengers. This is tue the lack of samples of values 7,8 or 9 which could cause a little of confusion for our models.



In [3]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.sql.functions import unix_timestamp, hour, col, dayofweek, when

data = data.dropna(subset=["PULocationID", "DOLocationID","passenger_count", "trip_distance", "fare_amount", "tip_amount", "tpep_pickup_datetime", "tpep_dropoff_datetime"])


dataClean = data.filter((col("trip_distance") > 0) & (col("fare_amount") > 0) & (col("tip_amount") >= 0))
dataFeat = dataClean.withColumn("trip_duration", (unix_timestamp("tpep_dropoff_datetime") - unix_timestamp("tpep_pickup_datetime"))/60)

dataFeat = dataFeat.withColumn("passenger_count_grouped", when(col("passenger_count") >= 6, 6).otherwise(col("passenger_count")))

#As the range of values goes from 1-7 and we want it to start at 0, we substract 1.
dataFeat = dataFeat.withColumn("day_of_week", dayofweek("tpep_pickup_datetime")-1)
dataFeat = dataFeat.withColumn("pickup_hour",hour("tpep_pickup_datetime"))
dataFeat = dataFeat.dropna(subset=["trip_duration", "pickup_hour", "day_of_week"])

columns = ["passenger_count_grouped","trip_distance","PULocationID","DOLocationID","fare_amount","trip_duration", "pickup_hour", "tip_amount", "day_of_week"]
selectedData = dataFeat.select(columns)
selectedData.show()

+-----------------------+-------------+------------+------------+-----------+-------------------+-----------+----------+-----------+
|passenger_count_grouped|trip_distance|PULocationID|DOLocationID|fare_amount|      trip_duration|pickup_hour|tip_amount|day_of_week|
+-----------------------+-------------+------------+------------+-----------+-------------------+-----------+----------+-----------+
|                      1|         9.76|         138|          33|       38.0|              18.75|          0|      4.72|          0|
|                      1|         7.62|         158|          42|       37.3|  32.18333333333333|         23|      8.46|          6|
|                      4|        20.07|         132|         236|       70.0|  34.18333333333333|          0|       0.0|          0|
|                      3|         2.34|         142|         186|       15.6|               15.0|          0|      4.12|          0|
|                      1|         5.05|         107|          80|    

In [4]:
value_counts = selectedData.groupBy("passenger_count_grouped").count()
value_counts.show()



+-----------------------+-------+
|passenger_count_grouped|  count|
+-----------------------+-------+
|                      0|  29150|
|                      6|  14091|
|                      5|  22057|
|                      1|2450879|
|                      3| 128633|
|                      2| 489550|
|                      4|  95171|
+-----------------------+-------+



                                                                                

## CATEGORICAL COLUMNS(INDEXERS AND ENCODERS)

To handle categorical columns we are using StringIndexer. As each value in the column represents a town in NYC and this can cause problems as for example zone 100 is not twice good as zone 50, are just different places.

For this StringIndexer is designed to convert a column of labels into a column of label indices.

The same methodology is applied for columns day_of_week pickup_hour and passenger_count_grouped

In [5]:
indexer_pickup = StringIndexer(inputCol="PULocationID", outputCol="PULocationID_index", handleInvalid="keep")
indexer_dropoff = StringIndexer(inputCol="DOLocationID", outputCol="DOLocationID_index", handleInvalid="keep")

encoder_pickup = OneHotEncoder(inputCols=["PULocationID_index"], outputCols=["PULocationID_vec"], dropLast=False, handleInvalid="keep")
encoder_drop = OneHotEncoder(inputCols=["DOLocationID_index"], outputCols=["DOLocationID_vec"], dropLast=False, handleInvalid="keep")

indexer_day = StringIndexer(inputCol="day_of_week", outputCol="day_of_week_index", handleInvalid="keep")
indexer_hour = StringIndexer(inputCol="pickup_hour", outputCol="pickup_hour_index", handleInvalid="keep")
indexer_pass = StringIndexer(inputCol="passenger_count_grouped", outputCol="passenger_count_index", handleInvalid="keep")

encoder_day = OneHotEncoder(inputCols=["day_of_week_index"], outputCols=["day_of_week_vec"], dropLast=True)
encoder_hour = OneHotEncoder(inputCols=["pickup_hour_index"], outputCols=["pickup_hour_vec"], dropLast=True)
encoder_pass = OneHotEncoder(inputCols=["passenger_count_index"], outputCols=["passenger_count_vec"], dropLast=True)

## MODEL TRAINING (LINEAR REGRESSION)

The steps we are following in order to train the different models are:

1. VectorAssembler: We have combined various ride details like trip_distance, fare_amount, and vectorized locations (PULocationID_vec, DOLocationID_vec), time (day_of_week_vec, pickup_hour_vec), and passenger count (passenger_count_vec) into one 'features' input for my model.

2. LinearRegression (lr): We have set up a Linear Regression model to learn how to predict the tip_amount based on that combined features vector.

3. ParamGridBuilder (paramGrid): We have defined a set of different tuning parameters (like regularization) for my Linear Regression, so I can test which combination works best.

4. RegressionEvaluator (evaluator): This will measure how good my model is by calculating the Mean Absolute Error (MAE) between its prediction and the actual tip_amount.

5. Pipeline: We have chained all my data prep steps (indexing, encoding, assembling) and the Linear Regression model into a single, streamlined workflow.

6. CrossValidator (cv): This tool will automatically train and test of our pipeline with all the different parameter sets from paramGrid using 3-fold cross-validation, picking the best one based on the MAE.

7. Data Sampling & Splitting: We have sampled 20% of the data (as in first stage the model took a vast amount of time to train) and then split it into dataTrain (for training/cross-validation) and dataTest (for final, unseen evaluation) to speed things up and properly assess performance.

8. Final Evaluation: After cv.fit(dataTrain) finds the best model (cvModel), We use it to make predictions on the dataTest. Then, We calculate MAE, RMSE, MSE, and R2 to see how well this final model performs on completely new data.

In [6]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression, RandomForestRegressor
from pyspark.ml.pipeline import Pipeline

assemblerCols = ["trip_distance","PULocationID_vec","DOLocationID_vec","fare_amount","trip_duration", "day_of_week_vec", "pickup_hour_vec", "passenger_count_vec"
]

assembler = VectorAssembler(inputCols=assemblerCols, outputCol='features')

In [7]:
lr = LinearRegression(labelCol="tip_amount", featuresCol="features")
paramGrid = (ParamGridBuilder().addGrid(lr.regParam,[0.01,0.1,0.5])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [50, 100])
             .build())

evaluator = RegressionEvaluator(labelCol='tip_amount', predictionCol='prediction', metricName='mae')
pipeline = Pipeline(stages=[
    indexer_pickup, indexer_dropoff,
    indexer_day, indexer_hour, indexer_pass,
    encoder_pickup, encoder_drop,
    encoder_day, encoder_hour, encoder_pass,
    assembler, lr
])


### Explanation:


In [8]:
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=3, parallelism=2)

sampledData = selectedData.sample(withReplacement=False, fraction=0.2, seed=42)
dataTrain, dataTest = sampledData.randomSplit([0.8,0.2], seed=42)
dataTrain.show()

+-----------------------+-------------+------------+------------+-----------+------------------+-----------+----------+-----------+
|passenger_count_grouped|trip_distance|PULocationID|DOLocationID|fare_amount|     trip_duration|pickup_hour|tip_amount|day_of_week|
+-----------------------+-------------+------------+------------+-----------+------------------+-----------+----------+-----------+
|                      0|          0.1|          48|          48|        3.0|0.6666666666666666|          6|       0.0|          4|
|                      0|          0.1|         186|         164|       10.0|              10.2|         18|       2.8|          6|
|                      0|          0.1|         262|         262|        3.0|              0.65|         18|       0.0|          5|
|                      0|          0.1|         265|         265|        3.0|0.5333333333333333|          0|       0.0|          5|
|                      0|          0.2|         100|         164|        6.5

                                                                                

In [9]:
cvModel = cv.fit(dataTrain)

25/05/14 16:48:25 WARN BlockManager: Block rdd_32_4 already exists on this machine; not re-adding it
                                                                                

## MODEL EVALUATION

In [10]:
predictions = cvModel.transform(dataTest)
mae = evaluator.evaluate(predictions)
print("MAE on Test data:", mae)

rmse_eval = RegressionEvaluator(labelCol='tip_amount', predictionCol='prediction', metricName='rmse')
rmse = rmse_eval.evaluate(predictions)
print("RMSE on Test Data:",rmse)

mse_eval = RegressionEvaluator(labelCol='tip_amount', predictionCol='prediction', metricName='mse')
mse = mse_eval.evaluate(predictions)
print(f"MSE on Test Data:",mse)

r2_evaluator = RegressionEvaluator(labelCol="tip_amount", predictionCol="prediction", metricName="r2")
r2 = r2_evaluator.evaluate(predictions)
print("R2:", r2)

                                                                                

MAE on Test data: 1.9790390686393609


                                                                                

RMSE on Test Data: 3.13747349613148


                                                                                

MSE on Test Data: 9.843739938927492
R2: 0.42887791081664683


                                                                                

### RESULTS:

- MAE is about 1.98, meaning on average, my tip predictions are off by roughly that amount.
- The RMSE is higher, around $3.14, which suggests that while the average error is low, there are some larger prediction errors pulling this value up.
- The MSE is 9.84 – this is the average of the squared errors and is what RMSE is derived from.
- An R2 of approximately 0.43 indicates my model explains about 43% of the variability in taxi tips, which is a decent start."

### POSSIBLE IMPROVEMENTS:

After some testing how adding more columns (like: pickup_hour, day_of_week,
    is_night, is_weekend) could increase or decrease our model performance. The results obtained where worse, then we concluded leaving they are now:

MAE on Test data: 2.0467288596114304
RMSE on Test Data: 3.2195547766976675
MSE on Test Data: 10.365532960156768

## MODEL TRAINING(RANDOMFOREST)


In [11]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline

rf = RandomForestRegressor(featuresCol="features", labelCol="tip_amount")

paramGridRF = (ParamGridBuilder()
               .addGrid(rf.numTrees, [10, 20])
               .addGrid(rf.maxDepth, [5, 10])
               .build())

pipeline = Pipeline(stages=[
    indexer_pickup, indexer_dropoff,
    indexer_day, indexer_hour, indexer_pass,
    encoder_pickup, encoder_drop,
    encoder_day, encoder_hour, encoder_pass,
    assembler, rf
])
evaluator = RegressionEvaluator(labelCol="tip_amount", predictionCol="prediction", metricName="mae")

cvRF = CrossValidator(estimator=pipeline,
                      estimatorParamMaps=paramGridRF,
                      evaluator=evaluator,
                      numFolds=3,
                      parallelism=2)


In [12]:
sampledData = selectedData.sample(withReplacement=False, fraction=0.2, seed=42)

In [13]:
cvModelRF = cvRF.fit(dataTrain)
predRF = cvModelRF.transform(dataTest)

evaluator = RegressionEvaluator(labelCol="tip_amount", predictionCol="prediction", metricName="mae")
print("Random Forest MAE:", evaluator.evaluate(predRF))

rmse_eval = RegressionEvaluator(labelCol='tip_amount', predictionCol='prediction', metricName='rmse')
rmse = rmse_eval.evaluate(predRF)
print("RMSE on Test Data:",rmse)

mse_eval = RegressionEvaluator(labelCol='tip_amount', predictionCol='prediction', metricName='mse')
mse = mse_eval.evaluate(predRF)
print(f"MSE on Test Data:",mse)

r2_evaluator = RegressionEvaluator(labelCol="tip_amount", predictionCol="prediction", metricName="r2")
r2 = r2_evaluator.evaluate(predRF)
print("R2:", r2)

25/05/14 16:52:30 WARN DAGScheduler: Broadcasting large task binary with size 1053.5 KiB
25/05/14 16:52:31 WARN DAGScheduler: Broadcasting large task binary with size 1568.0 KiB
25/05/14 16:52:32 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
25/05/14 16:52:45 WARN DAGScheduler: Broadcasting large task binary with size 1227.1 KiB
25/05/14 16:52:46 WARN DAGScheduler: Broadcasting large task binary with size 1909.1 KiB
25/05/14 16:52:48 WARN DAGScheduler: Broadcasting large task binary with size 2.9 MiB
25/05/14 16:52:50 WARN DAGScheduler: Broadcasting large task binary with size 4.3 MiB
25/05/14 16:53:05 WARN DAGScheduler: Broadcasting large task binary with size 1045.0 KiB
25/05/14 16:53:06 WARN DAGScheduler: Broadcasting large task binary with size 1539.6 KiB
25/05/14 16:53:08 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
25/05/14 16:53:21 WARN DAGScheduler: Broadcasting large task binary with size 1216.6 KiB
25/05/14 16:53:23 WARN DAGSchedul

Random Forest MAE: 2.019929460938898


                                                                                

RMSE on Test Data: 3.172192155766229


                                                                                

MSE on Test Data: 10.062803073104794




R2: 0.4161681281902655


                                                                                

### RESULTS

- My MAE is about 2.02. This means, on average, my Random Forest predictions are off by this amount, which is just slightly higher than the 1.98 MAE I got with Linear Regression.
- The RMSE is around 3.17. Similar to the Linear Regression (which was 3.14), this is higher than the MAE, still pointing to some larger prediction errors. It's also a touch higher than the previous model.
- My MSE is 10.06. This is the average of squared errors, again slightly up from the Linear Regression's 9.84.
- An R2 of approximately 0.416 shows my Random Forest model explains about 41.6% of the variability in tips. This is a little bit less than the 42.9% my Linear Regression model explained.

Overall, it seems this particular Random Forest configuration performed slightly worse than my Linear Regression model on these metrics.

## MODEL TRAINING(GBTREGRESSOR)

In [14]:
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(labelCol="tip_amount", featuresCol="features", maxIter=30)

paramGrid = (ParamGridBuilder()
    .addGrid(gbt.maxDepth, [5, 7])
    .addGrid(gbt.maxIter, [50, 100])
    .addGrid(gbt.stepSize, [0.1, 0.2])
    .build())

evaluator = RegressionEvaluator(labelCol="tip_amount", predictionCol="prediction", metricName="mae")

pipeline = Pipeline(stages=[
    indexer_pickup, indexer_dropoff,
    indexer_day, indexer_hour, indexer_pass,
    encoder_pickup, encoder_drop,
    encoder_day, encoder_hour, encoder_pass,
    assembler, gbt
])

cv = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=3,
    parallelism=2
)


In [15]:
cvModel = cv.fit(dataTrain)

25/05/14 17:32:45 WARN BlockManager: Asked to remove block rdd_19124_12, which does not exist
25/05/14 17:33:06 WARN DAGScheduler: Broadcasting large task binary with size 1001.1 KiB
25/05/14 17:33:06 WARN DAGScheduler: Broadcasting large task binary with size 1003.4 KiB
25/05/14 17:33:07 WARN DAGScheduler: Broadcasting large task binary with size 1005.1 KiB
25/05/14 17:33:07 WARN DAGScheduler: Broadcasting large task binary with size 1005.6 KiB
25/05/14 17:33:07 WARN DAGScheduler: Broadcasting large task binary with size 1006.3 KiB
25/05/14 17:33:08 WARN DAGScheduler: Broadcasting large task binary with size 1007.4 KiB
25/05/14 17:33:08 WARN DAGScheduler: Broadcasting large task binary with size 1009.2 KiB
25/05/14 17:33:08 WARN DAGScheduler: Broadcasting large task binary with size 1000.0 KiB
25/05/14 17:33:08 WARN DAGScheduler: Broadcasting large task binary with size 1012.2 KiB
25/05/14 17:33:09 WARN DAGScheduler: Broadcasting large task binary with size 1001.6 KiB
25/05/14 17:33:0

## MODEL EVALUATION

In [16]:
predictions = cvModel.transform(dataTest)

mae = evaluator.evaluate(predictions)
print("MAE on Test data:", mae)

rmse_eval = RegressionEvaluator(labelCol="tip_amount", predictionCol="prediction", metricName="rmse")
rmse = rmse_eval.evaluate(predictions)
print("RMSE on Test Data:", rmse)

mse_eval = RegressionEvaluator(labelCol="tip_amount", predictionCol="prediction", metricName="mse")
mse = mse_eval.evaluate(predictions)
print("MSE on Test Data:", mse)

r2_eval = RegressionEvaluator(labelCol="tip_amount", predictionCol="prediction", metricName="r2")
r2 = r2_eval.evaluate(predictions)
print("R2 on Test Data:", r2)

                                                                                

MAE on Test data: 1.9671959983206897


                                                                                

RMSE on Test Data: 3.2530875945279334


                                                                                

MSE on Test Data: 10.582578897671537




R2 on Test Data: 0.38601135274970066


                                                                                

### RESULTS

- MAE: 1.97 – Slightly better than both Random Forest (2.02) and Linear Regression (1.98), indicating marginally lower average error.

- RMSE: 3.25 – Higher than both Random Forest (3.17) and Linear Regression (3.14), showing it is more sensitive to larger errors.

- MSE: 10.58 – Also higher, confirming that squared errors are larger on average.

- R square: 0.39 – Lower than Random Forest (0.42) and Linear Regression (0.43), suggesting it explains less variance in the data.

Overall, while Gradient Boost performs slightly better in terms of MAE, it is behind in RMSE, MSE, and R squared compared to the other models—especially Linear Regression, which appears to have the best overall performance here.

# FINAL CONCLUSIONS

## References:

https://stackoverflow.com/questions/73524197/pyspark-performing-one-hot-encoding

https://www.datatechnotes.com/2021/05/mllib-linear-regression-example-with.html

https://spark.apache.org/docs/latest/api/python/reference/pyspark.ml.html

https://github.com/RezvanRah/ML_TaxiFare_Prediction

https://www.youtube.com/watch?v=HK4YW9qvQE8