In [2]:
import os
import chardet
import pandas as pd
import geopandas as gpd

BASE = "data"
traj_path = os.path.join(BASE, "Take-out workers(1)", "外卖员每小时对应基站_基站编码.csv")
map_path  = os.path.join(BASE, "Take-out workers(1)", "新业态基站_编码(1).csv")
shp_path  = os.path.join(BASE, "wgs84", "gzlbs.shp")

def read_csv_safely(path):
    with open(path, "rb") as f:
        raw = f.read(200000)
    enc = chardet.detect(raw).get("encoding") or "utf-8"
    try:
        return pd.read_csv(path, encoding=enc)
    except Exception:
        # 常见回退
        for e in ["utf-8-sig", "gbk", "ansi"]:
            try:
                return pd.read_csv(path, encoding=e)
            except Exception:
                continue
        raise

# 读 CSV
df_traj = read_csv_safely(traj_path)
df_map  = read_csv_safely(map_path)

# 读 shapefile
gdf = gpd.read_file(shp_path)
if gdf.crs is None or gdf.crs.to_epsg() != 4326:
    try:
        gdf = gdf.to_crs(epsg=4326)
    except Exception:
        # 若原始 CRS 未设置且本就为 WGS84，可忽略；否则需用户确认
        pass
gdf["lon"] = gdf.geometry.x
gdf["lat"] = gdf.geometry.y

print("轨迹数据列名:", list(df_traj.columns)[:20])
print("基站编码映射列名:", list(df_map.columns)[:20])
print("基站矢量列名:", list(gdf.columns)[:30])
print("样例轨迹行:")
display(df_traj.head(3))
print("样例映射行:")
display(df_map.head(3))
print("样例矢量行:")
display(gdf.head(3))

  return pd.read_csv(path, encoding=enc)


轨迹数据列名: ['UID', 'cgi_0', 'cgi_1', 'cgi_2', 'cgi_3', 'cgi_4', 'cgi_5', 'cgi_6', 'cgi_7', 'cgi_8', 'cgi_9', 'cgi_10', 'cgi_11', 'cgi_12', 'cgi_13', 'cgi_14', 'cgi_15', 'cgi_16', 'cgi_17', 'cgi_18']
基站编码映射列名: ['cgi_id', 'lon_gz', 'lat_gz']
基站矢量列名: ['cgi_id', 'lon_gz', 'lat_gz', 'geometry', 'lon', 'lat']
样例轨迹行:


Unnamed: 0,UID,cgi_0,cgi_1,cgi_2,cgi_3,cgi_4,cgi_5,cgi_6,cgi_7,cgi_8,...,cgi_14,cgi_15,cgi_16,cgi_17,cgi_18,cgi_19,cgi_20,cgi_21,cgi_22,cgi_23
0,169649,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,...,73648.0,1,1,73648,2,2.0,1,1,2.0,2.0
1,169648,565.0,,,,565.0,565.0,565.0,3.0,4.0,...,5.0,5,5,5,3,565.0,565,565,565.0,
2,169645,2577.0,7.0,6.0,,2577.0,2577.0,2577.0,,,...,,7,7,15096,55771,102237.0,50131,67920,2577.0,67920.0


样例映射行:


Unnamed: 0,cgi_id,lon_gz,lat_gz
0,110082,29557.547815,64409.446807
1,84781,74146.816341,28973.560021
2,119413,48669.37212,28968.959635


样例矢量行:


Unnamed: 0,cgi_id,lon_gz,lat_gz,geometry,lon,lat
0,110082,29557.547815,64409.446807,POINT (113.18222 23.4466),113.182215,23.446596
1,84781,74146.816341,28973.560021,POINT (113.61701 23.12727),113.617011,23.127267
2,119413,48669.37212,28968.959635,POINT (113.36813 23.127),113.368131,23.126998


In [4]:
import re

# 可能的基站编码字段候选
station_keys = [
    "基站编码","基站id","基站ID","站点编码","站点id","站点ID","cellid","cell_id","eci","ECI",
    "lac","LAC","ci","CI","site_id","siteID","gNodeB","ENBID","NR_CELL_ID","小区ID","小区编码"
]

time_keys = ["时间","采样时间","hour","Hour","时刻","timestamp","datetime","time","TIME"]
user_keys = ["外卖员","外卖员id","外卖员ID","courier_id","user_id","uid","ID","id"]

def find_col(candidates, columns):
    cols_lower = {c.lower(): c for c in columns}
    for k in candidates:
        if k in columns: return k
        if k.lower() in cols_lower: return cols_lower[k.lower()]
    # 模糊匹配
    for c in columns:
        cl = c.lower()
        if any(re.search(rf"{re.escape(k.lower())}", cl) for k in candidates):
            return c
    return None

traj_station_col = find_col(station_keys, df_traj.columns)
map_station_left  = find_col(station_keys, df_map.columns)
map_station_right = None  # 若映射表两列：一列是轨迹里的编码，一列是矢量里的编码
if df_map.shape[1] >= 2:
    # 猜测“另一列”为非同名的另一种编码
    other_cols = [c for c in df_map.columns if c != map_station_left]
    # 优先挑一个非数字占比低/字符串列
    map_station_right = other_cols[0] if other_cols else None

# 矢量表里的“基站标识列”（用来与映射右列或轨迹列连接）
gdf_station_col = find_col(station_keys, gdf.columns)

traj_time_col = find_col(time_keys, df_traj.columns)
traj_user_col = find_col(user_keys, df_traj.columns)

print("识别到字段：")
print("轨迹-基站列:", traj_station_col, "轨迹-时间列:", traj_time_col, "轨迹-人员列:", traj_user_col)
print("映射-轨迹端列:", map_station_left, "映射-矢量端列(猜测):", map_station_right)
print("矢量-基站列:", gdf_station_col)

# 统一字符串以避免类型不一致
def norm_str_series(s):
    return s.astype(str).str.strip().str.replace(r"\.0$", "", regex=True)

df_traj2 = df_traj.copy()
if traj_station_col:
    df_traj2[traj_station_col] = norm_str_series(df_traj2[traj_station_col])

df_map2 = df_map.copy()
if map_station_left:
    df_map2[map_station_left] = norm_str_series(df_map2[map_station_left])
if map_station_right and map_station_right in df_map2.columns:
    df_map2[map_station_right] = norm_str_series(df_map2[map_station_right])

gdf2 = gdf.copy()
if gdf_station_col and gdf_station_col in gdf2.columns:
    gdf2[gdf_station_col] = norm_str_series(gdf2[gdf_station_col])

# 三种合并策略：
merged = None
merge_info = ""

try_order = []

# A. 轨迹基站列 直接对上 矢量基站列
if traj_station_col and gdf_station_col:
    try_order.append(("A", df_traj2, traj_station_col, gdf2, gdf_station_col))

# B. 轨迹 -> 映射(左) -> 映射(右) -> 矢量
if traj_station_col and map_station_left and map_station_right and gdf_station_col:
    # 先左连接到映射表，再用映射右列对接矢量
    tmp = df_traj2.merge(df_map2[[map_station_left, map_station_right]], left_on=traj_station_col, right_on=map_station_left, how="left")
    try_order.append(("B", tmp, map_station_right, gdf2, gdf_station_col))

# C. 轨迹 -> 映射(左) 直接等名对接矢量（若映射左列名与矢量列名相同）
if traj_station_col and map_station_left and (map_station_left == gdf_station_col):
    tmp = df_traj2.merge(df_map2[[map_station_left]], on=map_station_left, how="left")
    try_order.append(("C", tmp, map_station_left, gdf2, gdf_station_col or map_station_left))

for tag, left_df, left_key, right_gdf, right_key in try_order:
    if left_key and right_key and left_key in left_df.columns and right_key in right_gdf.columns:
        merged_try = left_df.merge(right_gdf[[right_key, "lon", "lat"]], left_on=left_key, right_on=right_key, how="left")
        hit_rate = merged_try["lon"].notna().mean()
        if hit_rate > 0.2:  # 命中率阈值，可调整
            merged = merged_try
            merge_info = tag
            break

if merged is None:
    print("未能自动匹配字段，请确认以下列名关系：")
    print("- 轨迹中的‘基站编码’列:", traj_station_col)
    print("- 映射表用于轨迹的列:", map_station_left, "用于矢量的列:", map_station_right)
    print("- 矢量中的‘基站标识’列:", gdf_station_col)
else:
    print(f"合并成功，策略 = {merge_info}，合并后行数={len(merged):,}，经纬度命中率={merged['lon'].notna().mean():.1%}")

merged_head = merged.head(3) if merged is not None else None
display(merged_head)

识别到字段：
轨迹-基站列: None 轨迹-时间列: None 轨迹-人员列: UID
映射-轨迹端列: None 映射-矢量端列(猜测): cgi_id
矢量-基站列: None
未能自动匹配字段，请确认以下列名关系：
- 轨迹中的‘基站编码’列: None
- 映射表用于轨迹的列: None 用于矢量的列: cgi_id
- 矢量中的‘基站标识’列: None


None

In [6]:
import os
import folium
import geopandas as gpd

# 已有的 gdf 为基站点集（来自 gzlbs.shp）
# 自动寻找路网（LineString/MultiLineString）shapefile
def find_road_shp(base_dir):
    cands = []
    for root, _, files in os.walk(base_dir):
        for f in files:
            if f.lower().endswith(".shp"):
                cands.append(os.path.join(root, f))
    for shp in cands:
        try:
            g = gpd.read_file(shp)
            if g.crs is None:
                # 优先假定为 WGS84，可按需要修改
                pass
            geom_types = set(g.geom_type.str.upper().dropna().unique().tolist())
            if any(t in {"LINESTRING","MULTILINESTRING"} for t in geom_types):
                return shp
        except Exception:
            continue
    return None

road_shp = find_road_shp(BASE)
roads = None
if road_shp:
    roads = gpd.read_file(road_shp)
    try:
        if roads.crs is None or roads.crs.to_epsg() != 4326:
            roads = roads.to_crs(epsg=4326)
    except Exception:
        pass

# 基站坐标
pts = gdf.dropna(subset=["lon","lat"]).copy()
center_lat = float(pts["lat"].median()) if not pts.empty else 23.1291
center_lon = float(pts["lon"].median()) if not pts.empty else 113.2644

m = folium.Map(location=[center_lat, center_lon], zoom_start=11, tiles="CartoDB positron")

# 路网图层
if roads is not None and not roads.empty:
    folium.GeoJson(
        roads.to_json(),
        name="Road Network",
        style_function=lambda x: {"color": "#1f78b4", "weight": 1, "opacity": 0.6}
    ).add_to(m)
else:
    print("未找到路网线要素 shapefile（LineString/MultiLineString）。已仅展示基站。")

# 基站点图层
for _, r in pts.iterrows():
    folium.CircleMarker(
        location=[float(r["lat"]), float(r["lon"])],
        radius=3,
        color="#e31a1c",
        fill=True,
        fill_opacity=0.8,
    ).add_to(m)

folium.LayerControl().add_to(m)

out_map = os.path.join(BASE, "network_overview.html")
m.save(out_map)
print("已保存路网+基站可视化:", out_map)

未找到路网线要素 shapefile（LineString/MultiLineString）。已仅展示基站。
已保存路网+基站可视化: data\network_overview.html


In [9]:
import os
import pandas as pd
import numpy as np
import geopandas as gpd
from shapely.geometry import Point
import folium

# 1) 选择一个快递员（UID）
if "UID" not in df_traj.columns:
    raise RuntimeError("未找到列 `UID`，请确认轨迹表包含快递员标识。")

uid = int(df_traj["UID"].iloc[0])  # 可改成指定的 UID，如 169649
print("示例快递员 UID:", uid)

# 2) 展开该快递员 24 小时的基站序列（确保严格按小时顺序）
hour_cols = [c for c in df_traj.columns if c.startswith("cgi_")]
if len(hour_cols) == 0:
    raise RuntimeError("未发现以 `cgi_` 开头的小时列，请确认轨迹数据格式。")

row = df_traj.loc[df_traj["UID"] == uid, hour_cols]
if row.empty:
    raise RuntimeError(f"未找到 UID={uid} 的轨迹行。")
row = row.iloc[0]

def parse_hour(col):
    try:
        return int(col.split("_")[1])
    except Exception:
        return np.nan

long_df = (
    row.rename(index=parse_hour)
       .reset_index()
       .rename(columns={"index":"hour", 0:"cgi"})
       .dropna(subset=["hour"])
       .astype({"hour": int})
       .sort_values("hour")
)

# 标准化编码类型，便于与 gdf 的 cgi_id 对齐（不丢弃重复，保持顺序）
long_df["cgi"] = long_df["cgi"].astype(str).str.strip().str.replace(r"\.0$", "", regex=True)

# 3) 通过基站矢量表 gdf 获得每小时点的经纬度（保持顺序）
if "cgi_id" not in gdf.columns or not {"lon","lat"}.issubset(gdf.columns):
    raise RuntimeError("矢量基站表 gdf 需包含 `cgi_id`, `lon`, `lat` 列。")

gdf_key = gdf[["cgi_id", "lon", "lat"]].copy()
gdf_key["cgi_id"] = gdf_key["cgi_id"].astype(str).str.strip().str.replace(r"\.0$", "", regex=True)

pts_df = long_df.merge(gdf_key, left_on="cgi", right_on="cgi_id", how="left").sort_values("hour")
pts_df = pts_df.dropna(subset=["lon","lat"]).copy()

print(f"该快递员共有 {len(long_df)} 个小时记录，其中成功映射到基站坐标的点数: {len(pts_df)}")
if len(pts_df) < 2:
    raise RuntimeError("可用点少于 2 个，无法绘制轨迹，请检查编码映射或选择其他 UID。")

# 4) 读取/自动发现路网（LineString/MultiLineString）
def find_road_shp(base_dir):
    cands = []
    for root, _, files in os.walk(base_dir):
        for f in files:
            if f.lower().endswith(".shp"):
                cands.append(os.path.join(root, f))
    for shp in cands:
        try:
            g = gpd.read_file(shp)
            geom_types = set(g.geom_type.str.upper().dropna().unique().tolist())
            if any(t in {"LINESTRING","MULTILINESTRING"} for t in geom_types):
                return shp
        except Exception:
            continue
    return None

road_shp = find_road_shp(BASE)
roads = None
if road_shp:
    roads = gpd.read_file(road_shp)
    try:
        if roads.crs is None or roads.crs.to_epsg() != 4326:
            roads = roads.to_crs(epsg=4326)
    except Exception:
        pass

# 5) 生成点几何（保持顺序）
pts_gdf = gpd.GeoDataFrame(
    pts_df[["hour","lon","lat"]].copy(),
    geometry=[Point(xy) for xy in zip(pts_df["lon"], pts_df["lat"])],
    crs="EPSG:4326"
).sort_values("hour").reset_index(drop=True)

# 6) 地图匹配：逐点投影到最近路段（若无路网，则直接用原始点序列）
def snap_points_to_roads(points_gdf, roads_gdf):
    if roads_gdf is None or roads_gdf.empty:
        print("未发现路网，将跳过地图匹配，使用原始点连线。")
        out = points_gdf.copy()
        out["geometry_matched"] = out.geometry.values
        return out

    sidx = roads_gdf.sindex if hasattr(roads_gdf, "sindex") else None
    matched_geoms = []
    for pt in points_gdf.geometry:
        # 候选索引：有 sindex 用最近包围盒，否则全表
        if sidx is not None:
            try:
                cand_idx = list(sidx.query(pt.buffer(0.01).bounds))
            except Exception:
                cand_idx = []
        else:
            cand_idx = []

        cand = roads_gdf.geometry if not cand_idx else roads_gdf.iloc[cand_idx].geometry
        # 选最近线并投影
        dists = cand.distance(pt)
        nearest_idx = cand.index[dists.values.argmin()]
        line = roads_gdf.geometry.loc[nearest_idx]
        matched_geoms.append(line.interpolate(line.project(pt)))

    out = points_gdf.copy()
    out["geometry_matched"] = matched_geoms
    return out

pts_matched = snap_points_to_roads(pts_gdf, roads)

# 7) 在底图上按“小时顺序”连线（用全部点，不仅首尾）
center_lat = float(pts_matched["lat"].median())
center_lon = float(pts_matched["lon"].median())
m = folium.Map(location=[center_lat, center_lon], zoom_start=12, tiles="CartoDB positron")

line_coords = [(p.y, p.x) for p in pts_matched.sort_values("hour")["geometry_matched"].tolist()]
folium.PolyLine(locations=line_coords, color="#ff7f00", weight=4, opacity=0.9).add_to(m)

# 可选：标注起点终点
folium.CircleMarker(location=line_coords[0], radius=5, color="#2ca02c", fill=True, fill_opacity=1.0, tooltip="起点").add_to(m)
folium.CircleMarker(location=line_coords[-1], radius=5, color="#d62728", fill=True, fill_opacity=1.0, tooltip="终点").add_to(m)

out_html = os.path.join(BASE, f"courier_{uid}_matched_route.html")
m.save(out_html)
print("已保存单快递员地图匹配轨迹:", out_html)

示例快递员 UID: 169649
该快递员共有 24 个小时记录，其中成功映射到基站坐标的点数: 24
未发现路网，将跳过地图匹配，使用原始点连线。
已保存单快递员地图匹配轨迹: data\courier_169649_matched_route.html
