In [1]:
from tqdm import tqdm  # 用于显示进度条，可选
import json
import ast
import re
from datetime import datetime, time
import pandas as pd
import geopandas as gpd
from shapely import wkt
from shapely.geometry import Point, Polygon, LineString, MultiLineString, MultiPoint, GeometryCollection
import statistics


import utils.data_loader

import importlib
importlib.reload(utils.data_loader)  # 重新加载修改后的模块

<module 'utils.data_loader' from 'C:\\DCLASS\\myJupyter\\paper2\\application\\shanghai_visualization\\utils\\data_loader.py'>

In [2]:
from py2neo import Node, Relationship, Graph, NodeMatcher, RelationshipMatcher, Subgraph
g = Graph('bolt://localhost:7687',auth=('neo4j','your password'), name = 'trkg')

In [3]:
# 读取 CSV 文件
file_path = r'D:\paper2\result\batch_time(..].csv'
batch_data = utils.data_loader.CSV_load(file_path)
# 转换为 datetime 类型
batch_data['start_datetime'] = pd.to_datetime(batch_data['start_datetime'])

# 加入节假日列表（清明节）
holiday_dates = pd.to_datetime(["2015-04-04", "2015-04-05", "2015-04-06"])

# 提取日期和星期几
batch_data['date'] = batch_data['start_datetime'].dt.date
batch_data['weekday'] = batch_data['start_datetime'].dt.weekday  # 0=周一, 6=周日
# 分类函数
def classify_day(row):
    if pd.to_datetime(row['date']) in holiday_dates:
        return 'holiday'
    elif row['weekday'] >= 5:
        return 'weekend'
    else:
        return 'weekday'

# 应用分类
batch_data['day_type'] = batch_data.apply(classify_day, axis=1)

# 转换时间字段为 time 对象
batch_data['start_time_only'] = pd.to_datetime(batch_data['start_time_only'], format='%H:%M:%S').dt.time
batch_data['end_time_only'] = pd.to_datetime(batch_data['end_time_only'], format='%H:%M:%S').dt.time

batch_data

Unnamed: 0,start_datetime,end_datetime,batch,start_time_only,end_time_only,date,weekday,day_type
0,2015-04-01 00:00:00,2015-04-01 00:05:00,batch_1,00:00:00,00:05:00,2015-04-01,2,weekday
1,2015-04-01 00:05:00,2015-04-01 00:10:00,batch_2,00:05:00,00:10:00,2015-04-01,2,weekday
2,2015-04-01 00:10:00,2015-04-01 00:15:00,batch_3,00:10:00,00:15:00,2015-04-01,2,weekday
3,2015-04-01 00:15:00,2015-04-01 00:20:00,batch_4,00:15:00,00:20:00,2015-04-01,2,weekday
4,2015-04-01 00:20:00,2015-04-01 00:25:00,batch_5,00:20:00,00:25:00,2015-04-01,2,weekday
...,...,...,...,...,...,...,...,...
8635,2015-04-30 23:35:00,2015-04-30 23:40:00,batch_8636,23:35:00,23:40:00,2015-04-30,3,weekday
8636,2015-04-30 23:40:00,2015-04-30 23:45:00,batch_8637,23:40:00,23:45:00,2015-04-30,3,weekday
8637,2015-04-30 23:45:00,2015-04-30 23:50:00,batch_8638,23:45:00,23:50:00,2015-04-30,3,weekday
8638,2015-04-30 23:50:00,2015-04-30 23:55:00,batch_8639,23:50:00,23:55:00,2015-04-30,3,weekday


In [4]:
def filter_batches_by_time_slot(batch_data, time_slot_id):
    # 定义 5 个时间段
    time_slots = {
        1: ("00:00:00", "06:30:00"),
        2: ("06:30:00", "09:00:00"),
        3: ("09:00:00", "17:00:00"),
        4: ("17:00:00", "19:30:00"),
        5: ("19:30:00", "23:59:59")
    }

    if time_slot_id not in time_slots:
        raise ValueError("time_slot_id must be an integer from 1 to 5.")

    start_time_str, end_time_str = time_slots[time_slot_id]

    start_time = datetime.strptime(start_time_str, "%H:%M:%S").time()
    end_time = datetime.strptime(end_time_str, "%H:%M:%S").time()

    # 过滤符合时间段的批次
    if start_time > end_time:  # 跨午夜情况（本例无）
        filtered_batches = batch_data[
            (batch_data['start_time_only'] >= start_time) |
            (batch_data['end_time_only'] <= end_time)
        ]
    else:
        filtered_batches = batch_data[
            (batch_data['start_time_only'] >= start_time) &
            (batch_data['end_time_only'] <= end_time)
        ]

    print(f"✅ [DEBUG] start_time: {start_time}, end_time: {end_time}")
    print(f"✅ [DEBUG] filtered_batches:\n{filtered_batches}")

    return filtered_batches['batch'].to_list()

In [5]:
# 例如：获取“工作日 + 早高峰（7:00–9:00）”的批次
# print(f"\nweekday_batches:\n")
# weekday_batches = filter_batches_by_time_slot(batch_data[batch_data['day_type'] == 'weekday'], 3)
print(f"\nweekend_batches:\n")
weekend_batches = filter_batches_by_time_slot(batch_data[batch_data['day_type'] == 'weekend'], 1)
# print(f"\nholiday_batches:\n")
# holiday_batches = filter_batches_by_time_slot(batch_data[batch_data['day_type'] == 'holiday'], 1)



weekend_batches:

✅ [DEBUG] start_time: 00:00:00, end_time: 06:30:00
✅ [DEBUG] filtered_batches:
          start_datetime        end_datetime       batch start_time_only  \
2880 2015-04-11 00:00:00 2015-04-11 00:05:00  batch_2881        00:00:00   
2881 2015-04-11 00:05:00 2015-04-11 00:10:00  batch_2882        00:05:00   
2882 2015-04-11 00:10:00 2015-04-11 00:15:00  batch_2883        00:10:00   
2883 2015-04-11 00:15:00 2015-04-11 00:20:00  batch_2884        00:15:00   
2884 2015-04-11 00:20:00 2015-04-11 00:25:00  batch_2885        00:20:00   
...                  ...                 ...         ...             ...   
7273 2015-04-26 06:05:00 2015-04-26 06:10:00  batch_7274        06:05:00   
7274 2015-04-26 06:10:00 2015-04-26 06:15:00  batch_7275        06:10:00   
7275 2015-04-26 06:15:00 2015-04-26 06:20:00  batch_7276        06:15:00   
7276 2015-04-26 06:20:00 2015-04-26 06:25:00  batch_7277        06:20:00   
7277 2015-04-26 06:25:00 2015-04-26 06:30:00  batch_7278        06

In [7]:
# 转换 `batch_list` 为 Cypher 查询格式
batch_list = weekend_batches
batch_list_str = ', '.join(f"'{batch}'" for batch in batch_list)
batch_list_str

"'batch_2881', 'batch_2882', 'batch_2883', 'batch_2884', 'batch_2885', 'batch_2886', 'batch_2887', 'batch_2888', 'batch_2889', 'batch_2890', 'batch_2891', 'batch_2892', 'batch_2893', 'batch_2894', 'batch_2895', 'batch_2896', 'batch_2897', 'batch_2898', 'batch_2899', 'batch_2900', 'batch_2901', 'batch_2902', 'batch_2903', 'batch_2904', 'batch_2905', 'batch_2906', 'batch_2907', 'batch_2908', 'batch_2909', 'batch_2910', 'batch_2911', 'batch_2912', 'batch_2913', 'batch_2914', 'batch_2915', 'batch_2916', 'batch_2917', 'batch_2918', 'batch_2919', 'batch_2920', 'batch_2921', 'batch_2922', 'batch_2923', 'batch_2924', 'batch_2925', 'batch_2926', 'batch_2927', 'batch_2928', 'batch_2929', 'batch_2930', 'batch_2931', 'batch_2932', 'batch_2933', 'batch_2934', 'batch_2935', 'batch_2936', 'batch_2937', 'batch_2938', 'batch_2939', 'batch_2940', 'batch_2941', 'batch_2942', 'batch_2943', 'batch_2944', 'batch_2945', 'batch_2946', 'batch_2947', 'batch_2948', 'batch_2949', 'batch_2950', 'batch_2951', 'batc

In [8]:
honeycomb_ids_ls = list(range(0, 204011))

In [9]:
import os
import gc

save_root  = r"C:\DCLASS\myJupyter\paper2\shanghai_data_save\weekend_batches\1"
local_avg_dir = os.path.join(save_root, "local_avg")
global_avg_dir = os.path.join(save_root, "global_avg")

# 创建保存目录
os.makedirs(local_avg_dir, exist_ok=True)
os.makedirs(global_avg_dir, exist_ok=True)

chunk_size = 50
file_index = 1  # ⬅️ 初始化 file_index
page_limit = 50000  # 分页大小

for i in tqdm(range(0, len(honeycomb_ids_ls), chunk_size), desc="Honeycomb Chunks"):

    # if i == chunk_size*5:
    #     break
        
    chunk = honeycomb_ids_ls[i:i+chunk_size]
    honeycomb_to_df = {}  # 存储每个格网的数据
    global_chunk_data = []  # 当前 chunk 的 global data

    for batch in batch_list:
        offset = 0
        while True:
            query = """
            MATCH (tp:trajectory_point)
            WHERE tp.batch = $batch
              AND (tp.speed IS NOT NULL OR tp.speed_2 IS NOT NULL)
              AND tp.honeycomb_name IN $honeycombs
            RETURN DISTINCT tp.batch AS batch,
                            tp.honeycomb_name AS honeycomb_name,
                            tp.speed AS speed,
                            tp.speed_2 AS speed_2
            SKIP $offset LIMIT $limit
            """
            params = {
                "batch": batch,#'batch_514'
                "honeycombs": chunk,
                "offset": offset,
                "limit": page_limit
            }
    
            result = g.run(query, parameters=params).data()
            if not result:
                break  # 当前分页无数据，终止分页循环
    
            df = pd.DataFrame(result)

            # 合并 speed 和 speed_2 列
            df['speed'] = df['speed'].fillna(df['speed_2'])
            df.drop(columns=['speed_2'], inplace=True)

            # ✅ 过滤掉不在 [0.695, 40] 闭区间内的速度
            df = df[(df['speed'] >= 0.695) & (df['speed'] <= 40)]

            if not df.empty:
                global_chunk_data.append(df)  # ⬅️ 临时 global 数据本轮使用
        
                # 将数据按 honeycomb_name 分开存入字典
                for h_id, group in df.groupby('honeycomb_name'):
                    if h_id not in honeycomb_to_df:
                        honeycomb_to_df[h_id] = []
                    honeycomb_to_df[h_id].append(group)

            offset += page_limit  # 下一页

    # 👉 处理 local（一个格网一个文件）
    for h_id, df_list in honeycomb_to_df.items():
        df_h = pd.concat(df_list, ignore_index=True)
        df_h_unique = df_h.drop_duplicates(subset=['batch', 'speed'])
        local_avg = (
            df_h_unique.groupby('batch')
            .agg(local_avg_speed=('speed', lambda x: round(x.mean(), 6)))
            .reset_index()
        )
        local_avg['honeycomb_name'] = h_id

        file_path = os.path.join(local_avg_dir, f"honeycomb_{h_id}.parquet")
        local_avg.to_parquet(file_path, engine="pyarrow", index=False)

    # 🟦 保存当前 chunk 的 global_avg_chunk
    if global_chunk_data:
        df_global_chunk = pd.concat(global_chunk_data, ignore_index=True)
        df_global_chunk = df_global_chunk.drop_duplicates(subset=['honeycomb_name', 'speed'])

        global_avg_chunk = (
            df_global_chunk.groupby('honeycomb_name')
            .agg(global_avg_speed=('speed', lambda x: round(x.mean(), 6)))
            .reset_index()
        )

        chunk_file = os.path.join(global_avg_dir, f"global_avg_chunk_{file_index:04}.parquet")
        global_avg_chunk.to_parquet(chunk_file, engine="pyarrow", index=False)

        file_index += 1  # ⬅️ 每写一个 global chunk 文件就递增编号

    gc.collect()

Honeycomb Chunks: 100%|█████████████████████████████████████████████████████████| 4081/4081 [24:54:43<00:00, 21.98s/it]
