In [None]:
#download from Kaggle
! pip install -q kaggle
from google.colab import files
files.upload()
! mkdir ~/.kaggle
! cp kaggle.json ~/.kaggle/
! chmod 600 ~/.kaggle/kaggle.json
! kaggle datasets download -d jeffsinsel/nyc-fhvhv-data

Saving kaggle.json to kaggle.json
Dataset URL: https://www.kaggle.com/datasets/jeffsinsel/nyc-fhvhv-data
License(s): CC0-1.0
Downloading nyc-fhvhv-data.zip to /content
100% 17.8G/17.8G [11:13<00:00, 29.2MB/s]
100% 17.8G/17.8G [11:13<00:00, 28.5MB/s]


In [None]:
#unzip files in nyc-fhvhv-data from Kaggle
! unzip nyc-fhvhv-data.zip

Archive:  nyc-fhvhv-data.zip
  inflating: data_dictionary_trip_records_hvfhs.pdf  
  inflating: fhvhv_tripdata_2019-02.parquet  
  inflating: fhvhv_tripdata_2019-03.parquet  
  inflating: fhvhv_tripdata_2019-04.parquet  
  inflating: fhvhv_tripdata_2019-05.parquet  
  inflating: fhvhv_tripdata_2019-06.parquet  
  inflating: fhvhv_tripdata_2019-07.parquet  
  inflating: fhvhv_tripdata_2019-08.parquet  
  inflating: fhvhv_tripdata_2019-09.parquet  
  inflating: fhvhv_tripdata_2019-10.parquet  
  inflating: fhvhv_tripdata_2019-11.parquet  
  inflating: fhvhv_tripdata_2019-12.parquet  
  inflating: fhvhv_tripdata_2020-01.parquet  
  inflating: fhvhv_tripdata_2020-02.parquet  
  inflating: fhvhv_tripdata_2020-03.parquet  
  inflating: fhvhv_tripdata_2020-04.parquet  
  inflating: fhvhv_tripdata_2020-05.parquet  
  inflating: fhvhv_tripdata_2020-06.parquet  
  inflating: fhvhv_tripdata_2020-07.parquet  
  inflating: fhvhv_tripdata_2020-08.parquet  
  inflating: fhvhv_tripdata_2020-09.parquet

In [None]:
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
import glob

# initialize spark
spark = SparkSession.builder.appName("NYC_Rides").getOrCreate()

# list of all parquet files from content folder
parquet_files = glob.glob('/content/*.parquet')

removed_summary = {}
combined_df = None

for file in parquet_files:
    # read parquet files get features
    df = spark.read.parquet(file).select(
        "Pickup_datetime", "DropOff_datetime", "PULocationID", "DOLocationID",
        "base_passenger_fare", "trip_miles", "tips", "driver_pay", "trip_time",
        "Hvfhs_license_num"
    )

    initial_count = df.count()

    # Clean data by filtering out null and invalid values
    df_clean = df.filter(
        (F.col("Pickup_datetime").isNotNull()) &
        (F.col("DropOff_datetime").isNotNull()) &
        (F.col("PULocationID").isNotNull()) &
        (F.col("DOLocationID").isNotNull()) &
        (F.col("base_passenger_fare").isNotNull()) &
        (F.col("trip_miles").isNotNull()) &
        (F.col("tips").isNotNull()) &
        (F.col("driver_pay").isNotNull()) &
        (F.col("trip_time").isNotNull()) &
        (F.col("Hvfhs_license_num").isNotNull()) &

        # Ensure numeric columns have valid positive values
        (F.col("base_passenger_fare") > 0) &
        (F.col("trip_miles") > 0) &
        (F.col("driver_pay") > 0) &
        (F.col("trip_time") > 0)
    )

    clean_count = df_clean.count()
    removed_summary[file] = initial_count - clean_count

    # combine clean DFs into one big DF
    if combined_df is None:
        combined_df = df_clean
    else:
        combined_df = combined_df.union(df_clean)

# check how many of which file were removed
print("Removal Summary:", removed_summary)

Removal Summary: {'/content/fhvhv_tripdata_2021-11.parquet': 97481, '/content/fhvhv_tripdata_2019-02.parquet': 2269891, '/content/fhvhv_tripdata_2021-06.parquet': 155107, '/content/fhvhv_tripdata_2020-12.parquet': 104215, '/content/fhvhv_tripdata_2022-06.parquet': 87529, '/content/fhvhv_tripdata_2022-08.parquet': 83152, '/content/fhvhv_tripdata_2020-06.parquet': 110413, '/content/fhvhv_tripdata_2020-05.parquet': 78979, '/content/fhvhv_tripdata_2020-10.parquet': 130980, '/content/fhvhv_tripdata_2022-03.parquet': 84930, '/content/fhvhv_tripdata_2020-03.parquet': 949378, '/content/fhvhv_tripdata_2019-10.parquet': 2486406, '/content/fhvhv_tripdata_2019-05.parquet': 2197902, '/content/fhvhv_tripdata_2021-12.parquet': 94795, '/content/fhvhv_tripdata_2021-09.parquet': 134529, '/content/fhvhv_tripdata_2021-10.parquet': 104918, '/content/fhvhv_tripdata_2019-09.parquet': 2193661, '/content/fhvhv_tripdata_2019-08.parquet': 4465149, '/content/fhvhv_tripdata_2019-12.parquet': 2247128, '/content/fhv

In [None]:
# calculate how many total rows of data were removed
tot = 0
for key,val in removed_summary.items():
  tot += int(val)

count = combined_df.count()
percent_removed = tot/count * 100

print("{}% was removed from a total of {} rides in 46 months".format(percent_removed, count))

5.159562385472075% was removed from a total of 708720164 rides in 46 months


In [None]:
!pip install category_encoders


Collecting category_encoders
  Downloading category_encoders-2.8.1-py3-none-any.whl.metadata (7.9 kB)
Downloading category_encoders-2.8.1-py3-none-any.whl (85 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m85.7/85.7 kB[0m [31m1.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: category_encoders
Successfully installed category_encoders-2.8.1


In [None]:
from pyspark.sql.functions import hour

# Extract Hour from Pickup_datetime
combined_df = combined_df.withColumn("hour", hour("Pickup_datetime"))


In [None]:
combined_df.show(5)
combined_df.printSchema()

+-------------------+-------------------+------------+------------+-------------------+----------+----+----------+---------+-----------------+----+
|    Pickup_datetime|   DropOff_datetime|PULocationID|DOLocationID|base_passenger_fare|trip_miles|tips|driver_pay|trip_time|Hvfhs_license_num|hour|
+-------------------+-------------------+------------+------------+-------------------+----------+----+----------+---------+-----------------+----+
|2021-11-01 00:00:52|2021-11-01 00:07:05|         225|          37|               7.91|      1.11| 0.0|      8.41|      373|           HV0003|   0|
|2021-11-01 00:12:43|2021-11-01 00:33:16|          36|         124|              25.73|      5.52| 0.0|     19.43|     1233|           HV0003|   0|
|2021-11-01 00:41:42|2021-11-01 00:48:36|         132|         216|               7.91|      1.62| 0.0|      8.42|      414|           HV0003|   0|
|2021-11-01 00:55:56|2021-11-01 01:04:54|         216|         258|              11.17|      2.54| 0.0|     10.3

In [None]:
from pyspark.sql.functions import col, sum, min

# Group by Pickup Zone, Hour, and Ride-Hailing Company to Aggregate Driver Pay
df_grouped = combined_df.groupBy("PULocationID", "hour", "Hvfhs_license_num").agg(
    sum("driver_pay").alias("total_driver_pay"),
    sum("tips").alias("total_tips"),
    sum("trip_miles").alias("total_miles"),
    sum("trip_time").alias("total_time_seconds"),
    sum("base_passenger_fare").alias("total_base_fare"),
    min("Pickup_datetime").alias("earliest_pickup_time")
)


In [None]:
import pandas as pd

# Convert to Pandas dataframe

df = df_grouped.toPandas()



In [None]:
import numpy as np

df_pandas = df.copy()

# Add relevant variables to dataframe
df_pandas["earliest_pickup_time"] = pd.to_datetime(df_pandas["earliest_pickup_time"])
df_pandas["day_of_week"] = df_pandas["earliest_pickup_time"].dt.dayofweek
df_pandas["is_weekend"] = df_pandas["day_of_week"].isin([5, 6]).astype(int)

# Convert trip time from seconds to hours
df_pandas["total_time_hours"] = df_pandas["total_time_seconds"] / 3600

# Handle division by zero
df_pandas["total_time_hours"].replace(0, np.nan, inplace=True)

# Compute total driver earnings (including tips)
df_pandas["total_driver_earnings"] = df_pandas["total_driver_pay"] + df_pandas["total_tips"]

# Compute earnings per hour
df_pandas["earnings_per_hour"] = df_pandas["total_driver_earnings"] / df_pandas["total_time_hours"]

The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  df_pandas["total_time_hours"].replace(0, np.nan, inplace=True)


In [None]:
df_pandas.columns.tolist()

['PULocationID',
 'hour',
 'Hvfhs_license_num',
 'total_driver_pay',
 'total_tips',
 'total_miles',
 'total_time_seconds',
 'total_base_fare',
 'earliest_pickup_time',
 'day_of_week',
 'is_weekend',
 'total_time_hours',
 'total_driver_earnings',
 'earnings_per_hour']

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder

# Set X and y variable to predict driver earnings
X = df_pandas.drop(columns=["total_driver_earnings", "earnings_per_hour", "earliest_pickup_time", "total_driver_pay", "total_time_seconds"])
y = df_pandas["earnings_per_hour"]

# Encode PULocationID using target mean encoding
pulo_mean = df_pandas.groupby("PULocationID")["earnings_per_hour"].mean()
df_pandas["PULocationID_encoded"] = df_pandas["PULocationID"].map(pulo_mean)
X = X.drop(columns=["PULocationID"])

# One-hot encode Hvfhs_license_num
X = pd.get_dummies(X, columns=["Hvfhs_license_num"], drop_first=True)
X = X.astype(float)

# Fix extreme outliers in target variable
y = y.clip(upper=y.quantile(0.99))

# Train-test split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [None]:
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from xgboost import XGBRegressor
from sklearn.metrics import r2_score, mean_absolute_error

# Train different models to choose the best performing one

models = {
    "RandomForest": RandomForestRegressor(n_estimators=100, max_depth=10, random_state=42),
    "XGBoost": XGBRegressor(n_estimators=100, learning_rate=0.1, max_depth=6, random_state=42),
    "GradientBoosting": GradientBoostingRegressor(n_estimators=100, learning_rate=0.1, max_depth=6, random_state=42)
}

results = {}
for name, model in models.items():
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    r2 = r2_score(y_test, y_pred)
    mae = mean_absolute_error(y_test, y_pred)
    results[name] = {"R² Score": r2, "MAE": mae}

# Convert results dictionary to DataFrame
results_df = pd.DataFrame(results).T

# Display the results
print(results_df)


                  R² Score       MAE
RandomForest      0.717148  2.783542
XGBoost           0.787632  2.339800
GradientBoosting  0.797635  2.308395


In [None]:
#GradientBoosting is the best model, therefore I'll use it to predict driver earnings
X = df.drop(columns=["earnings_per_hour"])
y = df["earnings_per_hour"]

# Train GradientBoosting with the whole dataset
best_model = GradientBoostingRegressor(n_estimators=100, learning_rate=0.1, max_depth=6, random_state=42)
best_model.fit(X, y)

# Predict earnings_per_hour
y_pred = best_model.predict(X)

# Attach Predictions to PULocationID
df_pandas["predicted_earnings_per_hour"] = y_pred


In [None]:
# Create a dataframe with PULocationID, hour and predicted_earnings_per_hour for heatmap

# Group by PULocationID and Hour to Get Mean Earnings Per Hour
earnings_by_location_time = (
    df_pandas.groupby(["PULocationID", "hour"])["predicted_earnings_per_hour"]
    .mean()
    .reset_index()
)

# Display first few rows of Results
print(earnings_by_location_time.head())

   PULocationID  hour  predicted_earnings_per_hour
0             1     0                    58.836892
1             1     1                    64.492939
2             1     2                    68.058126
3             1     3                    69.057160
4             1     4                    71.637152


In [None]:
#download it as a CSV

earnings_by_location_time.to_csv('earnings_by_location_time_with_pred.csv', index=False)
files.download('earnings_by_location_time_with_pred.csv')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>