In [1]:
from dask.distributed import Client, LocalCluster
import dask.dataframe as dd
from haversine import haversine
from dask.diagnostics import ProgressBar
from dask.distributed import progress

cluster = LocalCluster(n_workers=4, threads_per_worker=16, memory_target_fraction=0.95, memory_limit='64GB')
client = Client(cluster)

high_schools = dd.read_csv("../data/additional_data/schools/high_schools_NYC_2021_processed.csv").repartition(npartitions=4)
middle_schools = dd.read_csv("../data/additional_data/schools/middle_schools_NYC_2021_processed.csv")

Perhaps you already have a cluster running?
Hosting the HTTP server on port 61711 instead


In [2]:
from rtree import index

def create_rtree_index(df, name=1, lat=2, long=3):
    idx = index.Index()
    for i, row in enumerate(df.itertuples()):
        idx.insert(i, (row[lat], row[long], row[lat], row[long]), obj={"name": row[name], "lat": row[lat], "long": row[long]})
    return idx

In [3]:
def test_function(idx, lat, lang):
    hit = list(idx.nearest((lat, lang, lat, lang), 1, objects=True))[0].object
    return (hit['name'], haversine((lat, lang), (hit['lat'], hit['long'])))

def test_wrapper(row, df):
    idx = create_rtree_index(df)
    return test_function(idx, row['Latitude'], row['Longitude'])

In [5]:
res = high_schools.map_partitions(lambda df: df.apply(test_wrapper, axis=1, df=middle_schools, result_type='expand'), meta={0: str, 1: float})
res.columns = ['nearest_middle_school', 'distance_to_nms']

In [6]:
m = client.compute(high_schools.merge(res, left_index=True, right_index=True))

In [7]:
progress(m)

VBox()

In [13]:
m.result()

Unnamed: 0,school_name,Latitude,Longitude,nearest_middle_school,distance_to_nms
12,Food and Finance High School,40.765250,-73.993105,Professional Performing Arts School,0.601838
20,"James Baldwin School, The",40.742867,-74.002136,Quest to Learn,0.000000
22,Gramercy Arts High School,40.735494,-73.987659,"Ballet Tech, NYC Public School for Dance",0.394449
32,N.Y.C. Lab School for Collaborative Studies,40.742129,-74.002198,New York City Lab Middle School for Collaborat...,0.000000
33,School of the Future High School,40.738952,-73.985479,School of the Future Middle and High School,0.000000
...,...,...,...,...,...
425,CSI High School for International Studies,40.581315,-74.158589,Marsh Avenue School for Expeditionary Learning,0.000000
430,Curtis High School,40.644735,-74.081222,I.S. 061 William A Morris,1.839917
433,Ralph R. McKee Career and Technical Education ...,40.642793,-74.078730,I.S. 061 William A Morris,1.884222
438,"Brooklyn School for Social Justice, The",40.697185,-73.911170,All City Leadership Secondary School,0.169942


In [4]:
tickets = dd.read_parquet("../data/parking_tickets/parquet/full_data_cleaned.parquet").repartition(npartitions=2**18)
tmp_tickets = tickets[["Latitude", "Longitude"]]

In [5]:
tmp_tickets = tmp_tickets.get_partition(0)

In [6]:
res = tmp_tickets.map_partitions(lambda df: df.apply(test_wrapper, axis=1, df=middle_schools, result_type='expand'), meta={0: str, 1: float})
res.columns = ['nearest_middle_school', 'distance_to_nms']

In [7]:
m = client.submit(
    lambda df1, df2: df1.merge(df2, left_index=True, right_index=True).compute(),
    client.scatter(tickets),
    client.scatter(res),
)

In [8]:
progress(m)

VBox()