In [2]:
import polars as pl
import osmnx as ox
import networkx as nx
import polars as pl
from functools import partial
import pickle
import numpy as np
from scipy.spatial import KDTree
from tqdm.auto import tqdm

In [2]:
with open('datasets/travel_times_simplified.pkl', 'rb') as f:
        distances_data = pickle.load(f)

In [3]:
G = ox.load_graphml('new_york_0.00025.graphml')

In [10]:
NEW_YORK_AREA = [(40.506797, 41.130785), (-74.268086, -73.031593)]
df = pl.read_parquet('datasets/raw_base.parquet').filter(
    (pl.col("pickup_longitude").is_between(*NEW_YORK_AREA[1], closed='both')) &
    (pl.col("pickup_latitude").is_between(*NEW_YORK_AREA[0], closed='both')) &
    (pl.col("dropoff_longitude").is_between(*NEW_YORK_AREA[1], closed='both')) &
    (pl.col("dropoff_latitude").is_between(*NEW_YORK_AREA[0], closed='both')) &
    (pl.col("passenger_count") > 0)).with_columns([
        pl.col("pickup_datetime").str.strptime(pl.Datetime, fmt="%Y-%m-%d %H:%M:%S UTC", strict=True)]
        ).drop('key')

In [4]:
points = np.zeros((len(G.nodes), 2))
for k, node in enumerate(G.nodes):
    points[k, 0] = G.nodes[node]['x']
    points[k, 1] = G.nodes[node]['y']
tree = KDTree(points)

In [17]:
def nearest_nodes(coords: pl.Series, prefix: str, G: nx.MultiDiGraph) -> pl.Series:
    lon_series = coords.struct.field(f'{prefix}_longitude')
    lat_series = coords.struct.field(f'{prefix}_latitude')
    points = np.zeros((len(lon_series), 2))
    points[:, 0] = lon_series.to_numpy()
    points[:, 1] = lat_series.to_numpy()
    return pl.Series(tree.query(points)[1])

def calculate_travel_distance(coords: pl.Series, distance_matrix: np.ndarray) -> pl.Series :
    pickup_node_series = coords.struct.field('pickup_node')
    dropoff_node_series = coords.struct.field('dropoff_node')

    distances = []
    for pn, dn in tqdm(zip(pickup_node_series,
                            dropoff_node_series),
                        total=len(dropoff_node_series)):
        distances.append(distance_matrix[pn, dn])
    return pl.Series(distances)

In [11]:
# ~ 1m 15s
df = df.with_columns(pl.struct(['pickup_longitude', 'pickup_latitude'])
                     .map(partial(nearest_nodes, prefix='pickup', G=G)).alias('pickup_node'))
df = df.with_columns(pl.struct(['dropoff_longitude', 'dropoff_latitude'])
                     .map(partial(nearest_nodes, prefix='dropoff', G=G)).alias('dropoff_node'))

In [13]:
df.describe()

describe,fare_amount,pickup_datetime,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count,pickup_node,dropoff_node
str,f64,str,f64,f64,f64,f64,f64,f64,f64
"""count""",54030130.0,"""54030130""",54030130.0,54030130.0,54030130.0,54030130.0,54030130.0,54030130.0,54030130.0
"""null_count""",0.0,"""0""",0.0,0.0,0.0,0.0,0.0,0.0,0.0
"""mean""",11.324604,,-73.975394,40.750868,-73.974479,40.751265,1.69135,7506.678392,8667.538882
"""std""",20.83751,,0.035001,0.02707,0.034782,0.031004,1.313965,14085.663191,14704.172504
"""min""",-300.0,"""2009-01-01 00:...",-74.268085,40.506906,-74.268085,40.50683,1.0,0.0,0.0
"""max""",93963.36,"""2015-06-30 23:...",-73.033332,41.130701,-73.031918,41.130642,208.0,47691.0,47691.0
"""median""",8.5,,-73.982098,40.753363,-73.980606,40.753852,1.0,1277.0,1411.0


In [18]:
# ~ 3m 30s
df = df.with_columns(pl.struct(['pickup_node', 'dropoff_node'])
                     .map(partial(calculate_travel_distance, distance_matrix=distances_data)).alias('travel_time'))

Start to calculate travel_time


  0%|          | 0/54030130 [00:00<?, ?it/s]

In [20]:
df.drop(['dropoff_node', 'pickup_node']).write_parquet('datasets/all_data_with_distances.parquet')