# Data Preprocessing

**Objective**  
Unify raw trip datasets, clean and standardize columns, engineer time features, map pickup coordinates to taxi zones, remove implausible records, and export compact artifacts for downstream ML modeling.

**What this notebook does**  
- Loads multiple monthly files (Parquet) and optionally an extra CSV of “new” trips  
- Normalizes dtypes (datetimes, categoricals, nullable integers), handles missing values  
- Engineers features (trip duration in minutes; hour/day features)  
- Performs a spatial join from pickup lon/lat → taxi zones (WGS84 / EPSG:4326)  
- Filters invalid trips (e.g., nonpositive or extreme durations)  
- Saves cleaned data as fast-loading `.npz` artifacts

---

## Inputs

- **Monthly Parquet files**: `fhv_tripdata_YYYY-MM.parquet`  
  - Minimum expected columns:  
    - `pickup_datetime` `datetime64[ns]`  
    - `dropoff_datetime` `datetime64[ns]`  
    - `PUlocationID`, `DOlocationID` `Int64` (nullable)  
    - Optional: `pickup_longitude`, `pickup_latitude` `float` (WGS84)
    - Optional: `Affiliated_base_number` `category/str`
- **Taxi zones layer**: polygons in GeoPackage/GeoJSON/Shapefile loaded as a `GeoDataFrame` named `taxi_zones` with **CRS = EPSG:4326**

---

## Outputs

- **`npz/combined_trips_clean.npz`**  
  - Cleaned consolidated trips as a table-equivalent (NumPy `.npz`) containing at least:  
    - `pickup_datetime` `datetime64[ns]`  
    - `dropoff_datetime` `datetime64[ns]`  
    - `trip_duration_minutes` `float` (minutes)  
    - `PUlocationID`, `DOlocationID` `Int64`  
    - Optional engineered: `hour` `int` (0–23), `day_of_week` `int` (0–6), `day_of_month` `int` (1–31)  
- **`npz/new_trips_with_zones.npz`**
  - “New trips” with appended zone attributes (e.g., `LocationID`, `zone`) and the same engineered features as above

Import Data

In [1]:
import pyarrow.parquet as pq
import pandas as pd
import os
data_directory = '/Users/zuminchen/Desktop/Summer Research Project/FHV_Data'

all_months_dfs = []

# Loop through files in the specified directory
for filename in os.listdir(data_directory):
    # Check if the filename matches the pattern: fhv_tripdata_YYYY-MM.parquet
    if filename.startswith('fhv_tripdata_') and filename.endswith('.parquet'):
        file_path = os.path.join(data_directory, filename)
        print(f"Reading {filename}...")
        try:
            # Read the parquet file into a PyArrow Table, then convert to Pandas
            table = pq.read_table(file_path)
            df = table.to_pandas()
            all_months_dfs.append(df)
            print(f"Successfully read {filename}. Shape: {df.shape}")
        except Exception as e:
            print(f"Error reading {filename}: {e}")

# Concatenate all DataFrames into one
if all_months_dfs:
    combined_trips_df = pd.concat(all_months_dfs, ignore_index=True)
    print("\n--- All files combined ---")
    print(f"Total rows in combined DataFrame: {combined_trips_df.shape[0]}")
    print(f"Total columns in combined DataFrame: {combined_trips_df.shape[1]}")
    print("\n--- First 5 rows of combined data ---")
    print(combined_trips_df.head())
    print("\n--- Info of combined data ---")
    combined_trips_df.info()

else:
    print("No Parquet files found or read successfully in the specified directory.")


Reading fhv_tripdata_2025-02.parquet...
Successfully read fhv_tripdata_2025-02.parquet. Shape: (1578722, 7)
Reading fhv_tripdata_2021-05.parquet...
Successfully read fhv_tripdata_2021-05.parquet. Shape: (1263660, 7)
Reading fhv_tripdata_2022-09.parquet...
Successfully read fhv_tripdata_2022-09.parquet. Shape: (1160493, 7)
Reading fhv_tripdata_2024-04.parquet...
Successfully read fhv_tripdata_2024-04.parquet. Shape: (1444626, 7)
Reading fhv_tripdata_2023-06.parquet...
Successfully read fhv_tripdata_2023-06.parquet. Shape: (1219445, 7)
Reading fhv_tripdata_2022-10.parquet...
Successfully read fhv_tripdata_2022-10.parquet. Shape: (1174988, 7)
Reading fhv_tripdata_2023-07.parquet...
Successfully read fhv_tripdata_2023-07.parquet. Shape: (1370843, 7)
Reading fhv_tripdata_2022-11.parquet...
Successfully read fhv_tripdata_2022-11.parquet. Shape: (1106084, 7)
Reading fhv_tripdata_2022-01.parquet...
Successfully read fhv_tripdata_2022-01.parquet. Shape: (1143691, 7)
Reading fhv_tripdata_2025-03

Missing Value Handling

In [2]:
combined_trips_df.isnull().sum()

dispatching_base_num             0
pickup_datetime                  0
dropOff_datetime                 0
PUlocationID              55565871
DOlocationID              11452809
SR_Flag                   70165194
Affiliated_base_number         885
dtype: int64

In [3]:
# Convert to a numeric type first, coercing errors to NaN, then fill and convert to int
combined_trips_df['SR_Flag'] = pd.to_numeric(combined_trips_df['SR_Flag'], errors='coerce').fillna(0).astype(int)
# Drop rows with empty values of locationID
combined_trips_df.dropna(subset=['PUlocationID', 'DOlocationID'], inplace=True)
# Fill with 'unknown' for Affiliated_base_number
combined_trips_df['Affiliated_base_number'] = combined_trips_df['Affiliated_base_number'].fillna('UNKNOWN')

Type Conversion

In [4]:
# Convert base numbers to categorical type for efficiency
combined_trips_df['dispatching_base_num'] = combined_trips_df['dispatching_base_num'].astype('category')
combined_trips_df['Affiliated_base_number'] = combined_trips_df['Affiliated_base_number'].astype('category')

Time Feature Engineering

In [5]:
# Trip Duration Feature Engineering
time_difference = combined_trips_df['dropOff_datetime'] - combined_trips_df['pickup_datetime']
combined_trips_df['trip_duration_minutes'] = time_difference.dt.total_seconds() / 60
# Extracting Time-Based Features
combined_trips_df['hour_of_day'] = combined_trips_df['pickup_datetime'].dt.hour
combined_trips_df['day_of_week'] = combined_trips_df['pickup_datetime'].dt.dayofweek # Monday=0, Sunday=6
combined_trips_df['day_of_month'] = combined_trips_df['pickup_datetime'].dt.day
combined_trips_df['month'] = combined_trips_df['pickup_datetime'].dt.month
combined_trips_df['year'] = combined_trips_df['pickup_datetime'].dt.year
combined_trips_df['is_weekend'] = combined_trips_df['pickup_datetime'].dt.weekday >= 5 # Boolean: True for Sat/Sun
combined_trips_df['quarter'] = combined_trips_df['pickup_datetime'].dt.quarter 

In [6]:
# Filtering out unrealistic trip_duration_minutes
combined_trips_df = combined_trips_df[
      (combined_trips_df['trip_duration_minutes'] > 0) &
      (combined_trips_df['trip_duration_minutes'] <= 240)].copy() 
# .copy() to avoid SettingWithCopyWarning
print(f"DataFrame shape after filtering durations: {combined_trips_df.shape}")

DataFrame shape after filtering durations: (14100256, 15)


Introduce Earlier Data

In [8]:
import pandas as pd
import os

data_directory = '/Users/zuminchen/Desktop/Summer Research Project/FHV_Data'
new_csv_file = 'gkne-dk5s.csv'
file_path = os.path.join(data_directory, new_csv_file)

# Load the new data into a DataFrame
try:
    print(f"Reading {new_csv_file} into a DataFrame...")
    # Using low_memory=False to avoid DtypeWarning for mixed-type columns
    new_trips_df = pd.read_csv(file_path, low_memory=False)
    print(f"Successfully read {new_csv_file}. Shape: {new_trips_df.shape}")
    
    print("\n--- First 5 rows of the new DataFrame (first 10 columns) ---")
    # Use .iloc[:, :10] to select all rows and the first 10 columns
    print(new_trips_df.iloc[:5, :10])

    print("\n--- Info of the new DataFrame (first 10 columns) ---")
    new_trips_df.iloc[:, :10].info()

except FileNotFoundError:
    print(f"Error: The file '{new_csv_file}' was not found at '{file_path}'.")
    print("Please ensure the file exists and the path is correct.")
except Exception as e:
    print(f"An error occurred while reading the file: {e}")

Reading gkne-dk5s.csv into a DataFrame...
Successfully read gkne-dk5s.csv. Shape: (1000000, 19)

--- First 5 rows of the new DataFrame (first 10 columns) ---
  vendor_id          pickup_datetime         dropoff_datetime  \
0       CMT  2014-10-04T23:04:31.000  2014-10-04T23:13:49.000   
1       CMT  2014-03-25T23:15:28.000  2014-03-25T23:23:42.000   
2       VTS  2014-12-09T20:39:00.000  2014-12-09T20:39:00.000   
3       CMT  2014-06-08T10:10:21.000  2014-06-08T10:11:56.000   
4       CMT  2014-07-28T00:02:14.000  2014-07-28T00:07:44.000   

   passenger_count  trip_distance  pickup_longitude  pickup_latitude  \
0                1            1.3        -73.991092        40.717533   
1                1            1.3        -73.967118        40.756929   
2                1            0.0        -73.991015        40.760652   
3                1            0.5        -73.978248        40.783308   
4                1            1.1        -73.984075        40.725214   

  store_and_fwd_fl

In [9]:
import geopandas as gpd
from shapely.geometry import Point

# A new GeoDataFrame 'zones_latlon' will be created with the correct CRS (WGS84 Lat/Lon).
if taxi_zones.crs is None or taxi_zones.crs.to_epsg() != 4326:
    zones_latlon = taxi_zones.to_crs(epsg=4326)
else:
    zones_latlon = taxi_zones.copy()
print("Taxi zones are ready with WGS84 CRS.")

# Create a 'geometry' column of Shapely Point objects.
# It's crucial to match the order: Point(longitude, latitude)
pickup_geometry = [Point(xy) for xy in zip(new_trips_df['pickup_longitude'], new_trips_df['pickup_latitude'])]
pickup_locations_gdf = gpd.GeoDataFrame(new_trips_df, geometry=pickup_geometry, crs="EPSG:4326")
print("Pickup locations GeoDataFrame created.")

dropoff_geometry = [Point(xy) for xy in zip(new_trips_df['dropoff_longitude'], new_trips_df['dropoff_latitude'])]
dropoff_locations_gdf = gpd.GeoDataFrame(new_trips_df, geometry=dropoff_geometry, crs="EPSG:4326")
print("Dropoff locations GeoDataFrame created.")

Taxi zones are ready with WGS84 CRS.
Pickup locations GeoDataFrame created.
Dropoff locations GeoDataFrame created.


In [10]:
# Use 'left' join to keep all trips from new_trips_df, even if a point falls outside a zone.
# The 'op' (operation) 'within' checks if a point is inside a polygon.
new_trips_with_pu = gpd.sjoin(pickup_locations_gdf, zones_latlon[['LocationID', 'geometry']], how="left", predicate='within')
# Rename the new LocationID column to reflect it's the pickup zone
new_trips_with_pu = new_trips_with_pu.rename(columns={'LocationID': 'PUlocationID'})
print("PUlocationID added to trips.")

# Join the dropoff locations GeoDataFrame with the taxi zones.
new_trips_with_do = gpd.sjoin(dropoff_locations_gdf, zones_latlon[['LocationID', 'geometry']], how="left", predicate='within')
# Rename the new LocationID column to reflect it's the dropoff zone
new_trips_with_do = new_trips_with_do.rename(columns={'LocationID': 'DOlocationID'})
print("DOlocationID added to trips.")

# Combine the results into a single DataFrame
new_trips_df['PUlocationID'] = new_trips_with_pu['PUlocationID'].values
new_trips_df['DOlocationID'] = new_trips_with_do['DOlocationID'].values

# Display the head of the new DataFrame to confirm the new columns exist.
print(new_trips_df.head())

# The 'PUlocationID' and 'DOlocationID' columns will have NaN values if a point fell outside a zone.
print(f"\nTrips with no matching pickup zone: {new_trips_df['PUlocationID'].isnull().sum()}")
print(f"Trips with no matching dropoff zone: {new_trips_df['DOlocationID'].isnull().sum()}")


PUlocationID added to trips.
DOlocationID added to trips.
  vendor_id          pickup_datetime         dropoff_datetime  \
0       CMT  2014-10-04T23:04:31.000  2014-10-04T23:13:49.000   
1       CMT  2014-03-25T23:15:28.000  2014-03-25T23:23:42.000   
2       VTS  2014-12-09T20:39:00.000  2014-12-09T20:39:00.000   
3       CMT  2014-06-08T10:10:21.000  2014-06-08T10:11:56.000   
4       CMT  2014-07-28T00:02:14.000  2014-07-28T00:07:44.000   

   passenger_count  trip_distance  pickup_longitude  pickup_latitude  \
0                1            1.3        -73.991092        40.717533   
1                1            1.3        -73.967118        40.756929   
2                1            0.0        -73.991015        40.760652   
3                1            0.5        -73.978248        40.783308   
4                1            1.1        -73.984075        40.725214   

  store_and_fwd_flag  dropoff_longitude  dropoff_latitude  ... fare_amount  \
0                  N         -74.003639 

In [11]:
# Drop rows with no matching zones
initial_rows = new_trips_df.shape[0]
new_trips_df.dropna(subset=['PUlocationID', 'DOlocationID'], inplace=True)
final_rows = new_trips_df.shape[0]

print(f"\nDropped {initial_rows - final_rows} trips that had no matching taxi zone.")
print(f"Remaining trips in the DataFrame: {final_rows}")


Dropped 24964 trips that had no matching taxi zone.
Remaining trips in the DataFrame: 975036


In [12]:
combined_trips_df.info()

<class 'pandas.core.frame.DataFrame'>
Index: 14100256 entries, 40 to 70165175
Data columns (total 15 columns):
 #   Column                  Dtype         
---  ------                  -----         
 0   dispatching_base_num    category      
 1   pickup_datetime         datetime64[us]
 2   dropOff_datetime        datetime64[us]
 3   PUlocationID            float64       
 4   DOlocationID            float64       
 5   SR_Flag                 int64         
 6   Affiliated_base_number  category      
 7   trip_duration_minutes   float64       
 8   hour_of_day             int32         
 9   day_of_week             int32         
 10  day_of_month            int32         
 11  month                   int32         
 12  year                    int32         
 13  is_weekend              bool          
 14  quarter                 int32         
dtypes: bool(1), category(2), datetime64[us](2), float64(3), int32(6), int64(1)
memory usage: 1.1 GB


In [13]:
new_trips_df.info()

<class 'pandas.core.frame.DataFrame'>
Index: 975036 entries, 0 to 999999
Data columns (total 21 columns):
 #   Column              Non-Null Count   Dtype  
---  ------              --------------   -----  
 0   vendor_id           975036 non-null  object 
 1   pickup_datetime     975036 non-null  object 
 2   dropoff_datetime    975036 non-null  object 
 3   passenger_count     975036 non-null  int64  
 4   trip_distance       975036 non-null  float64
 5   pickup_longitude    975036 non-null  float64
 6   pickup_latitude     975036 non-null  float64
 7   store_and_fwd_flag  474866 non-null  object 
 8   dropoff_longitude   975036 non-null  float64
 9   dropoff_latitude    975036 non-null  float64
 10  payment_type        975036 non-null  object 
 11  fare_amount         975036 non-null  float64
 12  mta_tax             975036 non-null  float64
 13  tip_amount          975036 non-null  float64
 14  tolls_amount        975036 non-null  float64
 15  total_amount        975036 non-null  fl

Decide the Specific Zone to Analysis

In [14]:
# Group by PUlocationID and count the number of trips for each zone
busiest_pickup_zones = new_trips_df.groupby('PUlocationID').size().reset_index(name='trip_count')

# Sort in descending order to find the top 10 busiest zones
top_10_pickup_zones = busiest_pickup_zones.sort_values(by='trip_count', ascending=False).head(10)

print("Top 10 Busiest Pickup Zones:")
print(top_10_pickup_zones)

Top 10 Busiest Pickup Zones:
     PUlocationID  trip_count
215         237.0       35670
145         161.0       33698
71           79.0       33442
146         162.0       32798
154         170.0       32383
208         230.0       32005
212         234.0       31790
214         236.0       31773
41           48.0       31430
168         186.0       30091


Save

In [16]:
from pathlib import Path
import npz_io

outdir = Path("npz"); outdir.mkdir(parents=True, exist_ok=True)

# Uncompressed .npz
npz_io.save_df_npz(outdir / "combined_trips_clean.npz", combined_trips_df, compress=False)
npz_io.save_df_npz(outdir / "new_trips_with_zones.npz", new_trips_df, compress=False)

'npz/new_trips_with_zones.npz'