In [1]:
# visualize_stations_in_service_area_map.py
# 交互式地图：Waymo 服务区 + 站点分布（L2/L3 分色，内外分层，聚合显示）

from pathlib import Path
import pandas as pd
import geopandas as gpd
from shapely.geometry import Point, Polygon, MultiPolygon
from shapely.ops import unary_union
import folium
from folium.plugins import MarkerCluster

# ========== 硬编码路径（按需修改）==========
SERVICE_AREA_GEOJSON = r"../data/raw data/waymo_atx_2025-03-03.geojson"
STATIONS_CSV_IN      = r"../data/stations.csv"   # 列至少包含：k,zone,level,lat,lon,plugs,util_factor
OUT_HTML             = r"../data/viz/stations_map.html"

# 可选：若提供一个“范围内的 zones 列表”CSV（至少含列 zone），会再按 zone 做一次过滤；没有就设为 ""
ZONES_KEEP_CSV       = r""

# 判定方式：True=边界点算在内（covered_by）；False=严格在面内（within）
COUNT_BORDER_AS_INSIDE = True

# 若服务区是折线（LineString/MultiLineString），在米制投影下缓冲转面
LINE_BUFFER_METERS = 30.0
# ==========================================


# ---------- 加载服务区（面或线->面） ----------
def load_service_area(path: str) -> gpd.GeoDataFrame:
    gdf = gpd.read_file(path)
    if gdf.crs is None:
        gdf = gdf.set_crs(4326, allow_override=True)
    else:
        gdf = gdf.to_crs(4326)

    geom = gdf.geometry
    is_line_like = geom.geom_type.isin(["LineString", "MultiLineString"]).all()

    if is_line_like:
        g_m = gdf.to_crs(3857).geometry
        poly_m = unary_union(g_m).buffer(LINE_BUFFER_METERS)  # 线→面
        sa_poly = gpd.GeoDataFrame(geometry=[poly_m], crs=3857).to_crs(4326)
    else:
        poly = unary_union(geom).buffer(0)  # 合并并修复
        sa_poly = gpd.GeoDataFrame(geometry=[poly], crs=4326)

    if not isinstance(sa_poly.geometry.iloc[0], (Polygon, MultiPolygon)):
        raise ValueError("服务区几何未能转换为有效面，请检查 GeoJSON。")
    return sa_poly[["geometry"]]


# ---------- 加载站点为点 ----------
def load_stations(csv_path: str) -> gpd.GeoDataFrame:
    df = pd.read_csv(csv_path)
    need = {"k", "zone", "level", "lat", "lon"}
    miss = [c for c in need if c not in df.columns]
    if miss:
        raise ValueError(f"站点CSV缺少列：{miss}；应至少包含 {need}")

    df["lat"] = pd.to_numeric(df["lat"], errors="coerce")
    df["lon"] = pd.to_numeric(df["lon"], errors="coerce")
    df = df.dropna(subset=["lat", "lon"]).copy()
    df["k"] = df["k"].astype(str)
    df["zone"] = df["zone"].astype(str)
    # level 标准化为小写（l2/l3）
    df["level"] = df["level"].astype(str).str.strip().str.lower()

    gdf = gpd.GeoDataFrame(
        df,
        geometry=[Point(xy) for xy in zip(df["lon"], df["lat"])],
        crs="EPSG:4326",
    )
    return gdf


# ---------- 根据服务区标记 inside/outside ----------
def tag_inside(stations_gdf: gpd.GeoDataFrame, service_area: gpd.GeoDataFrame, border_inside: bool) -> pd.Series:
    predicate = "covered_by" if border_inside else "within"
    try:
        joined = gpd.sjoin(stations_gdf, service_area, how="left", predicate=predicate)
        return joined["index_right"].notna()
    except TypeError:
        # 兼容旧版 geopandas：逐点判断
        poly = service_area.geometry.iloc[0]
        if predicate == "covered_by":
            return stations_gdf.geometry.apply(lambda p: p.within(poly) or p.touches(poly))
        else:
            return stations_gdf.geometry.within(poly)


# ---------- 绘制图层 ----------
def add_service_area_layer(m: folium.Map, sa: gpd.GeoDataFrame):
    folium.GeoJson(
        data=sa.to_json(),
        name="Service Area",
        style_function=lambda x: {"color": "#d62728", "weight": 3, "fillOpacity": 0.05},
        highlight_function=lambda x: {"weight": 4, "color": "#ff9896"},
    ).add_to(m)


def add_station_layer(m: folium.Map, gdf: gpd.GeoDataFrame, name: str, color_fn):
    grp = folium.FeatureGroup(name=name, show=True)
    clusters = {}  # level -> MarkerCluster
    for lvl in sorted(gdf["level"].dropna().unique()):
        clusters[lvl] = MarkerCluster(name=f"{name} • {lvl.upper()}", disableClusteringAtZoom=15)
        clusters[lvl].add_to(grp)

    for _, r in gdf.iterrows():
        p = r.geometry
        lvl = (r.get("level") or "").lower()
        color = color_fn(lvl)
        html = (
            f"<b>k</b>: {r.get('k','')}"
            f"<br><b>zone</b>: {r.get('zone','')}"
            f"<br><b>level</b>: {r.get('level','')}"
            f"<br><b>plugs</b>: {r.get('plugs','')}"
            f"<br><b>util_factor</b>: {r.get('util_factor','')}"
        )
        marker = folium.CircleMarker(
            location=[p.y, p.x],
            radius=4,
            color=color,
            fill=True,
            fill_opacity=0.95,
            weight=1,
            popup=folium.Popup(html=html, max_width=260),
            tooltip=f"k: {r.get('k','')} | {lvl.upper()}",
        )
        # 分配到 level 子簇；未知 level 的落到主分组
        if lvl in clusters:
            marker.add_to(clusters[lvl])
        else:
            marker.add_to(grp)
    grp.add_to(m)


def add_outside_layer(m: folium.Map, gdf: gpd.GeoDataFrame):
    grp = folium.FeatureGroup(name="Stations (outside)", show=False)
    cluster = MarkerCluster(name="Outside cluster", disableClusteringAtZoom=15)
    for _, r in gdf.iterrows():
        p = r.geometry
        html = (
            f"<b>k</b>: {r.get('k','')}"
            f"<br><b>zone</b>: {r.get('zone','')}"
            f"<br><b>level</b>: {r.get('level','')}"
            f"<br><b>plugs</b>: {r.get('plugs','')}"
            f"<br><b>util_factor</b>: {r.get('util_factor','')}"
        )
        folium.CircleMarker(
            location=[p.y, p.x],
            radius=3,
            color="#7f7f7f",
            fill=True,
            fill_opacity=0.8,
            weight=1,
            popup=folium.Popup(html=html, max_width=260),
            tooltip=f"OUT • k: {r.get('k','')}",
        ).add_to(cluster)
    cluster.add_to(grp)
    grp.add_to(m)


def main():
    # 1) 载入服务区
    sa = load_service_area(SERVICE_AREA_GEOJSON)

    # 2) 载入站点，并（可选）用 zones_keep 再约束
    st = load_stations(STATIONS_CSV_IN)
    if ZONES_KEEP_CSV:
        keep_z = pd.read_csv(ZONES_KEEP_CSV, dtype={"zone": str})
        if "zone" not in keep_z.columns:
            raise ValueError("ZONES_KEEP_CSV 需包含列 'zone'")
        keep_set = set(keep_z["zone"].astype(str))
        st = st[st["zone"].astype(str).isin(keep_set)].copy()

    # 3) inside/outside 判定
    inside_mask = tag_inside(st, sa, COUNT_BORDER_AS_INSIDE)
    st_in = st.loc[inside_mask].copy()
    st_out = st.loc[~inside_mask].copy()

    # 4) 地图初始化（用服务区居中）
    minx, miny, maxx, maxy = sa.total_bounds
    center = [(miny + maxy) / 2, (minx + maxx) / 2]
    m = folium.Map(location=center, zoom_start=12, tiles="cartodbpositron")

    # 5) 图层：服务区 + 站点
    add_service_area_layer(m, sa)

    def level_color(lvl: str) -> str:
        if lvl == "l3" or lvl == "dcfc" or "fast" in lvl:
            return "#e74c3c"   # 红：直流快充
        if lvl == "l2":
            return "#2ecc71"   # 绿：L2
        return "#1f77b4"       # 蓝：未知/其他

    add_station_layer(m, st_in, "Stations (inside)", level_color)
    if not st_out.empty:
        add_outside_layer(m, st_out)

    # 6) 统计信息
    n_all = len(st)
    n_in = len(st_in)
    n_out = len(st_out)
    print(f"Stations: total={n_all}, inside={n_in}, outside={n_out}")
    print("Inside by level:\n", st_in["level"].value_counts(dropna=False))

    folium.LayerControl(collapsed=False).add_to(m)
    m.fit_bounds([[miny, minx], [maxy, maxx]])

    Path(OUT_HTML).parent.mkdir(parents=True, exist_ok=True)
    m.save(OUT_HTML)
    print(f"Saved map -> {OUT_HTML}")

if __name__ == "__main__":
    main()


Stations: total=295, inside=295, outside=0
Inside by level:
 level
l2    275
l3     20
Name: count, dtype: int64
Saved map -> ../data/viz/stations_map.html
