In [9]:
# ===== Cell 1: 设置 =====
import pandas as pd
import numpy as np
from pathlib import Path

FREQ = "3min"
OUT_DIR = Path("outputs_fast")
OUT_DIR.mkdir(parents=True, exist_ok=True)

# 路径（改成你的文件）
TOMTOM_CSV = r"C:/Users/elvinli/OneDrive/CodeProjects/TomTom_data_20_24Aug2025.csv"
VESSEL_FILE = r"C:/Users/elvinli/OneDrive/CodeProjects/Vesselposition_data_20_24Aug2025.csv"

# CSV 解析引擎：装了 pyarrow 就用它，否则用 "c"
CSV_ENGINE = "c"


In [10]:
# ===== Cell 2: TomTom（行内均值 + 分块聚合，静态指标） =====
def _mean_from_inner_csv(s: str) -> float:
    """从 'id,traffic_level\n...' 字符串快速求 traffic_level 的均值（不展开成大表）。"""
    if not isinstance(s, str) or not s:
        return np.nan
    i = s.find("\n")
    if i == -1:
        return np.nan
    total = 0.0
    n = 0
    for line in s[i+1:].splitlines():
        if not line:
            continue
        try:
            total += float(line.rsplit(",", 1)[-1])
            n += 1
        except Exception:
            continue
    return (total / n) if n else np.nan

def build_tomtom_3min_static(file_path: str,
                             out_path=OUT_DIR / "tomtom_3min.parquet",
                             chunksize: int = 2000,
                             engine: str = "pyarrow"):
    ts_chunks, mean_chunks = [], []
    for chunk in pd.read_csv(file_path, usecols=["time", "data"],
                             chunksize=chunksize, engine=engine, low_memory=False):
        t = pd.to_datetime(chunk["time"], utc=True, errors="coerce").dt.tz_convert(None)
        means = [_mean_from_inner_csv(s) for s in chunk["data"].tolist()]
        ts_chunks.append(t.values)
        mean_chunks.append(np.array(means, dtype="float64"))

    if not ts_chunks:
        raise ValueError("TomTom CSV 未解析到数据。")

    df = pd.DataFrame({
        "timestamp": np.concatenate(ts_chunks),
        "traffic_level_mean": np.concatenate(mean_chunks)
    }).dropna(subset=["timestamp"]).sort_values("timestamp")

    # 直接重采样到 3min
    df_3min = (df.set_index("timestamp")
                 .resample(FREQ)["traffic_level_mean"]
                 .mean().to_frame().reset_index())

    df_3min.to_parquet(out_path, index=False)
    print(f"[TomTom] 保存：{out_path}  形状={df_3min.shape}")
    return df_3min



In [11]:
# 只保留：timestamp, vessel_count, vessel_avg_speed
def build_vessel_3min_static(file_path: str,
                             out_path=OUT_DIR / "vessel_3min.parquet",
                             convert_speed_to_mps=True,
                             unique_boats=False,
                             chunksize: int = 2_000_000,
                             engine: str = "pyarrow",
                             sep: str = ","):
    """
    针对你提供的列名进行“硬匹配”：
      - 时间列: 'upload-timestamp'（形如 2025-08-20T06:34:08.177626Z）
      - 速度列: 'speed-in-centimeters-per-second'（cm/s）
    生成每3分钟的 vessel_count 与 vessel_avg_speed（默认 m/s）
    """

    # 先读头确认列名存在
    head = pd.read_csv(file_path, nrows=50, engine=engine, sep=sep, low_memory=False, on_bad_lines="skip")
    cols = list(head.columns)

    # --- 硬匹配列名（来自你发的列表） ---
    TCOL = "upload-timestamp"
    SCOL = "speed-in-centimeters-per-second"
    ICOL = None
    if unique_boats:
        # 如果你要按唯一船只计数，可把这里改为合适的ID列，如 'mmsi-number' 或 'id'
        ICOL = "id" if "id" in cols else ("mmsi-number" if "mmsi-number" in cols else None)

    if TCOL not in cols:
        raise ValueError(f"未找到时间列 '{TCOL}'；现有列名示例：{cols[:10]}")
    if SCOL not in cols:
        raise ValueError(f"未找到速度列 '{SCOL}'；现有列名示例：{cols[:10]}")
    if unique_boats and ICOL is None:
        raise ValueError("unique_boats=True 但未找到合适的船只ID列（建议 'id' 或 'mmsi-number'）。")

    usecols = [TCOL, SCOL] + ([ICOL] if unique_boats else [])

    # 全局累计器
    count_acc, speed_sum_acc, speed_cnt_acc = {}, {}, {}

    # 专门针对 '...Z' 的 ISO8601 格式，显式给 format，保证不跑偏到 1970
    TS_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"   # 你的样例就是这个格式

    for chunk in pd.read_csv(file_path, usecols=usecols, chunksize=chunksize,
                             engine=engine, sep=sep, low_memory=False, on_bad_lines="skip"):
        # —— 时间解析（UTC）——
        # 先按固定格式解析；如果有极少数缺少毫秒位的，退一步用通用解析兜底
        ts = pd.to_datetime(chunk[TCOL], format=TS_FORMAT, utc=True, errors="coerce")
        miss = ts.isna()
        if miss.any():
            ts2 = pd.to_datetime(chunk.loc[miss, TCOL], utc=True, errors="coerce")
            ts.loc[miss] = ts2

        ts = ts.dt.tz_convert(None)  # 转为naive UTC，避免后续 join 踩坑
        chunk = chunk.assign(timestamp=ts).dropna(subset=["timestamp"])

        # —— 速度处理 ——（cm/s → m/s 可选）
        sp = pd.to_numeric(chunk[SCOL], errors="coerce")
        if convert_speed_to_mps:
            sp = sp / 100.0
        chunk["speed"] = sp

        # 3min 桶
        chunk["bin"] = chunk["timestamp"].dt.floor(FREQ)

        # 计数：记录条数或唯一船只数
        if unique_boats:
            g_count = chunk.groupby("bin")[ICOL].nunique()
        else:
            g_count = chunk.groupby("bin").size()

        g_sum = chunk.groupby("bin")["speed"].sum(min_count=1)
        g_n   = chunk.groupby("bin")["speed"].count()

        # 累计
        for k, v in g_count.items():
            count_acc[k] = count_acc.get(k, 0) + int(v)
        for k, v in g_sum.items():
            speed_sum_acc[k] = speed_sum_acc.get(k, 0.0) + float(v)
        for k, v in g_n.items():
            speed_cnt_acc[k] = speed_cnt_acc.get(k, 0) + int(v)

    # 汇总为最终 DataFrame
    bins = sorted(set(count_acc) | set(speed_sum_acc) | set(speed_cnt_acc))
    vessel_count = pd.Series([count_acc.get(b, 0) for b in bins], index=bins, dtype="int64").rename("vessel_count")
    vessel_avg_speed = (pd.Series([speed_sum_acc.get(b, np.nan) for b in bins], index=bins) /
                        pd.Series([speed_cnt_acc.get(b, 0) for b in bins], index=bins)).rename("vessel_avg_speed")

    df_3min = pd.concat([vessel_count, vessel_avg_speed], axis=1).reset_index().rename(columns={"index": "timestamp"})
    df_3min.to_parquet(out_path, index=False)
    print(f"[Vessel] 保存：{out_path}  形状={df_3min.shape}")
    return df_3min


In [12]:
# ===== Cell 4: 执行 =====
tt = build_tomtom_3min_static(TOMTOM_CSV, engine=CSV_ENGINE)
vs = build_vessel_3min_static(VESSEL_FILE, engine=CSV_ENGINE)

display(tt.head())
display(vs.head())


[TomTom] 保存：outputs_fast\tomtom_3min.parquet  形状=(2188, 2)
[Vessel] 保存：outputs_fast\vessel_3min.parquet  形状=(1802, 3)


Unnamed: 0,timestamp,traffic_level_mean
0,2025-08-20 06:36:00,0.825112
1,2025-08-20 06:39:00,0.816597
2,2025-08-20 06:42:00,0.810953
3,2025-08-20 06:45:00,0.800038
4,2025-08-20 06:48:00,0.800142


Unnamed: 0,timestamp,vessel_count,vessel_avg_speed
0,2025-08-20 06:27:00,12,2.408333
1,2025-08-20 06:30:00,30,1.023333
2,2025-08-20 06:33:00,3903,2.84043
3,2025-08-20 06:36:00,5821,3.025631
4,2025-08-20 06:39:00,5900,2.931915
