<a href="https://colab.research.google.com/github/Van-Wu1/cycle/blob/main/scr/py/s3_index.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
# ============= 安装依赖（Colab 第一格运行） =============
!pip -q install geopandas shapely pyproj fiona rtree python-igraph tqdm

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m56.6/56.6 kB[0m [31m599.3 kB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m17.3/17.3 MB[0m [31m52.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m507.6/507.6 kB[0m [31m28.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.4/4.4 MB[0m [31m54.8 MB/s[0m eta [36m0:00:00[0m
[?25h

In [3]:
# ============= 导入包 =============
import geopandas as gpd
import numpy as np
import igraph as ig
from tqdm.auto import tqdm
import random

In [4]:
from google.colab import drive
drive.mount('/content/drive')
!ls '/content/drive/MyDrive/CASA0004_Cycling/data'

Mounted at /content/drive
BoroughShp  GreatLondonShp  s1	s2_Env	s3


In [5]:
# ============= 参数区（按需改） =============
# 输入：伦敦范围已裁剪好的路网（线要素）
IN_GPKG = "/content/drive/MyDrive/CASA0004_Cycling/data/s3/emptyroad/edges_s3.gpkg"
IN_LAYER = None  # 如果是 GPKG，多图层时填具体图层名；单图层或 geojson 填 None

# 输出
OUT_GEOJSON = "/content/drive/MyDrive/CASA0004_Cycling/data/s3/export/cen_s3_2_5.gpkg"

In [6]:
ziduan = gpd.read_file(IN_GPKG)
print(ziduan.columns)

Index(['id', 'name', 'way_type', 'geometry'], dtype='object')


In [7]:
# 长度权重字段名（如果没有，就用 geometry.length）
LEN_FIELD = "metres"    # 若没有该字段，会自动改用 geometry.length
TOL = 1.0               # 端点量化容差（米），用于“吸附”断点
SEED = 42               # 随机种子

# betweenness 计算模式
BET_MODE = "approx"     # "exact" 或 "approx"
K_SAMPLES = 1200        # 近似模式下采样源点数量（建议：500~3000 之间按机器调）

# closeness 选项
CLOSENESS_HARMONIC = True   # 非连通图建议用 harmonic 口径

In [8]:
# ============= 读取与预处理 =============
if IN_LAYER:
    roads = gpd.read_file(IN_GPKG, layer=IN_LAYER)
else:
    roads = gpd.read_file(IN_GPKG)

# 修正/设置 CRS（OpenMapping 通常是 EPSG:27700）
if roads.crs is None:
    roads = roads.set_crs(27700)
elif str(roads.crs).endswith("4326"):
    # 若误读成经纬度，通常需要改回 27700；你也可以根据 bounds 判断再 set_crs
    roads = roads.set_crs(27700, allow_override=True)

# 单部件化，清理空几何
roads = roads.explode(index_parts=False, ignore_index=True)
roads = roads[~roads.geometry.is_empty & roads.geometry.notna()].copy()

# 生成长度字段
if LEN_FIELD in roads.columns:
    roads["length_m"] = roads[LEN_FIELD].astype(float)
else:
    roads["length_m"] = roads.geometry.length

In [9]:
# ============= 建图：端点 -> 节点；路段 -> 边（带 orig_road 映射） =============
def qpt(xy, tol=TOL):
    return (round(xy[0] / tol) * tol, round(xy[1] / tol) * tol)

# 清理无效/零长度几何
roads = roads[roads.geometry.notna() & (~roads.geometry.is_empty)].copy()
geom_len = roads.geometry.length
bad_zero = (geom_len <= 0)
if bad_zero.any():
    print(f"[WARN] 发现 {bad_zero.sum()} 条零长度几何，已剔除。")
    roads = roads[~bad_zero].copy()
roads = roads.reset_index(drop=True)

node_index = {}
nodes_xy = []
edges_uv = []
edge_weights = []
edge_orig_row = []

for i, (geom, w) in enumerate(tqdm(zip(roads.geometry, roads["length_m"]), total=len(roads), desc="Build graph")):
    coords = list(geom.coords)
    u_xy = qpt(coords[0])
    v_xy = qpt(coords[-1])
    # 顶点去重并编号
    for xy in (u_xy, v_xy):
        if xy not in node_index:
            node_index[xy] = len(nodes_xy)
            nodes_xy.append(xy)
    u = node_index[u_xy]; v = node_index[v_xy]
    edges_uv.append((u, v))
    edge_weights.append(float(w) if float(w) > 0 else 1e-6)  # 防 0
    edge_orig_row.append(i)

# 建图（允许多重边）
g = ig.Graph()
g.add_vertices(len(nodes_xy))
g.add_edges(edges_uv)
g.es["length"] = edge_weights
g.es["orig_road"] = edge_orig_row
g.vs["xy"] = nodes_xy

# 是否只保留最大连通子图
USE_GIANT = False  # ← 如需减少计算量可设 True，但非 giant 部分会成为 NaN
if USE_GIANT:
    comps = g.clusters()
    gi = comps.giant()
    # giant 子图仍然保留了 es/ vs 的属性（包括 orig_road / xy）
    G = gi
    print(f"[INFO] 使用 giant 子图：V={G.vcount()} E={G.ecount()} / 原图 V={g.vcount()} E={g.ecount()}")
else:
    G = g
    print(f"[INFO] 使用全图：V={G.vcount()} E={G.ecount()}")

Build graph:   0%|          | 0/164138 [00:00<?, ?it/s]

[INFO] 使用全图：V=184964 E=164138


In [11]:
# ============= Local Closeness within 2km & 5km（节点→边） =============
import numpy as np
from tqdm import tqdm

try:
    from scipy.spatial import cKDTree as KDTree
except Exception as e:
    raise RuntimeError("需要 SciPy 的 KDTree。请先安装 scipy 再运行。") from e

# 读取顶点坐标
coords = np.asarray(G.vs["xy"], dtype=float)
nV = G.vcount()
kdt = KDTree(coords)

# 半径设置（米）
RADII = [2000.0, 5000.0]
SLACK = 1.05  # 欧氏筛选放宽 5%，降低漏检风险（网络距离 >= 欧氏距离）

# 用于把节点值映射到边（两端点均值）
s_idx = np.fromiter((e.tuple[0] for e in G.es), dtype=int, count=G.ecount())
t_idx = np.fromiter((e.tuple[1] for e in G.es), dtype=int, count=G.ecount())

def local_closeness_R(radius_m: float) -> np.ndarray:
    """基于半径 R（米）的局部 closeness：count(sum of reachable within R) / sum(distances within R)"""
    out = np.zeros(nV, dtype=float)
    for u in tqdm(range(nV), desc=f"Closeness R={int(radius_m/1000)}km"):
        # 1) 欧氏预筛：在 R*SLACK 的圆内寻找候选目标
        cand = kdt.query_ball_point(coords[u], r=radius_m * SLACK)
        if not cand:
            continue
        # 排除自己
        if u in cand:
            cand.remove(u)
        if len(cand) == 0:
            continue

        # 2) 仅对候选目标计算加权最短路
        d = G.shortest_paths_dijkstra(source=u, target=cand, weights="length")[0]
        d = np.asarray(d, dtype=float)

        # 3) 仅保留 0<距离<=R 的可达目标
        mask = np.isfinite(d) & (d > 0) & (d <= radius_m)
        if not np.any(mask):
            continue

        # 4) 局部 closeness： 可达目标数 / 距离和（越大越“近”）
        out[u] = mask.sum() / d[mask].sum()
    return out

# 分别计算 2km、5km 的节点级局部 closeness
clo_2k_nodes = local_closeness_R(2000.0)
clo_5k_nodes = local_closeness_R(5000.0)

# 映射到边（两端点均值）
clo_2k_edges = ((clo_2k_nodes[s_idx] + clo_2k_nodes[t_idx]) / 2.0).astype(float)
clo_5k_edges = ((clo_5k_nodes[s_idx] + clo_5k_nodes[t_idx]) / 2.0).astype(float)

# 写入边属性
G.es["closeness_2km"] = clo_2k_edges.tolist()
G.es["closeness_5km"] = clo_5k_edges.tolist()

print("[INFO] 已生成边属性：closeness_2km, closeness_5km")

  d = G.shortest_paths_dijkstra(source=u, target=cand, weights="length")[0]
Closeness R=2km: 100%|██████████| 184964/184964 [1:52:37<00:00, 27.37it/s]
Closeness R=5km: 100%|██████████| 184964/184964 [1:56:45<00:00, 26.40it/s]


[INFO] 已生成边属性：closeness_2km, closeness_5km


In [12]:
# ============= Betweenness（边） =============
random.seed(SEED)
np.random.seed(SEED)

BET_MODE = BET_MODE  # 沿用你参数区
K_SAMPLES = min(int(K_SAMPLES), max(1, G.vcount()))

if BET_MODE == "exact":
    with tqdm(total=1, desc="Edge betweenness (exact)") as pbar:
        eb = G.edge_betweenness(weights="length")
        G.es["betweenness_edge"] = eb
        pbar.update(1)
else:
    # 近似：随机抽取 K_SAMPLES 个源点，统计最短路经过次数
    counts = np.zeros(G.ecount(), dtype=np.float64)
    all_nodes = list(range(G.vcount()))
    sources = random.sample(all_nodes, K_SAMPLES)

    for s in tqdm(sources, desc=f"Edge betweenness approx (K={K_SAMPLES})"):
        epaths = G.get_shortest_paths(s, to=all_nodes, weights="length", output="epath")
        for epath in epaths:
            if not epath:  # 自身或不可达
                continue
            counts[epath] += 1.0

    counts /= K_SAMPLES
    G.es["betweenness_edge"] = counts.tolist()

  epaths = G.get_shortest_paths(s, to=all_nodes, weights="length", output="epath")
Edge betweenness approx (K=1200): 100%|██████████| 1200/1200 [48:09<00:00,  2.41s/it]


In [13]:
# ============= 回写到 GeoDataFrame（含 2km/5km 局部 closeness） =============
# 先初始化为 NaN（防止索引缺失）
roads["edge_betweenness"]   = roads.get("edge_betweenness",   np.nan)
roads["edge_closeness_hc"]  = roads.get("edge_closeness_hc",  np.nan)  # 如果你还保留全局/调和版
roads["edge_closeness_2km"] = np.nan
roads["edge_closeness_5km"] = np.nan

# 基于 orig_road 做一一对应的精确回写
orig_idx = np.array(G.es["orig_road"])

# 已有 betweenness
if "betweenness_edge" in G.es.attributes():
    roads.loc[orig_idx, "edge_betweenness"] = np.array(G.es["betweenness_edge"], dtype=float)

# 若你前面还保留了全局/调和 closeness（可选）
if "closeness_hc" in G.es.attributes():
    roads.loc[orig_idx, "edge_closeness_hc"] = np.array(G.es["closeness_hc"], dtype=float)

# 新增：2km / 5km 局部 closeness
roads.loc[orig_idx, "edge_closeness_2km"] = np.array(G.es["closeness_2km"], dtype=float)
roads.loc[orig_idx, "edge_closeness_5km"] = np.array(G.es["closeness_5km"], dtype=float)

# 统计
tot = len(roads)
def stat(col):
    miss = roads[col].isna().sum()
    print(f"       {col:>18} NaN: {miss:,}  ({miss/tot:.1%})")

print(f"[STAT] 回写完成：总边 {tot:,}")
for col in ["edge_betweenness","edge_closeness_hc","edge_closeness_2km","edge_closeness_5km"]:
    if col in roads.columns:
        stat(col)

[STAT] 回写完成：总边 164,138
         edge_betweenness NaN: 0  (0.0%)
        edge_closeness_hc NaN: 164,138  (100.0%)
       edge_closeness_2km NaN: 0  (0.0%)
       edge_closeness_5km NaN: 0  (0.0%)


In [14]:
# ============= 导出 =============
roads.to_file(OUT_GEOJSON, driver="gpkg")
print("Saved:", OUT_GEOJSON)
print("Rows:", len(roads))

Saved: /content/drive/MyDrive/CASA0004_Cycling/data/s3/export/cen_s3_2_5.gpkg
Rows: 164138


In [15]:
print(roads.columns)

Index(['id', 'name', 'way_type', 'geometry', 'length_m', 'edge_betweenness',
       'edge_closeness_hc', 'edge_closeness_2km', 'edge_closeness_5km'],
      dtype='object')


In [16]:
# ========= 参数区（按需改） =========
# IN_PATH  = "/mnt/data/cen_s3_2_5.gpkg"   # 你的输入 GPKG
OUT_PATH = "/content/drive/MyDrive/CASA0004_Cycling/data/s3/export/index_s3.gpkg"  # 输出 GPKG

In [17]:
# 双尺度 closeness 权重（不含 hc）
W_C2, W_C5 = 0.6, 0.4

# index_3 内部权重：B vs C_multi
W_B, W_C = 0.4, 0.6

# betweenness 裁剪的分位数（稳健处理长尾）
P_LOW, P_HIGH = 1, 99
# ===================================


# 读数据（如果你已有 gdf，就把这一行替换掉）
gdf = roads

In [18]:
# --------- 一些健壮性检查 ----------
need_cols = ["edge_betweenness", "edge_closeness_2km", "edge_closeness_5km"]
missing = [c for c in need_cols if c not in gdf.columns]
if missing:
    raise ValueError(f"缺少必要字段: {missing}")

In [20]:
import pandas as pd
# --------- 工具函数 ----------
def minmax(series: pd.Series):
    s = pd.to_numeric(series, errors="coerce")
    vmin = np.nanmin(s) if np.isfinite(np.nanmin(s)) else np.nan
    vmax = np.nanmax(s) if np.isfinite(np.nanmax(s)) else np.nan
    if not np.isfinite(vmin) or not np.isfinite(vmax) or vmax == vmin:
        return pd.Series(np.zeros(len(s)), index=s.index)
    out = (s - vmin) / (vmax - vmin)
    return out.fillna(0)

In [21]:
# --------- 1) 处理 betweenness（裁剪 + log1p + 0-1） ----------
bet = pd.to_numeric(gdf["edge_betweenness"], errors="coerce")
# 百分位裁剪（防极值支配）
p1, p99 = np.nanpercentile(bet.dropna(), [P_LOW, P_HIGH])
bet_clip = bet.clip(lower=p1, upper=p99)
# 对数变换（0 安全）
gdf["_B_log"] = np.log1p(bet_clip)
# 归一化
gdf["B_norm"] = minmax(gdf["_B_log"])

In [22]:
# --------- 2) 处理 2km / 5km closeness（各自 0-1） ----------
gdf["C2_norm"] = minmax(gdf["edge_closeness_2km"])
gdf["C5_norm"] = minmax(gdf["edge_closeness_5km"])

# --------- 3) 双尺度 closeness 合成 ----------
den = W_C2 + W_C5
gdf["C_multi"] = (W_C2 * gdf["C2_norm"] + W_C5 * gdf["C5_norm"]) / den

# --------- 4) Section3 指数：index_3 ----------
den2 = W_B + W_C
gdf["index_3"] = (W_B * gdf["B_norm"] + W_C * gdf["C_multi"]) / den2

In [23]:
# 写 GPKG（新建/覆盖），图层名给个更清晰的
layer_name = "index3_section"
gdf.to_file(OUT_PATH, layer=layer_name, driver="GPKG")
print(f"已导出：{OUT_PATH} (layer='{layer_name}')")

已导出：/content/drive/MyDrive/CASA0004_Cycling/data/s3/export/index_s3.gpkg (layer='index3_section')
