In [1]:
import os
import math
import warnings
import numpy as np
import pandas as pd
import geopandas as gpd
import networkx as nx
import pyomo.environ as pyo
import folium
import osmnx as ox
from shapely.geometry import Point

warnings.filterwarnings("ignore", category=UserWarning)
ox.settings.log_console = True
ox.settings.use_cache = True

# Config
PLACE_NAME = "Nairobi, Kenya"
TIME_THRESHOLDS_MIN = [10, 15, 20]   # minutes
HEX_SIZE_KM = 1.2
RESULTS_DIR = "results"
os.makedirs(RESULTS_DIR, exist_ok=True)



In [2]:
def km_to_deg(km: float) -> float:
    return km / 111.32

def build_hex_grid(boundary_gdf, hex_km):
    poly = boundary_gdf.geometry.unary_union
    minx, miny, maxx, maxy = poly.bounds
    dx = km_to_deg(hex_km)
    dy = dx * math.sqrt(3)/2
    xs = np.arange(minx - dx, maxx + dx, dx)
    ys = np.arange(miny - dy, maxy + dy, dy)
    points = []
    for j, y in enumerate(ys):
        offset = 0 if j % 2 == 0 else dx/2
        for x in xs:
            points.append(Point(x+offset, y))
    pts_gdf = gpd.GeoDataFrame(geometry=points, crs="EPSG:4326")
    pts_gdf = pts_gdf[pts_gdf.within(poly)].reset_index(drop=True)
    pts_gdf["id"] = [f"d{idx:04d}" for idx in range(len(pts_gdf))]
    pts_gdf["weight"] = 1.0
    return pts_gdf[["id", "weight", "geometry"]]


In [4]:
# 1) Boundary (use union_all with Shapely 2)
polygon = boundary.geometry.union_all()

# 2) Drive graph with travel time
ox.settings.timeout = 180  # bump Overpass/network timeouts
G = ox.graph_from_polygon(polygon, network_type="drive", retain_all=True, simplify=True)
G = ox.add_edge_speeds(G)
G = ox.add_edge_travel_times(G)

# 3) Candidate sites: malls & supermarkets (one request, OR semantics on values)
import time
tags = {
    "shop": ["mall", "supermarket"],  # shop=mall OR shop=supermarket
    "amenity": "supermarket"          # OR amenity=supermarket
}

def fetch_pois_with_retry(polygon, tags, n_retries=3, wait=10):
    last_err = None
    for k in range(n_retries):
        try:
            g = ox.features_from_polygon(polygon, tags=tags)
            if g is not None and len(g) > 0:
                return g
            else:
                last_err = RuntimeError("Empty result from Overpass for given tags.")
        except Exception as e:
            last_err = e
        # backoff then retry
        time.sleep(wait * (k + 1))
    raise last_err

pois = fetch_pois_with_retry(polygon, tags, n_retries=3, wait=10)
candidates = pois.to_crs("EPSG:4326").copy()

# Prefer point geometry; for polygons, take centroids
candidates["geometry"] = candidates.geometry.centroid
candidates = candidates.drop_duplicates(subset=["geometry"]).reset_index(drop=True)
candidates["id"] = [f"s{idx:04d}" for idx in range(len(candidates))]

print(f"Candidates fetched: {len(candidates)}")

# 4) Demand grid
demand = build_hex_grid(boundary, HEX_SIZE_KM)
print(f"Demand points: {len(demand)}")



  overpass_settings = _make_overpass_settings()
  yield _overpass_request(data={"data": query_str})
  overpass_settings = _make_overpass_settings()
  yield _overpass_request(data={"data": query_str})
  this_pause = _get_overpass_pause(overpass_endpoint)


Candidates fetched: 395
Demand points: 560



  candidates["geometry"] = candidates.geometry.centroid
  poly = boundary_gdf.geometry.unary_union


In [5]:
def nearest_nodes(G, gdf_points):
    return ox.distance.nearest_nodes(G, gdf_points.geometry.x, gdf_points.geometry.y)

def compute_reach_sets(G, cand_nodes, demand_nodes, thresholds_min):
    node_to_dem = {}
    for j, dn in enumerate(demand_nodes):
        node_to_dem.setdefault(int(dn), []).append(j)
    cover = {}
    for tmin in thresholds_min:
        cutoff = tmin * 60
        cover[tmin] = {j: [] for j in range(len(demand_nodes))}
        for i, cn in enumerate(cand_nodes):
            lengths = nx.single_source_dijkstra_path_length(G, cn, weight="travel_time", cutoff=cutoff)
            for node_id in lengths:
                if node_id in node_to_dem:
                    for j in node_to_dem[node_id]:
                        cover[tmin][j].append(i)
    return cover

def solve_set_cover(num_sites, num_demands, cover_dict):
    model = pyo.ConcreteModel()
    model.I = pyo.RangeSet(0, num_sites-1)
    model.J = pyo.RangeSet(0, num_demands-1)
    model.y = pyo.Var(model.I, domain=pyo.Binary)

    def cover_rule(m, j):
        if len(cover_dict[j]) > 0:
            return sum(m.y[i] for i in cover_dict[j]) >= 1
        return pyo.Constraint.Skip
    model.cover = pyo.Constraint(model.J, rule=cover_rule)
    model.obj = pyo.Objective(expr=sum(model.y[i] for i in model.I), sense=pyo.minimize)

    solver = pyo.SolverFactory("highs")  # or "cbc", "glpk"
    solver.solve(model)
    picks = [i for i in range(num_sites) if pyo.value(model.y[i]) > 0.5]
    return picks


In [6]:
cand_nodes = nearest_nodes(G, candidates)
dem_nodes = nearest_nodes(G, demand)
cover_maps = compute_reach_sets(G, cand_nodes, dem_nodes, TIME_THRESHOLDS_MIN)

results = {}
for t in TIME_THRESHOLDS_MIN:
    picks = solve_set_cover(len(candidates), len(demand), cover_maps[t])
    results[t] = picks
    print(f"{t} min -> {len(picks)} sites selected")


10 min -> 11 sites selected
15 min -> 5 sites selected
20 min -> 3 sites selected


In [7]:
center = [boundary.geometry.centroid.y.values[0], boundary.geometry.centroid.x.values[0]]
m = folium.Map(location=center, zoom_start=11)

# add candidate sites
for _, r in candidates.iterrows():
    folium.CircleMarker([r.geometry.y, r.geometry.x], radius=3, color="gray").add_to(m)

# add selected sites for each scenario
colors = {10:"red", 15:"blue", 20:"green"}
for t, picks in results.items():
    for i in picks:
        r = candidates.iloc[i]
        folium.Marker([r.geometry.y, r.geometry.x],
                      popup=f"{r.get('name', r['id'])} ({t}min)",
                      icon=folium.Icon(color=colors[t])).add_to(m)

m



  center = [boundary.geometry.centroid.y.values[0], boundary.geometry.centroid.x.values[0]]
