Import

In [1]:
import numpy as np
import geopandas as gpd
import matplotlib.pyplot as plt
import seaborn as sns
import contextily as ctx

from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from sklearn.metrics import r2_score
from sklearn.preprocessing import StandardScaler
import pandas as pd

import warnings
warnings.filterwarnings("ignore")

# Read Data

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("YellowTaxi").master("local[*]").config("spark.driver.memory","48G").getOrCreate()

# Path to the directory containing Parquet files
data_path = "../data/tlc_data/"

# Load all Parquet files at once
data = spark.read.parquet(
    f"{data_path}yellow_tripdata_2023-12.parquet",
    f"{data_path}yellow_tripdata_2024-01.parquet",
    f"{data_path}yellow_tripdata_2024-02.parquet",
    f"{data_path}yellow_tripdata_2024-03.parquet",
    f"{data_path}yellow_tripdata_2024-04.parquet",
    f"{data_path}yellow_tripdata_2024-05.parquet"
)


24/08/25 21:08:29 WARN Utils: Your hostname, lixianweideMacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.29.173.161 instead (on interface en0)
24/08/25 21:08:29 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/25 21:08:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

# Data clean

In [3]:
from pyspark.sql import functions as F

# Drop columns that are not needed
data = data.drop('store_and_fwd_flag')
data = data.drop('Airport_fee')
data = data.drop('RatecodeID')
data = data.drop('congestion_surcharge')
data = data.drop('VendorID')
data = data.drop('extra')
data = data.drop('mta_tax')
data = data.drop('tolls_amount')
data = data.drop('improvement_surcharge')

# Fill missing values
fill_values = {
    'passenger_count': 0
}

data = data.fillna(fill_values)

# Filter rows with valid data
data = data.filter(
    (data['passenger_count'].between(1, 4)) &
    (data['trip_distance'].between(1, 50)) &
    (data['fare_amount'].between(4, 200)) &
    (data['tip_amount'] >= 0) &
    (data['total_amount'] > 0) &
    (data['PULocationID'].between(1, 263)) &
    (data['DOLocationID'].between(1, 263))
)

# Add derived features
data = data.withColumn('date_time', F.to_date(data['tpep_pickup_datetime']))
data = data.withColumn('period', (F.unix_timestamp('tpep_dropoff_datetime') - F.unix_timestamp('tpep_pickup_datetime')) / 60)
data = data.withColumn('hour', F.hour(data['tpep_pickup_datetime']))

# Filter for valid trip periods (0 < period <= 180 minutes)
data = data.filter((data['period'] > 0) & (data['period'] <= 180))

# Filter rows to only include data within specified date ranges
date_filter = (
    ((data['date_time'] <= '2023-12-31') & (data['date_time'] >= '2023-12-01')) |
    ((data['date_time'] <= '2024-01-31') & (data['date_time'] >= '2024-01-01')) |
    ((data['date_time'] <= '2024-02-29') & (data['date_time'] >= '2024-02-01')) |
    ((data['date_time'] <= '2024-03-31') & (data['date_time'] >= '2024-03-01')) |
    ((data['date_time'] <= '2024-04-30') & (data['date_time'] >= '2024-04-01')) |
    ((data['date_time'] <= '2024-05-31') & (data['date_time'] >= '2024-05-01'))
)
data = data.filter(date_filter)

# Load zone data and join to identify pickup and dropoff borough
zone_data = spark.read.csv('../data/taxi_zones/taxi+_zone_lookup.csv', header=True, inferSchema=True)
data = data.join(zone_data.select('LocationID', F.col('Borough').alias('pick_up_borough')),
                 data['PULocationID'] == zone_data['LocationID'], 'left').drop('LocationID')

data = data.join(zone_data.select('LocationID', F.col('Borough').alias('drop_off_borough')),
                 data['DOLocationID'] == zone_data['LocationID'], 'left').drop('LocationID')

# Identify pick up and dropoff location that's from the airport
data = data.withColumn('pick_up_borough', F.when(data['PULocationID'] == 138, 'LaGuardia')
                                         .when(data['PULocationID'] == 132, 'JFK')
                                         .otherwise(data['pick_up_borough']))

data = data.withColumn('drop_off_borough', F.when(data['DOLocationID'] == 138, 'LaGuardia')
                                          .when(data['DOLocationID'] == 132, 'JFK')
                                          .otherwise(data['drop_off_borough']))

# Show the result 
data.show()


+--------------------+---------------------+---------------+-------------+------------+------------+------------+-----------+----------+------------+----------+------------------+----+---------------+----------------+
|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|PULocationID|DOLocationID|payment_type|fare_amount|tip_amount|total_amount| date_time|            period|hour|pick_up_borough|drop_off_borough|
+--------------------+---------------------+---------------+-------------+------------+------------+------------+-----------+----------+------------+----------+------------------+----+---------------+----------------+
| 2023-12-01 00:59:44|  2023-12-01 01:13:22|              2|          2.2|         114|         186|           1|       13.5|       3.0|        21.5|2023-12-01|13.633333333333333|   0|      Manhattan|       Manhattan|
| 2023-12-01 00:18:16|  2023-12-01 00:25:32|              2|          2.2|         229|         263|           1|       11.4|   

                                                                                

In [4]:
data = data.toPandas()

                                                                                

In [9]:
# from https://www.visualcrossing.com/weather/weather-data-services#
weather_data = pd.read_csv('../data/New York City,USA 2023-12-01 to 2024-05-31.csv')
weather_data.head()

Unnamed: 0,name,datetime,tempmax,tempmin,temp,feelslikemax,feelslikemin,feelslike,dew,humidity,...,solarenergy,uvindex,severerisk,sunrise,sunset,moonphase,conditions,description,icon,stations
0,"New York City,USA",2023-12-01,8.8,5.7,7.2,7.7,3.0,5.5,3.1,75.9,...,2.7,2,10,2023-12-01T07:00:36,2023-12-01T16:29:18,0.64,"Rain, Partially cloudy",Becoming cloudy in the afternoon with rain.,rain,"72505394728,72055399999,KLGA,KJRB,F1417,KNYC,7..."
1,"New York City,USA",2023-12-02,14.4,8.3,10.2,14.4,6.2,9.4,7.8,85.5,...,6.8,5,10,2023-12-02T07:01:37,2023-12-02T16:29:04,0.67,Partially cloudy,Clearing in the afternoon.,partly-cloudy-day,"72505394728,72055399999,KLGA,KJRB,F1417,KNYC,7..."
2,"New York City,USA",2023-12-03,10.6,8.9,9.5,10.6,5.8,8.3,8.3,92.2,...,1.0,1,10,2023-12-03T07:02:37,2023-12-03T16:28:51,0.7,"Rain, Overcast",Cloudy skies throughout the day with a chance ...,rain,"72505394728,72055399999,KLGA,KJRB,F1417,KNYC,7..."
3,"New York City,USA",2023-12-04,11.2,6.7,8.7,11.2,4.1,7.3,3.4,70.5,...,7.4,5,10,2023-12-04T07:03:36,2023-12-04T16:28:41,0.73,Clear,Clear conditions throughout the day.,clear-day,"72505394728,72055399999,KLGA,KJRB,F1417,KNYC,7..."
4,"New York City,USA",2023-12-05,6.7,4.3,5.7,5.6,2.4,3.9,-2.2,57.4,...,5.5,3,10,2023-12-05T07:04:34,2023-12-05T16:28:33,0.75,Partially cloudy,Partly cloudy throughout the day.,partly-cloudy-day,"72505394728,72055399999,KLGA,KJRB,F1417,KNYC,7..."


In [10]:
weather_data.columns

Index(['name', 'datetime', 'tempmax', 'tempmin', 'temp', 'feelslikemax',
       'feelslikemin', 'feelslike', 'dew', 'humidity', 'precip', 'precipprob',
       'precipcover', 'preciptype', 'snow', 'snowdepth', 'windgust',
       'windspeed', 'winddir', 'sealevelpressure', 'cloudcover', 'visibility',
       'solarradiation', 'solarenergy', 'uvindex', 'severerisk', 'sunrise',
       'sunset', 'moonphase', 'conditions', 'description', 'icon', 'stations'],
      dtype='object')

In [11]:
weather_data['datetime']=pd.to_datetime(weather_data['datetime'])
weather_data=weather_data.sort_values('datetime')
weather_data=weather_data.reset_index()
weather_data=weather_data.drop(['index', 'stations', 'name', 'description','sunrise',
       'sunset'],axis=1)
# weather_data['datetime']=weather_data['datetime'].astype('datetime64[us]')


In [12]:
weather_data.head()

Unnamed: 0,datetime,tempmax,tempmin,temp,feelslikemax,feelslikemin,feelslike,dew,humidity,precip,...,sealevelpressure,cloudcover,visibility,solarradiation,solarenergy,uvindex,severerisk,moonphase,conditions,icon
0,2023-12-01,8.8,5.7,7.2,7.7,3.0,5.5,3.1,75.9,0.625,...,1020.7,47.4,14.7,32.5,2.7,2,10,0.64,"Rain, Partially cloudy",rain
1,2023-12-02,14.4,8.3,10.2,14.4,6.2,9.4,7.8,85.5,0.0,...,1016.6,56.5,11.4,77.5,6.8,5,10,0.67,Partially cloudy,partly-cloudy-day
2,2023-12-03,10.6,8.9,9.5,10.6,5.8,8.3,8.3,92.2,0.904,...,1009.6,98.4,9.6,10.4,1.0,1,10,0.7,"Rain, Overcast",rain
3,2023-12-04,11.2,6.7,8.7,11.2,4.1,7.3,3.4,70.5,0.0,...,1008.7,17.5,16.0,85.6,7.4,5,10,0.73,Clear,clear-day
4,2023-12-05,6.7,4.3,5.7,5.6,2.4,3.9,-2.2,57.4,0.0,...,1016.3,27.8,16.0,63.0,5.5,3,10,0.75,Partially cloudy,partly-cloudy-day


In [13]:
weather_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 183 entries, 0 to 182
Data columns (total 28 columns):
 #   Column            Non-Null Count  Dtype         
---  ------            --------------  -----         
 0   datetime          183 non-null    datetime64[ns]
 1   tempmax           183 non-null    float64       
 2   tempmin           183 non-null    float64       
 3   temp              183 non-null    float64       
 4   feelslikemax      183 non-null    float64       
 5   feelslikemin      183 non-null    float64       
 6   feelslike         183 non-null    float64       
 7   dew               183 non-null    float64       
 8   humidity          183 non-null    float64       
 9   precip            183 non-null    float64       
 10  precipprob        183 non-null    int64         
 11  precipcover       183 non-null    float64       
 12  preciptype        76 non-null     object        
 13  snow              183 non-null    float64       
 14  snowdepth         183 non-

In [14]:
data.head()

Unnamed: 0,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,PULocationID,DOLocationID,payment_type,fare_amount,tip_amount,total_amount,date_time,period,hour,pick_up_borough,drop_off_borough
0,2023-12-01 00:59:44,2023-12-01 01:13:22,2,2.2,114,186,1,13.5,3.0,21.5,2023-12-01,13.633333,0,Manhattan,Manhattan
1,2023-12-01 00:18:16,2023-12-01 00:25:32,2,2.2,229,263,1,11.4,2.0,18.4,2023-12-01,7.266667,0,Manhattan,Manhattan
2,2023-12-01 00:17:09,2023-12-01 00:33:31,1,5.33,45,162,1,24.7,3.0,32.7,2023-12-01,16.366667,0,Manhattan,Manhattan
3,2023-12-01 00:19:04,2023-12-01 00:34:36,1,3.33,186,209,1,17.7,3.4,26.1,2023-12-01,15.533333,0,Manhattan,Manhattan
4,2023-12-01 00:08:39,2023-12-01 00:16:18,1,2.1,163,262,1,12.1,3.42,20.52,2023-12-01,7.65,0,Manhattan,Manhattan


In [15]:
data['date_time'] = pd.to_datetime(data['date_time'])

In [16]:
data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 13240082 entries, 0 to 13240081
Data columns (total 15 columns):
 #   Column                 Dtype         
---  ------                 -----         
 0   tpep_pickup_datetime   datetime64[ns]
 1   tpep_dropoff_datetime  datetime64[ns]
 2   passenger_count        int64         
 3   trip_distance          float64       
 4   PULocationID           int32         
 5   DOLocationID           int32         
 6   payment_type           int64         
 7   fare_amount            float64       
 8   tip_amount             float64       
 9   total_amount           float64       
 10  date_time              datetime64[ns]
 11  period                 float64       
 12  hour                   int32         
 13  pick_up_borough        object        
 14  drop_off_borough       object        
dtypes: datetime64[ns](3), float64(5), int32(3), int64(2), object(2)
memory usage: 1.3+ GB


In [17]:
data.isna().sum()

tpep_pickup_datetime     0
tpep_dropoff_datetime    0
passenger_count          0
trip_distance            0
PULocationID             0
DOLocationID             0
payment_type             0
fare_amount              0
tip_amount               0
total_amount             0
date_time                0
period                   0
hour                     0
pick_up_borough          0
drop_off_borough         0
dtype: int64

## data merge

In [18]:

data = data.sort_values(by='date_time')

data = pd.merge_asof(data, weather_data, 
                     left_on='date_time',right_on='datetime',
                     tolerance=pd.Timedelta(days=1), direction='forward')



In [19]:
data.head()

Unnamed: 0,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,PULocationID,DOLocationID,payment_type,fare_amount,tip_amount,total_amount,...,sealevelpressure,cloudcover,visibility,solarradiation,solarenergy,uvindex,severerisk,moonphase,conditions,icon
0,2023-12-01 00:59:44,2023-12-01 01:13:22,2,2.2,114,186,1,13.5,3.0,21.5,...,1020.7,47.4,14.7,32.5,2.7,2,10,0.64,"Rain, Partially cloudy",rain
1,2023-12-01 18:17:38,2023-12-01 18:40:48,2,2.69,186,79,1,20.5,5.4,32.4,...,1020.7,47.4,14.7,32.5,2.7,2,10,0.64,"Rain, Partially cloudy",rain
2,2023-12-01 18:41:58,2023-12-01 18:52:44,1,1.29,158,234,1,11.4,1.5,19.4,...,1020.7,47.4,14.7,32.5,2.7,2,10,0.64,"Rain, Partially cloudy",rain
3,2023-12-01 18:52:21,2023-12-01 19:20:26,2,1.29,170,163,1,22.6,5.82,34.92,...,1020.7,47.4,14.7,32.5,2.7,2,10,0.64,"Rain, Partially cloudy",rain
4,2023-12-01 18:12:34,2023-12-01 18:30:47,1,1.45,137,161,1,16.3,3.42,26.22,...,1020.7,47.4,14.7,32.5,2.7,2,10,0.64,"Rain, Partially cloudy",rain


In [20]:
data.to_csv('data.csv')

In [21]:
data.columns

Index(['tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count',
       'trip_distance', 'PULocationID', 'DOLocationID', 'payment_type',
       'fare_amount', 'tip_amount', 'total_amount', 'date_time', 'period',
       'hour', 'pick_up_borough', 'drop_off_borough', 'datetime', 'tempmax',
       'tempmin', 'temp', 'feelslikemax', 'feelslikemin', 'feelslike', 'dew',
       'humidity', 'precip', 'precipprob', 'precipcover', 'preciptype', 'snow',
       'snowdepth', 'windgust', 'windspeed', 'winddir', 'sealevelpressure',
       'cloudcover', 'visibility', 'solarradiation', 'solarenergy', 'uvindex',
       'severerisk', 'moonphase', 'conditions', 'icon'],
      dtype='object')