<a href="https://colab.research.google.com/github/ALJLBL/3D-/blob/main/3D%E7%82%B9%E4%BA%91%E7%89%A9%E4%BD%93%E8%B7%9F%E8%B8%AA.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
from google.colab import drive
drive.mount('/content/drive')
!pip install velodyne-decoder > /dev/null

Mounted at /content/drive


In [27]:
"""
LiDAR 点云聚类 + 匈牙利算法轨迹跟踪
================================================================
本脚本示例演示如何在 Colab 环境下：
1. 读取 Velodyne VLP‑16 产生的 .pcap 原始数据；
2. 对每帧点云做 ROI 裁剪、DBSCAN 初步聚类；
3. 用几何约束将被 DBSCAN 过分切分的小簇二次合并；
4. 以质心为观测，跨帧用匈牙利算法（全局最优）进行目标匹配与轨迹维护；
5. 最终用 Plotly 交互式 3D 折线图可视化每条轨迹。

依赖：numpy / scipy / scikit‑learn / plotly / velodyne‑decoder
建议版本：numpy≥1.26, scipy≥1.11, sklearn≥1.4.

作者：Gong
"""

# 标准库 & 第三方库导入
import numpy as np
import plotly.graph_objs as go
from dataclasses import dataclass, field
from typing import List, Dict, Tuple
from sklearn.cluster import DBSCAN
from sklearn.neighbors import KDTree
from scipy.sparse import coo_matrix
from scipy.sparse.csgraph import connected_components
from scipy.optimize import linear_sum_assignment
import velodyne_decoder as vd  # pip install velodyne-decoder


# ──────────────────────────────
# 0. 全局参数配置
#    —— 只需修改 Cfg 类即可快速调参
# ──────────────────────────────
class Cfg:
    """算法与 ROI 参数统一放这里，便于实验时集中修改"""
    # ROI（感兴趣区域）——先在这里裁剪掉车身/远点，可显著减少后续耗时
    x_range = (0.0, 8.5)   # 车辆前方 0~8.5 m
    y_range = (-15.0, 30.) # 左右 −15~30 m
    z_range = (-1.4, 4.0)  # 高度 −1.4~4 m

    # DBSCAN 聚类参数
    db_eps = 0.55          # 邻域半径 (m)
    db_min_samples = 8     # 最小邻居点数

    # 二次合并参数
    merge_dist = 1.5           # 两簇最近点距离 ≤ 1.5 m 才考虑合并
    merge_ang_parallel = 5.0   # 主轴夹角 < 5° 认为近似平行
    merge_ang_perp = 5.0       # 主轴接近 90°±5° 认为近似垂直

    # 跟踪参数
    track_max_dist = 1.0   # 跨帧允许的最大质心位移 (m)
    track_max_age = 5      # 轨迹连续丢失 n 帧后删除


# ──────────────────────────────
# 1. 通用辅助函数
# ──────────────────────────────

def crop_xyz(xyz: np.ndarray) -> np.ndarray:
    """按照 Cfg 中的 ROI 取子集"""
    xr, yr, zr = Cfg.x_range, Cfg.y_range, Cfg.z_range
    mask = (
        (xyz[:, 0] >= xr[0]) & (xyz[:, 0] <= xr[1]) &
        (xyz[:, 1] >= yr[0]) & (xyz[:, 1] <= yr[1]) &
        (xyz[:, 2] >= zr[0]) & (xyz[:, 2] <= zr[1])
    )
    return xyz[mask]


def principal_components(pts: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
    """计算第一&第三主成分方向；点数不足 3 时返回默认轴"""
    if pts.shape[0] < 3:
        return np.array([1., 0., 0.]), np.array([0., 0., 1.])
    u, _, _ = np.linalg.svd(pts - pts.mean(0), full_matrices=False)
    return u[0], u[2]  # 第一、第三主成分


# ──────────────────────────────
# 2. 数据类定义：Cluster & Track
# ──────────────────────────────
@dataclass(slots=True)
class Cluster:
    """保存单个簇的关键属性"""
    pts: np.ndarray        # (N,3) 点云
    centroid: np.ndarray   # 质心，用于跟踪
    axis: np.ndarray       # 第一主成分方向（单位向量）

    @staticmethod
    def from_indices(xyz: np.ndarray, idxs: np.ndarray) -> "Cluster":
        """由索引列表快速构造簇对象"""
        p = xyz[idxs]
        return Cluster(p, p.mean(0), principal_components(p)[0])


@dataclass
class Track:
    id: int                         # 轨迹编号
    traj: List[np.ndarray] = field(default_factory=list)  # 历史质心序列
    age: int = 0                    # 未匹配帧计数（越大越老）


# ──────────────────────────────
# 3. 初步聚类（DBSCAN）与二次合并
# ──────────────────────────────

def dbscan_clusters(xyz: np.ndarray) -> List[Cluster]:
    """对单帧点云做 DBSCAN，返回簇列表"""
    labels = DBSCAN(eps=Cfg.db_eps, min_samples=Cfg.db_min_samples).fit_predict(xyz)
    # labels 为 −1 … K-1，-1 表示噪声点，这里忽略噪声簇
    return [Cluster.from_indices(xyz, np.where(labels == k)[0])
            for k in range(labels.max() + 1)]


def merge_clusters(clusters: List[Cluster]) -> List[Cluster]:
    """利用几何约束把被 DBSCAN 过细分的相邻小簇再合并"""
    if len(clusters) <= 1:
        return clusters

    # 3.1 先对簇质心建 KD-Tree，找距离阈值内的簇对
    cents = np.vstack([c.centroid for c in clusters])
    kd = KDTree(cents)
    neigh = kd.query_radius(cents, r=Cfg.merge_dist)

    # 3.2 按夹角 & 共面条件筛选可合并边
    edges = []  # 存储 (i,j) 需要连通的簇索引对
    for i, nb in enumerate(neigh):
        for j in nb:
            if j <= i:  # 去重：仅保留上三角
                continue
            a, b = clusters[i], clusters[j]
            # 两簇第一主成分方向夹角
            ang = np.degrees(np.arccos(np.clip(abs(a.axis @ b.axis), -1, 1)))

            merge_flag = False
            if ang < Cfg.merge_ang_parallel:  # 近似平行
                # 用簇 a 的第三主成分作为局部法向量，检查共面
                _, normal = principal_components(a.pts)
                if abs((b.centroid - a.centroid) @ normal) < 0.1:
                    merge_flag = True
            elif abs(ang - 90) < Cfg.merge_ang_perp:  # 近似垂直
                merge_flag = True

            if merge_flag:
                edges.append((i, j))

    if not edges:  # 无需合并
        return clusters

    # 3.3 把可合并边构成无向图，求连通域
    row, col = zip(*edges)
    row_idx = np.array(row + col, dtype=int)
    col_idx = np.array(col + row, dtype=int)

    n_nodes = int(max(len(clusters), row_idx.max() + 1, col_idx.max() + 1))

    graph = coo_matrix(
        (np.ones(len(row_idx), dtype=bool), (row_idx, col_idx)),
        shape=(n_nodes, n_nodes)
    )
    n_comp, labels = connected_components(graph, directed=False)

    # 3.4 连通域内部合并成一个新簇
    merged: List[Cluster] = []
    for cid in range(n_comp):
        idxs = np.where(labels == cid)[0]
        # 过滤掉哑元节点（>len(clusters)-1）
        valid_idxs = [k for k in idxs if k < len(clusters)]
        if len(valid_idxs) == 1:
            merged.append(clusters[valid_idxs[0]])
        else:
            pts = np.concatenate([clusters[k].pts for k in valid_idxs])
            merged.append(Cluster(pts, pts.mean(0), principal_components(pts)[0]))
    return merged


# ──────────────────────────────
# 4. 匈牙利算法跟踪器
# ──────────────────────────────
class HungarianTracker:
    """基于质心 + 匈牙利分配的简单多目标跟踪器"""
    def __init__(self):
        self.trk: Dict[int, Track] = {}
        self.next = 0  # 下一条新轨迹使用的 id

    # —— 主入口 ——
    def update(self, detections: List[np.ndarray]):
        """输入当前帧所有检测质心坐标，更新内部轨迹"""
        # 4.1 所有存活轨迹先 age+1
        for tr in self.trk.values():
            tr.age += 1

        # 4.2 没有历史轨迹：全部新建
        if not self.trk:
            for det in detections:
                self._add(det)
            return

        # 4.3 当前帧无检测：只做清理
        if not detections:
            self._clean()
            return

        # 4.4 构造 |T|×|D| 成本矩阵（欧氏距离）
        ids, last = zip(*[(tid, tr.traj[-1]) for tid, tr in self.trk.items()])
        cost = np.linalg.norm(np.stack(last)[:, None, :] - np.stack(detections)[None, :, :], axis=2)

        row_idx, col_idx = linear_sum_assignment(cost)  # 全局最优匹配

        used_trk, used_det = set(), set()
        # 4.5 更新匹配成功的轨迹
        for r, c in zip(row_idx, col_idx):
            if cost[r, c] < Cfg.track_max_dist:
                tid = ids[r]
                self.trk[tid].traj.append(detections[c])
                self.trk[tid].age = 0
                used_trk.add(tid)
                used_det.add(c)

        # 4.6 未匹配检测 → 新轨迹
        for idx, det in enumerate(detections):
            if idx not in used_det:
                self._add(det)

        # 4.7 清理过老轨迹
        self._clean()

    # —— 内部辅助 ——
    def _add(self, det: np.ndarray):
        """创建新轨迹"""
        self.trk[self.next] = Track(self.next, [det], 0)
        self.next += 1

    def _clean(self):
        """丢弃 age > max_age 的轨迹"""
        dead = [tid for tid, tr in self.trk.items() if tr.age > Cfg.track_max_age]
        for tid in dead:
            self.trk.pop(tid)


# ──────────────────────────────
# 5. 数据读取与主流程
# ──────────────────────────────

def read_frames(pcap_path: str):
    """兼容不同版本 velodyne-decoder 的 read_pcap 接口"""
    try:
        return vd.read_pcap(pcap_path, as_pcl_structs=True)
    except TypeError:
        return vd.read_pcap(pcap_path)


def run_pipeline(pcap_path: str) -> Dict[int, Track]:
    """主流程：逐帧处理 pcap，输出完整轨迹字典"""
    tracker = HungarianTracker()

    for f_idx, (stamp, pts) in enumerate(read_frames(pcap_path)):
        # Velodyne struct → (N,3) float32
        xyz = np.column_stack((pts['x'], pts['y'], pts['z'])).astype(np.float32)
        xyz = crop_xyz(xyz)
        print(f"frame {f_idx:04d}: {len(xyz)} pts")
        if xyz.size == 0:
            continue  # 当前帧无兴趣点

        # 5.1 初步 DBSCAN + 合并
        clusters = merge_clusters(dbscan_clusters(xyz))

        # 5.2 更新跟踪器
        detections = [c.centroid for c in clusters]
        tracker.update(detections)

    return tracker.trk


# ──────────────────────────────
# 6. 轨迹可视化函数
# ──────────────────────────────

def plot_tracks(tracks: Dict[int, Track]):
    """Plotly 交互式 3D 折线图"""
    traces = []
    for tid, tr in tracks.items():
        arr = np.vstack(tr.traj)
        traces.append(
            go.Scatter3d(
                x=arr[:, 0], y=arr[:, 1], z=arr[:, 2],
                mode="lines+markers",
                name=f"ID {tid}",
                marker=dict(size=3),
                line=dict(width=2)
            )
        )
    fig = go.Figure(
        data=traces,
        layout=dict(
            scene=dict(aspectmode="data"),
            title="3D Trajectories (Hungarian Tracker)",
            margin=dict(l=0, r=0, b=0, t=30)
        )
    )
    fig.show()

In [28]:
# ──────────────────────────────
# 7. 示例入口（Colab 下直接运行）
# ──────────────────────────────
if __name__ == "__main__":
    # 修改为自己的 pcap 路径：可放在 /content 或 Google Drive
    pcap_file = "/content/drive/MyDrive/日本大学/高梨研/data/2022-10-28-13-25-46_Velodyne-VLP-16-Data.pcap"

    tracks = run_pipeline(pcap_file)
    print(f"共生成 {len(tracks)} 条轨迹")

    # 绘图
    plot_tracks(tracks)

frame 0000: 1738 pts
frame 0001: 1737 pts
frame 0002: 575 pts
frame 0003: 580 pts
frame 0004: 576 pts
frame 0005: 580 pts
frame 0006: 580 pts
frame 0007: 579 pts
frame 0008: 578 pts
frame 0009: 575 pts
frame 0010: 575 pts
frame 0011: 578 pts
frame 0012: 577 pts
frame 0013: 578 pts
frame 0014: 580 pts
frame 0015: 590 pts
frame 0016: 584 pts
frame 0017: 586 pts
frame 0018: 590 pts
frame 0019: 586 pts
frame 0020: 583 pts
frame 0021: 585 pts
frame 0022: 591 pts
frame 0023: 585 pts
frame 0024: 585 pts
frame 0025: 590 pts
frame 0026: 587 pts
frame 0027: 590 pts
frame 0028: 588 pts
frame 0029: 591 pts
frame 0030: 590 pts
frame 0031: 589 pts
frame 0032: 592 pts
frame 0033: 595 pts
frame 0034: 593 pts
frame 0035: 591 pts
frame 0036: 594 pts
frame 0037: 597 pts
frame 0038: 599 pts
frame 0039: 600 pts
frame 0040: 600 pts
frame 0041: 594 pts
frame 0042: 597 pts
frame 0043: 605 pts
frame 0044: 596 pts
frame 0045: 597 pts
frame 0046: 604 pts
frame 0047: 600 pts
frame 0048: 602 pts
frame 0049: 597 pt