In [1]:
import os
import pandas as pd
import numpy as np
import warnings
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
from matplotlib import cm

import networkx as nx
import geopandas as gpd
from shapely.geometry import Point
import transbigdata as tbd
from geopy.distance import geodesic

# 关闭警告提醒
warnings.filterwarnings("ignore")
# 显示所有列
pd.set_option('display.max_columns', None)

todo 读取数据

In [3]:
# 读取路径重构的结果
folder_path = r'C:\01 毕业论文\7.论文代码和结果\硕士毕业论文代码\应用案例分析\车辆路径重构\2.重构车辆路径\有效处理结果\path_result.csv'
target_data = pd.read_csv(folder_path, encoding='gbk')
# 时间数据格式转换
target_data = target_data.rename(columns={'datetime': 'time_point'})
target_data['time_point'] = pd.to_datetime(target_data['time_point'])
origin_record = len(target_data)

print('原始数据量:', origin_record)
target_data.head(3)

In [4]:
# 读取路网拓扑的连接信息
edges = pd.read_csv(r'C:\01 毕业论文\7.论文代码和结果\硕士毕业论文代码\应用案例分析\车辆路径重构\1.构建路网拓扑结构\center_edges.csv', encoding='gbk')

# 将索引变为新的属性列
edges.reset_index(drop=False, inplace=True)

# 提取相邻节点的连接边属性
# start_node = 'LINK-39092939-637199088-637199090'
# end_node = '2-39092939'
# roadtype, length = edges[(edges['start_node'] == start_node) & (edges['end_node'] == end_node)][['road_type', 'road_length']].values[0]
# print(f"{start_node} -> {end_node}  roadtype:{roadtype} roadlength:{length}")

# 打印示例
edges.head(3)

todo 标记时间段

In [5]:
# 标注时间段

def categorize_time_period(time):
    """标记时间段（高峰期、平峰期、低峰期）"""
    if time.hour in range(7, 9) or time.hour in range(17, 19):  # 高峰期
        return 'high_peak'
    elif time.hour in range(22, 24) or time.hour in range(0, 7):  # 低峰期
        return 'low_peak'
    else:
        return 'off_peak'  # 平峰期

target_data['time_period'] = target_data['time_point'].apply(categorize_time_period)
target_data.head(3)

todo 标记道路类型

In [6]:
# 匹配道路类型

# 1. 生成 `start_node` 和 `end_node` 列
target_data['start_node'] = target_data['posname']
target_data['end_node'] = target_data['posname'].shift(-1)  # 取下一行的 posname 作为 end_node

# 2. 确保相同 travel_id 才能匹配
target_data['valid_pair'] = target_data['travel_id'] == target_data['travel_id'].shift(-1)

# 3. 过滤掉 `valid_pair == False` 的行（防止跨 travel_id 匹配）
valid_target_data = target_data[target_data['valid_pair']].copy()

# 4. 批量匹配 `road_type` 和 `road_length`
matched_data = valid_target_data.merge(
    edges, 
    left_on=['start_node', 'end_node'], 
    right_on=['start_node', 'end_node'], 
    how='left'
)

# 5. 将 `road_type` 和 `road_length` 赋值回原表
target_data.loc[matched_data.index, 'road_type'] = matched_data['road_type']
target_data.loc[matched_data.index, 'edge_length'] = matched_data['road_length']
target_data.loc[matched_data.index, 'edge_index'] = matched_data['index']   # 记录 edges 中的索引

# 6. 处理未匹配到的行，即上一次出行的终点
target_data['road_type'] = target_data['road_type'].fillna(target_data['road_type'].shift(1))  # 继承上一行的值
target_data['edge_length'].fillna(0, inplace=True)
target_data['edge_index'].fillna(-1, inplace=True)  # 未匹配的边设为 -1

# 7. 清理临时列
target_data.drop(columns=['start_node', 'end_node', 'valid_pair'], inplace=True)

# 8. 剔除road_type=NaN对应travel_id的所有数据
nan_rows = target_data[target_data['road_type'].isna()]  # 筛选出 road_type 为 NaN 的行
invalid_travel_ids = nan_rows['travel_id'].unique()      # 获取包含 NaN 的 travel_id 列表
target_data = target_data[~target_data['travel_id'].isin(invalid_travel_ids)]    # 删除对应 travel_id 所有行

print(f"匹配后的数据: {len(target_data)} / {origin_record}", '占比: {:.2%}'.format(len(target_data) / origin_record))
target_data.head(3)

In [7]:
total_travel = target_data['travel_id'].drop_duplicates().tolist()
print('出行次数:', len(total_travel))

todo 保存结果

In [8]:
# 定义空间类型
target_data['in_urban_area'] = True
target_data['edge_index'] = target_data['edge_index'].astype(int)
target_data['edge_length'] = target_data['edge_length'].round(2)

# 分离时间和日期列  because: .shp文件不能识别空格
target_data['date'] = target_data['time_point'].dt.date.astype(str)
target_data['time'] = target_data['time_point'].dt.time.astype(str)

# 选择字段
result = target_data[['device_id', 'travel_id', 'date', 'time', 'bayonetname', 'posname', 'longitude', 'latitude', 
                      'distance', 'c_distance', 'edge_length', 'edge_index', 'time_period', 'road_type', 'in_urban_area']]
# 重设索引
result.reset_index(drop=True, inplace=True)

# 保存结果
result.to_csv('test_data.csv', index=False, encoding='gbk')
result.head(3)

In [9]:
# 构建目标数据的 GeoDataFrame
geometry = [Point(xy) for xy in zip(target_data['longitude'], target_data['latitude'])]
geo_data = gpd.GeoDataFrame(target_data, crs='EPSG:4326', geometry=geometry)

# 保存为 Shapefile 文件
geo_data.to_file(r'C:\01 毕业论文\7.论文代码和结果\硕士毕业论文代码\应用案例分析\车辆速度估计\test_data.shp', driver='ESRI Shapefile', encoding='utf-8')
# geo_data.to_file('F:\研究生毕业论文\Map\测试数据/test_data.shp', driver='ESRI Shapefile', encoding='utf-8')
geo_data.head(3)

# 提取特定时间段的车辆路径

In [10]:
# 读取已经处理好的测试数据
test_data = pd.read_csv('test_data.csv', encoding='gbk')
test_data['time'] = pd.to_datetime(test_data['time'])
print('数据量:', len(test_data))

In [8]:
# 提取4:00/7:00/12:00的研究数据

def categorize_hour(time):
    """标记指定的小时数据"""
    target_hour = [4, 7, 12]
    if time.hour in target_hour:  # 高峰期
        return True
    else:
        return False

test_data['study_hour'] = test_data['time'].apply(categorize_hour)
study_data = test_data[test_data['study_hour'] == True]
study_data.drop(columns=['study_hour'], inplace=True)
study_data.reset_index(drop=True, inplace=True)

print(f'在研究时间内的数据: {len(study_data)}', '占比: {:.2%}'.format(len(study_data) / len(test_data)))
study_data.head(3)

In [9]:
study_data.to_csv('study_data.csv', index=False, encoding='gbk')