# Scaling Proximity and Accessibility Analysis Using Dask

## 1. Introduction

In this notebook, we use [Dask](https://www.dask.org) to run in parallel the proximity and accessibility analysis based on [Pandana](https://udst.github.io/pandana/), which we have introduced in the first notebook, on a set of cities.

## 2. Dask 

Dask is a Python library for parallel and distributed computing. 

From the [Dask documentation](https://docs.dask.org):

![](https://docs.dask.org/en/stable/_images/dask-overview.svg)

Dask can lead to:

* faster calculations.
* lower memory requirements.

.. but the actual speed up/memory footprint are heavily influenced by:
* the calculations/algorithm that we want to parallelize.
* the size of the dataset.
* parameters (chunk size and shape).

Always start from serial calculations!

## 3. The analysis

In [None]:
import os
os.environ["OMP_NUM_THREADS"] = "1"  # disable Pandana multithreading 
os.environ["USE_PYGEOS"] = "0"  # suppress geopandas warning

from pathlib import Path

import pandana
import geopandas as gpd
import matplotlib
import matplotlib.pyplot as plt

The following functions define the proximity and accessibility analysis that we want to carry out for a set of cities:

In [None]:
def load_features(data_folder, city, features):
    """
    Load features for the provided city, return a GeoDataFrame
    """
    file_path = Path(data_folder) / city / f"{features}_{city}.shp"
    features = gpd.read_file(file_path)
    return features

In [None]:
def build_network(nodes, edges, parks):
    """
    Set up a Network object using parks as points of interest (POIs)
    """
    # set indexes for nodes and edges
    nodes = nodes.set_index("osmid", drop=False)
    edges = edges.set_index(["u", "v"], drop=False)
    
    # set up the network
    network = pandana.Network(
        node_x=nodes["x"], 
        node_y=nodes["y"], 
        edge_from=edges["u"], 
        edge_to=edges["v"], 
        edge_weights=edges[["length"]],
    )

    # set park centroids as points of interest
    network.set_pois(
        category="parks",
        maxdist=1000,
        maxitems=25,
        x_col=parks.centroid.x,
        y_col=parks.centroid.y,
    )
    
    return network

In [None]:
def proximity_analysis(nodes, edges, parks):
    """
    Run proximity analysis: for all nodes, find distance of closest
    parks.
    """
   # build network
    network = build_network(nodes, edges, parks)    

    # for all nodes, find 3 closest parks within 800m
    proximity = network.nearest_pois(
        distance=800,
        category="parks",
        num_pois=3,
        include_poi_ids=False
    )

    return proximity

In [None]:
def accessibility_analysis(nodes, edges, parks, buildings, d=800):
    """
    Run accessibility analysis: for all buildings, find how many
    parks fall within a given distance. 
    """
    # build network
    network = build_network(nodes, edges, parks)

    # add target points to network
    node_ids = network.get_node_ids(
        x_col=parks.centroid.x, 
        y_col=parks.centroid.y
    )
    network.set(node_ids, name="parks")

    # for all nodes, find how many parks fall within distance d
    accessibility = network.aggregate(
        distance=d,
        type="count",
        name="parks"
    )

    # assign accessibility of buildings using closest nodes 
    node_ids = network.get_node_ids(
        x_col=buildings.centroid.x, 
        y_col=buildings.centroid.y
    )
    buildings["accessibility"] = node_ids.map(
        accessibility.to_dict()
    )

    return buildings

We will load data from the following path on Spider:

In [None]:
data_folder = Path("/project/stursdat/Data/ScalableGIS/Part2/data_9_cities")

In [None]:
!tree $data_folder

Let's test these functions for a city, Delft:

In [None]:
city = "Delft"

In [None]:
%%time
parks = load_features(data_folder, city, "parks")
nodes = load_features(data_folder, city, "nodes")
edges = load_features(data_folder, city, "edges")
buildings = load_features(data_folder, city, "buildings")

In [None]:
%%time
proximity = proximity_analysis(nodes, edges, parks)

In [None]:
%%time
accessibility = accessibility_analysis(nodes, edges, parks, buildings)

## 4. Run steps in parallel using Dask Delayed

The [Dask Delayed](https://docs.dask.org/en/stable/delayed.html) abstraction allows one to easily generate task graphs from Python functions.

In [None]:
from dask.delayed import delayed

In [None]:
load_features_lazy = delayed(load_features)

In [None]:
parks_p = load_features_lazy(data_folder, city, "parks")
nodes_p = load_features_lazy(data_folder, city, "nodes")
edges_p = load_features_lazy(data_folder, city, "edges")
buildings_p = load_features_lazy(data_folder, city, "buildings")

In [None]:
proximity_analysis_lazy = delayed(proximity_analysis)

In [None]:
proximity_p = proximity_analysis_lazy(nodes_p, edges_p, parks_p)

In [None]:
import dask
dask.visualize(proximity_p)

In [None]:
accessibility_analysis_lazy = delayed(accessibility_analysis)

In [None]:
accessibility_p = accessibility_analysis_lazy(nodes_p, edges_p, parks_p, buildings_p)

In [None]:
dask.visualize(accessibility_p)

In [None]:
dask.visualize([proximity_p, accessibility_p])

In [None]:
from dask.distributed import Client

client = Client("tcp://10.0.2.120:42289")
client

In [None]:
%%time
proximity, accessibility = dask.compute(proximity_p, accessibility_p)

## 5. Run the analyses for all cities using Dask Bags

[Dask Bags](https://docs.dask.org/en/latest/bag.html) implement operations such as `map`/`reduce` on Python collections.

In [None]:
cities = [el.name for el in data_folder.glob("*")]

In [None]:
import dask.bag as db
cities_bag = db.from_sequence(cities)

In [None]:
nodes_bag = db.map(load_features, data_folder, cities_bag, "nodes")
edges_bag = db.map(load_features, data_folder, cities_bag, "edges")
parks_bag = db.map(load_features, data_folder, cities_bag, "parks")
buildings_bag = db.map(load_features, data_folder, cities_bag, "buildings")

In [None]:
accessibility_bag = db.map(accessibility_analysis, nodes_bag, edges_bag, parks_bag, buildings_bag)

In [None]:
dask.visualize(accessibility_bag)

Let's generate one plot for each of the cities:

In [None]:
fig_folder = Path.cwd().parent / "fig"
fig_folder.mkdir(exist_ok=True)

In [31]:
def plot(city, parks, buildings, fig_folder):
    """
    Visualize the buildings using the results of the
    accessibility analysis, then save the figure.
    """
    fig, ax = plt.subplots(figsize=(10, 10))
    
    parks.plot(
        ax=ax,
        color="green"
    )
    
    buildings.plot(
        ax=ax,
        column="accessibility",  
        cmap="autumn",
        norm=matplotlib.colors.LogNorm(),
        figsize=(20,20),
    )

    ax.set_axis_off()
    fig_path = Path(fig_folder) / f"{city}.png"
    fig.savefig(fig_path)

In [None]:
res = db.map(plot, cities_bag, parks_bag, accessibility_bag, fig_folder)

In [None]:
dask.compute(res)