In [2]:
import duckdb

parquet_path = "data/raw/ride_data_parquet/*.parquet"
jan_parquet = "data/raw/ride_data_parquet/fhvhv_tripdata_2024-01.parquet"
taxi_zone = "data/processed/taxi_zones.parquet"

In [8]:
query = f"SELECT * FROM '{taxi_zone}'"
df = duckdb.query(query).to_df()
print(df.head(100))

    OBJECTID  Shape_Leng  Shape_Area                       zone  LocationID  \
0          1    0.116357    0.000782             Newark Airport           1   
1          2    0.433470    0.004866                Jamaica Bay           2   
2          3    0.084341    0.000314    Allerton/Pelham Gardens           3   
3          4    0.043567    0.000112              Alphabet City           4   
4          5    0.092146    0.000498              Arden Heights           5   
..       ...         ...         ...                        ...         ...   
95        96    0.185180    0.000548  Forest Park/Highland Park          96   
96        97    0.062476    0.000163                Fort Greene          97   
97        98    0.121661    0.000486              Fresh Meadows          98   
98        99    0.183371    0.001210            Freshkills Park          99   
99       100    0.024813    0.000037           Garment District         100   

          borough                                  

In [3]:
query = f"DESCRIBE '{jan_parquet}'"
df = duckdb.query(query).to_df()
print(df)

             column_name column_type null   key default extra
0      hvfhs_license_num     VARCHAR  YES  None    None  None
1   dispatching_base_num     VARCHAR  YES  None    None  None
2   originating_base_num     VARCHAR  YES  None    None  None
3       request_datetime   TIMESTAMP  YES  None    None  None
4      on_scene_datetime   TIMESTAMP  YES  None    None  None
5        pickup_datetime   TIMESTAMP  YES  None    None  None
6       dropoff_datetime   TIMESTAMP  YES  None    None  None
7           PULocationID     INTEGER  YES  None    None  None
8           DOLocationID     INTEGER  YES  None    None  None
9             trip_miles      DOUBLE  YES  None    None  None
10             trip_time      BIGINT  YES  None    None  None
11   base_passenger_fare      DOUBLE  YES  None    None  None
12                 tolls      DOUBLE  YES  None    None  None
13                   bcf      DOUBLE  YES  None    None  None
14             sales_tax      DOUBLE  YES  None    None  None
15  cong

In [4]:
# Drop the existing table before creating a new one
duckdb.query("DROP TABLE IF EXISTS cleaned_ride_data")

# Create the cleaned table
query = f"""
CREATE TABLE cleaned_ride_data AS 
SELECT request_datetime, pickup_datetime, dropoff_datetime, PULocationID, 
DOLocationID, trip_miles, trip_time, base_passenger_fare, 
congestion_surcharge, driver_pay
FROM '{jan_parquet}';
"""
duckdb.query(query)

# Save cleaned data as a new Parquet file
duckdb.query("COPY cleaned_ride_data TO 'data/processed/cleaned_ride_data.parquet' (FORMAT PARQUET)")

print("Successfully cleaned data and saved to 'data/processed/cleaned_ride_data.parquet'")

Successfully cleaned data and saved to 'data/processed/cleaned_ride_data.parquet'


In [5]:
test_parquet = "data/processed/cleaned_ride_data.parquet"
query = f"SELECT * FROM '{test_parquet}' WHERE trip_miles > 5"
df = duckdb.query(query).to_df()
print(df.head())

     request_datetime     pickup_datetime    dropoff_datetime  PULocationID  \
0 2024-01-01 00:03:47 2024-01-01 00:06:15 2024-01-01 00:27:53           255   
1 2024-01-01 00:22:51 2024-01-01 00:29:47 2024-01-01 00:50:08            95   
2 2024-01-01 00:29:43 2024-01-01 00:37:50 2024-01-01 00:58:44           195   
3 2024-01-01 00:08:37 2024-01-01 00:11:17 2024-01-01 00:27:44           229   
4 2024-01-01 00:42:04 2024-01-01 00:53:16 2024-01-01 01:41:05           249   

   DOLocationID  trip_miles  trip_time  base_passenger_fare  \
0            95       7.020       1298                32.16   
1           212      11.330       1221                45.83   
2           112       7.370       1254                37.68   
3            87       5.477        987                22.78   
4           188       7.643       2869                35.10   

   congestion_surcharge  driver_pay  
0                  0.00       24.35  
1                  0.00       30.98  
2                  0.00       50

In [6]:
import geopandas as gpd
import duckdb

duckdb.query("DROP TABLE IF EXISTS enriched_ride_data")

create_table_query = """
CREATE TABLE enriched_ride_data (
    request_datetime TIMESTAMP,
    pickup_datetime TIMESTAMP,
    dropoff_datetime TIMESTAMP,
    PULocationID INT,
    DOLocationID INT,
    trip_miles DOUBLE,
    trip_time DOUBLE,
    base_passenger_fare DOUBLE,
    congestion_surcharge DOUBLE,
    driver_pay DOUBLE,
    pickup_lat DOUBLE,
    pickup_lon DOUBLE,
    dropoff_lat DOUBLE,
    dropoff_lon DOUBLE,
    trip_duration DOUBLE,  -- dropoff_datetime - pickup_datetime in seconds
    wait_time DOUBLE,  -- pickup_datetime - request_datetime in seconds
    avg_speed DOUBLE,  -- miles per hour
);
"""
duckdb.query(create_table_query)

# Define file paths
ride_data_parquet = "data/processed/ride_data/cleaned_ride_data.parquet"
shapefile_path = "data/raw/taxi_zones/taxi_zones.shp"
output_parquet = "data/processed/ride_data/enriched_ride_data.parquet"

# Step 1: Load Taxi Zones Shapefile
taxi_zones = gpd.read_file(shapefile_path)

# Step 2: Convert CRS (Projected to WGS84)
if taxi_zones.crs != "EPSG:4326":
    taxi_zones = taxi_zones.to_crs(epsg=2263)  # Convert to projected CRS first

# Step 3: Compute Centroids in EPSG:2263 for accurate location mapping
taxi_zones["pickup_lat"] = taxi_zones.geometry.centroid.y
taxi_zones["pickup_lon"] = taxi_zones.geometry.centroid.x
taxi_zones["dropoff_lat"] = taxi_zones.geometry.centroid.y
taxi_zones["dropoff_lon"] = taxi_zones.geometry.centroid.x

# Step 4: Convert back to WGS84 (EPSG:4326) for final latitude/longitude
taxi_zones = taxi_zones.to_crs(epsg=4326)

# Extract final geospatial coordinates
taxi_zones["pickup_lat"] = taxi_zones.geometry.centroid.y
taxi_zones["pickup_lon"] = taxi_zones.geometry.centroid.x
taxi_zones["dropoff_lat"] = taxi_zones.geometry.centroid.y
taxi_zones["dropoff_lon"] = taxi_zones.geometry.centroid.x

# Remove geometry column before DuckDB processing
taxi_zones = taxi_zones.drop(columns=["geometry"])  

# Step 5: Store Taxi Zones in DuckDB
conn = duckdb.connect(database=':memory:', read_only=False)
conn.register('taxi_zones', taxi_zones)

# Step 6: Load Ride Data
duckdb.query(f"CREATE OR REPLACE TABLE ride_data AS SELECT * FROM '{ride_data_parquet}'")

# Step 7: Process Data Efficiently in Batches
query = f"""
INSERT INTO enriched_ride_data
SELECT ride.*, 
    tz_pickup.pickup_lat, tz_pickup.pickup_lon, 
    tz_dropoff.dropoff_lat, tz_dropoff.dropoff_lon,
    date_part('epoch', dropoff_datetime - pickup_datetime) AS trip_duration,
    date_part('epoch', pickup_datetime - request_datetime) AS wait_time,
    trip_miles / (date_part('epoch', dropoff_datetime - pickup_datetime) / 3600) AS avg_speed,
FROM cleaned_ride_data AS ride
LEFT JOIN taxi_zones AS tz_pickup ON ride.PULocationID = tz_pickup.LocationID
LEFT JOIN taxi_zones AS tz_dropoff ON ride.DOLocationID = tz_dropoff.LocationID
WHERE tz_pickup.pickup_lat IS NOT NULL 
AND tz_dropoff.dropoff_lat IS NOT NULL 
AND date_part('epoch', dropoff_datetime - pickup_datetime) > 0
"""

duckdb.query(query)

# Step 8: Save Processed Data in Parquet Format
duckdb.query(f"COPY enriched_ride_data TO '{output_parquet}' (FORMAT 'parquet')")

print(f"✅ Successfully processed ride data with lat/lon mapping. Data saved to {output_parquet}")



  taxi_zones["pickup_lat"] = taxi_zones.geometry.centroid.y

  taxi_zones["pickup_lon"] = taxi_zones.geometry.centroid.x

  taxi_zones["dropoff_lat"] = taxi_zones.geometry.centroid.y

  taxi_zones["dropoff_lon"] = taxi_zones.geometry.centroid.x


✅ Successfully processed ride data with lat/lon mapping. Data saved to data/processed/ride_data/enriched_ride_data.parquet


In [7]:
test_parquet = "data/processed/ride_data/enriched_ride_data.parquet"
query = f"SELECT * FROM '{test_parquet}' WHERE request_datetime = '2024-01-01 00:03:47'"
df = duckdb.query(query).to_df()
print(df.head())

     request_datetime     pickup_datetime    dropoff_datetime  PULocationID  \
0 2024-01-01 00:03:47 2024-01-01 00:06:15 2024-01-01 00:27:53           255   
1 2024-01-01 00:03:47 2024-01-01 00:05:36 2024-01-01 00:08:30           174   
2 2024-01-01 00:03:47 2024-01-01 00:06:18 2024-01-01 00:21:04            80   
3 2024-01-01 00:03:47 2024-01-01 00:08:51 2024-01-01 00:23:25           122   
4 2024-01-01 00:03:47 2024-01-01 00:06:39 2024-01-01 00:22:40            25   

   DOLocationID  trip_miles  trip_time  base_passenger_fare  \
0            95       7.020     1298.0                32.16   
1           240       0.472      174.0                 8.05   
2           170       5.590      886.0                22.79   
3            38       3.020      874.0                15.15   
4           227       4.859      961.0                19.62   

   congestion_surcharge  driver_pay  pickup_lat  pickup_lon  dropoff_lat  \
0                  0.00       24.35   40.718804  -73.957418    40.7214

In [8]:
import geopandas as gpd

# Load Taxi Zones shapefile
taxi_zones = gpd.read_file("data/raw/taxi_zones/taxi_zones.shp")

# Convert from EPSG:4326 (geographic CRS) to EPSG:2263 (projected CRS)
taxi_zones = taxi_zones.to_crs(epsg=2263)

# Compute centroids (correctly using projected coordinates)
taxi_zones["pickup_lat_proj"] = taxi_zones.geometry.centroid.y
taxi_zones["pickup_lon_proj"] = taxi_zones.geometry.centroid.x
taxi_zones["dropoff_lat_proj"] = taxi_zones.geometry.centroid.y
taxi_zones["dropoff_lon_proj"] = taxi_zones.geometry.centroid.x

# Convert back to EPSG:4326 for correct latitude & longitude
taxi_zones = taxi_zones.to_crs(epsg=4326)

# Extract final lat/lon values
taxi_zones["pickup_lat"] = taxi_zones.geometry.centroid.y
taxi_zones["pickup_lon"] = taxi_zones.geometry.centroid.x
taxi_zones["dropoff_lat"] = taxi_zones.geometry.centroid.y
taxi_zones["dropoff_lon"] = taxi_zones.geometry.centroid.x

# Validate the output
print(taxi_zones[["pickup_lat", "pickup_lon", "dropoff_lat", "dropoff_lon"]].head())


   pickup_lat  pickup_lon  dropoff_lat  dropoff_lon
0   40.691831  -74.174000    40.691831   -74.174000
1   40.616745  -73.831299    40.616745   -73.831299
2   40.864474  -73.847422    40.864474   -73.847422
3   40.723752  -73.976968    40.723752   -73.976968
4   40.552659  -74.188484    40.552659   -74.188484



  taxi_zones["pickup_lat"] = taxi_zones.geometry.centroid.y

  taxi_zones["pickup_lon"] = taxi_zones.geometry.centroid.x

  taxi_zones["dropoff_lat"] = taxi_zones.geometry.centroid.y

  taxi_zones["dropoff_lon"] = taxi_zones.geometry.centroid.x


In [9]:
import networkx as nx

# Load the road network from a GraphML file
G = nx.read_graphml("data/processed/road_graph/new_york_processed_network.graphml")

# Print basic information about the graph
print(f"Graph Loaded: {G.number_of_nodes()} nodes, {G.number_of_edges()} edges")

# Extract and display sample maxspeed values from edges
sample_maxspeeds = []

for u, v, data in list(G.edges(data=True))[:100]:  # Limit to first 100 edges
    sample_maxspeeds.append(data.get('maxspeed', 'Unknown'))  # Retrieve maxspeed or default to 'Unknown'

print("Sample Maxspeed Values:", sample_maxspeeds)


Graph Loaded: 55242 nodes, 139297 edges
Sample Maxspeed Values: ['50 mph', 'Unknown', 'Unknown', '50 mph', 'Unknown', '50 mph', '25 mph', 'Unknown', '25 mph', 'Unknown', 'Unknown', 'Unknown', 'Unknown', 'Unknown', '25 mph', 'Unknown', 'Unknown', 'Unknown', 'Unknown', 'Unknown', 'Unknown', '25 mph', 'Unknown', 'Unknown', '25 mph', 'Unknown', 'Unknown', '25 mph', 'Unknown', 'Unknown', 'Unknown', '25 mph', 'Unknown', '25 mph', '25 mph', 'Unknown', 'Unknown', 'Unknown', 'Unknown', '20 mph', 'Unknown', 'Unknown', 'Unknown', 'Unknown', 'Unknown', 'Unknown', 'Unknown', '25 mph', '25 mph', 'Unknown', 'Unknown', 'Unknown', 'Unknown', 'Unknown', '25 mph', 'Unknown', '25 mph', '25 mph', 'Unknown', 'Unknown', 'Unknown', '20 mph', '20 mph', 'Unknown', 'Unknown', 'Unknown', 'Unknown', 'Unknown', 'Unknown', 'Unknown', 'Unknown', 'Unknown', '25 mph', '25 mph', '25 mph', '25 mph', '20 mph', '20 mph', '20 mph', '20 mph', '20 mph', '20 mph', '20 mph', '20 mph', '20 mph', '20 mph', '20 mph', '25 mph', 'Un

In [10]:
print("Sample Nodes:", list(G.nodes(data=True))[:5])
print("Sample Nodes:", list(G.edges(data=True))[:5])

Sample Nodes: [('39076461', {'y': '40.7863451', 'x': '-73.7947484', 'highway': 'motorway_junction', 'ref': '33', 'street_count': '3'}), ('39076490', {'y': '40.7624294', 'x': '-73.7570906', 'highway': 'motorway_junction', 'ref': '31W', 'street_count': '3'}), ('39076504', {'y': '40.7534671', 'x': '-73.7441643', 'highway': 'motorway_junction', 'ref': '30W', 'street_count': '3'}), ('42421728', {'y': '40.7980478', 'x': '-73.9600437', 'highway': 'traffic_signals', 'street_count': '3'}), ('42421731', {'y': '40.7986542', 'x': '-73.9614745', 'highway': 'traffic_signals', 'street_count': '4'})]
Sample Nodes: [('39076461', '274283981', {'osmid': '25161349', 'highway': 'motorway', 'lanes': '2', 'maxspeed': '50 mph', 'name': 'Cross Island Parkway', 'oneway': 'True', 'ref': 'CI', 'reversed': 'False', 'length': '819.5016661477803', 'geometry': 'LINESTRING (-73.7947484 40.7863451, -73.794615 40.7863898, -73.7944856 40.7864343, -73.7943563 40.7864809, -73.7942289 40.7865266, -73.7940993 40.7865739, -73

In [11]:
import networkx as nx


# Validate street_count vs degree
validation_results = []

for node in G.nodes():
    degree = G.degree(node)  # Count edges
    street_count = int(G.nodes[node].get('street_count', 0))  # Extract street count
    
    validation_results.append((node, degree, street_count))

# Print the first 10 nodes for comparison
for node, degree, street_count in validation_results[:10]:
    print(f"Node: {node} | Degree: {degree} | Street Count: {street_count}")


Node: 39076461 | Degree: 3 | Street Count: 3
Node: 39076490 | Degree: 3 | Street Count: 3
Node: 39076504 | Degree: 3 | Street Count: 3
Node: 42421728 | Degree: 6 | Street Count: 3
Node: 42421731 | Degree: 8 | Street Count: 4
Node: 42421737 | Degree: 6 | Street Count: 4
Node: 42421741 | Degree: 6 | Street Count: 4
Node: 42421745 | Degree: 6 | Street Count: 4
Node: 42421749 | Degree: 7 | Street Count: 4
Node: 42421751 | Degree: 4 | Street Count: 3
