In [1]:
import geopandas as gpd
import numpy as np
import pandas as pd
import networkx as nx
import h3
from shapely.geometry import Point
from mdptoolbox.mdp import ValueIteration

In [2]:
# Load the hexagonal geojson file
hex_gdf = gpd.read_file("geojson\\hex_dataset.geojson")

# Ensure necessary columns exist
if "avg_imd_score" not in hex_gdf.columns or "total_population" not in hex_gdf.columns:
    raise ValueError("Missing column")

In [3]:
# Declare states and num_states
hex_indices = hex_gdf["h3_index"]
states = list(hex_indices)
num_states = len(states)

In [4]:
# Load existing stations
existing_stations_gdf = gpd.read_file("geojson\\existing_bike_stations.geojson")

if hex_gdf.crs != existing_stations_gdf.crs:
    existing_stations_gdf = existing_stations_gdf.to_crs(hex_gdf.crs)

existing_station_coords = existing_stations_gdf.geometry.apply(lambda geom: (geom.y, geom.x))
resolution = h3.get_resolution(states[0])
existing_station_hexes = set(h3.latlng_to_cell(lat, lon, resolution) for lat, lon in existing_station_coords)

# Build a reverse lookup for station hexes
station_density_lookup = {}

for h in states:
    neighbors = h3.grid_disk(h, 1)
    count = sum(1 for n in neighbors if n in existing_station_hexes)
    station_density_lookup[h] = count

In [5]:
# Create an adjacency graph based on H3 neighbors
G = nx.Graph()
hex_indices = hex_gdf["h3_index"]
hex_indices_values = hex_indices.values  # for faster lookup

for h in hex_indices:
    neighbors = h3.grid_disk(h, 1)
    for n in neighbors:
        if n in hex_indices_values:
            G.add_edge(h, n)


In [6]:
# Initialise rewards
rewards = np.zeros(num_states)

# Extract population and IMD arrays for normalization
all_pop = hex_gdf["total_population"]
all_imd = hex_gdf["avg_imd_score"]

min_pop, max_pop = all_pop.min(), all_pop.max()
min_imd, max_imd = all_imd.min(), all_imd.max()

# Tunable weights for demand vs equity
alpha = 0.5  # weight for population
beta = 0.5   # weight for deprivation (equity)
gamma = 0.4 # Station Density Weight

max_station_density = max(station_density_lookup.values()) or 1 # avoid division by zero

for i, h in enumerate(states):
    row = hex_gdf[hex_gdf["h3_index"] == h]
    if not row.empty:
        imd_score = row["avg_imd_score"].values[0]
        population = row["total_population"].values[0]

        norm_pop = (population - min_pop) / (max_pop - min_pop)
        norm_imd = (imd_score - min_imd) / (max_imd - min_imd)

        # Less dense areas get higher rewards
        station_density = station_density_lookup[h]
        norm_inverse_density = 1 - (station_density / gamma)

        reward = (
            alpha * norm_pop
            + beta * norm_imd
            + gamma * norm_inverse_density
        )

        rewards[i] = reward

In [7]:
num_actions = 2
transition_matrix = np.zeros((num_actions, num_states, num_states))

# Action 0: Do nothing
for i in range(num_states):
    transition_matrix[0, i, i] = 1.0

# Action 1: Place a station
for i, h in enumerate(states):
    neighbors = h3.grid_disk(h, 1)
    valid_neighbors = [n for n in neighbors if n in states]
    prob = 1 / len(valid_neighbors) if valid_neighbors else 0
    for n in valid_neighbors:
        j = states.index(n)
        transition_matrix[1, i, j] = prob
    transition_matrix[1, i, i] += 1.0  # self-loop

# Normalize transitions
for a in range(num_actions):
    for s in range(num_states):
        row_sum = np.sum(transition_matrix[a, s, :])
        if row_sum > 0:
            transition_matrix[a, s, :] /= row_sum

In [8]:
allowed_hexes = set()
for h in existing_station_hexes:
    allowed_hexes.update(h3.grid_disk(h, 3))  # within 3 rings (525m, which was chosen because people are only willing to walk around 400-500m from a bike station to their destination https://www.welovecycling.com/wide/2020/04/30/how-far-are-you-willing-to-walk-for-bike-sharing/)

filtered_states = [h for h in states if h in allowed_hexes]

if not filtered_states:
    raise ValueError("No candidate hexagons remain after filtering.")

filtered_indices = [states.index(h) for h in filtered_states]
filtered_rewards = rewards[filtered_indices]
filtered_transition_matrix = transition_matrix[:, filtered_indices, :][:, :, filtered_indices]

# Re-normalize transition matrix after slicing
for a in range(filtered_transition_matrix.shape[0]):
    for s in range(filtered_transition_matrix.shape[1]):
        row_sum = np.sum(filtered_transition_matrix[a, s, :])
        if row_sum > 0:
            filtered_transition_matrix[a, s, :] /= row_sum
        else:
            # self-loop if no transitions
            filtered_transition_matrix[a, s, s] = 1.0


In [9]:
mdp = ValueIteration(filtered_transition_matrix, filtered_rewards, discount=0.9)
mdp.run()

In [11]:
best_hex_indices = np.argsort(mdp.V)[-5:]
best_hexes = [filtered_states[i] for i in best_hex_indices]

best_locations = hex_gdf[hex_gdf["h3_index"].isin(best_hexes)][
    ["h3_index", "geometry", "total_population", "avg_imd_score"]
]

best_locations.to_file("Results\\best_hexagons.geojson", driver="GeoJSON")
best_locations.drop(columns="geometry").to_csv("Results\\best_hexagons.csv", index=False)

print("Results saved to 'best_hexagons.geojson' and 'best_hexagons.csv'")

Results saved to 'best_hexagons.geojson' and 'best_hexagons.csv'
