In [5]:
import math
import geopandas as gpd
from shapely.geometry import MultiLineString
import folium
from IPython.display import IFrame

# 1. 读入数据 & 统一 CRS
roads     = gpd.read_file(r"E:/internship/database/road/DBTRoad_metropolitan_primary.shp")
cyclelane = gpd.read_file(r"E:/internship/database/road/osm_cyclelane/osm_cyclelane_split_final.shp")
if roads.crs != cyclelane.crs:
    cyclelane = cyclelane.to_crs(roads.crs)

# 2. 计算每条要素的方向角（0–180°）
def compute_orientation(geom):
    if isinstance(geom, MultiLineString):
        geom = max(geom.geoms, key=lambda g: g.length)
    x0, y0 = geom.coords[0][:2]
    x1, y1 = geom.coords[-1][:2]
    return math.degrees(math.atan2(y1 - y0, x1 - x0)) % 180

roads["road_angle"]      = roads.geometry.apply(compute_orientation)
cyclelane["cycle_angle"] = cyclelane.geometry.apply(compute_orientation)

# 3. 构造道路 20m 缓冲区 & 空间连接（找出缓冲区内的自行车段）
BUFFER_DIST = 30  # 米
roads_buf = roads.copy()
roads_buf["road_idx"] = roads_buf.index
roads_buf.geometry    = roads_buf.geometry.buffer(BUFFER_DIST)

join = gpd.sjoin(
    cyclelane[["geometry", "cycle_angle"]],
    roads_buf[["geometry", "road_idx", "road_angle"]],
    how="left",
    predicate="intersects"
).dropna(subset=["road_idx"])

# 4. 角度差过滤（|Δangle| ≤ 20°）
join["ang_diff"] = (
    join["cycle_angle"] - join["road_angle"]
).abs().apply(lambda d: min(d, 180 - d))
join = join[join["ang_diff"] <= 30]

# 5. 垂足距离计算 & 对每段选择最近的道路
def perp_dist(c_geom, r_geom):
    centroid = c_geom.centroid
    proj     = r_geom.project(centroid)
    foot     = r_geom.interpolate(proj)
    return centroid.distance(foot)

join["perp_dist"] = join.apply(
    lambda row: perp_dist(
        cyclelane.loc[row.name].geometry,
        roads.loc[int(row["road_idx"])].geometry
    ),
    axis=1
)

best = join.loc[join.groupby(level=0)["perp_dist"].idxmin()]
matched_cycle_idx = best.index.unique()
matched_road_idx  = best["road_idx"].astype(int).unique()

# 6. 打标：cyclelane["matched"], roads["osm"]
cyclelane["matched"] = False
cyclelane.loc[matched_cycle_idx, "matched"] = True

roads["osm"] = 0
roads.loc[matched_road_idx, "osm"] = 1

# 7. 统计验证
total     = len(cyclelane)
matched   = cyclelane["matched"].sum()
unmatched = total - matched
print(f"原始自行车段落数：{total}")
print(f"可匹配段落数：{matched}")
print(f"不可匹配段落数：{unmatched}")
assert matched + unmatched == total

# 8. Folium 可视化
roads_wgs  = roads.to_crs(epsg=4326)
cycle_wgs  = cyclelane.to_crs(epsg=4326)
centroid   = roads_wgs.geometry.unary_union.centroid

m = folium.Map(
    location=[centroid.y, centroid.x],
    zoom_start=14,
    tiles=None,
    control_scale=True
)

# 原始道路（灰色）
folium.GeoJson(
    roads_wgs,
    name="原始道路",
    style_function=lambda feat: {"color": "gray", "weight": 1, "opacity": 0.6}
).add_to(m)

# 可匹配自行车段（绿色）
folium.GeoJson(
    cycle_wgs[cycle_wgs["matched"]],
    name="可匹配自行车段",
    style_function=lambda feat: {"color": "green", "weight": 3, "opacity": 0.8}
).add_to(m)

# 未匹配自行车段（红色）
folium.GeoJson(
    cycle_wgs[~cycle_wgs["matched"]],
    name="未匹配自行车段",
    style_function=lambda feat: {"color": "red", "weight": 3, "opacity": 0.8}
).add_to(m)


folium.LayerControl().add_to(m)
m.save("final_match_visualization.html")
# 在 Jupyter Notebook 中可以：
# from IPython.display import IFrame
IFrame("final_match_visualization.html", width="100%", height="600px")


原始自行车段落数：216050
可匹配段落数：173206
不可匹配段落数：42844


  centroid   = roads_wgs.geometry.unary_union.centroid


In [6]:
# 假设上一步你已经得到了 cyclelane、并在其上打好了 cyclelane["matched"] 标记

# 1. 拆分出两个 GeoDataFrame
matched_gdf   = cyclelane[cyclelane["matched"]].copy()
unmatched_gdf = cyclelane[~cyclelane["matched"]].copy()

# 2. 选择输出格式和路径（可改为 GeoJSON）
matched_path   = r"E:/internship/database/road/osmcyclelane_matched.shp"
unmatched_path = r"E:/internship/database/road/osmcyclelane_unmatched.shp"

# 3. 写文件
matched_gdf.to_file(matched_path,   driver="ESRI Shapefile")
unmatched_gdf.to_file(unmatched_path, driver="ESRI Shapefile")

print(f"已导出 可匹配 自行车段：{matched_path}")
print(f"已导出 不可匹配 自行车段：{unmatched_path}")


  matched_gdf.to_file(matched_path,   driver="ESRI Shapefile")
  ogr_write(
  unmatched_gdf.to_file(unmatched_path, driver="ESRI Shapefile")
  ogr_write(


已导出 可匹配 自行车段：E:/internship/database/road/osmcyclelane_matched.shp
已导出 不可匹配 自行车段：E:/internship/database/road/osmcyclelane_unmatched.shp


In [7]:
# 在gis中使用捕捉操作后，用折点打断后再进行
import math
import geopandas as gpd
from shapely.geometry import MultiLineString
import numpy as np

# 1. 读入
roads     = gpd.read_file(r"E:/internship/database/road/DBTRoad_metropolitan_primary.shp")
cyclelane = gpd.read_file(r"E:/internship/database/road/osmcyclelane_matched.shp")

# 2. 统一 CRS
if roads.crs != cyclelane.crs:
    cyclelane = cyclelane.to_crs(roads.crs)
    # —— 新增：删除长度 < 5 米的自行车段 —— 
# 计算每条线的长度
cyclelane["length_m"] = cyclelane.geometry.length
# 保留长度 >= 5 米的
cyclelane = cyclelane.loc[cyclelane["length_m"] >= 5].copy()
# 删除临时字段
cyclelane.drop(columns=["length_m"], inplace=True)

# —— 在开始任何 apply 之前，先把空几何丢掉 —— 
cyclelane = cyclelane.loc[
    cyclelane.geometry.notnull() &
    ~cyclelane.geometry.is_empty
].copy()

# 3. 初筛：最近距离匹配，打 matched 标记
max_dist = 30
nearest = gpd.sjoin_nearest(
    cyclelane, roads,
    how="left",
    distance_col="dist",
    max_distance=max_dist
)
matched_idx = nearest.loc[nearest["dist"].notna()].index.unique()
cyclelane["matched"] = False
cyclelane.loc[matched_idx, "matched"] = True

# 4. 计算方向角（带空值保护）
def compute_orientation(geom):
    # 防守：如果还是空或 None，返回 NaN
    if geom is None or geom.is_empty:
        return np.nan
    # MultiLineString 时取最长那一段
    if isinstance(geom, MultiLineString):
        geom = max(geom.geoms, key=lambda g: g.length)
    x0, y0 = geom.coords[0][:2]
    x1, y1 = geom.coords[-1][:2]
    return math.degrees(math.atan2(y1 - y0, x1 - x0)) % 180

roads["angle"]     = roads.geometry.apply(compute_orientation)
cyclelane["angle"] = cyclelane.geometry.apply(compute_orientation)

# —— 再次过滤掉 angle 为 NaN 的行（保险起见） —— 
cyclelane = cyclelane.loc[cyclelane["angle"].notna()]

# 5. 初始化 osm & 建空间索引
roads["osm"] = 0
sindex = roads.sindex

# 6. 内部片段过滤函数
def is_interior_segment(road_geom, bike_geom, margin):
    cent   = bike_geom.centroid
    proj   = road_geom.project(cent)
    length = road_geom.length
    return (proj > margin) and (proj < length - margin)

# 7. 精匹配循环
BUFFER_DIST  = 30
ANGLE_THRESH = 20
MARGIN       = 0

for _, bike in cyclelane[cyclelane["matched"]].iterrows():
    buff = bike.geometry.buffer(BUFFER_DIST)
    cand_idx = list(sindex.intersection(buff.bounds))
    cands    = roads.loc[cand_idx].copy()
    if cands.empty: 
        continue

    cands = cands[cands.geometry.intersects(buff)]
    if cands.empty:
        continue

    cands["ang_diff"] = cands["angle"].apply(
        lambda a: min(abs(a - bike.angle), 180 - abs(a - bike.angle))
    )
    cands = cands[cands["ang_diff"] <= ANGLE_THRESH]
    if cands.empty:
        continue

    cands["interior_ok"] = cands.geometry.apply(
        lambda r: is_interior_segment(r, bike.geometry, MARGIN)
    )
    cands = cands[cands["interior_ok"]]
    if cands.empty:
        continue

    cands["dist2"] = cands.geometry.distance(bike.geometry)
    best_idx      = cands["dist2"].idxmin()
    roads.at[best_idx, "osm"] = 1

# 8. 保存
roads.to_file(
    r"E:/internship/database/Test/road_with_osm_test.shp",
    driver="ESRI Shapefile"
)


In [8]:
from shapely.geometry import LineString, MultiLineString

# —— A) 收集所有 osm==1 道路端点 —— 
matched_nodes = set()
for idx in roads.index[roads["osm"] == 1]:
    geom = roads.at[idx, "geometry"]
    parts = geom.geoms if isinstance(geom, MultiLineString) else [geom]
    for part in parts:
        x0,y0 = part.coords[0][:2]
        x1,y1 = part.coords[-1][:2]
        matched_nodes.add((round(x0,6), round(y0,6)))
        matched_nodes.add((round(x1,6), round(y1,6)))

# —— B) 把夹在两已匹配端点间的小段也晋升为 osm=1 —— 
to_promote = []
for idx in roads.index[roads["osm"] == 0]:
    geom = roads.at[idx, "geometry"]
    parts = geom.geoms if isinstance(geom, MultiLineString) else [geom]
    for part in parts:
        p0 = (round(part.coords[0][0],6), round(part.coords[0][1],6))
        p1 = (round(part.coords[-1][0],6), round(part.coords[-1][1],6))
        if p0 in matched_nodes and p1 in matched_nodes:
            to_promote.append(idx)
            break

roads.loc[to_promote, "osm"] = 1


In [6]:
from collections import defaultdict

# —— A) 构建端点坐标到边索引的映射 —— 
#（假设 roads 已经加载，且有 roads["osm"] 字段）
# 用四舍五入缓存坐标以应对浮点误差
PREC = 6
pt2edges = defaultdict(set)

for idx, row in roads.iterrows():
    geom = row.geometry
    # 支持 MultiLineString
    parts = geom.geoms if hasattr(geom, "geoms") else [geom]
    for part in parts:
        x0, y0 = part.coords[0][:2]
        x1, y1 = part.coords[-1][:2]
        p0 = (round(x0, PREC), round(y0, PREC))
        p1 = (round(x1, PREC), round(y1, PREC))
        pt2edges[p0].add(idx)
        pt2edges[p1].add(idx)

# —— B) 交叉口扩散循环 —— 
promoted = True
while promoted:
    promoted = False
    # 找本轮所有 osm==1 的边
    ones = set(roads.index[roads["osm"] == 1])
    # 对每条当前 1 的边，检查它两端
    for idx in list(ones):
        geom = roads.at[idx, "geometry"]
        parts = geom.geoms if hasattr(geom, "geoms") else [geom]
        for part in parts:
            for pt in [(part.coords[0][:2]), (part.coords[-1][:2])]:
                p = (round(pt[0], PREC), round(pt[1], PREC))
                # 遍历所有在此端点相连的边
                for nbr in pt2edges[p]:
                    if roads.at[nbr, "osm"] == 0:
                        # 看 nbr 的“另一端”坐标
                        g2 = roads.at[nbr, "geometry"]
                        segs = g2.geoms if hasattr(g2, "geoms") else [g2]
                        # 找那段的对应part
                        for s in segs:
                            q0 = (round(s.coords[0][0], PREC), round(s.coords[0][1], PREC))
                            q1 = (round(s.coords[-1][0], PREC), round(s.coords[-1][1], PREC))
                            if p == q0:
                                other = q1
                            elif p == q1:
                                other = q0
                            else:
                                continue
                            # 计算 other 端点的度
                            deg = len(pt2edges[other])
                            # 若度>2，则晋升
                            if deg > 2:
                                roads.at[nbr, "osm"] = 1
                                promoted = True
                            break
                # end for nbr
            # end for pt
    # end for idx

# —— C) 输出检查 —— 
total_edges = len(roads)
osm_ones    = int((roads["osm"] == 1).sum())
print(f"道路总数：{total_edges}")
print(f"osm=1 的道路数：{osm_ones} (含扩散晋升)")

道路总数：16610
osm=1 的道路数：1695 (含扩散晋升)


In [9]:
import folium
from IPython.display import IFrame

# —— 假设到此你已经有了以下变量：——
# roads: GeoDataFrame，包含字段 "osm"（0/1）
# cyclelane: GeoDataFrame，包含字段 "matched"（True/False）

# 1. 投影到 WGS84（Folium 需要经纬度）
roads_wgs  = roads.to_crs(epsg=4326)
cycle_wgs  = cyclelane.to_crs(epsg=4326)

# 2. 计算地图中心点
centroid = roads_wgs.geometry.unary_union.centroid

# 3. 初始化 Folium 地图，加载底图
m = folium.Map(
    location=[centroid.y, centroid.x],
    zoom_start=14,
    tiles=None,
    control_scale=True
)

# 4. 渲染原始道路（灰色）
folium.GeoJson(
    roads_wgs,
    name="原始道路",
    style_function=lambda feat: {
        "color": "gray",
        "weight": 1,
        "opacity": 0.6
    }
).add_to(m)

# 5. 渲染可匹配自行车段落（红色）
folium.GeoJson(
    cycle_wgs[cycle_wgs["matched"]],
    name="可匹配自行车段落",
    style_function=lambda feat: {
        "color": "red",
        "weight": 3,
        "opacity": 0.8
    }
).add_to(m)

# 6. 渲染 osm=1 的道路（绿色）
folium.GeoJson(
    roads_wgs[roads_wgs["osm"] == 1],
    name="osm=1 道路",
    style_function=lambda feat: {
        "color": "green",
        "weight": 3,
        "opacity": 0.8
    }
).add_to(m)

# 7. 添加图层控件并保存/展示
folium.LayerControl().add_to(m)
m.save("match_visualization.html")

# 在 Jupyter Notebook 中嵌入预览：
# IFrame("match_visualization.html", width="100%", height="600px")


# 添加图层控件并保存
folium.LayerControl().add_to(m)
m.save("match_visualization.html")
# （Notebook 中可用 IFrame 嵌入预览）
# IFrame("final_visualization.html", width="100%", height="600px")
m


  centroid = roads_wgs.geometry.unary_union.centroid


In [11]:
import geopandas as gpd
import pandas as pd
import networkx as nx
from shapely.geometry import Point, LineString, MultiLineString
from shapely.ops import nearest_points

# —— 0. 读取未匹配的自行车段（cycle_unmatched），roads 已在内存 —— 
cycle_unmatched = gpd.read_file(r"E:/internship/database/road/osmcyclelane_unmatched.shp")
if cycle_unmatched.crs != roads.crs:
    cycle_unmatched = cycle_unmatched.to_crs(roads.crs)

# —— 1. 构建无向图，把每条碎段的端点当作节点 —— 
G = nx.Graph()
for idx, row in cycle_unmatched.iterrows():
    geom = row.geometry
    parts = geom.geoms if isinstance(geom, MultiLineString) else [geom]
    for part in parts:
        p0 = tuple(part.coords[0][:2])
        p1 = tuple(part.coords[-1][:2])
        # 在图里连这两个端点
        G.add_edge(p0, p1, edge_idx=idx)

# —— 2. 找出度＝1 的端点 —— 
dead_pts = [pt for pt, deg in G.degree() if deg == 1]
print("度=1 的断头端点数：", len(dead_pts))

# —— 3. 准备 snapping 用的 roads_union —— 
roads_union = roads.geometry.unary_union  # 或 union_all()

# —— 4. 对每条未匹配线：只 snap 那些真正断开的端点 —— 
new_lines = []
for idx, row in cycle_unmatched.iterrows():
    geom = row.geometry
    parts = geom.geoms if isinstance(geom, MultiLineString) else [geom]
    for part in parts:
        coords = list(part.coords)
        p0, p1 = tuple(coords[0][:2]), tuple(coords[-1][:2])
        # 只有断头端点才 snap
        if p0 in dead_pts:
            snap0 = nearest_points(Point(p0), roads_union)[1]
            coord0 = (snap0.x, snap0.y)
        else:
            coord0 = p0
        if p1 in dead_pts:
            snap1 = nearest_points(Point(p1), roads_union)[1]
            coord1 = (snap1.x, snap1.y)
        else:
            coord1 = p1

        # 中间点保留原样二维
        mid_xy = [(c[0], c[1]) for c in coords[1:-1]]
        new_geom = LineString([coord0] + mid_xy + [coord1])
        new_lines.append({"geometry": new_geom, "match": None, "osm": 1})

print("新 snapped 线段数量：", len(new_lines))

# —— 5. 追加回 roads 并打印统计 —— 
new_gdf     = gpd.GeoDataFrame(new_lines, crs=roads.crs)
common_cols = [c for c in roads.columns if c in new_gdf.columns]
before      = len(roads)
roads       = gpd.GeoDataFrame(
    pd.concat([roads, new_gdf[common_cols]], ignore_index=True),
    crs=roads.crs
)
after     = len(roads)
osm_after = int((roads["osm"] == 1).sum())

print("追加前 roads 数:", before)
print("追加后 roads 数:", after)
print("追加后 osm=1 道路数:", osm_after)


度=1 的断头端点数： 5142


  roads_union = roads.geometry.unary_union  # 或 union_all()


新 snapped 线段数量： 42844
追加前 roads 数: 298346
追加后 roads 数: 341190
追加后 osm=1 道路数: 63265


In [12]:
import folium
from IPython.display import IFrame

# 1. 投影到 WGS84（Folium 要求经纬度）
roads_wgs           = roads.to_crs(epsg=4326)
cycle_unmatched_wgs = cycle_unmatched.to_crs(epsg=4326)

# 2. 地图中心
cent = roads_wgs.geometry.unary_union.centroid
m = folium.Map(
    location=[cent.y, cent.x],
    zoom_start=14,
    tiles=None,
    control_scale=True
)

# 3. 渲染原始/追加后的所有道路（灰色）
folium.GeoJson(
    roads_wgs,
    name="全部道路",
    style_function=lambda feat: {
        "color": "gray", "weight": 1, "opacity": 0.6
    }
).add_to(m)

# 4. 渲染 roads 中 osm=1 的部分（绿色）
folium.GeoJson(
    roads_wgs[roads_wgs["osm"] == 1],
    name="osm=1 道路",
    style_function=lambda feat: {
        "color": "green", "weight": 6, "opacity": 0.8
    }
).add_to(m)

# 5. 渲染原始未匹配自行车段（红色）
folium.GeoJson(
    cycle_unmatched_wgs,
    name="未匹配自行车段",
    style_function=lambda feat: {
        "color": "red", "weight": 3, "opacity": 0.8
    }
).add_to(m)

# 6. 图层控件
folium.LayerControl().add_to(m)

# 7. 保存并在 Notebook 中预览
m.save("final_match_and_unmatched.html")
IFrame("final_match_and_unmatched.html", width="100%", height="600px")


  cent = roads_wgs.geometry.unary_union.centroid


In [None]:
roads.to_file(r"E:/internship/database/road/cyclelanematch_step1.shp")