In [1]:
import cv2
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
import matplotlib as mpl
from matplotlib import cm
from matplotlib import colors
import networkx as nx
from scipy.interpolate import interp1d
from scipy import ndimage

# 中线数据导入和检查

## 中线骨架和angle_m等数据导入并检查

In [2]:
# 读取中线
# folder_path = "Y:\\SZX\\2025_wbi_analysis\\good_WBI\\20241219_4.5g-ov_06"
folder_path = "Y:\\SZX\\2025_wbi_analysis\\202509_TurnLabelCheck\\20250716_lg9624-1gNa-002-24d-ov_002"
npz_data = np.load(folder_path + '\output-0516.npz', allow_pickle=True)
data = npz_data['arr_0'].item()
data_keys = list(data.keys())
paths= {key: value['all_paths'] for key, value in data.items()}


### 读取angle_m, 检查骨架数量和角度分布

In [3]:

# 检查骨架数量
print('骨架数量：',len(paths))
def normalize_points(points):
    """确保骨架点是 (N,2) int32 数组"""
    arr = np.array(points, dtype=np.float32)
    arr = np.vstack(arr)                # 去掉长度为1的维度

    arr = arr.astype(np.int32)           # OpenCV 要 int32
    return arr
# 检查angle_m
angle_m = np.array(list({key: value['angle_m'] for key, value in data.items()}.values()), dtype=float)
# angle_m_check = angle_m[~np.isnan(angle_m)]
# plt.hist(angle_m_check)

骨架数量： 46170


In [4]:
# 检查带符号的角度
# 检查angle_md
angle_md = np.array(list({key: value['angle_md'] for key, value in data.items()}.values()), dtype=float)
# angle_md_check = angle_md[~np.isnan(angle_md)]
# plt.hist(angle_md_check)

### 读取环形和交叉骨架信息

In [5]:
closest_idx = list({key: value['closest path'] for key, value in data.items()}.values())
circular = list({key: value['circular'] for key, value in data.items()}.values())
branching = list({key: value['branching'] for key, value in data.items()}.values())

### 读取头部方向和速度向量数据

In [6]:
# 每个位置包含nan或一个元组(dx,dy)为方向向量
vector_m = list({key: value['vector_m'] for key, value in data.items()}.values())
vector_h = list({key: value['vector_h'] for key, value in data.items()}.values())
pos_phr = list({key: value['pos_phr'] for key, value in data.items()}.values())

In [None]:

pos_phr = list({key: value['closest path'] for key, value in data.items()}.values())

In [7]:
# 计算根据运动方向旋转后的朝头部方向运动向量
def rotation_mat_2(theta_degrees, mov_vec):
    if (theta_degrees == np.nan) or (theta_degrees == None):
        return None
    if (mov_vec is None) or (mov_vec.any()==np.nan):
        return None
    """
    统一的 2D 旋转矩阵
    θ > 0: 逆时针旋转
    θ < 0: 顺时针旋转
    """
    theta_rad = -np.radians(theta_degrees)
    cos_theta = np.cos(theta_rad)
    sin_theta = np.sin(theta_rad)
    rot_mat =  np.array([
        [cos_theta, -sin_theta],
        [sin_theta,  cos_theta]
    ])
    return np.dot(rot_mat, mov_vec)

rotation_mat_vec = np.vectorize(rotation_mat_2)

In [8]:
vector_mh = []
for i, ang in enumerate(angle_md):
    vector_mh.append(rotation_mat_2(ang, vector_m[i]))

In [9]:
# 求向量投影
def project_vector_A_on_B(vector_A, vector_B):
    """
    返回C向量，方向与B相同，长度为A的投影
    """
    # 转换为numpy数组
    A = np.array(vector_A)
    B = np.array(vector_B)
    # 计算向量B的单位向量
    B_norm = np.linalg.norm(B)
    if B_norm == 0:
        print('B向量不能为零')
        return None
        # raise ValueError("向量B不能为零向量")
    B_unit = B / B_norm
    
    # 计算向量A在向量B上的投影长度
    projection_length = np.dot(A, B_unit)
    # 计算向量C = 投影长度 × 单位向量B
    vector_C = projection_length * B_unit
    return vector_C

# 骨架分析

## Omega Turn labelling

### 骨架合并

#### 函数定义

In [None]:
def path_length(path):
    """计算骨架长度"""
    diffs = np.diff(path, axis=0)
    return np.sum(np.sqrt((diffs ** 2).sum(axis=1)))


#### 轨迹拆分

In [None]:
def path_length(coords: np.ndarray) -> float:
    """polyline 长度（逐段欧氏距离累加）。coords: (N,2)"""
    if coords is None or len(coords) < 2:
        return 0.0
    d = np.diff(coords, axis=0)
    return float(np.sum(np.sqrt((d**2).sum(axis=1))))

def quantize_point(pt, q=1):
    """把像素点量化到网格上（容差聚类）。q=1 等价于原始整数像素匹配。"""
    return (int(round(pt[0]/q)*q), int(round(pt[1]/q)*q))

def cut_paths_to_edges(all_paths, q=1):
    """
    把多条骨架在“端点”和“交叉点”处切段，返回 MultiGraph：
    - 节点：关键点坐标（量化后）
    - 边：两关键点之间的子折线，数据里带 coords/length
    """
    # 1) 统计每个像素（量化后）被多少条路径/多少次命中，用于发现交叉
    occ = {}  # key -> list of (path_idx, local_idx)
    for pi, path in enumerate(all_paths):
        if path is None or len(path) == 0:
            continue
        for si, pt in enumerate(path):
            key = quantize_point(pt, q)
            occ.setdefault(key, []).append((pi, si))

    # 2) 关键点：每条 polyline 的首尾点 + 被多个 path 命中的点（交叉/重合点）
    keypoints = set()
    for pi, path in enumerate(all_paths):
        if path is None or len(path) == 0:
            continue
        keypoints.add(quantize_point(path[0], q))
        keypoints.add(quantize_point(path[-1], q))
    for key, hits in occ.items():
        # 命中数量>1，基本可视为交叉/共点（或一个 path 在该点有重复）
        if len(hits) > 1:
            keypoints.add(key)

    # 3) 在关键点处把每条 polyline 切段，生成边
    G = nx.MultiGraph()
    # 节点位置（用于方向判断）
    node_pos = {}  # key(node) -> 原始坐标（选第一次出现的真实像素）
    for kp in keypoints:
        # 选一个代表性坐标（量化格内实际像素），尽量从 occ 里取真实像素
        if kp in occ:
            pi, si = occ[kp][0]
            node_pos[kp] = tuple(map(int, all_paths[pi][si]))
        else:
            node_pos[kp] = (int(kp[0]), int(kp[1]))
        G.add_node(kp)

    for pi, path in enumerate(all_paths):
        if path is None or len(path) < 2:
            continue
        # 找到该 path 上属于关键点的索引
        cut_idx = []
        for si, pt in enumerate(path):
            if quantize_point(pt, q) in keypoints:
                cut_idx.append(si)
        # 确保首尾在里头
        if 0 not in cut_idx:
            cut_idx.insert(0, 0)
        if (len(path)-1) not in cut_idx:
            cut_idx.append(len(path)-1)
        # 去重并排序
        cut_idx = sorted(set(cut_idx))
        # 相邻关键点之间形成一条边
        for a, b in zip(cut_idx[:-1], cut_idx[1:]):
            if b <= a:
                continue
            sub = path[a:b+1]
            if len(sub) < 2:
                continue
            u = quantize_point(sub[0], q)
            v = quantize_point(sub[-1], q)
            w = path_length(sub)
            # 可能 u==v（零长度/回折），跳过
            if u == v or w == 0:
                continue
            # 入图，边上存 coords 和 length
            G.add_edge(u, v, length=w, coords=sub)

    # 把 node 真实坐标也记录上（便于后续方向判断）
    nx.set_node_attributes(G, {n: {'pos': node_pos[n]} for n in G.nodes})
    return G

def oriented_coords(edge_data, start_node_pos):
    """
    根据起点节点位置，返回边的坐标正向/反向（避免连接时倒序）。
    edge_data['coords'] 是边的折线；start_node_pos 是当前节点实际坐标
    """
    coords = edge_data['coords']
    if len(coords) == 0:
        return coords
    # 与起点更接近的一端作为开头
    d0 = np.linalg.norm(coords[0] - start_node_pos)
    d1 = np.linalg.norm(coords[-1] - start_node_pos)
    return coords if d0 <= d1 else coords[::-1]

def extract_non_branching_chains(G: nx.MultiGraph):
    """
    提取所有“非分叉链”：中间节点度数=2，端点为“度!=2 的节点”或到头。
    同时处理纯环（整个连通分量所有节点度数=2）的情况。
    返回：list of dict，每个 dict 包含 {'nodes': [...], 'coords': np.ndarray, 'length': float}
    """
    chains = []
    visited = set()  # 记录已走过的具体边 (u,v,key)（无向，按排序存）

    def mark(u,v,k):  # 无向规范化
        return (u, v, k) if u <= v else (v, u, k)

    deg = dict(G.degree())
    terminals = [n for n,d in deg.items() if d != 2]

    # —— 从端点出发的所有链 —— #
    for start in terminals:
        for _, nbr, key, data in G.edges(start, keys=True, data=True):
            m = mark(start, nbr, key)
            if m in visited:
                continue
            # 开始沿着非分叉链走
            path_nodes = [start]
            path_coords = []
            cur, prev = start, None

            while True:
                # 选择从 cur 出发且未访问的边
                next_edge = None
                for _, nb, k, d in G.edges(cur, keys=True, data=True):
                    mk = mark(cur, nb, k)
                    if mk in visited:
                        continue
                    # 避免立刻折返（如果有其它可选）
                    if prev is not None and nb == prev:
                        continue
                    next_edge = (cur, nb, k, d)
                    break
                if next_edge is None:
                    # 如果没有其它边，就看看能不能把“回退那条”也并上（首步情况）
                    for _, nb, k, d in G.edges(cur, keys=True, data=True):
                        mk = mark(cur, nb, k)
                        if mk not in visited:
                            next_edge = (cur, nb, k, d)
                            break
                if next_edge is None:
                    break

                u, v, k, d = next_edge
                # 方向化坐标并拼接（避免重复第一个点）
                start_pos = np.array(G.nodes[u]['pos'])
                seg = oriented_coords(d, start_pos)
                if len(path_coords) == 0:
                    path_coords = seg.copy()
                else:
                    path_coords = np.vstack([path_coords, seg[1:]])
                visited.add(mark(u, v, k))

                prev, cur = u, v
                path_nodes.append(cur)
                # 到达端点（度!=2）就停止
                if deg[cur] != 2:
                    break

            if len(path_coords) >= 2:
                chains.append({
                    'nodes': path_nodes,
                    'coords': path_coords,
                    'length': path_length(path_coords)
                })

    # —— 处理纯环：该连通分量所有节点度数都为2 —— #
    # 还有未访问的边说明它们属于环
    for u, v, k in list(G.edges(keys=True)):
        m = mark(u, v, k)
        if m in visited:
            continue
        # 从 u 出发沿环走到无法继续
        path_nodes = [u]
        path_coords = []
        cur, prev = u, None
        while True:
            next_edge = None
            for _, nb, kk, d in G.edges(cur, keys=True, data=True):
                mk = mark(cur, nb, kk)
                if mk in visited:
                    continue
                # 在环里第一次可以任意选边，之后避免直接折返
                if prev is not None and nb == prev:
                    continue
                next_edge = (cur, nb, kk, d)
                break
            if next_edge is None:
                # 没有未访问边了，环走完
                break
            a, b, kk, d = next_edge
            start_pos = np.array(G.nodes[a]['pos'])
            seg = oriented_coords(d, start_pos)
            if len(path_coords) == 0:
                path_coords = seg.copy()
            else:
                path_coords = np.vstack([path_coords, seg[1:]])
            visited.add(mark(a, b, kk))
            prev, cur = a, b
            path_nodes.append(cur)

        if len(path_coords) >= 2:
            chains.append({
                'nodes': path_nodes,
                'coords': path_coords,
                'length': path_length(path_coords)
            })

    return chains

def merge_and_find_longest_non_branching(all_paths, quant=1):
    """
    主入口：
    1) 在交叉/端点处切段并建图
    2) 提取所有非分叉链（含纯环）
    3) 返回最长链的坐标和长度
    """
    G = cut_paths_to_edges(all_paths, q=quant)
    chains = extract_non_branching_chains(G)
    if not chains:
        return None, 0.0, chains, G
    longest = max(chains, key=lambda c: c['length'])
    chain_dict = chains[0]
    all_coords = chain_dict['coords']
    # 返回所有轨迹
    # all_chains = list({key: value['coords'] for key, value in chain_dict.items()}.values())
    return longest['coords'], longest['length'], all_coords, G
def merge_and_find_non_branching(all_paths, quant=1):
    """
    主入口：
    1) 在交叉/端点处切段并建图
    2) 提取所有非分叉链（含纯环）
    3) 返回所有链的坐标
    """
    G = cut_paths_to_edges(all_paths, q=quant)
    chains = extract_non_branching_chains(G)
    if not chains:
        return None, 0.0, chains, G
    # chains是list，找一个元素，使得其在某个指标下最大，指标为元素长度
    # chain的每个元素是一条链的各个信息
    chains = sorted(chains, key=lambda c: c['length'], reverse=True)
    all_seq_path = [i['coords'] for i in chains]
    # 返回所有轨迹
    # all_chains = list({key: value['coords'] for key, value in chain_dict.items()}.values())
    return all_seq_path, G

##### 拆分操作

In [None]:
seq_paths = {}
for key, value in paths.items():
    seq_path_i, G = merge_and_find_non_branching(
        value,
        quant=1   # 若不同段的交点不完全重合，可设 quant=2 或 3 像素，把近点聚成同一节点
    )
    seq_paths[key] = seq_path_i

#### 轨迹合并

##### 轨迹合并函数

In [None]:

def path_length(path):
    """计算一条 path 的几何长度"""
    diffs = np.diff(path, axis=0)
    return np.sum(np.sqrt((diffs ** 2).sum(axis=1)))

def build_graph(paths, tol=1e-6):
    """把所有 paths 合并成一张图"""
    G = nx.Graph()
    node_id = {}

    def get_node_id(point):
        for nid, coord in node_id.items():
            if np.allclose(coord, point, atol=tol):
                return nid
        nid = len(node_id)
        node_id[nid] = point
        return nid

    for path in paths:
        src, dst = path[0], path[-1]
        src_id, dst_id = get_node_id(src), get_node_id(dst)
        length = path_length(path)
        G.add_edge(src_id, dst_id, weight=length, coords=path)

    return G, node_id

def longest_path(paths):
    """返回合并后图上的最长路径坐标"""
    G, node_id = build_graph(paths)
    degrees = dict(G.degree())
    endpoints = [n for n, d in degrees.items() if d == 1]

    longest_len = -1
    longest_coords = None

    for i in range(len(endpoints)):
        for j in range(i+1, len(endpoints)):
            u, v = endpoints[i], endpoints[j]
            for path_nodes in nx.all_simple_paths(G, u, v):
                coords = []
                total_len = 0
                for k in range(len(path_nodes)-1):
                    a, b = path_nodes[k], path_nodes[k+1]
                    edge = G[a][b]
                    coords_edge = edge["coords"]

                    # 判断方向
                    if np.allclose(coords_edge[0], node_id[a]):
                        coords.extend(coords_edge.tolist())
                    else:
                        coords.extend(coords_edge[::-1].tolist())

                    total_len += edge["weight"]

                if total_len > longest_len:
                    longest_len = total_len
                    longest_coords = np.array(coords)

    return longest_coords


def longest_path_old(paths):
    """返回合并后图上的最长路径坐标"""
    G, node_id = build_graph(paths)
    degrees = dict(G.degree())
    endpoints = [n for n, d in degrees.items() if d == 1]

    longest_len = -1
    longest_coords = None

    for i in range(len(endpoints)):
        for j in range(i+1, len(endpoints)):
            u, v = endpoints[i], endpoints[j]
            for path_nodes in nx.all_simple_paths(G, u, v):
                coords = []
                total_len = 0
                for k in range(len(path_nodes)-1):
                    edge = G[path_nodes[k]][path_nodes[k+1]]
                    coords.extend(edge["coords"].tolist())
                    total_len += edge["weight"]
                if total_len > longest_len:
                    longest_len = total_len
                    longest_coords = np.array(coords)

    return longest_coords


##### 合并操作

In [None]:
long_paths = {}
for key, value in seq_paths.items():
    longest_coords = longest_path(value)
    long_paths[key] = longest_coords

### 计算轨迹曲率

##### 函数定义

In [None]:
from scipy.interpolate import splprep, splev

def resample_skeleton(skeleton, step=1.0):
    """
    把骨架按固定像素间距重采样
    skeleton: (N,2) 数组
    step: 每隔多少像素取一个点
    """
    skeleton = np.array(skeleton)
    if len(skeleton) < 3:
        return skeleton
    
    # 去掉重复点
    diffs = np.diff(skeleton, axis=0)
    mask = np.any(diffs != 0, axis=1)
    skeleton = skeleton[np.r_[True, mask]]
    if len(skeleton) < 3:
        return skeleton
    
    x, y = skeleton[:,0], skeleton[:,1]
    seglen = np.sqrt(np.diff(x)**2 + np.diff(y)**2)
    dist = np.concatenate(([0], np.cumsum(seglen)))   # 单位：像素
    total_len = dist[-1]
    
    # 归一化到 [0,1] 供 splprep 使用
    u = dist / total_len
    try:
        tck, _ = splprep([x, y], s=0, u=u)
    except Exception as e:
        print("splprep error:", e)
        return skeleton
    
    # 在 [0,total_len] 上等间距采样
    new_dist = np.arange(0, total_len, step)
    new_u = new_dist / total_len
    new_points = np.array(splev(new_u, tck)).T
    return new_points

def resample_skeleton_2(skeleton, step=1.0):
    # 等像素间距采样
    total_len = len(skeleton)
    if total_len <= step*2:
        return []
    idx_series = np.arange(0,total_len, step)
    selected_points = skeleton[idx_series,:]
    return selected_points

def curvature_from_angles(skeleton, step=1.0):
    """
    通过角度差计算骨架曲率
    返回：
        curvatures: 每个点的角度差 (带正负号)
        mean_curvature: 曲率强度指标
    """
    pts = resample_skeleton_2(skeleton, step=step)
    if not len(pts):
        return np.nan

    diffs = np.diff(pts, axis=0)
    angles = np.arctan2(diffs[:,1], diffs[:,0])
    # 解开角度防止跳变
    angles_unwrapped = np.unwrap(angles)
    # 相邻角度差
    dtheta = np.diff(angles_unwrapped)
    # # 曲率指标（你可以换成 np.mean(np.abs(dtheta))）
    # mean_curvature = np.sqrt(np.mean(dtheta**2))
    return np.degrees(dtheta)

In [None]:
# 等间距重采样和滑动平均

def moving_average(data, window_size=5):
    """对二维数据做滑动平均"""
    kernel = np.ones(window_size) / window_size
    x = np.convolve(data[:,0], kernel, mode='same')
    y = np.convolve(data[:,1], kernel, mode='same')
    return np.vstack((x,y)).T

def resample_and_smooth(points, window_size=5):
    # --- 等弧长重采样 ---
    x, y = points[:,0], points[:,1]
    dist = np.sqrt(np.diff(x)**2 + np.diff(y)**2)
    s = np.concatenate(([0], np.cumsum(dist)))
    total_length = s[-1]
    s_uniform = np.linspace(0, total_length, len(points))
    fx = interp1d(s, x, kind='linear')
    fy = interp1d(s, y, kind='linear')
    resampled = np.vstack((fx(s_uniform), fy(s_uniform))).T
    
    # --- 平滑 ---
    smoothed = moving_average(resampled, window_size)
    return resampled, smoothed


In [None]:
def remove_outliers_by_jump(points, max_step=10):
    """
    去掉瞬间跨度过大的点
    points: (N,2) array
    max_step: 相邻点最大允许移动距离 (像素)
    """
    x, y = points[:,0], points[:,1]
    dist = np.sqrt(np.diff(x)**2 + np.diff(y)**2)
    # 第一个点保留
    mask = np.ones(len(points), dtype=bool)
    
    # 如果跨度大于阈值，就把后一帧去掉
    mask[1:][dist > max_step] = False
    
    return points[mask]

In [None]:
def resample_and_dthatas(long_paths, step):
    # 采样和平均
    resampled = {}
    # smoothed = {}
    for key, skelon in long_paths.items():
        if type(skelon) == type(None):
            resampled[key] =  []
        else:
            # 去掉瞬移离群点
            skelon = remove_outliers_by_jump(skelon, max_step=10)
            resampled[key],_ = resample_and_smooth(skelon, window_size=50)
    dtheta = {}
    for key, skelon in resampled.items():
        if type(None)==type(skelon):
            dtheta[key] = None
        elif (len(skelon)) < 200:
            dtheta[key] = None
        else:
            dtheta_i = curvature_from_angles(skelon, step=step)
            sum_dtheta_i = round(np.sum(dtheta_i),2)
            dtheta[key]={'dtheta':dtheta_i,'sum_dtheta':sum_dtheta_i}
    return resampled, dtheta

##### (*) 重采样轨迹和曲率计算

In [None]:
# # 采样和平均
# resampled = {}
# # smoothed = {}
# for key, skelon in long_paths.items():
#     if type(skelon) == type(None):
#         resampled[key] =  []
#     else:
#         # 去掉瞬移离群点
#         skelon = remove_outliers_by_jump(skelon, max_step=10)
#         resampled[key],_ = resample_and_smooth(skelon, window_size=50)

In [None]:
# # 对于合并后取最长的骨架(重采样))
# dtheta = {}
# for key, skelon in resampled.items():
#     if type(None)==type(skelon):
#         dtheta[key] = None
#     elif (len(skelon)) < 200:
#         dtheta[key] = None
#     else:
#         dtheta_i = curvature_from_angles(skelon, step=100)
#         dtheta[key]=dtheta_i

##### (*)曲率关键帧检查

In [None]:
# # 选择关键帧检查delta
# keys = [33070, 33085, 33095, 33090]
# step = 100
# for key in keys:
#     dtheta_i = dtheta[key]
#     long_paths_i = long_paths[key]
#     resampled_i = resampled[key]
#     smoothed_i = smoothed[key]
#     # 原骨架采样
#     points_sel = long_paths_i[np.arange(0,len(long_paths_i), step)]
#     # 重采样后骨架采样
#     points_res = resampled_i[np.arange(0,len(resampled_i), step)]
#     # 平滑
#     points_smh = smoothed_i[np.arange(0,len(smoothed_i), step)]

#     fig, ax = plt.subplots(1,4, figsize = (24,5))
#     ax[0].plot(dtheta_i)
#     ax[0].set_title(f"sum:{round(np.sum(dtheta_i),2)}deg")
#     ax[1].scatter(long_paths_i[:,0], long_paths_i[:,1], c = np.arange(len(long_paths_i)),cmap='bwr')
#     ax[1].scatter(points_sel[:,0], points_sel[:,1], color='red')
#     # 重采样
#     ax[2].scatter(resampled_i[:,0], resampled_i[:,1])
#     ax[2].scatter(points_res[:,0], points_res[:,1], color='red')
#     # 平滑
#     ax[3].scatter(smoothed_i[:,0], smoothed_i[:,1])
#     ax[3].scatter(points_smh[:,0], points_smh[:,1], color='red')
#     plt.show()

#### 重采样骨架和计算相邻角

In [None]:
resampled, dtheta = resample_and_dthatas(long_paths,step=20)
sum_dtheta = {
    key: (value['sum_dtheta'] if isinstance(value, dict) and 'sum_dtheta' in value else None)
    for key, value in dtheta.items()
    }
df_sum_dtheta = pd.Series(sum_dtheta, name='sum_dtheta').to_frame()

#### (*)结合人工标记数据检查

In [None]:
# # 合并人工标注数据
# df_label = pd.DataFrame(omega_np, columns=['label'])

In [None]:
# steps = [10,20,30,40,50,60,70,80,90,100]
# rotation_threshold = 210

# for s in np.arange(10,110,10):
#     _, dtheta = resample_and_dthatas(long_paths,step=s)
#     sum_dtheta = {
#     key: (value['sum_dtheta'] if isinstance(value, dict) and 'sum_dtheta' in value else None)
#     for key, value in dtheta.items()
#     }
#     df_sum_dtheta = pd.Series(sum_dtheta, name='sum_dtheta').to_frame()
#     df_sum_dtheta['coil'] = (df_sum_dtheta['sum_dtheta'].abs()>=rotation_threshold)
#     df_sum_dtheta['color'] = df_sum_dtheta['coil'].map({True:'r', False:'b'})
#     df_sum_dtheta['color'] = df_sum_dtheta['color'].astype(str)
#     df_sum_dtheta = df_sum_dtheta.join(df_label, how='left')

#     # 图
#     # 假设 df_sum_dtheta 已经存在
#     labels = df_sum_dtheta['label'].values
#     n = len(labels)
#     # 找出连续的 label==1 区间
#     turn_intervals = []
#     in_turn = False
#     start = 0

#     for i in range(n):
#         if labels[i] == 1 and not in_turn:
#             # turn开始
#             start = i
#             in_turn = True
#         elif labels[i] != 1 and in_turn:
#             # turn结束
#             end = i - 1
#             turn_intervals.append((start, end))
#             in_turn = False

#     # 如果最后一个点也是turn状态
#     if in_turn:
#         turn_intervals.append((start, n - 1))

#     # 绘图
#     plt.figure(figsize=(20,5))

#     # 先绘制背景矩形
#     for start, end in turn_intervals:
#         plt.axvspan(start, end, color='lemonchiffon', alpha=0.5)  # alpha控制透明度

#     # 绘制散点
#     plt.scatter(
#         x=range(n),
#         y=df_sum_dtheta['sum_dtheta'],
#         c=df_sum_dtheta['color'],
#         s=2
#     )
#     plt.title(f'dtheta_sum.abs >={rotation_threshold},step-{s}')
#     plt.show()

## 骨架分析视频检查

### 视频可视化骨架和骨架信息

In [11]:
# 提取骨架角度信息
dtheta_dict = {
    key: (value['dtheta'] if isinstance(value, dict) and 'dtheta' in value else None)
    for key, value in dtheta.items()
}

NameError: name 'dtheta' is not defined

In [None]:
# 骨架信息
series_skeleton = pd.Series(paths)        # 原始骨架
long_skeleton = pd.Series(resampled)
# dtheta_skeleton = pd.Series(dtheta)
dtheta_skeleton = pd.Series(dtheta_dict)
# 读取视频
cap = cv2.VideoCapture(os.path.join(folder_path, "c1_onnx.mp4"))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
print("视频总帧数:", total_frames)

# 参数
check_inv = 25
start_frame = 32000
delay = 20
frame_idx_end = total_frames
step  = 2
rotation_threshold = 210
    
cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)

for frame_idx in range(start_frame, total_frames, step):
    # 视频定位当前帧
    cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)  # 定位到 frame_idx
    ret, frame = cap.read()
    if frame is None:
        continue
    if not ret:
        break
    
    # ================= 左图（原视频 + 原始骨架） =================
    frame_left = frame.copy()
    cv2.putText(frame_left, f"Frame: {frame_idx}", (10, 30),
                cv2.FONT_HERSHEY_SIMPLEX, 0.75, (0, 0, 255), 2)
    circular_i, branching_i = False, False
    points_raw = []
    try:
        idx = data_keys.index(frame_idx)
        points_raw = series_skeleton.iloc[idx]

        # 读取骨架分类信息
        closest_idx_i = closest_idx[idx]
        # 读取咽喉点坐标
        pos_phr_i = np.array(pos_phr[idx], dtype=float)
        # 读取环形/交叉轨迹标记
        circular_i = circular[idx]
        branching_i = branching[idx]
    except:
        pass
    colors = [(0,0,255), (0,255,0),(255,0,0)]
    colors_2 = [(176, 106, 28),   # 钴蓝色
    (51, 51, 204),    # 科学红
    (0, 204, 102),    # 激光绿
    (0, 255, 255),    # 荧光黄
    (255, 0, 255),    # 洋红
    (0, 77, 255)]

    # 求所有骨架长度并排序
    if len(points_raw):
        len_ls = [len(i) for i in points_raw]
        len_ls.sort(reverse=True)
        if len(len_ls) >= 2:
            # 如果骨架数量超过两个
            ratio = round(len_ls[0]/len_ls[1], 2)
            cv2.putText(frame_left, f"skel_ratio:", (200, 30),
            cv2.FONT_HERSHEY_SIMPLEX, 0.75, (0,0,255), 2)
            cv2.putText(frame_left, f"{ratio}", (200, 50),
            cv2.FONT_HERSHEY_SIMPLEX, 0.75, (0,0,255), 2)
    for i, points in enumerate(points_raw):
        # 轨迹长度
        len_i = len(points) # 轨迹长度
        cv2.putText(frame_left, f"skel{i}:{len_i}", (350, 30+(i*20)),
        cv2.FONT_HERSHEY_SIMPLEX, 0.75, colors[i % 3], 2)
        # 轨迹
        pts = normalize_points(points) if len(points) > 0 else []
        for (x, y) in pts:
            if np.isnan(x) or np.isnan(y):
                continue
            cv2.circle(frame_left, (int(y), int(x)), 5, colors[i % 3], -1)
    # 字符打印
    # 环形或交叉
    if circular_i:
        cv2.putText(frame_left, f"circle", (10, 70),
                cv2.FONT_HERSHEY_SIMPLEX, 0.75, (0, 0, 255), 2)
    if branching_i:
        cv2.putText(frame_left, f"branching", (10, 50),
                cv2.FONT_HERSHEY_SIMPLEX,0.75, (0, 0, 255), 2)
        
    # ================= 右图（原视频 + 合并骨架） =================
    frame_right = frame.copy()

    # 打印曲率指标
    sum_i = None
    if len(dtheta):
        try:
            idx = data_keys.index(frame_idx)
            # 拆分骨架
            dtheta_i = dtheta_skeleton.iloc[idx]
            # 平均theta
            # mean_delta_i = round(np.mean(dtheta_i), 2)
            # 和
            sum_i = abs(round(np.sum(dtheta_i),2))
            # 均方根
            mean_sqrt_delta_i = round(np.sqrt(np.mean(dtheta_i**2)),2)
            # 方差
            var_delta_i = round(np.var(dtheta_i),2)
            # 打印
            # cv2.putText(frame_right, f"mean_cur: {mean_delta_i}", (10, 30),
            #     cv2.FONT_HERSHEY_SIMPLEX, 0.75, (0, 128, 0), 2)
            if sum_i >= rotation_threshold:
                cv2.putText(frame_right, f"rotation: {sum_i}deg", (10, 30),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.75, (0, 0, 255), 2)
            else:
                cv2.putText(frame_right, f"rotation: {sum_i}deg", (10, 30),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.75, (200, 200, 200), 2)
            cv2.putText(frame_right, f"m_sqrt_cur: {mean_sqrt_delta_i}", (10, 55),
                cv2.FONT_HERSHEY_SIMPLEX, 0.75, (200, 200, 200), 2)
            cv2.putText(frame_right, f"var_cur: {var_delta_i}", (10, 80),
                cv2.FONT_HERSHEY_SIMPLEX, 0.75, (200, 200, 200), 2)
        except:
            dtheta_i = []
    # 打印最长骨架（只有一条）
    try:
        idx = data_keys.index(frame_idx)
        # 拆分骨架
        points_long = long_skeleton.iloc[idx]
        if points_long is None:
            points_long = []
    except:
        points_long = []
    pts = normalize_points(points_long) if len(points_long) > 0 else []
    for (x, y) in pts:
        if np.isnan(x) or np.isnan(y):
            continue
        if type(sum_i)!=type(None):
            if sum_i >= rotation_threshold:
                cv2.circle(frame_right, (int(y), int(x)), 5, (0,0,255), -1)
            else:
                cv2.circle(frame_right, (int(y), int(x)), 5, (200,200,200), -1)

    # ================= 拼接左右视频 =================
    combined = np.hstack((frame_left, frame_right))

    cv2.imshow("Skeleton Compare", combined)
    key = cv2.waitKey(delay) & 0xFF
    if key == ord('p'):
        break
    if key == ord('q'):
        break
    elif key == ord(' '):
        cv2.waitKey(0)
cap.release()
cv2.destroyAllWindows()


### 视频可视化骨架+方向向量

In [21]:
# 读取视频
cap = cv2.VideoCapture(os.path.join(folder_path, "c1_onnx.mp4"))
series_skeleton = pd.Series(paths)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
print("视频总帧数:", total_frames)
# 计算头部朝向时的切割像素长度
body_l = 200

# 检查间隔（每隔n帧）
check_inv = 10
delay = 200
export_video = True         # 是否导出视频到文件夹
output_path = os.path.join(folder_path, "c1_midline.mp4")  # 导出视频路径
frame_idx = 13900
# 结束帧
frame_idx_end = total_frames
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)



error_ls = []

# 视频导出初始化
if export_video:
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # 或 'XVID'
    # fps = cap.get(cv2.CAP_PROP_FPS)
    fps = 20
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
while cap.isOpened() and frame_idx <= frame_idx_end:
    ret, frame = cap.read()
    if not ret:
        break
    if frame_idx % check_inv != 0:
        frame_idx += 1
        continue
    # 显示帧索引
    cv2.putText(frame, f"Frame: {frame_idx}", (10, 30), 
                cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
    # 画出向量
    color_vector_h = (128, 0, 128)  # 紫色
    color_vector_m = (0, 165, 255)    # 明黄色
    color_vector_mh= (216, 191, 216) # 淡紫色
    try:
        idx = data_keys.index(frame_idx)   # 找到对应帧的列表索引
    except:
        print('不存在这一帧：frame',frame_idx)
        frame_idx += 1
        continue
    # 读取咽喉点坐标
    pos_phr_i = np.array(pos_phr[idx], dtype=float)
    # 读取两个向量和角速度
    vector_h_i= np.array(vector_h[idx], dtype=float)
    # 打印头部方向向量
    vec_h_print = [round(vector_h_i[0], 3), round(vector_h_i[1], 3)]
    cv2.putText(frame, f"vector_h: {vec_h_print}", (10, 60), 
                cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)

    vector_m_i= np.array(vector_m[idx], dtype=float)
    vector_mh_i = np.array(vector_mh[idx], dtype=float)
    angle_m_i = angle_md[idx]
    if angle_m_i:
        # 角度打印
        # 获取图像的宽度和高度
        frame_height, frame_width = frame.shape[:2]
        text = f"angle_md: {round(angle_m_i,2)}"
        text_size, _ = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 1, 2)
        if angle_m_i <= 100:
            color_m_t = (0,255,0)
        else:
            color_m_t = (0,0,255)
        cv2.putText(frame, text, (frame_width-10-text_size[0], text_size[1]+10), 
                    cv2.FONT_HERSHEY_SIMPLEX, 1, color_m_t, 2)
    # 向量打印
    if np.isnan(vector_m_i).any() or np.isnan(vector_h_i).any():
        print('无方向向量，不打印')
        pass
    else:
        norm_m = np.linalg.norm(vector_m_i)
        norm_mh = np.linalg.norm(vector_mh_i)
        norm_h = np.linalg.norm(vector_h_i)
        if norm_m == 0:
            pass
        else:
            # 向量归一化模长为1
            vector_h_i = vector_h_i/norm_h
            vector_m_i = vector_m_i/norm_m
            vector_mh_i = vector_mh_i/norm_mh
        # 计算起始结束点位置
        length_factor = body_l
        length_factor_m = 150
        length_factor_mh  = 150
        start_point_h = (int(pos_phr_i[0] - vector_h_i[0]*length_factor), int(pos_phr_i[1] - vector_h_i[1]*length_factor))
        end_point_h = (int(pos_phr_i[0] + vector_h_i[0]*length_factor), int(pos_phr_i[1] + vector_h_i[1]*length_factor))
        
        end_point2 = (int(pos_phr_i[0] + vector_m_i[0]*length_factor_m), int(pos_phr_i[1] + vector_m_i[1]*length_factor_m))
        end_point3 = (int(pos_phr_i[0] + vector_mh_i[0]*length_factor_mh), int(pos_phr_i[1] + vector_mh_i[1]*length_factor_mh))
        
        # 需要强制转换为int才能打印
        pos_phr_i = (int(pos_phr_i[0]), int(pos_phr_i[1]))
        end_point2 = (int(end_point2[0]), int(end_point2[1]))
        end_point3 = (int(end_point3[0]), int(end_point3[1]))
        start_point_h = (int(start_point_h[0]), int(start_point_h[1]))
        end_point_h = (int(end_point_h[0]), int(end_point_h[1]))
 
        cv2.arrowedLine(frame, pos_phr_i, end_point2, color_vector_m, 5, tipLength=0.1)
        cv2.arrowedLine(frame, pos_phr_i, end_point3, color_vector_mh, 5, tipLength=0.1)   # 旋转后的运动方向
        cv2.line(frame, start_point_h, end_point_h, color_vector_h, 5)
        cv2.circle(frame, pos_phr_i, 10, color_vector_h, thickness=-1)

    if frame_idx < max(data_keys):
        try:
            idx = data_keys.index(frame_idx)   # 找到对应帧的列表索引
            points_raw = series_skeleton.iloc[idx]  # 原始骨架
        except:
            print('不存在骨架：frame',frame_idx)
            frame_idx += 1
            continue
        # 有多条骨架，分别画出
        colors = [(0,0,255), (0,255,0),(255,0,0)]
        i = 0
        for points in points_raw:
            midline_points = normalize_points(points)  # 返回 (N,2) int32
        # cv2.polylines(frame, [midline_points.reshape(-1,1,2)], isClosed=False, color=(0, 0, 255), thickness=2)
            for (x, y) in midline_points:
                cv2.circle(frame, (y, x), 3, colors[i%3], -1)
            i += 1

    cv2.imshow("Skeleton Check", frame)
    # 导出视频
    if export_video:
        out.write(frame)
    key = cv2.waitKey(delay) & 0xFF   # 播放速度 (30ms/帧)，也可以改大
    if key == ord('q'):   # 按 q 退出
        break
    elif key == ord(' '): # 按空格暂停
        cv2.waitKey(0)

    frame_idx += 1

cap.release()
if export_video:
    out.release()
cv2.destroyAllWindows()


视频总帧数: 46721


# 运动数据导入，检查并计算头部运动速度

## 数据读取和运动方向信息计算

### 运动数据读取

In [None]:
path = r'Y:\\SZX\\BehaviorData\\20250716_WBI\\midline_debug\\20250716_lg9624-1gNa-002-24d-ov_002'
f_path = [f for f in os.listdir(path) if '_mot_vid_mid' in f][0]
df_mot_mid = pd.read_csv(os.path.join(path, f_path))

### 分割前进后退

In [None]:
# 根据阈值分割前进后退
threshold = 120
df_mot_mid.loc[:,'forward'] = 0
df_mot_mid.loc[df_mot_mid["angle_m"]>=threshold,'forward'] = 1

# 分配颜色
df = df_mot_mid[['X', 'Y', 'forward']]
df.loc[:,'color'] = df['forward'].map({0:'grey',1:'red'})

### 求运动向量，头部方向，以及在头部方向的速度投影

In [None]:
def rotation_mat_2(theta_degrees, mov_vec):
    """
    旋转
    """
    theta_rad = -np.radians(theta_degrees)
    cos_theta = np.cos(theta_rad)
    sin_theta = np.sin(theta_rad)
    rot_mat =  np.array([
        [cos_theta, -sin_theta],
        [sin_theta,  cos_theta]
    ])
    return np.dot(rot_mat, mov_vec)
# 求向量投影
def project_vector_A_on_B(vector_A, vector_B):
    """
    返回C向量，方向与B相同，长度为A的投影
    """
    # 转换为numpy数组
    A = np.array(vector_A)
    B = np.array(vector_B)
    # 计算向量B的单位向量
    B_norm = np.linalg.norm(B)
    if B_norm == 0:
        print('B向量不能为零')
        return None
        # raise ValueError("向量B不能为零向量")
    B_unit = B / B_norm
    
    # 计算向量A在向量B上的投影长度
    projection_length = np.dot(A, B_unit)
    # 计算向量C = 投影长度 × 单位向量B
    vector_C = projection_length * B_unit
    return vector_C

In [None]:
# 运动方向向量
df_mot_mid.loc[:,'moving_vec'] = df_mot_mid.apply(lambda x: (x['x_velocity'], x['y_velocity']),axis = 1)

In [None]:
# 头部朝向向量
df_mot_mid.loc[:,'heading_vec'] = df_mot_mid.apply(lambda x: rotation_mat_2(x['angle_md'], x['moving_vec']), axis=1)

In [None]:
# 求投影头部朝向后的运动向量
df_mot_mid.loc[:,'proj_mov_vec'] = df_mot_mid.apply(lambda x: project_vector_A_on_B(x['moving_vec'], x['heading_vec']), axis=1)

## 标记转向

In [None]:
# 计算头部朝向角
arr_head_vec = np.vstack(df_mot_mid["heading_vec"].values)
df_mot_mid['heading_agl'] = np.arctan2(arr_head_vec[:,0],arr_head_vec[:,1])

#### 利用头部朝向在一段时间内的平均值

In [None]:
def delta_of_avg_vec(df, t, window=5, vec_col='heading_vec'):
    if (t-window <df['Time'].min()) or (t+window > df['Time'].max()):
        return np.nan
    else:
        # 求前窗口向量平均
        pre_mask = (df['Time']>=t-window) & (df['Time']<t)
        vec_pre = np.vstack(df.loc[pre_mask,vec_col].values)
        mean_vec_pre = np.nanmean(vec_pre, axis=0)

        # 后窗口向量平均
        # 求前窗口向量平均
        pos_mask = (df['Time']>=t) & (df['Time']<= t+window)
        vec_pos = np.vstack(df.loc[pos_mask,vec_col].values)
        mean_vec_pos = np.nanmean(vec_pos, axis=0)

        dot = np.dot(mean_vec_pre, mean_vec_pos)
        det = mean_vec_pre[0]*mean_vec_pos[1] - mean_vec_pre[1]*mean_vec_pos[0]  # 2D叉积
        angle = np.arctan2(det, dot)  # atan2自动考虑正负方向

        return angle

In [None]:
# 计算平均头部朝向角变化
df_mot_mid['heading_avg_delta'] = df_mot_mid['Time'].apply(
    lambda t: delta_of_avg_vec(df_mot_mid, t)
)

In [None]:
df_mot_mid['heading_avg_delta_abs']  = df_mot_mid['heading_avg_delta'].abs()
df_mot_mid['heading_avg_delta_deg']  = df_mot_mid['heading_avg_delta_abs']*180

In [None]:
# 计算平均运动方向角变化
df_mot_mid['moving_avg_delta'] = df_mot_mid['Time'].apply(
    lambda t: delta_of_avg_vec(df_mot_mid, t,window=2,vec_col='moving_vec' )
)

In [None]:
df_mot_mid['moving_avg_delta_abs']  = df_mot_mid['moving_avg_delta'].abs()
df_mot_mid['moving_avg_delta_deg']  = df_mot_mid['moving_avg_delta_abs']*180

In [None]:
step = 3000
for i in range(0,len(df_mot_mid), step):
    df_plot = df_mot_mid.iloc[i:i+step-1]
    # df_turn = df_plot[df_plot['heading_turn']==1]
    plt.figure
    # 先计算整个 df 的最小最大值，用于统一 colorbar
    vmin = df_mot_mid['heading_avg_delta'].min()
    vmax = df_mot_mid['heading_avg_delta'].max()
    norm = mcolors.Normalize(vmin=vmin, vmax=vmax)
    sc = plt.scatter(df_plot['X'], df_plot['Y'], s = 0.8, c=df_plot['heading_avg_delta'],
                cmap='bwr', norm=norm)
    # sc = plt.scatter(df_turn['X'], df_turn['Y'], s = 0.8, color='r')
    plt.colorbar(sc)
    plt.show()

#### 利用头部朝向角变化率分turn
效果不太好

In [None]:
# unwrap
heading = df_mot_mid['heading_agl'].fillna(method='ffill').fillna(0).values
df_mot_mid['heading_agl_unw'] = np.unwrap(heading)

In [None]:
# 滑动差分
window = 0.5 # 1s时间窗
df_mot_mid['heading_agl_vel'] = (
df_mot_mid['heading_agl_unw'].shift(-int(window/df_mot_mid['Time'].diff().mean()))
- df_mot_mid['heading_agl_unw'].shift(int(window/df_mot_mid['Time'].diff().mean()))
)/(2 * window)

In [None]:
df_mot_mid['heading_turn'] = 0
df_mot_mid.loc[(df_mot_mid['heading_agl_vel'].abs() >= 3),'heading_turn'] = 1

plt.plot(df_mot_mid['heading_agl_vel'], color='b')
plt.plot(df_mot_mid[df_mot_mid['heading_turn']==1]['heading_agl_vel'], color='r')
plt.show()

In [None]:
# step = 3000
# for i in range(0,len(df_mot_mid), step):
#     df_plot = df_mot_mid.iloc[i:i+step-1]
#     df_turn = df_plot[df_plot['heading_turn']==1]
#     plt.figure
#     # 先计算整个 df 的最小最大值，用于统一 colorbar
#     vmin = df_mot_mid['heading_agl_vel'].min()
#     vmax = df_mot_mid['heading_agl_vel'].max()
#     norm = mcolors.Normalize(vmin=-5, vmax=5)
#     sc = plt.scatter(df_plot['X'], df_plot['Y'], s = 0.8, c=df_plot['heading_agl_vel'],
#                 cmap='bwr', norm=norm)
#     sc = plt.scatter(df_turn['X'], df_turn['Y'], s = 0.8, color='r')
#     plt.colorbar(sc)
#     plt.show()

#### 利用运动方向变化区分

In [None]:
# 运动朝向角
# 计算头部朝向角
arr_head_vec = np.vstack(df_mot_mid["moving_vec"].values)
df_mot_mid['moving_agl'] = np.arctan2(arr_head_vec[:,0],arr_head_vec[:,1])

In [None]:
# unwrap
heading = df_mot_mid['moving_agl'].fillna(method='ffill').fillna(0).values
df_mot_mid['moving_agl_unw'] = np.unwrap(heading)
# 滑动差分
window = 1 # 1s时间窗
df_mot_mid['moving_agl_vel'] = (
df_mot_mid['moving_agl_unw'].shift(-int(window/df_mot_mid['Time'].diff().mean()))
- df_mot_mid['moving_agl_unw'].shift(int(window/df_mot_mid['Time'].diff().mean()))
)/(2 * window)

In [None]:
# 转换角度制且取绝对值
df_mot_mid['abs_mov_agl_vel'] = df_mot_mid['moving_agl_vel'].abs()*180

#### 轨迹验证头部朝向角度变化

In [None]:
def norm_vec(series):
    '''
    输入是包含向量的子df
    '''
    arr = np.vstack(series.values)  # 转成 N×2 数组
    norm = arr / np.linalg.norm(arr, axis=1, keepdims=True)
    return list(map(tuple, norm))   # 返回 [(x1,y1), (x2,y2), ...]

In [None]:
segment = 4000
n = 1000   # 每隔n帧取一帧显示箭头
frame_rate = 48  # 帧率大约48
arrow_scale = 15.0      # 箭头整体缩放倍数（越大箭头越短）
head_width = 0.02      # 箭头头宽
head_length = 0.02      # 箭头头长
alpha = 0.7
test_col = "abs_mov_agl_vel"
for i in range(0, len(df_mot_mid), segment):
    df_cut = df_mot_mid.iloc[i:i+segment-1,:]
    ser = np.arange(0, len(df_cut), n)
    df_vec = df_cut.iloc[ser,:].copy()
    color_vector_h  = '#800080'  # purple (128,0,128)
    color_vector_m  = '#FFA500'  # orange (255,165,0)
    color_vector_mh = '#D8BFD8'  # thistle (216,191,216)
    # 运动方向
    df_vec.loc[:,'norm_moving_vec'] = norm_vec(df_vec["moving_vec"])
    # 头部方向
    df_vec.loc[:,'norm_heading_vec'] = norm_vec(df_vec["heading_vec"])
    # 头部运动方向
    df_vec.loc[:,'norm_proj_mov_vec'] = norm_vec(df_vec["proj_mov_vec"])
    
    plt.figure()
    plt.scatter(df_cut.X, df_cut.Y, s=5, color = 'grey', alpha=0.1)
    sc = plt.scatter(df_cut.X, df_cut.Y, s=5, c=df_cut[test_col], cmap='bwr')
    # # 间隔1s打一个点
    # ser2 = np.arange(0, len(df_cut), frame_rate)
    # df_scatter = df_cut.iloc[ser2,:].copy()
    # plt.scatter(df_scatter.X, df_scatter.Y, s=10, color = 'r')
    for idx, row in df_vec.iterrows():
        x, y = row.X, row.Y
        
        # 运动方向箭头
        dx, dy = row.norm_moving_vec[0]/arrow_scale, row.norm_moving_vec[1]/arrow_scale
        plt.arrow(x, y, dx, dy, color=color_vector_m, head_width=head_width, head_length=head_length, alpha=0.7)
        
        # 头部方向箭头 
        dx, dy = row.norm_heading_vec[0]/arrow_scale, row.norm_heading_vec[1]/arrow_scale
        # dx, dy = row.norm_heading_vec
        plt.arrow(x, y, dx, dy, color=color_vector_h, head_width=head_width, head_length=head_length, alpha=0.7)
        
        # 头部运动方向箭头 (蓝)
        dx, dy = row.norm_proj_mov_vec[0]/arrow_scale, row.norm_proj_mov_vec[1]/arrow_scale
        # dx, dy = row.norm_proj_mov_vec/arrow_scale
        plt.arrow(x, y, dx, dy, color=color_vector_mh, head_width=head_width, head_length=head_length, alpha=0.7)
    
    plt.xlabel('X')
    plt.colorbar(sc)
    plt.ylabel('Y')
    plt.title(f'Trajectory segment {i}-{i+segment-1}')
    plt.axis('equal')
    plt.show()

## 运动轨迹视频验证

In [None]:
# 参数
segment_len = 2000  # 每段背景长度
n = 50  # 抽点画矢量
scale_factor = 100 # 矢量长度收缩比例
fps = 400
jump = 10 # 跳帧
delay = int(fps / fps)  # 毫秒

# 颜色映射
df_mot_mid.loc[:, 'color'] = df_mot_mid['forward'].map({0: (255, 0, 0), 1: (0, 0, 255)})  # BGR
total_frames = len(df_mot_mid)

# 图像大小
width, height = 800, 800
margin = 50

# 逐帧显示
for frame in range(total_frames):
    if frame % jump!= 0:
        continue
    seg_id = (frame // segment_len) * segment_len
    df_cut = df_mot_mid.iloc[seg_id:seg_id + segment_len, :]

    # 该段独立归一化
    # x_min_seg, x_max_seg = df_cut["X"].min(), df_cut["X"].max()
    # y_min_seg, y_max_seg = df_cut["Y"].min(), df_cut["Y"].max()
    x_min_seg, x_max_seg = df_cut["X"].dropna().min(), df_cut["X"].dropna().max()
    y_min_seg, y_max_seg = df_cut["Y"].dropna().min(), df_cut["Y"].dropna().max()


    def to_img_coords_seg(x, y):
        if np.isnan(x) or np.isnan(y):
            return np.nan, np.nan
        else:
            x_img = int((x - x_min_seg) / (x_max_seg - x_min_seg + 1e-8) * (width - 2*margin) + margin)
            y_img = int(height - ((y - y_min_seg) / (y_max_seg - y_min_seg + 1e-8) * (height - 2*margin) + margin))
            return x_img, y_img

    # 初始化白色背景
    img = np.ones((height, width, 3), dtype=np.uint8) * 255

    # 背景轨迹
    for _, row in df_cut.iterrows():
        x, y = to_img_coords_seg(row.X, row.Y)
        if np.isnan(x) or np.isnan(y):
            continue
        else:
            cv2.circle(img, (x, y), 3, (150, 150, 150), -1)

    # 背景速度向量（每 n 帧取一个）
    for idx in range(0, len(df_cut), n):
        row = df_cut.iloc[idx]
        x, y = to_img_coords_seg(row.X, row.Y)
        if np.isnan(x) or np.isnan(y):
            continue
        else:
            # 运动方向
            dm, dn = row.moving_vec[0], row.moving_vec[1]
            norm = np.sqrt(dm**2 + dn**2)
            if norm != 0:
                dm, dn = dm/norm, dn/norm
            if (~np.isnan(dm)) & (~np.isnan(dn)):
                end_m = int(x + dm * segment_len*2/scale_factor)
                end_n = int(y - dn * segment_len*2/scale_factor)
                cv2.arrowedLine(img, (x, y), (end_m, end_n), (0, 165, 255), 3, tipLength=0.15)

            # 头部方向
            dx, dy = row.heading_vec[0], row.heading_vec[1]
            norm = np.sqrt(dx**2 + dy**2)
            if norm != 0:
                dx, dy = dx/norm, dy/norm
            if (~np.isnan(dx)) & (~np.isnan(dy)):
                end_x = int(x + dx * segment_len*2/scale_factor)
                end_y = int(y - dy * segment_len*2/scale_factor)
                cv2.arrowedLine(img, (x, y), (end_x, end_y), row.color, 3, tipLength=0.3)
            # 头部运动方向
            # 头部方向
            dp, dq = row.proj_mov_vec[0], row.proj_mov_vec[1]
            norm = np.sqrt(dp**2 + dq**2)
            if norm != 0:
                dp, dq = dp/norm, dq/norm
            if (~np.isnan(dp)) & (~np.isnan(dq)):
                end_p = int(x + dp * segment_len/scale_factor)
                end_q = int(y - dq * segment_len/scale_factor)
                cv2.arrowedLine(img, (x, y), (end_p, end_q),(216, 191, 216), 3, tipLength=0.3)
        
    # 当前点
    row = df_mot_mid.iloc[frame]
    x, y = to_img_coords_seg(row.X, row.Y)
    if np.isnan(x) or np.isnan(y):
            continue
    else:
        if row.angle_m >= threshold:
            cv2.circle(img, (x, y), 20, (0, 0, 255), -1)
        else:
            cv2.circle(img, (x, y), 20, (255, 0, 0), -1)
    # 显示
    cv2.imshow("Worm Motion", img)
    key = cv2.waitKey(delay) & 0xFF
    if key == ord('q'):   # 按 q 退出
        break
    elif key == ord(' '): # 按空格暂停
        cv2.waitKey(0)

cv2.destroyAllWindows()


## 合并骨架分析和运动数据

### 合并运动数据，骨架信息和人工标注数据

#### 函数定义

In [None]:
# 求路径比例和前两条路径长度的df
def path_ratio(paths):
    ratios = {}
    len_dict = {}
    for key, value in paths.items():
        if len(value) >= 2:
            # 如果大于2，求前两个的比例
            len_ls = [len(path) for path in value]
            len_ls.sort(reverse=True)
            ratio = len_ls[0]/len_ls[1]
            lens = len_ls[:2]
        else:
            ratio = np.nan
            lens = []
        ratios[key] = ratio
        len_dict[key] = lens
    df_ratio = pd.Series(ratios, name='path_ratio').to_frame()
    df_len = pd.Series(len_dict, name='path_len').to_frame()
    df = df_ratio.join(df_len, how='left')

    return df


In [None]:
# 求1的开始和结束点
def get_turn_interval(df, col_name):
    # labels打印矩形
    labels = df[col_name].values
    n = len(labels)
    # 找出连续的 label==1 区间
    turn_intervals = []
    in_turn = False
    start = 0

    for i in range(n):
        if labels[i] == 1 and not in_turn:
            # turn开始
            start = i
            in_turn = True
        elif labels[i] != 1 and in_turn:
            # turn结束
            end = i - 1
            turn_intervals.append((start, end))
            in_turn = False

    # 如果最后一个点也是turn状态
    if in_turn:
        turn_intervals.append((start, n - 1))
    return turn_intervals

#### 合并数据

In [None]:
# 加入曲率角度数据到运动数据（切片）
df_mot_slice = df_mot_mid[['Time','X', 'Y']]
df_mot_slice = df_mot_slice.join(df_sum_dtheta, how='left')

In [None]:
# 生成路径比例，加入路径比例信息信息数据到运动数据
df = path_ratio(paths)
df_mot_slice = df_mot_slice.join(df, how='left')

In [None]:
circular = {key: value['circular'] for key, value in data.items()}
branching = {key: value['branching'] for key, value in data.items()}

In [None]:
df_cir_branching = pd.DataFrame.from_dict(
    {"circular": circular, "branching": branching},
    orient="index"
).T

In [None]:
# 加入原骨架分析信息到运动数据
df_mot_slice.drop(columns=['circular', 'branching'], inplace=True)
df_mot_slice = df_mot_slice.join(df_cir_branching, how='left')

In [None]:
# 初始化 turn 列
df_mot_slice['turn'] = 0  

# 使用位运算符 | 来组合条件
df_mot_slice.loc[
    (df_mot_slice['circular'] == 1) |
    (df_mot_slice['branching'] == 1) |
    (df_mot_slice['path_ratio'] <= 3) |
    (df_mot_slice['sum_dtheta'].abs() >= 220),
    'turn'
] = 1

#### 可视化人工标注和自动标注

In [None]:
df = df_mot_slice.copy()
# 综合
df['path_lim'] = df['path_len'].apply(
    lambda x: 1 if (isinstance(x, (list, tuple, np.ndarray)) and len(x) > 1 and x[1] >= 200) else 0
)

df.loc[:,'turn'] = 0
df.loc[
    (df['circular'] == 1) |
    (df['branching'] == 1) |
    ((df['path_ratio'] <= 1.5)&(df['path_lim'] == 1)) |
    (df['sum_dtheta'].abs() >= 220)
    ,
    'turn'
] = 1

In [None]:
# 定义结构元素（卷积核）大小
close_size = 100  # 你可以调整这个值，通常为3或5
open_size = 100
# 先进行闭操作（先膨胀后腐蚀）：填充小的空洞
closed = ndimage.binary_closing(df['turn'].values, structure=np.ones(close_size))
# 再进行开操作（先腐蚀后膨胀）：去除小的噪声点
opened = ndimage.binary_opening(closed, structure=np.ones(open_size))
# 将结果转换为整数并添加到 DataFrame
df['turn_pc'] = opened.astype(int)

In [None]:
turn_ints = get_turn_interval(df, 'turn')
# 开闭后的turn
turn_pc_ints = get_turn_interval(df, 'turn_pc')
# circular
cir_ints = get_turn_interval(df, 'circular')
# branching
bra_ints = get_turn_interval(df, 'branching')
# path_ratio
df['path_ratio_dis'] = np.where(df['path_ratio'] <= 1.5, 1, 0)
ratio_ints = get_turn_interval(df,'path_ratio_dis')
# sum_dtheta
df['sum_dtheta_dis'] = np.where(df['sum_dtheta'].abs() >= 220, 1, 0)
dtheta_ints = get_turn_interval(df, "sum_dtheta_dis")
# artificial labeling
label_ints = get_turn_interval(df, 'label')

# 绘图
fig,ax = plt.subplots(7,1,figsize=(20,10), sharex=True)

# 绘制人工打标记
for start, end in turn_ints:
    ax[0].axvspan(start, end, color='orange', alpha=1)  # alpha控制透明度
ax[0].set_title('automated labeling of turn')
for start, end in label_ints:
    ax[1].axvspan(start, end, color='red', alpha=1)  # alpha控制透明度
ax[1].set_title('artificial labeling of turn')
for start, end in cir_ints:
    ax[6].axvspan(start, end, color='orange', alpha=1)  # alpha控制透明度
ax[6].set_title('circular')
for start, end in bra_ints:
    ax[3].axvspan(start, end, color='orange', alpha=1)  # alpha控制透明度
ax[3].set_title('branching')
for start, end in ratio_ints:
    ax[4].axvspan(start, end, color='orange', alpha=1)  # alpha控制透明度
ax[4].set_title('path_ratio')
for start, end in dtheta_ints:
    ax[5].axvspan(start, end, color='orange', alpha=1)  # alpha控制透明度
ax[5].set_title('sum_dtheta')
desired_ticks = np.arange(0, len(df), 2000) # 生成一个数组 [10, 30, 50, 70, 90]
ax[6].set_xticks(desired_ticks)
for start, end in turn_pc_ints:
    ax[2].axvspan(start, end, color='orange', alpha=1)  # alpha控制透明度
ax[2].set_title(f'turn_open_closed. close:{close_size},open:{open_size}')

plt.show()

### 视频分析比较人工标注和自动标注

In [None]:
WHITE = (255, 255, 255)
YELLOW = (0, 255, 255)
CYAN = (255, 255, 0)
LIME_GREEN = (0, 255, 0)
PINK = (147, 20, 255)
ORANGE = (0, 165, 255)
RED = (0, 0, 255)
GOLD = (0, 215, 255)
colors = [(0,0,255), (0,255,0),(255,0,0)]
colors_2 = [(176, 106, 28),   # 钴蓝色
(51, 51, 204),    # 科学红
(0, 204, 102),    # 激光绿
(0, 255, 255),    # 荧光黄
(255, 0, 255),    # 洋红
(0, 77, 255)]

In [None]:
df = df
# 骨架信息
series_skeleton = pd.Series(paths)        # 原始骨架
long_skeleton = pd.Series(resampled)
# 读取视频
cap = cv2.VideoCapture(os.path.join(folder_path, "c1_onnx.mp4"))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
print("视频总帧数:", total_frames)

# 参数
check_inv = 5
output_path = os.path.join(folder_path, "c1_midline_compare.mp4")
start_frame = 15000
delay = 100
frame_idx_end = total_frames
step  = 5
rotation_threshold = 220
label_col = 'turn_pc' 
cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)

for frame_idx in range(start_frame, total_frames, step):
    # 视频定位当前帧
    cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)  # 定位到 frame_idx
    ret, frame = cap.read()
    if frame is None:
        continue
    if not ret:
        break
    
    # ================= 左图（原视频 + 自动标注（原始骨架） =================
    frame_left = frame.copy()
    cv2.putText(frame_left, f"Frame: {frame_idx}", (10, 500),
                cv2.FONT_HERSHEY_SIMPLEX, 0.75, (0, 0, 255), 2)

    try:
        idx = data_keys.index(frame_idx)
        row = df.iloc[frame_idx]
        points_raw = series_skeleton.iloc[idx]
    except:
        pass
        
    for i, points in enumerate(points_raw):
        # 轨迹长度
        len_i = len(points) # 轨迹长度
        cv2.putText(frame_left, f"skel{i}:{len_i}", (350, 30+(i*20)),
        cv2.FONT_HERSHEY_SIMPLEX, 0.75, colors[i % 3], 2)
        # 轨迹
        pts = normalize_points(points) if len(points) > 0 else []
        for (x, y) in pts:
            if np.isnan(x) or np.isnan(y):
                continue
            cv2.circle(frame_left, (int(y), int(x)), 5, colors[i % 3], -1)
    
    # 字符打印
    # 判断turn
    if row[label_col]:
        cv2.putText(frame_left, label_col, (10, 30),
                cv2.FONT_HERSHEY_SIMPLEX, 0.75, (0, 0, 255), 2)
    else:
        cv2.putText(frame_left, f"turn", (10, 30),
                cv2.FONT_HERSHEY_SIMPLEX, 0.75, (200, 200, 200), 2)
    # 环形或交叉
    if row.circular:
        cv2.putText(frame_left, f"circle", (10, 50),
                cv2.FONT_HERSHEY_SIMPLEX, 0.75, (0, 0, 255), 2)
    else:
        cv2.putText(frame_left, f"circle", (10, 50),
                cv2.FONT_HERSHEY_SIMPLEX, 0.75, (200, 200, 200), 2)
        
    if row.branching:
        cv2.putText(frame_left, f"branching", (80, 50),
                cv2.FONT_HERSHEY_SIMPLEX,0.75, (0, 0, 255), 2)
    else:
        cv2.putText(frame_left, f"branching", (80, 50),
                cv2.FONT_HERSHEY_SIMPLEX,0.75, (200, 200, 200), 2)
    if row.path_ratio<=3:
        cv2.putText(frame_left, f"ratio:{round(row.path_ratio,2)}", (10, 70),
                cv2.FONT_HERSHEY_SIMPLEX,0.75, (0, 0, 255), 2)
    else:
        cv2.putText(frame_left, f"ratio:{round(row.path_ratio,2)}", (10, 70),
                cv2.FONT_HERSHEY_SIMPLEX,0.75, (200, 200, 200), 2)
    if np.abs(row.sum_dtheta)>=rotation_threshold:
        cv2.putText(frame_left, f"rotation:{row.sum_dtheta}", (10, 90),
                cv2.FONT_HERSHEY_SIMPLEX,0.75, (0, 0, 255), 2)  
    else:    
        cv2.putText(frame_left, f"rotation:{row.sum_dtheta}", (10, 90),
                cv2.FONT_HERSHEY_SIMPLEX,0.75, (200, 200, 200), 2)  
        
    # ================= 右图（原视频 + 合并骨架） =================
    frame_right = frame.copy()

    # 打印最长骨架（只有一条）
    try:
        idx = data_keys.index(frame_idx)
        row = df.iloc[frame_idx]
        # 拆分骨架
        points_long = long_skeleton.iloc[idx]
        if points_long is None:
            points_long = []
    except:
        points_long = []

    # 轨迹
    pts = normalize_points(points_long) if len(points_long) > 0 else []
    for (x, y) in pts:
        if np.isnan(x) or np.isnan(y):
            continue
        if row.label == 1:
            cv2.circle(frame_right, (int(y), int(x)), 5, (0,0,255), -1)
        else: 
            cv2.circle(frame_right, (int(y), int(x)), 5, (200,200,200), -1)
    # 打印label标记
    if row.label == 1:
        cv2.putText(frame_right, f"label", (10, 30),
                cv2.FONT_HERSHEY_SIMPLEX,1, YELLOW, 2)
        
    # ================= 拼接左右视频 =================
    combined = np.hstack((frame_left, frame_right))

    cv2.imshow("Skeleton Compare", combined)

    key = cv2.waitKey(delay) & 0xFF
    if key == ord('p'):
        break
    if key == ord('q'):
        break
    elif key == ord(' '):
        cv2.waitKey(0)


cap.release()
cv2.destroyAllWindows()


In [None]:
colors

# 合并视频骨架可视化和运动轨迹验证分析

### 轨迹打印运动方向和平均头部朝向
### （根据阈值筛选turn并显示轨迹颜色）

In [None]:
# === 参数设置 ===
# check_inv = 10
fps = 50       # 提高导出视频帧率（原来 20，可以改 60 或更高）
segment_len = 3000
n = 150
scale_factor = 200
step = 20       # 每隔 100 帧取 1 帧
delay = 1      # cv2.imshow 的等待时间（毫秒，1 表示最快）
body_l  =200
start_frame = 13800
margin = 50
export_video = True
output_path = os.path.join(folder_path, "merged.mp4")
color_col = "heading_avg_delta_deg"    # 轨迹显色列，一般是角度值
traj_colormap = False    # 选择根据colormap显示轨迹还是根据前进后退
color_threshold = 350   # 打印高角速度点的阈值
traj_vec_visible = False   # 是否在轨迹上打印向量
agl_vec_col = 'moving_vec'   # 用来计算平均向量的列，moving_vec或heading_vec

def to_img_coords_seg(x, y):
        if np.isnan(x) or np.isnan(y):
            return np.nan, np.nan
        else:
            x_img = int((x - x_min_seg) / (x_max_seg - x_min_seg + 1e-8) * (width - 2*margin) + margin)
            y_img = int(height - ((y - y_min_seg) / (y_max_seg - y_min_seg + 1e-8) * (height - 2*margin) + margin))
            return x_img, y_img
        
# 运动点颜色映射
# 颜色映射
df_mot_mid.loc[:, 'color'] = df_mot_mid['forward'].map({0: (255, 0, 0), 1: (0, 0, 255)})  # BGR

# 轨迹点和向量颜色
red_bgr = (0, 0, 255)        # 红色
blue_bgr = (255, 0, 0)       # 蓝色
pink_bgr = (180, 105, 255)   # 粉色
light_blue_bgr = (255, 191, 128)  # 浅蓝色
# 轨迹颜色映射
# 建立一个 colormap（这里用从灰到红）
# cmap_traj = mpl.colormaps['bwr']   # 或者 'viridis', 'plasma' 等
cmap_traj = mpl.colors.LinearSegmentedColormap.from_list(
    "gray_red", ["lightgray", "red"]
)
norm_color = mcolors.Normalize(vmin=df_mot_mid[color_col].dropna().min(), vmax=df_mot_mid[color_col].dropna().max())
series_skeleton = pd.Series(paths)

# === 打开视频 ===
cap = cv2.VideoCapture(os.path.join(folder_path, "c1_onnx.mp4"))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
print("视频总帧数:", total_frames)

# === 准备 VideoWriter ===
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
canvas_size = 800  # 右边轨迹图大小

if export_video:
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width + canvas_size, max(height, canvas_size)))

# === 分段循环 ===
# 定位到起始帧
cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

for frame_idx in range(start_frame, total_frames, step):


    # 左侧: 视频帧 + 叠加骨架/矢量 (代码1的逻辑)
    # 视频定位当前帧
    cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)  # 定位到 frame_idx
    ret, frame = cap.read()
    if not ret:
        break
    
    # 找到帧对应的骨架索引
    try:
        idx = data_keys.index(frame_idx)   # 找到对应帧的列表索引
    except:
        print('不存在这一帧：frame',frame_idx)
        frame_idx += 1
        continue
    
    # 读取咽喉点坐标
    pos_phr_i = np.array(pos_phr[idx], dtype=float)
    # 读取两个向量和角速度
    vector_h_i= np.array(vector_h[idx], dtype=float)
    vector_m_i= np.array(vector_m[idx], dtype=float)
    vector_mh_i = np.array(vector_mh[idx], dtype=float)
    angle_m_i = angle_md[idx]

    # 字符打印
    # 帧数
    cv2.putText(frame, f"Frame: {frame_idx}", (10, 30),
                cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
    # 其它字符
    if angle_m_i:

        # 角度差打印
        frame_height, frame_width = frame.shape[:2]
        text = f"angle_md: {round(angle_m_i,2)}"
        text_size, _ = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 1, 2)
        if angle_m_i <= 100:
            color_m_t = (0,255,0)
        else:
            color_m_t = (0,0,255)
        cv2.putText(frame, text, (frame_width-10-text_size[0], text_size[1]+10), 
                    cv2.FONT_HERSHEY_SIMPLEX, 1, color_m_t, 2)
        
        # # 头部朝向变化角
        # head_delta = df_mot_mid['heading_avg_delta'].iloc[frame_idx]
        # text_head_delta = f'delta_heading:{round(head_delta,2)}'
        # cv2.putText(frame, text_head_delta, (10, frame_height-10), 
        #             cv2.FONT_HERSHEY_SIMPLEX, 1, color_vector_m, 2)
    
    # 向量打印
    # 向量颜色
    color_vector_h = (128, 0, 128)  # 紫色
    color_vector_m = (0, 165, 255)    # 明黄色
    color_vector_mh= (216, 191, 216) # 淡紫色
    if np.isnan(vector_m_i).any() or np.isnan(vector_h_i).any():
        print('无方向向量，不打印')
        pass
    else:
        norm_m = np.linalg.norm(vector_m_i)
        norm_mh = np.linalg.norm(vector_mh_i)
        norm_h = np.linalg.norm(vector_h_i)
        if norm_m == 0:
            pass
        else:
            # 向量归一化模长为1
            vector_h_i = vector_h_i/norm_h
            vector_m_i = vector_m_i/norm_m
            vector_mh_i = vector_mh_i/norm_mh
        # 计算起始结束点位置
        length_factor = body_l
        length_factor_m = 150
        length_factor_mh  = 150
        start_point_h = (int(pos_phr_i[0] - vector_h_i[0]*length_factor), int(pos_phr_i[1] - vector_h_i[1]*length_factor))
        end_point_h = (int(pos_phr_i[0] + vector_h_i[0]*length_factor), int(pos_phr_i[1] + vector_h_i[1]*length_factor))
        end_point2 = (int(pos_phr_i[0] + vector_m_i[0]*length_factor_m), int(pos_phr_i[1] + vector_m_i[1]*length_factor_m))
        end_point3 = (int(pos_phr_i[0] + vector_mh_i[0]*length_factor_mh), int(pos_phr_i[1] + vector_mh_i[1]*length_factor_mh))
        
        # 需要强制转换为int才能打印
        pos_phr_i = (int(pos_phr_i[0]), int(pos_phr_i[1]))
        end_point2 = (int(end_point2[0]), int(end_point2[1]))
        end_point3 = (int(end_point3[0]), int(end_point3[1]))
        start_point_h = (int(start_point_h[0]), int(start_point_h[1]))
        end_point_h = (int(end_point_h[0]), int(end_point_h[1]))

        cv2.arrowedLine(frame, pos_phr_i, end_point2, color_vector_m, 5, tipLength=0.1)
        cv2.arrowedLine(frame, pos_phr_i, end_point3, color_vector_mh, 5, tipLength=0.1)   # 旋转后的运动方向
        cv2.line(frame, start_point_h, end_point_h, color_vector_h, 5)
        cv2.circle(frame, pos_phr_i, 10, color_vector_h, thickness=-1)

    # 打印骨架
    if frame_idx < max(data_keys):
        try:
            idx = data_keys.index(frame_idx)   # 找到对应帧的列表索引
            points_raw = series_skeleton.iloc[idx]  # 原始骨架
        except:
            print('不存在骨架：frame',frame_idx)
            continue
        # 有多条骨架，分别画出
        colors = [(0,0,255), (0,255,0),(255,0,0)]
        i = 0
        for points in points_raw:
            midline_points = normalize_points(points)  # 返回 (N,2) int32
        # cv2.polylines(frame, [midline_points.reshape(-1,1,2)], isClosed=False, color=(0, 0, 255), thickness=2)
            for (x, y) in midline_points:
                cv2.circle(frame, (y, x), 3, colors[i%3], -1)
            i += 1

    # 右侧: 绘制轨迹背景 (代码2的逻辑)

    img = np.ones((canvas_size, canvas_size, 3), dtype=np.uint8) * 255
    seg_id = (frame_idx // segment_len) * segment_len
    df_cut = df_mot_mid.iloc[seg_id:seg_id + segment_len, :]

    # 该段独立归一化
    x_min_seg, x_max_seg = df_cut["X"].min(), df_cut["X"].max()
    y_min_seg, y_max_seg = df_cut["Y"].min(), df_cut["Y"].max()

    

    # 初始化白色背景
    img = np.ones((height, width, 3), dtype=np.uint8) * 255

    # 背景轨迹
    
    for _, row in df_cut.iterrows():
        x, y = to_img_coords_seg(row.X, row.Y)
        angle_m_i = row.angle_m
        color_val = row[color_col]

        if np.isnan(x) or np.isnan(y) or np.isnan(color_val):
            continue
        if traj_colormap:
            # 将数值映射到 [0,1]，再取 colormap
            rgba = cmap_traj(norm_color(color_val))       # 结果是 (r,g,b,a)，范围[0,1]
            bgr = tuple(int(255 * c) for c in rgba[:3][::-1])  # 转成OpenCV的BGR

            cv2.circle(img, (x, y), 3, bgr, -1)
        else:
            if color_val >= color_threshold:
                cv2.circle(img, (x, y), 3, (0,0,255), -1)
            else:
                if angle_m_i:
                    if angle_m_i <= 130:
                        cv2.circle(img, (x, y), 3, light_blue_bgr, -1)
                    else:
                        cv2.circle(img, (x, y), 3, pink_bgr, -1)
            # cv2.circle(img, (x, y), 3, (200,200,200), -1)

    # 背景速度向量（每 n 帧取一个）
    if traj_vec_visible:
        for idx in range(0, len(df_cut), n):
            row = df_cut.iloc[idx]
            if np.isnan(x) or np.isnan(y):
                continue
            else:
                x, y = to_img_coords_seg(row.X, row.Y)
                # 运动方向
                dm, dn = row.moving_vec[0], row.moving_vec[1]
                norm = np.sqrt(dm**2 + dn**2)
                if norm != 0:
                    dm, dn = dm/norm, dn/norm
                if (~np.isnan(dm)) & (~np.isnan(dn)):
                    end_m = int(x + dm * segment_len*2/scale_factor)
                    end_n = int(y - dn * segment_len*2/scale_factor)
                    cv2.arrowedLine(img, (x, y), (end_m, end_n), (0, 165, 255), 3, tipLength=0.15)

                # 打印前窗口和后窗口向量
                # 单纯的颜色值定义
                

                agl_window = 5 # 1s
                t = row.Time
                if (t-agl_window >= df_mot_mid.Time.min()) & (t+agl_window <= df_mot_mid.Time.max()):
                    # 求前窗口向量平均
                    pre_mask = (df_mot_mid['Time']>=t-agl_window) & (df_mot_mid['Time']<t)
                    vec_pre = np.vstack(df_mot_mid.loc[pre_mask,agl_vec_col].values)
                    mean_vec_pre = np.mean(vec_pre, axis=0)
                    dx, dy = mean_vec_pre[0], mean_vec_pre[1]
                    norm = np.sqrt(dx**2 + dy**2)
                    if norm != 0:
                        dx, dy = dx/norm, dy/norm
                    if (~np.isnan(dx)) & (~np.isnan(dy)):
                        end_x = int(x + dx * segment_len*2/scale_factor)
                        end_y = int(y - dy * segment_len*2/scale_factor)
                        cv2.arrowedLine(img, (x, y), (end_x, end_y), pink_bgr, 3, tipLength=0.3)
                    # 后窗口向量平均
                    # 求前窗口向量平均
                    pos_mask = (df_mot_mid['Time']>=t) & (df_mot_mid['Time']<= t+agl_window)
                    vec_pos = np.vstack(df_mot_mid.loc[pos_mask,agl_vec_col].values)
                    mean_vec_pos = np.mean(vec_pos, axis=0)
                    dp, dq = mean_vec_pos[0], mean_vec_pos[1]
                    norm = np.sqrt(dp**2 + dq**2)
                    if norm != 0:
                        dp, dq = dp/norm, dq/norm
                    if (~np.isnan(dp)) & (~np.isnan(dq)):
                        end_p = int(x + dp * segment_len/scale_factor)
                        end_q = int(y - dq * segment_len/scale_factor)
                        cv2.arrowedLine(img, (x, y), (end_p, end_q),light_blue_bgr, 3, tipLength=0.3)
                

    # 当前点
    row = df_mot_mid.iloc[frame_idx]
    x, y = to_img_coords_seg(row.X, row.Y)
    if np.isnan(x) or np.isnan(y):
            continue
    else:
        if row.angle_m >= threshold:
            cv2.circle(img, (x, y), 20, (0, 0, 255), -1)
        else:
            cv2.circle(img, (x, y), 20, (255, 0, 0), -1)

    # 给轨迹加上colorbar
    fig, ax = plt.subplots(figsize=(1, 7))  # 竖直 colorbar
    fig.subplots_adjust(left=0.5, right=0.9, top=0.7, bottom=0.05)
    cb = plt.colorbar(cm.ScalarMappable(norm=norm_color, cmap=cmap_traj),
                  cax=ax, orientation='vertical')
     # 刻度和标签移动到左侧
    cb.ax.yaxis.set_ticks_position("left")
    cb.ax.yaxis.set_label_position("left")
    cb.set_label(color_col, labelpad=0, fontsize=10)  # 文字与刻度条保持一定距离

    # 转换成 numpy 数组（RGB）
    fig.canvas.draw()
    colorbar_img = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)
    colorbar_img = colorbar_img.reshape(fig.canvas.get_width_height()[::-1] + (3,))
    plt.close(fig)
    # 调整大小匹配右侧轨迹图高度
    colorbar_img = cv2.cvtColor(colorbar_img, cv2.COLOR_RGB2BGR)     # <— 这一步修正色彩
    colorbar_img = cv2.resize(colorbar_img, (60, img.shape[0]))
    # 拼接到轨迹图右侧
    img = np.hstack((img, colorbar_img))

    # === 拼接左右图像 ===
    # 高度对齐
    if frame.shape[0] != img.shape[0]:
        img = cv2.resize(img, (canvas_size, frame.shape[0]))
    combined = np.hstack((frame, img))

    cv2.imshow("Merged", combined)
    if export_video:
        out.write(combined)

    key = cv2.waitKey(delay) & 0xFF
    if key == ord('q'):
        break
    elif key == ord(' '):
        cv2.waitKey(0)

cap.release()
if export_video:
    out.release()
cv2.destroyAllWindows()

### 轨迹打印运动方向和平均头部朝向

In [None]:
# === 参数设置 ===
# check_inv = 10
fps = 50       # 提高导出视频帧率（原来 20，可以改 60 或更高）
segment_len = 3000
n = 150
scale_factor = 200
step = 10       # 每隔 100 帧取 1 帧
delay = 1      # cv2.imshow 的等待时间（毫秒，1 表示最快）
body_l  =200
start_frame = 13800
margin = 50
export_video = True
output_path = os.path.join(folder_path, "merged.mp4")
color_col = "moving_avg_delta_deg"    # 轨迹显色列，一般是角度值
agl_vec_col = 'moving_vec'   # 用来计算平均向量的列，moving_vec或heading_vec
def to_img_coords_seg(x, y):
        if np.isnan(x) or np.isnan(y):
            return np.nan, np.nan
        else:
            x_img = int((x - x_min_seg) / (x_max_seg - x_min_seg + 1e-8) * (width - 2*margin) + margin)
            y_img = int(height - ((y - y_min_seg) / (y_max_seg - y_min_seg + 1e-8) * (height - 2*margin) + margin))
            return x_img, y_img
        
# 运动点颜色映射
# 颜色映射
df_mot_mid.loc[:, 'color'] = df_mot_mid['forward'].map({0: (255, 0, 0), 1: (0, 0, 255)})  # BGR

# 轨迹颜色映射
# 建立一个 colormap（这里用从灰到红）
# cmap = mpl.colormaps['bwr']   # 或者 'viridis', 'plasma' 等
cmap_traj = mpl.colors.LinearSegmentedColormap.from_list(
    "gray_red", ["lightgray", "red"]
)
norm_color = mcolors.Normalize(vmin=df_mot_mid[color_col].dropna().min(), vmax=df_mot_mid[color_col].dropna().max())
series_skeleton = pd.Series(paths)

# === 打开视频 ===
cap = cv2.VideoCapture(os.path.join(folder_path, "c1_onnx.mp4"))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
print("视频总帧数:", total_frames)

# === 准备 VideoWriter ===
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
canvas_size = 800  # 右边轨迹图大小

if export_video:
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width + canvas_size, max(height, canvas_size)))

# === 分段循环 ===
# 定位到起始帧
cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

for frame_idx in range(start_frame, total_frames, step):


    # 左侧: 视频帧 + 叠加骨架/矢量 (代码1的逻辑)
    # 视频定位当前帧
    cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)  # 定位到 frame_idx
    ret, frame = cap.read()
    if not ret:
        break
    
    # 找到帧对应的骨架索引
    try:
        idx = data_keys.index(frame_idx)   # 找到对应帧的列表索引
    except:
        print('不存在这一帧：frame',frame_idx)
        frame_idx += 1
        continue
    
    # 读取咽喉点坐标
    pos_phr_i = np.array(pos_phr[idx], dtype=float)
    # 读取两个向量和角速度
    vector_h_i= np.array(vector_h[idx], dtype=float)
    vector_m_i= np.array(vector_m[idx], dtype=float)
    vector_mh_i = np.array(vector_mh[idx], dtype=float)
    angle_m_i = angle_md[idx]

    # 字符打印
    # 帧数
    cv2.putText(frame, f"Frame: {frame_idx}", (10, 30),
                cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
    # 其它字符
    if angle_m_i:

        # 角度差打印
        frame_height, frame_width = frame.shape[:2]
        text = f"angle_md: {round(angle_m_i,2)}"
        text_size, _ = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 1, 2)
        if angle_m_i <= 100:
            color_m_t = (0,255,0)
        else:
            color_m_t = (0,0,255)
        cv2.putText(frame, text, (frame_width-10-text_size[0], text_size[1]+10), 
                    cv2.FONT_HERSHEY_SIMPLEX, 1, color_m_t, 2)
        
        # # 头部朝向变化角
        # head_delta = df_mot_mid['heading_avg_delta'].iloc[frame_idx]
        # text_head_delta = f'delta_heading:{round(head_delta,2)}'
        # cv2.putText(frame, text_head_delta, (10, frame_height-10), 
        #             cv2.FONT_HERSHEY_SIMPLEX, 1, color_vector_m, 2)
    
    # 向量打印
    # 向量颜色
    color_vector_h = (128, 0, 128)  # 紫色
    color_vector_m = (0, 165, 255)    # 明黄色
    color_vector_mh= (216, 191, 216) # 淡紫色
    if np.isnan(vector_m_i).any() or np.isnan(vector_h_i).any():
        print('无方向向量，不打印')
        pass
    else:
        norm_m = np.linalg.norm(vector_m_i)
        norm_mh = np.linalg.norm(vector_mh_i)
        norm_h = np.linalg.norm(vector_h_i)
        if norm_m == 0:
            pass
        else:
            # 向量归一化模长为1
            vector_h_i = vector_h_i/norm_h
            vector_m_i = vector_m_i/norm_m
            vector_mh_i = vector_mh_i/norm_mh
        # 计算起始结束点位置
        length_factor = body_l
        length_factor_m = 150
        length_factor_mh  = 150
        start_point_h = (int(pos_phr_i[0] - vector_h_i[0]*length_factor), int(pos_phr_i[1] - vector_h_i[1]*length_factor))
        end_point_h = (int(pos_phr_i[0] + vector_h_i[0]*length_factor), int(pos_phr_i[1] + vector_h_i[1]*length_factor))
        end_point2 = (int(pos_phr_i[0] + vector_m_i[0]*length_factor_m), int(pos_phr_i[1] + vector_m_i[1]*length_factor_m))
        end_point3 = (int(pos_phr_i[0] + vector_mh_i[0]*length_factor_mh), int(pos_phr_i[1] + vector_mh_i[1]*length_factor_mh))
        
        # 需要强制转换为int才能打印
        pos_phr_i = (int(pos_phr_i[0]), int(pos_phr_i[1]))
        end_point2 = (int(end_point2[0]), int(end_point2[1]))
        end_point3 = (int(end_point3[0]), int(end_point3[1]))
        start_point_h = (int(start_point_h[0]), int(start_point_h[1]))
        end_point_h = (int(end_point_h[0]), int(end_point_h[1]))

        cv2.arrowedLine(frame, pos_phr_i, end_point2, color_vector_m, 5, tipLength=0.1)
        cv2.arrowedLine(frame, pos_phr_i, end_point3, color_vector_mh, 5, tipLength=0.1)   # 旋转后的运动方向
        cv2.line(frame, start_point_h, end_point_h, color_vector_h, 5)
        cv2.circle(frame, pos_phr_i, 10, color_vector_h, thickness=-1)

    # 打印骨架
    if frame_idx < max(data_keys):
        try:
            idx = data_keys.index(frame_idx)   # 找到对应帧的列表索引
            points_raw = series_skeleton.iloc[idx]  # 原始骨架
        except:
            print('不存在骨架：frame',frame_idx)
            continue
        # 有多条骨架，分别画出
        colors = [(0,0,255), (0,255,0),(255,0,0)]
        i = 0
        for points in points_raw:
            midline_points = normalize_points(points)  # 返回 (N,2) int32
        # cv2.polylines(frame, [midline_points.reshape(-1,1,2)], isClosed=False, color=(0, 0, 255), thickness=2)
            for (x, y) in midline_points:
                cv2.circle(frame, (y, x), 3, colors[i%3], -1)
            i += 1

    # 右侧: 绘制轨迹背景 (代码2的逻辑)

    img = np.ones((canvas_size, canvas_size, 3), dtype=np.uint8) * 255
    seg_id = (frame_idx // segment_len) * segment_len
    df_cut = df_mot_mid.iloc[seg_id:seg_id + segment_len, :]

    # 该段独立归一化
    x_min_seg, x_max_seg = df_cut["X"].min(), df_cut["X"].max()
    y_min_seg, y_max_seg = df_cut["Y"].min(), df_cut["Y"].max()

    

    # 初始化白色背景
    img = np.ones((height, width, 3), dtype=np.uint8) * 255

    # 背景轨迹
    
    for _, row in df_cut.iterrows():
        x, y = to_img_coords_seg(row.X, row.Y)
        color_val = row[color_col]

        if np.isnan(x) or np.isnan(y) or np.isnan(color_val):
            continue
        # 将数值映射到 [0,1]，再取 colormap
        rgba = cmap_traj(norm_color(color_val))       # 结果是 (r,g,b,a)，范围[0,1]
        bgr = tuple(int(255 * c) for c in rgba[:3][::-1])  # 转成OpenCV的BGR

        cv2.circle(img, (x, y), 3, bgr, -1)

    # 背景速度向量（每 n 帧取一个）
    for idx in range(0, len(df_cut), n):
        row = df_cut.iloc[idx]
        if np.isnan(x) or np.isnan(y):
            continue
        else:
            x, y = to_img_coords_seg(row.X, row.Y)
            # 运动方向
            dm, dn = row.moving_vec[0], row.moving_vec[1]
            norm = np.sqrt(dm**2 + dn**2)
            if norm != 0:
                dm, dn = dm/norm, dn/norm
            if (~np.isnan(dm)) & (~np.isnan(dn)):
                end_m = int(x + dm * segment_len*2/scale_factor)
                end_n = int(y - dn * segment_len*2/scale_factor)
                cv2.arrowedLine(img, (x, y), (end_m, end_n), (0, 165, 255), 3, tipLength=0.15)

            # 打印前窗口和后窗口向量
            # 单纯的颜色值定义
            red_bgr = (0, 0, 255)        # 红色
            blue_bgr = (255, 0, 0)       # 蓝色
            pink_bgr = (180, 105, 255)   # 粉色
            light_blue_bgr = (255, 191, 128)  # 浅蓝色

            agl_window = 5 # 1s
            t = row.Time
            if (t-agl_window >= df_mot_mid.Time.min()) & (t+agl_window <= df_mot_mid.Time.max()):
                # 求前窗口向量平均
                pre_mask = (df_mot_mid['Time']>=t-agl_window) & (df_mot_mid['Time']<t)
                vec_pre = np.vstack(df_mot_mid.loc[pre_mask,agl_vec_col].values)
                mean_vec_pre = np.mean(vec_pre, axis=0)
                dx, dy = mean_vec_pre[0], mean_vec_pre[1]
                norm = np.sqrt(dx**2 + dy**2)
                if norm != 0:
                    dx, dy = dx/norm, dy/norm
                if (~np.isnan(dx)) & (~np.isnan(dy)):
                    end_x = int(x + dx * segment_len*2/scale_factor)
                    end_y = int(y - dy * segment_len*2/scale_factor)
                    cv2.arrowedLine(img, (x, y), (end_x, end_y), red_bgr, 3, tipLength=0.3)
                # 后窗口向量平均
                # 求前窗口向量平均
                pos_mask = (df_mot_mid['Time']>=t) & (df_mot_mid['Time']<= t+agl_window)
                vec_pos = np.vstack(df_mot_mid.loc[pos_mask,agl_vec_col].values)
                mean_vec_pos = np.mean(vec_pos, axis=0)
                dp, dq = mean_vec_pos[0], mean_vec_pos[1]
                norm = np.sqrt(dp**2 + dq**2)
                if norm != 0:
                    dp, dq = dp/norm, dq/norm
                if (~np.isnan(dp)) & (~np.isnan(dq)):
                    end_p = int(x + dp * segment_len/scale_factor)
                    end_q = int(y - dq * segment_len/scale_factor)
                    cv2.arrowedLine(img, (x, y), (end_p, end_q),blue_bgr, 3, tipLength=0.3)
            

    # 当前点
    row = df_mot_mid.iloc[frame_idx]
    x, y = to_img_coords_seg(row.X, row.Y)
    if np.isnan(x) or np.isnan(y):
            continue
    else:
        if row.angle_m >= threshold:
            cv2.circle(img, (x, y), 20, (0, 0, 255), -1)
        else:
            cv2.circle(img, (x, y), 20, (255, 0, 0), -1)

    # 给轨迹加上colorbar
    fig, ax = plt.subplots(figsize=(1, 7))  # 竖直 colorbar
    fig.subplots_adjust(left=0.5, right=0.9, top=0.7, bottom=0.05)
    cb = plt.colorbar(cm.ScalarMappable(norm=norm_color, cmap=cmap_traj),
                  cax=ax, orientation='vertical')
     # 刻度和标签移动到左侧
    cb.ax.yaxis.set_ticks_position("left")
    cb.ax.yaxis.set_label_position("left")
    cb.set_label(color_col, labelpad=0, fontsize=10)  # 文字与刻度条保持一定距离

    # 转换成 numpy 数组（RGB）
    fig.canvas.draw()
    colorbar_img = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)
    colorbar_img = colorbar_img.reshape(fig.canvas.get_width_height()[::-1] + (3,))
    plt.close(fig)
    # 调整大小匹配右侧轨迹图高度
    colorbar_img = cv2.cvtColor(colorbar_img, cv2.COLOR_RGB2BGR)     # <— 这一步修正色彩
    colorbar_img = cv2.resize(colorbar_img, (60, img.shape[0]))
    # 拼接到轨迹图右侧
    img = np.hstack((img, colorbar_img))

    # === 拼接左右图像 ===
    # 高度对齐
    if frame.shape[0] != img.shape[0]:
        img = cv2.resize(img, (canvas_size, frame.shape[0]))
    combined = np.hstack((frame, img))

    cv2.imshow("Merged", combined)
    if export_video:
        out.write(combined)

    key = cv2.waitKey(delay) & 0xFF
    if key == ord('q'):
        break
    elif key == ord(' '):
        cv2.waitKey(0)

cap.release()
if export_video:
    out.release()
cv2.destroyAllWindows()

### 轨迹打印运动方向和头部朝向

视频总帧数和没有切割过的df_mot_mid是相同的，因为df是根据视频帧的时间戳对齐的运动数据和中线数据，这两个文件的长度都比中线骨架数量要大
所以帧数可以直接作为提取csv的行数

In [None]:
# === 参数设置 ===
# check_inv = 10
fps = 100       # 提高导出视频帧率（原来 20，可以改 60 或更高）
segment_len = 3000
n = 300
scale_factor = 200
step = 25       # 每隔 100 帧取 1 帧
delay = 1      # cv2.imshow 的等待时间（毫秒，1 表示最快）
body_l  =200
start_frame = 0
margin = 50
export_video = True
output_path = os.path.join(folder_path, "merged.mp4")
color_col = "abs_mov_agl_vel"

def to_img_coords_seg(x, y):
        if np.isnan(x) or np.isnan(y):
            return np.nan, np.nan
        else:
            x_img = int((x - x_min_seg) / (x_max_seg - x_min_seg + 1e-8) * (width - 2*margin) + margin)
            y_img = int(height - ((y - y_min_seg) / (y_max_seg - y_min_seg + 1e-8) * (height - 2*margin) + margin))
            return x_img, y_img
        
# 运动点颜色映射
# 颜色映射
df_mot_mid.loc[:, 'color'] = df_mot_mid['forward'].map({0: (255, 0, 0), 1: (0, 0, 255)})  # BGR

# 轨迹颜色映射
# 建立一个 colormap（这里用从灰到红）
# cmap = mpl.colormaps['bwr']   # 或者 'viridis', 'plasma' 等
cmap = mpl.colors.LinearSegmentedColormap.from_list(
    "gray_red", ["lightgray", "red"]
)
norm_color = mcolors.Normalize(vmin=df_mot_mid[color_col].dropna().min(), vmax=df_mot_mid[color_col].dropna().max())
series_skeleton = pd.Series(paths)

# === 打开视频 ===
cap = cv2.VideoCapture(os.path.join(folder_path, "c1_onnx.mp4"))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
print("视频总帧数:", total_frames)

# === 准备 VideoWriter ===
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
canvas_size = 800  # 右边轨迹图大小

if export_video:
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width + canvas_size, max(height, canvas_size)))

# === 分段循环 ===
# 定位到起始帧
cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

for frame_idx in range(start_frame, total_frames, step):


    # 左侧: 视频帧 + 叠加骨架/矢量 (代码1的逻辑)
    # 视频定位当前帧
    cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)  # 定位到 frame_idx
    ret, frame = cap.read()
    if not ret:
        break
    
    # 找到帧对应的骨架索引
    try:
        idx = data_keys.index(frame_idx)   # 找到对应帧的列表索引
    except:
        print('不存在这一帧：frame',frame_idx)
        frame_idx += 1
        continue
    
    # 读取咽喉点坐标
    pos_phr_i = np.array(pos_phr[idx], dtype=float)
    # 读取两个向量和角速度
    vector_h_i= np.array(vector_h[idx], dtype=float)
    vector_m_i= np.array(vector_m[idx], dtype=float)
    vector_mh_i = np.array(vector_mh[idx], dtype=float)
    angle_m_i = angle_md[idx]

    # 字符打印
    # 帧数
    cv2.putText(frame, f"Frame: {frame_idx}", (10, 30),
                cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
    # 其它字符
    if angle_m_i:

        # 角度差打印
        frame_height, frame_width = frame.shape[:2]
        text = f"angle_md: {round(angle_m_i,2)}"
        text_size, _ = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 1, 2)
        if angle_m_i <= 100:
            color_m_t = (0,255,0)
        else:
            color_m_t = (0,0,255)
        cv2.putText(frame, text, (frame_width-10-text_size[0], text_size[1]+10), 
                    cv2.FONT_HERSHEY_SIMPLEX, 1, color_m_t, 2)
        
        # # 头部朝向变化角
        # head_delta = df_mot_mid['heading_avg_delta'].iloc[frame_idx]
        # text_head_delta = f'delta_heading:{round(head_delta,2)}'
        # cv2.putText(frame, text_head_delta, (10, frame_height-10), 
        #             cv2.FONT_HERSHEY_SIMPLEX, 1, color_vector_m, 2)
    
    # 向量打印
    # 向量颜色
    color_vector_h = (128, 0, 128)  # 紫色
    color_vector_m = (0, 165, 255)    # 明黄色
    color_vector_mh= (216, 191, 216) # 淡紫色
    if np.isnan(vector_m_i).any() or np.isnan(vector_h_i).any():
        print('无方向向量，不打印')
        pass
    else:
        norm_m = np.linalg.norm(vector_m_i)
        norm_mh = np.linalg.norm(vector_mh_i)
        norm_h = np.linalg.norm(vector_h_i)
        if norm_m == 0:
            pass
        else:
            # 向量归一化模长为1
            vector_h_i = vector_h_i/norm_h
            vector_m_i = vector_m_i/norm_m
            vector_mh_i = vector_mh_i/norm_mh
        # 计算起始结束点位置
        length_factor = body_l
        length_factor_m = 150
        length_factor_mh  = 150
        start_point_h = (int(pos_phr_i[0] - vector_h_i[0]*length_factor), int(pos_phr_i[1] - vector_h_i[1]*length_factor))
        end_point_h = (int(pos_phr_i[0] + vector_h_i[0]*length_factor), int(pos_phr_i[1] + vector_h_i[1]*length_factor))
        end_point2 = (int(pos_phr_i[0] + vector_m_i[0]*length_factor_m), int(pos_phr_i[1] + vector_m_i[1]*length_factor_m))
        end_point3 = (int(pos_phr_i[0] + vector_mh_i[0]*length_factor_mh), int(pos_phr_i[1] + vector_mh_i[1]*length_factor_mh))
        
        # 需要强制转换为int才能打印
        pos_phr_i = (int(pos_phr_i[0]), int(pos_phr_i[1]))
        end_point2 = (int(end_point2[0]), int(end_point2[1]))
        end_point3 = (int(end_point3[0]), int(end_point3[1]))
        start_point_h = (int(start_point_h[0]), int(start_point_h[1]))
        end_point_h = (int(end_point_h[0]), int(end_point_h[1]))

        cv2.arrowedLine(frame, pos_phr_i, end_point2, color_vector_m, 5, tipLength=0.1)
        cv2.arrowedLine(frame, pos_phr_i, end_point3, color_vector_mh, 5, tipLength=0.1)   # 旋转后的运动方向
        cv2.line(frame, start_point_h, end_point_h, color_vector_h, 5)
        cv2.circle(frame, pos_phr_i, 10, color_vector_h, thickness=-1)

    # 打印骨架
    if frame_idx < max(data_keys):
        try:
            idx = data_keys.index(frame_idx)   # 找到对应帧的列表索引
            points_raw = series_skeleton.iloc[idx]  # 原始骨架
        except:
            print('不存在骨架：frame',frame_idx)
            continue
        # 有多条骨架，分别画出
        colors = [(0,0,255), (0,255,0),(255,0,0)]
        i = 0
        for points in points_raw:
            midline_points = normalize_points(points)  # 返回 (N,2) int32
        # cv2.polylines(frame, [midline_points.reshape(-1,1,2)], isClosed=False, color=(0, 0, 255), thickness=2)
            for (x, y) in midline_points:
                cv2.circle(frame, (y, x), 3, colors[i%3], -1)
            i += 1

    # 右侧: 绘制轨迹背景 (代码2的逻辑)

    img = np.ones((canvas_size, canvas_size, 3), dtype=np.uint8) * 255
    seg_id = (frame_idx // segment_len) * segment_len
    df_cut = df_mot_mid.iloc[seg_id:seg_id + segment_len, :]

    # 该段独立归一化
    x_min_seg, x_max_seg = df_cut["X"].min(), df_cut["X"].max()
    y_min_seg, y_max_seg = df_cut["Y"].min(), df_cut["Y"].max()

    

    # 初始化白色背景
    img = np.ones((height, width, 3), dtype=np.uint8) * 255

    # 背景轨迹
    
    for _, row in df_cut.iterrows():
        x, y = to_img_coords_seg(row.X, row.Y)
        color_val = row[color_col]

        if np.isnan(x) or np.isnan(y) or np.isnan(color_val):
            continue
        # 将数值映射到 [0,1]，再取 colormap
        rgba = cmap(norm_color(color_val))       # 结果是 (r,g,b,a)，范围[0,1]
        bgr = tuple(int(255 * c) for c in rgba[:3][::-1])  # 转成OpenCV的BGR

        cv2.circle(img, (x, y), 3, bgr, -1)

    # 背景速度向量（每 n 帧取一个）
    for idx in range(0, len(df_cut), n):
        row = df_cut.iloc[idx]
        if np.isnan(x) or np.isnan(y):
            continue
        else:
            x, y = to_img_coords_seg(row.X, row.Y)
            # 运动方向
            dm, dn = row.moving_vec[0], row.moving_vec[1]
            norm = np.sqrt(dm**2 + dn**2)
            if norm != 0:
                dm, dn = dm/norm, dn/norm
            if (~np.isnan(dm)) & (~np.isnan(dn)):
                end_m = int(x + dm * segment_len*2/scale_factor)
                end_n = int(y - dn * segment_len*2/scale_factor)
                cv2.arrowedLine(img, (x, y), (end_m, end_n), (0, 165, 255), 3, tipLength=0.15)

            # 头部方向
            dx, dy = row.heading_vec[0], row.heading_vec[1]
            norm = np.sqrt(dx**2 + dy**2)
            if norm != 0:
                dx, dy = dx/norm, dy/norm
            if (~np.isnan(dx)) & (~np.isnan(dy)):
                end_x = int(x + dx * segment_len*2/scale_factor)
                end_y = int(y - dy * segment_len*2/scale_factor)
                cv2.arrowedLine(img, (x, y), (end_x, end_y), row.color, 3, tipLength=0.3)
            # 头部运动方向
            # 头部方向
            dp, dq = row.proj_mov_vec[0], row.proj_mov_vec[1]
            norm = np.sqrt(dp**2 + dq**2)
            if norm != 0:
                dp, dq = dp/norm, dq/norm
            if (~np.isnan(dp)) & (~np.isnan(dq)):
                end_p = int(x + dp * segment_len/scale_factor)
                end_q = int(y - dq * segment_len/scale_factor)
                cv2.arrowedLine(img, (x, y), (end_p, end_q),(216, 191, 216), 3, tipLength=0.3)
        

    # 当前点
    row = df_mot_mid.iloc[frame_idx]
    x, y = to_img_coords_seg(row.X, row.Y)
    if np.isnan(x) or np.isnan(y):
            continue
    else:
        if row.angle_m >= threshold:
            cv2.circle(img, (x, y), 20, (0, 0, 255), -1)
        else:
            cv2.circle(img, (x, y), 20, (255, 0, 0), -1)

    # === 拼接左右图像 ===
    # 高度对齐
    if frame.shape[0] != img.shape[0]:
        img = cv2.resize(img, (canvas_size, frame.shape[0]))
    combined = np.hstack((frame, img))

    cv2.imshow("Merged", combined)
    if export_video:
        out.write(combined)

    key = cv2.waitKey(delay) & 0xFF
    if key == ord('q'):
        break
    elif key == ord(' '):
        cv2.waitKey(0)

cap.release()
if export_video:
    out.release()
cv2.destroyAllWindows()

### 可视化环形和交叉骨架

In [None]:
# === 参数设置 ===
# check_inv = 10
fps = 50       # 提高导出视频帧率（原来 20，可以改 60 或更高）
segment_len = 3000
n = 150
scale_factor = 200
step = 20       # 每隔 100 帧取 1 帧
delay = 1      # cv2.imshow 的等待时间（毫秒，1 表示最快）
body_l  =200
start_frame = 31000
margin = 50
export_video = True
output_path = os.path.join(folder_path, "merged.mp4")

# 右侧轨迹打印参数
color_col = "heading_avg_delta_deg"    # 轨迹显色列，一般是角度值
traj_colormap = False    # 选择根据colormap显示轨迹还是根据前进后退
traj_omega = True     # 如果根据omega标注轨迹
color_threshold = 350   # 打印高角速度点的阈值
traj_vec_visible = False   # 是否在轨迹上打印向量
agl_vec_col = 'moving_vec'   # 用来计算平均向量的列，moving_vec或heading_vec

def to_img_coords_seg(x, y):
        if np.isnan(x) or np.isnan(y):
            return np.nan, np.nan
        else:
            x_img = int((x - x_min_seg) / (x_max_seg - x_min_seg + 1e-8) * (width - 2*margin) + margin)
            y_img = int(height - ((y - y_min_seg) / (y_max_seg - y_min_seg + 1e-8) * (height - 2*margin) + margin))
            return x_img, y_img
        
# 运动点颜色映射
# 颜色映射
df_mot_mid.loc[:, 'color'] = df_mot_mid['forward'].map({0: (255, 0, 0), 1: (0, 0, 255)})  # BGR

# 轨迹点和向量颜色
red_bgr = (0, 0, 255)        # 红色
blue_bgr = (255, 0, 0)       # 蓝色
pink_bgr = (180, 105, 255)   # 粉色
light_blue_bgr = (255, 191, 128)  # 浅蓝色
bright_orange_yellow = (0, 165, 255)
# 轨迹颜色映射
# 建立一个 colormap（这里用从灰到红）
# cmap_traj = mpl.colormaps['bwr']   # 或者 'viridis', 'plasma' 等
cmap_traj = mpl.colors.LinearSegmentedColormap.from_list(
    "gray_red", ["lightgray", "red"]
)
norm_color = mcolors.Normalize(vmin=df_mot_mid[color_col].dropna().min(), vmax=df_mot_mid[color_col].dropna().max())
series_skeleton = pd.Series(paths)

# === 打开视频 ===
cap = cv2.VideoCapture(os.path.join(folder_path, "c1_onnx.mp4"))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
print("视频总帧数:", total_frames)

# === 准备 VideoWriter ===
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
canvas_size = 800  # 右边轨迹图大小

if export_video:
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width + canvas_size, max(height, canvas_size)))

# === 分段循环 ===
# 定位到起始帧
cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

for frame_idx in range(start_frame, total_frames, step):


    # 左侧: 视频帧 + 叠加骨架/矢量 (代码1的逻辑)
    # 视频定位当前帧
    cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)  # 定位到 frame_idx
    ret, frame = cap.read()
    if not ret:
        break
    
    # 找到帧对应的骨架索引
    try:
        idx = data_keys.index(frame_idx)   # 找到对应帧的列表索引
    except:
        print('不存在这一帧：frame',frame_idx)
        frame_idx += 1
        continue
    # 读取距离咽喉点最近的骨架idx
    closest_idx_i = closest_idx[idx]
    # 读取咽喉点坐标
    pos_phr_i = np.array(pos_phr[idx], dtype=float)
    # 读取环形/交叉轨迹标记
    circular_i = circular[idx]
    branching_i = branching[idx]

    # 读取两个向量和角速度
    vector_h_i= np.array(vector_h[idx], dtype=float)
    vector_m_i= np.array(vector_m[idx], dtype=float)
    vector_mh_i = np.array(vector_mh[idx], dtype=float)
    angle_m_i = angle_md[idx]

    # 字符打印
    # 帧数打印
    cv2.putText(frame, f"Frame: {frame_idx}", (10, 30),
                cv2.FONT_HERSHEY_SIMPLEX, 0.75, (0, 0, 255), 2)
    # 环形或交叉
    if circular_i:
        cv2.putText(frame, f"circle", (10, 70),
                cv2.FONT_HERSHEY_SIMPLEX, 0.75, (0, 0, 255), 2)
    if branching_i:
        cv2.putText(frame, f"branching", (10, 50),
                cv2.FONT_HERSHEY_SIMPLEX,0.75, (0, 0, 255), 2)
    # 其它字符
    if angle_m_i:
        # 角度差打印
        frame_height, frame_width = frame.shape[:2]
        text = f"angle_md: {round(angle_m_i,2)}"
        text_size, _ = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 1, 2)
        if angle_m_i <= 100:
            color_m_t = (0,255,0)
        else:
            color_m_t = (0,0,255)
        cv2.putText(frame, text, (frame_width-10-text_size[0], text_size[1]+10), 
                    cv2.FONT_HERSHEY_SIMPLEX, 0.75, color_m_t, 2)
        
        # # 头部朝向变化角
        # head_delta = df_mot_mid['heading_avg_delta'].iloc[frame_idx]
        # text_head_delta = f'delta_heading:{round(head_delta,2)}'
        # cv2.putText(frame, text_head_delta, (10, frame_height-10), 
        #             cv2.FONT_HERSHEY_SIMPLEX, 1, color_vector_m, 2)
    
    # 向量打印
    # 向量颜色
    color_vector_h = (128, 0, 128)  # 紫色
    color_vector_m = (0, 165, 255)    # 明黄色
    color_vector_mh= (216, 191, 216) # 淡紫色
    if np.isnan(vector_m_i).any() or np.isnan(vector_h_i).any():
        print('无方向向量，不打印')
        pass
    else:
        norm_m = np.linalg.norm(vector_m_i)
        norm_mh = np.linalg.norm(vector_mh_i)
        norm_h = np.linalg.norm(vector_h_i)
        if norm_m == 0:
            pass
        else:
            # 向量归一化模长为1
            vector_h_i = vector_h_i/norm_h
            vector_m_i = vector_m_i/norm_m
            vector_mh_i = vector_mh_i/norm_mh
        # 计算起始结束点位置
        length_factor = body_l
        length_factor_m = 150
        length_factor_mh  = 150
        start_point_h = (int(pos_phr_i[0] - vector_h_i[0]*length_factor), int(pos_phr_i[1] - vector_h_i[1]*length_factor))
        end_point_h = (int(pos_phr_i[0] + vector_h_i[0]*length_factor), int(pos_phr_i[1] + vector_h_i[1]*length_factor))
        end_point2 = (int(pos_phr_i[0] + vector_m_i[0]*length_factor_m), int(pos_phr_i[1] + vector_m_i[1]*length_factor_m))
        end_point3 = (int(pos_phr_i[0] + vector_mh_i[0]*length_factor_mh), int(pos_phr_i[1] + vector_mh_i[1]*length_factor_mh))
        
        # 需要强制转换为int才能打印
        pos_phr_i = (int(pos_phr_i[0]), int(pos_phr_i[1]))
        end_point2 = (int(end_point2[0]), int(end_point2[1]))
        end_point3 = (int(end_point3[0]), int(end_point3[1]))
        start_point_h = (int(start_point_h[0]), int(start_point_h[1]))
        end_point_h = (int(end_point_h[0]), int(end_point_h[1]))

        cv2.arrowedLine(frame, pos_phr_i, end_point2, color_vector_m, 5, tipLength=0.1)
        cv2.arrowedLine(frame, pos_phr_i, end_point3, color_vector_mh, 5, tipLength=0.1)   # 旋转后的运动方向
        cv2.line(frame, start_point_h, end_point_h, color_vector_h, 5)
        cv2.circle(frame, pos_phr_i, 10, color_vector_h, thickness=-1)

    # 打印骨架
    if frame_idx < max(data_keys):
        try:
            idx = data_keys.index(frame_idx)   # 找到对应帧的列表索引
            points_raw = series_skeleton.iloc[idx]  # 原始骨架
        except:
            print('不存在骨架：frame',frame_idx)
            continue
        # 有多条骨架，分别画出
        colors = [(0,0,255), (0,255,0),(255,0,0)]
        i = 0
        for points in points_raw:
            midline_points = normalize_points(points)  # 返回 (N,2) int32
        # cv2.polylines(frame, [midline_points.reshape(-1,1,2)], isClosed=False, color=(0, 0, 255), thickness=2)
            for (x, y) in midline_points:
                cv2.circle(frame, (y, x), 3, colors[i%3], -1)
            i += 1

    # 右侧: 绘制轨迹背景 (代码2的逻辑)

    img = np.ones((canvas_size, canvas_size, 3), dtype=np.uint8) * 255
    seg_id = (frame_idx // segment_len) * segment_len
    df_cut = df_mot_mid.iloc[seg_id:seg_id + segment_len, :]

    # 该段独立归一化
    x_min_seg, x_max_seg = df_cut["X"].min(), df_cut["X"].max()
    y_min_seg, y_max_seg = df_cut["Y"].min(), df_cut["Y"].max()

    

    # 初始化白色背景
    img = np.ones((height, width, 3), dtype=np.uint8) * 255

    # 背景轨迹
    
    for _, row in df_cut.iterrows():
        x, y = to_img_coords_seg(row.X, row.Y)
        angle_m_i = row.angle_m
        color_val = row[color_col]

        if np.isnan(x) or np.isnan(y) or np.isnan(color_val):
            continue
        if traj_colormap:
            # 将数值映射到 [0,1]，再取 colormap
            rgba = cmap_traj(norm_color(color_val))       # 结果是 (r,g,b,a)，范围[0,1]
            bgr = tuple(int(255 * c) for c in rgba[:3][::-1])  # 转成OpenCV的BGR

            cv2.circle(img, (x, y), 3, bgr, -1)
        else:
            if color_val >= color_threshold:
                cv2.circle(img, (x, y), 3, (0,0,255), -1)
            else:
                if angle_m_i:
                    if angle_m_i <= 130:
                        cv2.circle(img, (x, y), 3, light_blue_bgr, -1)
                    else:
                        cv2.circle(img, (x, y), 3, pink_bgr, -1)
        if traj_omega:
            # 如果标记omega turn
            if row.omega_lab==1:
                cv2.circle(img, (x, y), 4, bright_orange_yellow, -1)

    # 背景速度向量（每 n 帧取一个）
    if traj_vec_visible:
        for idx in range(0, len(df_cut), n):
            row = df_cut.iloc[idx]
            if np.isnan(x) or np.isnan(y):
                continue
            else:
                x, y = to_img_coords_seg(row.X, row.Y)
                # 运动方向
                dm, dn = row.moving_vec[0], row.moving_vec[1]
                norm = np.sqrt(dm**2 + dn**2)
                if norm != 0:
                    dm, dn = dm/norm, dn/norm
                if (~np.isnan(dm)) & (~np.isnan(dn)):
                    end_m = int(x + dm * segment_len*2/scale_factor)
                    end_n = int(y - dn * segment_len*2/scale_factor)
                    cv2.arrowedLine(img, (x, y), (end_m, end_n), (0, 165, 255), 3, tipLength=0.15)

                # 打印前窗口和后窗口向量
                # 单纯的颜色值定义
                

                agl_window = 5 # 1s
                t = row.Time
                if (t-agl_window >= df_mot_mid.Time.min()) & (t+agl_window <= df_mot_mid.Time.max()):
                    # 求前窗口向量平均
                    pre_mask = (df_mot_mid['Time']>=t-agl_window) & (df_mot_mid['Time']<t)
                    vec_pre = np.vstack(df_mot_mid.loc[pre_mask,agl_vec_col].values)
                    mean_vec_pre = np.mean(vec_pre, axis=0)
                    dx, dy = mean_vec_pre[0], mean_vec_pre[1]
                    norm = np.sqrt(dx**2 + dy**2)
                    if norm != 0:
                        dx, dy = dx/norm, dy/norm
                    if (~np.isnan(dx)) & (~np.isnan(dy)):
                        end_x = int(x + dx * segment_len*2/scale_factor)
                        end_y = int(y - dy * segment_len*2/scale_factor)
                        cv2.arrowedLine(img, (x, y), (end_x, end_y), pink_bgr, 3, tipLength=0.3)
                    # 后窗口向量平均
                    # 求前窗口向量平均
                    pos_mask = (df_mot_mid['Time']>=t) & (df_mot_mid['Time']<= t+agl_window)
                    vec_pos = np.vstack(df_mot_mid.loc[pos_mask,agl_vec_col].values)
                    mean_vec_pos = np.mean(vec_pos, axis=0)
                    dp, dq = mean_vec_pos[0], mean_vec_pos[1]
                    norm = np.sqrt(dp**2 + dq**2)
                    if norm != 0:
                        dp, dq = dp/norm, dq/norm
                    if (~np.isnan(dp)) & (~np.isnan(dq)):
                        end_p = int(x + dp * segment_len/scale_factor)
                        end_q = int(y - dq * segment_len/scale_factor)
                        cv2.arrowedLine(img, (x, y), (end_p, end_q),light_blue_bgr, 3, tipLength=0.3)
                

    # 当前点
    row = df_mot_mid.iloc[frame_idx]
    x, y = to_img_coords_seg(row.X, row.Y)
    if np.isnan(x) or np.isnan(y):
            continue
    else:
        if row.angle_m >= threshold:
            cv2.circle(img, (x, y), 10, (0, 0, 255), -1)
        else:
            cv2.circle(img, (x, y), 10, (255, 0, 0), -1)

    # 给轨迹加上colorbar
    fig, ax = plt.subplots(figsize=(1, 7))  # 竖直 colorbar
    fig.subplots_adjust(left=0.5, right=0.9, top=0.7, bottom=0.05)
    cb = plt.colorbar(cm.ScalarMappable(norm=norm_color, cmap=cmap_traj),
                  cax=ax, orientation='vertical')
     # 刻度和标签移动到左侧
    cb.ax.yaxis.set_ticks_position("left")
    cb.ax.yaxis.set_label_position("left")
    cb.set_label(color_col, labelpad=0, fontsize=10)  # 文字与刻度条保持一定距离

    # 转换成 numpy 数组（RGB）
    fig.canvas.draw()
    colorbar_img = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)
    colorbar_img = colorbar_img.reshape(fig.canvas.get_width_height()[::-1] + (3,))
    plt.close(fig)
    # 调整大小匹配右侧轨迹图高度
    colorbar_img = cv2.cvtColor(colorbar_img, cv2.COLOR_RGB2BGR)     # <— 这一步修正色彩
    colorbar_img = cv2.resize(colorbar_img, (60, img.shape[0]))
    # 拼接到轨迹图右侧
    img = np.hstack((img, colorbar_img))

    # === 拼接左右图像 ===
    # 高度对齐
    if frame.shape[0] != img.shape[0]:
        img = cv2.resize(img, (canvas_size, frame.shape[0]))
    combined = np.hstack((frame, img))

    cv2.imshow("Merged", combined)
    if export_video:
        out.write(combined)

    key = cv2.waitKey(delay) & 0xFF
    if key == ord('q'):
        break
    elif key == ord(' '):
        cv2.waitKey(0)

cap.release()
if export_video:
    out.release()
cv2.destroyAllWindows()

# 读取视频人工标注omega turn

### 函数定义

In [None]:
def label_omega_turn(video_path, save_path="labels.npy", step=1, delay=30):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print("无法打开视频")
        return

    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    print(f"总帧数: {total_frames}")

    # 初始化标签序列
    labels = np.zeros(total_frames, dtype=int)

    frame_idx = 0
    omega_mode = False  # 是否处于 omega turn 标记状态

    while frame_idx < total_frames:
        cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
        ret, frame = cap.read()
        if not ret:
            break

        # 根据当前模式标记帧
        labels[frame_idx] = int(omega_mode)

        # 显示当前帧号和状态
        status_text = f"Frame: {frame_idx}/{total_frames} | Omega: {int(omega_mode)}"
        cv2.putText(frame, status_text, (10, 30),
                    cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)

        cv2.imshow("Video Labeling", frame)

        # 等待按键（delay ms），没有按键就继续播放
        key = cv2.waitKey(delay) & 0xFF

        if key == ord('q'):  # 退出
            break
        elif key == ord('o'):  # 切换 omega turn 状态
            omega_mode = not omega_mode
            print(f"Omega mode 切换为 {omega_mode}")
        elif key == ord(' '):  # 暂停/继续
            print("暂停，按空格继续...")
            while True:
                key2 = cv2.waitKey(0) & 0xFF
                if key2 == ord(' '):  # 再次空格继续
                    break
                elif key2 == ord('q'):
                    cap.release()
                    cv2.destroyAllWindows()
                    np.save(save_path, labels)
                    return labels
        print(frame_idx)
        # 前进 step 帧
        frame_idx += step

    cap.release()
    cv2.destroyAllWindows()

    # 保存结果
    np.save(save_path, labels)
    print(f"标注完成，结果已保存到 {save_path}")

    return labels

### 标注

In [None]:
path_folder = 'Y:\\SZX\\2025_wbi_analysis\\good_WBI\\no_binary\\20250115_4.5g-24d-ov_08'
video_name ='\\c1_new.mp4'
save_name = "\\omega_labels.npy"
labels = label_omega_turn(path_folder+video_name, step=10,delay=200,save_path=path_folder+save_name)

读取

In [None]:
path_dir = r'Y:\\SZX\\2025_wbi_analysis\\good_WBI\\20241219_4.5g-ov_06'
omega_file = '\\omega_labels.npy'
omega_np = np.load(path_dir+omega_file, allow_pickle=True)

In [None]:
path_dir = r'Y:\\SZX\\2025_wbi_analysis\\good_WBI\\no_binary\\20250115_4.5g-24d-ov_08'
omega_file = '\\omega_labels.npy'
omega_np = np.load(path_dir+omega_file, allow_pickle=True)
df_mot_slice['label'] = np.array(omega_np)

In [None]:
len(omega_np)

In [None]:
df_mot_slice['label'] = np.array(omega_np)

In [None]:
plt.plot(omega_np)