## Cooperative-Competitive Maximum Coverage Location Problem  solved by Genetic Algorithm (GA)

In [None]:
from Algorithm.GA_Hulatang import GeneticAlgorithm
import random
import numpy as np
from matplotlib import pyplot as plt

## Load the real-world datasets

In [None]:
import geopandas as gpd
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import colors
import time

### Demand Population Distribution

In [None]:
%%time
ls = gpd.read_file("data/HuLaTang/Deman_Point.shp")
ls['POINT_X'] = ls.geometry.x
ls['POINT_Y'] = ls.geometry.y
all_pop_col = [c for c in ls.columns if c.lower()=='all_pop'][0]
ls['speed_pct_freeflow_rev'] = ls[all_pop_col]
ls.head(3)

In [None]:
def normalize_to_1_10(data):
    min_val = min(data)
    max_val = max(data)
    normalized_data = [(x - min_val) / (max_val - min_val) * 9 + 1 for x in data]
    return normalized_data
ls['speed_pct_freeflow_rev_norm'] = normalize_to_1_10(ls['speed_pct_freeflow_rev'])

total_pop = sum(ls['speed_pct_freeflow_rev'])
total_pop_norm = sum(ls['speed_pct_freeflow_rev_norm'])
print("The number of records is ", len(ls))
print("The total speed unit are ", total_pop)
print("The total norm speed unit are ", total_pop_norm)

In [None]:
sitedf = pd.read_csv("data/HuLaTang/Candidate_Point.csv", encoding='utf-8-sig')
print("The number of candidate sites is ", len(sitedf))
sitedf.head(3)

## Normalization

In [None]:
def Normalization(x, y):
    max_x, max_y = np.max(x), np.max(y)
    min_x, min_y = np.min(x), np.min(y)
    S_x = (max_x-min_x)
    S_y = (max_y-min_y)
    S = max(S_x, S_y)
    new_x, new_y = (x-min_x)/S, (y-min_y)/S
    data_xy = np.vstack((new_x, new_y))
    Data = data_xy.T
    return new_x, new_y, S

In [None]:
ls_X = np.array(ls['POINT_X'])
ls_Y = np.array(ls['POINT_Y'])
bbs_X = np.array(sitedf['point_x_1'] if 'point_x_1' in sitedf.columns else sitedf['POINT_X'])
bbs_Y = np.array(sitedf['point_y_1'] if 'point_y_1' in sitedf.columns else sitedf['POINT_Y'])
X = np.concatenate([ls_X, bbs_X])
Y = np.concatenate([ls_Y, bbs_Y])
NORM_X, NORM_Y, S = Normalization(X, Y)
ls['NORM_X'] = NORM_X[:len(ls)]
ls['NORM_Y'] = NORM_Y[:len(ls)]
sitedf['NORM_X'] = NORM_X[len(ls):]
sitedf['NORM_Y'] = NORM_Y[len(ls):]

In [None]:
encodings = ["utf-8-sig", "gbk", "utf-8"]
stores_df = None
for enc in encodings:
    try:
        stores_df = pd.read_csv("data/HuLaTang/Hulatang.csv", encoding=enc)
        break
    except Exception:
        pass
if stores_df is None:
    raise RuntimeError("Failed to read Hulatang.csv")

store_x_col = None
store_y_col = None
for c in stores_df.columns:
    if c.lower() in ("x", "point_x"): store_x_col = c
    if c.lower() in ("y", "point_y"): store_y_col = c
if store_x_col is None or store_y_col is None:
    for c in stores_df.columns:
        if c.lower() == "point_x": store_x_col = c
        if c.lower() == "point_y": store_y_col = c
if store_x_col is None or store_y_col is None:
    raise ValueError("Store table is missing coordinate columns (X/Y or POINT_X/POINT_Y).")

Ak_col = None
Pk_col = None
for c in stores_df.columns:
    if c.lower() == "ak": Ak_col = c
    if c.lower() == "pk": Pk_col = c
if Ak_col is None or Pk_col is None:
    raise ValueError("Store table is missing Ak or Pk columns.")

cand_x_col = 'point_x_1' if 'point_x_1' in sitedf.columns else 'POINT_X'
cand_y_col = 'point_y_1' if 'point_y_1' in sitedf.columns else 'POINT_Y'
cand_xy = sitedf[[cand_x_col, cand_y_col]].to_numpy(float)
store_xy = stores_df[[store_x_col, store_y_col]].to_numpy(float)

all_xy = np.vstack([cand_xy, store_xy])
min_x, max_x = all_xy[:,0].min(), all_xy[:,0].max()
min_y, max_y = all_xy[:,1].min(), all_xy[:,1].max()
S = max(max_x - min_x, max_y - min_y)

cand_norm = (cand_xy - np.array([min_x, min_y])) / S
store_norm = (store_xy - np.array([min_x, min_y])) / S

sitedf['NORM_X'] = cand_norm[:,0]
sitedf['NORM_Y'] = cand_norm[:,1]

ls['NORM_X'] = (ls['POINT_X'] - min_x) / S
ls['NORM_Y'] = (ls['POINT_Y'] - min_y) / S

delta_agg = 700.0 / S
delta_comp = 150.0 / S
cd = cand_norm[:, None, :] - store_norm[None, :, :]
dists_cs = np.sqrt((cd ** 2).sum(axis=2))
B = (stores_df[Ak_col].to_numpy(float) * np.exp(- (dists_cs**2) / (delta_agg**2))).sum(axis=1)
C = (stores_df[Pk_col].to_numpy(float) * np.exp(- (dists_cs**2) / (delta_comp**2))).sum(axis=1)

rent_col = 'ZUJIN' if 'ZUJIN' in sitedf.columns else ('Zujin' if 'Zujin' in sitedf.columns else 'rent')
rents = sitedf[rent_col].to_numpy(float)

print(f"Unified normalization done for candidates/stores, S={S:.3f}. Computed B/C and rent.")

In [None]:
def generate_candidate_sites(sites, M=100, heuristic = None):
    '''
    Generate M candidate sites with the convex hull of a point set
    Input:
        sites: a Pandas DataFrame with X, Y and other characteristic
        M: the number of candidate sites to generate
        heuristic:
    Return:
        sites: a Numpy array with shape of (M,2)
    '''

    if M is not None:
        if M > len(sites):
            M = None
    if heuristic is None or heuristic == '':
        if M is None:
            return sites
        np.random.seed(0)
        index = np.random.choice(len(sites), M)
        return sites.iloc[index]
    elif heuristic == 'coverage':
        sites = sites.sort_values(by='pop_covered_2km', ascending=False).reset_index()
        if M is None:
            return sites
        return sites.iloc[:M]
    elif heuristic == 'coverage_e':
        sites = sites.sort_values(by='pop_covered_2km_exclusive', ascending=False).reset_index()
        if M is None:
            return sites
        return sites.iloc[:M]
    elif heuristic == 'impression':
        sites = sites.sort_values(by='weeklyImpr', ascending=False).reset_index()
        if M is None:
            return sites
        return sites.iloc[:M]
    elif heuristic == 'impression_e':
        sites = sites.sort_values(by='weeklyImpr_2km_exclusive', ascending=False).reset_index()
        if M is None:
            return sites
        return sites.iloc[:M]

In [None]:
bbs_ = generate_candidate_sites(sitedf, M=None, heuristic="")

In [None]:
users = np.array(ls[['NORM_X', 'NORM_Y']])
facilities = np.array(bbs_[['NORM_X', 'NORM_Y']])
demand = np.array(ls['speed_pct_freeflow_rev'])
p = 20
real_radius = 1000
radius = real_radius/S

dist = np.sum((facilities[:, np.newaxis, :] - users[np.newaxis, :, :]) ** 2, axis=-1) ** 0.5
rent_col = 'ZUJIN' if 'ZUJIN' in bbs_.columns else ('Zujin' if 'Zujin' in bbs_.columns else 'rent')
rents = np.array(bbs_[rent_col], dtype=float)

In [None]:
start_time = time.time()
w1, w2, w3, w4 = 0.40, 0.20, 0.25, 0.15

genetic = GeneticAlgorithm(
    n=len(ls), m=len(bbs_), p=p,
    cost_matrix=dist, r=radius,
    demand=demand, costs=rents,
    B=B, C=C,
    w1=w1, w2=w2, w3=w3, w4=w4,
    iterations=200, generation_size=80, reproduction_size=30, mutation_prob=0.1,
)

genetic.optimize()
sol = genetic.get_solution_details()
centers = sol['selected_facilities']
end_time = time.time()
execution_time = end_time - start_time

# Component metrics (aligned with Gurobi presentation)
w1, w2, w3, w4 = 0.40, 0.20, 0.25, 0.15
market_benefit = w1 * sol['covered_demand']
agglomeration_benefit = w3 * sol['sum_B']
competition_cost = w4 * sol['sum_C']
rent_cost = w2 * sol['sum_rent']

print("Selected facility indices:", centers)
print("Objective value:", round(sol['objective_value']))
print("Market coverage benefit:", round(market_benefit))
print("Agglomeration benefit:", round(agglomeration_benefit))
print("Competition cost:", -round(competition_cost))  # negative for display
print("Rent cost:", -round(rent_cost))                # negative for display
print("Average rent per facility:", round(sol['sum_rent']/len(centers)) if len(centers)>0 else 0)
print("Covered / Total demand:", f"{round(sol['covered_demand'])} / {round(sol['total_demand'])}")
print("Coverage rate:", f"{sol['coverage_rate']*100:.2f}%")
print(f"Execution time: {execution_time:.2f}s")
print(centers)

In [None]:
def plot_result(ls, opt_sites, radius):
    
    fig, ax = plt.subplots(figsize=(20, 12))

    cmap = plt.cm.Blues
    new_cmap = colors.ListedColormap(cmap(np.linspace(0.15, 1, 256)))
    pop_col = None
    if 'speed_pct_freeflow_rev' in ls.columns:
        pop_col = 'speed_pct_freeflow_rev'
    else:
        for c in ls.columns:
            if c.lower() == 'all_pop':
                pop_col = c
                break
    try:
        if pop_col is not None:
            ls.plot(ax=ax, column=pop_col, k=5, markersize=5, cmap=new_cmap, label='Market')
        else:
            ls.plot(ax=ax, markersize=5, color='#8da0cb', label='Demand')
    except Exception:
        ls.plot(ax=ax, markersize=5, color='#8da0cb', label='Demand')

    def pick_xy_cols(df):
        x_col = None
        y_col = None
        for c in df.columns:
            cl = str(c).lower()
            if cl in ('point_x_1', 'x', 'point_x', 'pointx') and x_col is None:
                x_col = c
            if cl in ('point_y_1', 'y', 'point_y', 'pointy') and y_col is None:
                y_col = c
        if x_col is None and 'POINT_X' in df.columns: x_col = 'POINT_X'
        if y_col is None and 'POINT_Y' in df.columns: y_col = 'POINT_Y'
        if x_col is None or y_col is None:
            raise ValueError('opt_sites is missing XY columns (point_x_1/point_y_1 or X/Y or POINT_X/POINT_Y).')
        return x_col, y_col

    x_col, y_col = pick_xy_cols(opt_sites)

    legend_plot_flag = {'current': False, 'selected': False}
    for _, site in opt_sites.iterrows():
        cx = float(site[x_col])
        cy = float(site[y_col])
        if 'current' in opt_sites.columns and bool(site['current']) is True:
            if legend_plot_flag['current'] is False:
                ax.scatter(cx, cy, c='red', marker='+', s=10, label='Current Sites')
                legend_plot_flag['current'] = True
            ax.scatter(cx, cy, c='red', marker='+', s=100)
            circle = plt.Circle((cx, cy), radius, color='red', fill=False, lw=2)
            ax.add_artist(circle)
        else:
            if legend_plot_flag['selected'] is False:
                ax.scatter(cx, cy, c='C1', marker='+', s=10, label='Optimized Selected Sites')
                legend_plot_flag['selected'] = True
            ax.scatter(cx, cy, c='C1', marker='+', s=100)
            # circle = plt.Circle((cx, cy), radius, color='C1', fill=False, lw=2)
            # ax.add_artist(circle)

    ax.axis('scaled')
    ax.tick_params(axis='both', left=False, top=False, right=False,
                   bottom=False, labelleft=False, labeltop=False,
                   labelright=False, labelbottom=False)
    ax.set_title('Selected {} Sites that Serve {} m'.format(len(opt_sites), radius), fontsize=20)
    render_scale_bar(ax=ax, x=0.05, y=0.05)
    render_north_arrow(ax=ax, x=0.95, y=0.95, size=0.01, ratio=0.7)
    ax.legend(loc='lower right', markerscale=10)
    return ax

In [None]:
def render_scale_bar(ax, x, y, segments=2, height=0.01, seg_length=2000, unit='m', linewidth=1.):
    unit_scale_factor = {
        'm': 1,
        'km': 1000,
        'meters': 1,
        'kilometers': 1000,
        'miles': 1609.34,
        'mi': 1609.34,
        'ft': 0.3,
        }
    x_lim = ax.get_xlim()
    y_lim = ax.get_ylim()

    # how much percent does one unit takes on the x axis
    x_per_unit = 1. / (x_lim[1] - x_lim[0])
    y_per_unit = 1. / (y_lim[1] - y_lim[0])

    # base for ticks (0, 1)
    x_base = [x + seg_length * unit_scale_factor[unit] * x_per_unit * i for i in range(0, segments + 1)]
    ax.axhline(y_lim[0] + y / y_per_unit, x_base[0], x_base[-1], c='black')
    y_base = [y, y + height]
    for i in range(segments + 1):
        ax.axvline(x_lim[0] + x_base[i] / x_per_unit, y, y + height, c='black')
        xy = (x_lim[0] + x_base[i] / x_per_unit, y_lim[0] + (y - 0.015) / y_per_unit)  # data_coords
        ax.text(xy[0], xy[1], s='{}'.format(int(seg_length * i)), horizontalalignment='center', verticalalignment='center')
    ax.text(x_lim[0] + (x_base[-1] + 0.02) / x_per_unit, y_lim[0] + (y - 0.015) / y_per_unit,
            s=unit, horizontalalignment='left',
            verticalalignment='center')
def render_north_arrow(ax, x, y, size, ratio = 1):
    path = [(0, 1), (-ratio, -1), (0, -0.5), (ratio, -1), (0, 1)]
    path = [(i[0] * size + x, i[1] * size + y) for i in path]
    arrow = plt.Polygon(path, color='black', transform=ax.transAxes)
    ax.add_patch(arrow)
    ax.text(x, y-size*2, s = 'N', horizontalalignment='center', verticalalignment='center', transform=ax.transAxes)

In [None]:
opt_sites = bbs_.iloc[centers]
plot_result(ls,opt_sites,2000)

In [None]:
import os
import seaborn as sns
from matplotlib.colors import ListedColormap
from matplotlib.lines import Line2D

def plot_result_pretty(ls, opt_sites, radius_m=2000, roads_path="data/HuLaTang/Road_Network.shp",
                       demand_gridsize=60, demand_cmap="magma", roads_color="#9e9e9e",
                       norm_mode="log", vmin_q=0.10, vmax_q=0.995, gamma=0.6, reduce="sum",
                       overlay_points=True, overlay_points_size=6, overlay_points_alpha=0.28,
                       overlay_hex_grid=False, hex_grid_color="#222222", hex_grid_alpha=0.18, hex_grid_lw=0.25):
    pop_col = None
    if 'speed_pct_freeflow_rev' in ls.columns:
        pop_col = 'speed_pct_freeflow_rev'
    else:
        for c in ls.columns:
            if str(c).lower() == 'all_pop':
                pop_col = c
                break
    if 'X' in ls.columns and 'Y' in ls.columns:
        dx, dy = 'X', 'Y'
    else:
        dx, dy = 'POINT_X', 'POINT_Y'
    if dx not in ls.columns or dy not in ls.columns:
        raise ValueError("ls must contain POINT_X/POINT_Y or X/Y columns.")

    def pick_xy_cols(df):
        x_col = None
        y_col = None
        for c in df.columns:
            cl = str(c).lower()
            if cl in ('point_x_1', 'x', 'point_x', 'pointx') and x_col is None:
                x_col = c
            if cl in ('point_y_1', 'y', 'point_y', 'pointy') and y_col is None:
                y_col = c
        if x_col is None and 'POINT_X' in df.columns: x_col = 'POINT_X'
        if y_col is None and 'POINT_Y' in df.columns: y_col = 'POINT_Y'
        if x_col is None or y_col is None:
            raise ValueError('opt_sites missing XY columns (point_x_1/point_y_1 or X/Y or POINT_X/POINT_Y).')
        return x_col, y_col

    x_col, y_col = pick_xy_cols(opt_sites)

    # 3) Figure
    fig, ax = plt.subplots(figsize=(20, 16))

    # Roads overlay at bottom
    if isinstance(roads_path, str) and os.path.exists(roads_path):
        try:
            roads = gpd.read_file(roads_path)
            try:
                roads.plot(ax=ax, color=roads_color, linewidth=0.3, alpha=0.35, zorder=1, label='Road network')
            except Exception:
                pass
        except Exception:
            pass

    # Demand heat via hexbin (no gaps) with normalization and quantile clipping (plot-only)
    try:
        values = ls[pop_col].to_numpy() if pop_col is not None else None
        if values is not None:
            vmin = float(np.quantile(values, vmin_q)) if 0 <= vmin_q < 1 else None
            vmax = float(np.quantile(values, vmax_q)) if 0 < vmax_q <= 1 else None
            if vmin is not None and vmax is not None and vmax > vmin:
                values_clipped = np.clip(values, vmin, vmax)
            else:
                values_clipped = values
            norm = None
            if norm_mode == 'log':
                from matplotlib.colors import LogNorm
                norm = LogNorm(vmin=max(values_clipped.min(), 1e-6), vmax=values_clipped.max())
            elif norm_mode == 'power':
                from matplotlib.colors import PowerNorm
                norm = PowerNorm(gamma=gamma, vmin=values_clipped.min(), vmax=values_clipped.max())
            else:
                norm = None
        else:
            values_clipped = None
            norm = None

        reducer = np.sum if reduce == 'sum' else np.mean
        hb = ax.hexbin(ls[dx].to_numpy(), ls[dy].to_numpy(),
                       C=values_clipped,
                       reduce_C_function=reducer if values_clipped is not None else None,
                       gridsize=demand_gridsize, cmap=demand_cmap, bins=None, mincnt=1,
                       linewidths=0, alpha=0.95, zorder=5, norm=norm)
        try:
            hb.set_edgecolor('face')
        except Exception:
            pass
        # Remove colorbar to keep the figure clean (optional)
        # cbar = fig.colorbar(hb, ax=ax, shrink=0.8)
        # cbar.set_label('Demand intensity', fontsize=12)
    except Exception:
        sc = ax.scatter(ls[dx], ls[dy], c=ls[pop_col] if pop_col is not None else '#9ecae1',
                        s=8, cmap=demand_cmap, edgecolors='none', zorder=5)
        if pop_col is not None:
            # Remove colorbar (optional)
            # cbar = fig.colorbar(sc, ax=ax, shrink=0.8)
            # cbar.set_label('Demand intensity', fontsize=12)
            pass

    # Optional overlay: demand points and hex grid
    # Hex grid overlay is disabled by default
    # if overlay_hex_grid:
    #     try:
    #         hb_grid = ax.hexbin(ls[dx].to_numpy(), ls[dy].to_numpy(),
    #                             gridsize=demand_gridsize, C=None, reduce_C_function=None,
    #                             linewidths=hex_grid_lw, edgecolors=hex_grid_color, facecolors='none',
    #                             alpha=hex_grid_alpha, zorder=4)
    #     except Exception:
    #         pass
    if overlay_points:
        try:
            ax.scatter(ls[dx], ls[dy], s=overlay_points_size, c='#000000', alpha=overlay_points_alpha,
                       linewidths=0, zorder=6, label='Demand points')
        except Exception:
            pass

    # Facilities and service circles
    legend_flag = {'selected': False, 'current': False}
    for _, row in opt_sites.iterrows():
        cx = float(row[x_col])
        cy = float(row[y_col])
        is_current = ('current' in opt_sites.columns and bool(row['current']) is True)
        if is_current:
            if not legend_flag['current']:
                ax.scatter(cx, cy, s=40, marker='o', facecolor='white', edgecolor='red', linewidths=1.2,
                           label='Current sites', zorder=9)
                legend_flag['current'] = True
            else:
                ax.scatter(cx, cy, s=40, marker='o', facecolor='white', edgecolor='red', linewidths=1.2, zorder=9)
            circ = plt.Circle((cx, cy), radius_m, facecolor='none', edgecolor='red', lw=0.9, alpha=0.9, zorder=8)
            ax.add_artist(circ)
        else:
            if not legend_flag['selected']:
                ax.scatter(cx, cy, s=44, marker='o', facecolor='C1', edgecolor='white', linewidths=0.6,
                           label='Selected facilities', zorder=10)
                legend_flag['selected'] = True
            else:
                ax.scatter(cx, cy, s=44, marker='o', facecolor='C1', edgecolor='white', linewidths=0.6, zorder=10)
            circ = plt.Circle((cx, cy), radius_m, facecolor='C1', edgecolor='C1', lw=0.6, alpha=0.12, zorder=8)
            ax.add_artist(circ)

    # Aesthetics
    ax.axis('scaled')
    ax.tick_params(axis='both', left=False, top=False, right=False,
                   bottom=False, labelleft=False, labeltop=False,
                   labelright=False, labelbottom=False)
    ax.grid(False)
    for spine in ax.spines.values():
        spine.set_visible(True)
        spine.set_linewidth(0.8)
        spine.set_edgecolor('#444444')
    ax.set_title('Optimized Facility Locations with Demand Heat and Roads', fontsize=18)
    try:
        render_scale_bar(ax=ax, x=0.05, y=0.05)
        render_north_arrow(ax=ax, x=0.95, y=0.95, size=0.01, ratio=0.7)
    except Exception:
        pass
    # Compose custom legend entries to ensure demand points and roads appear
    handles, labels = ax.get_legend_handles_labels()
    if not any(lbl == 'Demand points' for lbl in labels):
        handles.append(Line2D([0], [0], marker='o', color='none', markerfacecolor='#000000',
                              markersize=6, alpha=overlay_points_alpha, label='Demand points'))
    if not any(lbl == 'Road network' for lbl in labels):
        handles.append(Line2D([0], [0], color=roads_color, lw=1.0, alpha=0.6, label='Road network'))
    ax.legend(handles=handles, loc='lower right', markerscale=1.6, frameon=True, framealpha=0.9)
    return ax


In [None]:
# Demo call for pretty plot (GA)
try:
    opt_sites = bbs_.iloc[centers]
except Exception:
    # fallback: if centers not defined, skip
    opt_sites = bbs_.iloc[:30]

ax = plot_result_pretty(ls, opt_sites, radius_m=1000, roads_path="data/HuLaTang/Road_Network.shp",
                        demand_gridsize=80, demand_cmap="inferno", norm_mode="log", vmin_q=0.10, vmax_q=0.995,
                        reduce="sum")
plt.show()


In [None]:
# Result figure: demand hexbin gradient + road overlay + facility points
# - Roads: overlay if a GeoDataFrame variable exists or if data/roads.(geojson|shp) is available
# - Demand intensity: weighted sum within hexagons if provided; auto-switch linear/log normalization
# - Axes: hide axes and ticks; keep only the colorbar

import numpy as np
import matplotlib.pyplot as plt
import matplotlib as mpl
from pathlib import Path

# Try importing GeoPandas for road plotting
try:
    import geopandas as gpd
    _has_gpd = True
except Exception:
    _has_gpd = False

# Guess common variable names (ordered by priority)
possible_demand_xy = [
    ('demand_x', 'demand_y'),
    ('X', 'Y'),
    ('clients_x', 'clients_y'),
    ('demand_points_x', 'demand_points_y'),
    ('dx', 'dy'),
]
possible_demand_strength = ['demand_w', 'weights', 'w', 'demand_strength', 'demands']
possible_facility_xy = [
    ('facility_x', 'facility_y'),
    ('fac_x', 'fac_y'),
    ('selected_facility_x', 'selected_facility_y'),
    ('sites_x', 'sites_y'),
    ('best_x', 'best_y'),
]
possible_roads_vars = ['roads_gdf', 'roads', 'edges_gdf', 'edges', 'road_gdf', 'G_edges', 'gdf_edges']
possible_roads_files = [
    'data/roads.geojson', 'data/roads.shp', 'data/road.geojson', 'data/road.shp',
    'data/roads/roads.shp', 'data/osm_roads.geojson'
]

# Manual fallback (if auto-detection fails, provide variables above and disable auto-detection)
# demand_x, demand_y = your_demand_x_array, your_demand_y_array
# demand_w = your_demand_weight_array  # optional, same length as demand_x/demand_y
# facility_x, facility_y = your_facility_x_array, your_facility_y_array
# roads_gdf = gpd.read_file('your_roads_file.geojson')

locals_dict = globals()


def get_first_existing_pair(candidates):
    for a, b in candidates:
        if a in locals_dict and b in locals_dict:
            ax = np.asarray(locals_dict[a]).ravel()
            ay = np.asarray(locals_dict[b]).ravel()
            if ax.size and ay.size and ax.size == ay.size:
                return ax, ay
    return None, None


def get_first_existing_array(candidates):
    for k in candidates:
        if k in locals_dict:
            arr = np.asarray(locals_dict[k]).ravel()
            if arr.size:
                return arr
    return None


def get_first_existing_roads():
    if not _has_gpd:
        return None
    # Prefer variables first
    for k in possible_roads_vars:
        if k in locals_dict:
            obj = locals_dict[k]
            if isinstance(obj, gpd.GeoDataFrame) and len(obj) > 0:
                return obj
    # Fallback to files
    for rel in possible_roads_files:
        p = Path(rel)
        if p.exists():
            try:
                gdf = gpd.read_file(p)
                if len(gdf) > 0:
                    return gdf
            except Exception:
                pass
    return None


# Get demand coordinates/strength, facilities, and roads
cx, cy = get_first_existing_pair(possible_demand_xy)
cw = get_first_existing_array(possible_demand_strength)
fx, fy = get_first_existing_pair(possible_facility_xy)
roads_gdf = get_first_existing_roads()

if cx is None or cy is None:
    raise RuntimeError('Failed to auto-detect demand coordinate variables. Please provide demand_x/demand_y.')

# Colormap: brighter high values
cmap = mpl.cm.get_cmap('YlOrRd')

# Weight switch & normalization: log for positive wide-range values, otherwise linear
use_weight = cw is not None and cw.shape[0] == cx.shape[0]
norm = None
if use_weight:
    cw = np.asarray(cw).astype(float)
    cw = np.where(np.isfinite(cw), cw, 0.0)
    cw = np.clip(cw, a_min=0.0, a_max=None)
    pos = cw[cw > 0]
    if pos.size:
        vmin = np.percentile(pos, 2)
        vmax = np.percentile(pos, 98)
        vmax = max(vmax, vmin * 1.01)
        dynamic = (vmax / max(vmin, 1e-9))
        if dynamic >= 50:
            norm = mpl.colors.LogNorm(vmin=max(vmin, 1e-6), vmax=vmax)
        else:
            norm = mpl.colors.Normalize(vmin=0, vmax=vmax)
    else:
        norm = mpl.colors.Normalize(vmin=0, vmax=1)

# Plot
fig, ax = plt.subplots(figsize=(9, 8), dpi=150)
fig.patch.set_facecolor('white')

# 1) Roads (bottom layer)
if roads_gdf is not None:
    try:
        roads_gdf.plot(ax=ax, color=(0, 0, 0, 0.12), linewidth=0.6, zorder=1)
    except Exception:
        pass

# 2) Demand hexbin (middle layer)
hex_kwargs = {
    'gridsize': 50,                 # grid density
    'cmap': cmap,
    'linewidths': 0.3,
    'edgecolors': (1, 1, 1, 0.6),   # semi-transparent white edges
    'mincnt': 1,                    # color only when at least one point exists
    'zorder': 2,
}

if use_weight:
    hb = ax.hexbin(cx, cy, C=cw, reduce_C_function=np.sum, norm=norm, **hex_kwargs)
    cbar = fig.colorbar(hb, ax=ax)
    cbar.set_label('Demand intensity (weighted sum within hexagon)', fontsize=10)
else:
    # Count only; use log scale to reveal low values for large datasets
    hb = ax.hexbin(cx, cy, norm=mpl.colors.LogNorm() if len(cx) > 2000 else None, **hex_kwargs)
    cbar = fig.colorbar(hb, ax=ax)
    cbar.set_label('Number of demand points', fontsize=10)

# 3) Facilities (top layer)
if fx is not None and fy is not None:
    ax.scatter(fx, fy, s=40, c='#ff7f0e', marker='o', edgecolors='k', linewidths=0.6, zorder=3, label='Selected facilities')
    ax.legend(loc='lower right', fontsize=9, frameon=True, framealpha=0.6)

# Layout: remove axes and ticks; keep colorbar
ax.set_axis_off()
ax.set_aspect('equal', adjustable='datalim')
plt.tight_layout()

# Save (optional)
# fig.savefig('results/final_hexbin_with_roads.png', dpi=220, bbox_inches='tight')
plt.show()

In [None]:
# Raw data visualization (without selected facilities overlay)
# Show only demand heat and roads, do not show facilities
try:
    opt_sites_empty = bbs_.iloc[0:0]
except Exception:
    opt_sites_empty = sitedf.iloc[0:0]

ax = plot_result_pretty(
    ls,
    opt_sites_empty,
    radius_m=1000,
    roads_path="data/HuLaTang/Road_Network.shp",
    demand_gridsize=80,
    demand_cmap="inferno",
    norm_mode="log",
    vmin_q=0.10,
    vmax_q=0.995,
    reduce="sum",
    overlay_points=True
)
plt.show()
