# Video Time Series Data Extraction

In [None]:
import cv2
import numpy as np
import csv
from collections import defaultdict

VIDEO_PATH = 0  # 0 表示摄像头；也可替换为 "video.mp4"
MAX_CORNERS = 1000
QUALITY_LEVEL = 0.01
MIN_DISTANCE = 12
BLOCK_SIZE = 3
USE_HARRIS = False
K_HARRIS = 0.04

WIN_SIZE = (21, 21)
MAX_LEVEL = 3
TERM_CRITERIA = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 30, 0.01)

REDETECT_INTERVAL = 10   # 每隔多少帧补点
FB_ERR_THRESH = 1.5      # 前后向一致性阈值（像素）
ERR_THRESH = 30.0        # LK 返回的误差阈值（经验）

DRAW_TRAJ_LEN = 30       # 仅可视化最近多少个点
OUTPUT_CSV = "tracks.csv"  # 轨迹导出

# ========== 工具函数 ==========
def detect_features(gray, mask=None, max_corners=MAX_CORNERS):
    pts = cv2.goodFeaturesToTrack(
        gray,
        maxCorners=max_corners,
        qualityLevel=QUALITY_LEVEL,
        minDistance=MIN_DISTANCE,
        blockSize=BLOCK_SIZE,
        useHarrisDetector=USE_HARRIS,
        k=K_HARRIS,
        mask=mask
    )
    if pts is None:
        return np.empty((0,1,2), dtype=np.float32)
    return np.float32(pts)

def forward_backward_check(prev_gray, curr_gray, p0):
    # 正向
    p1, st1, err1 = cv2.calcOpticalFlowPyrLK(
        prev_gray, curr_gray, p0, None,
        winSize=WIN_SIZE, maxLevel=MAX_LEVEL, criteria=TERM_CRITERIA
    )
    # 反向
    p0_back, st2, err2 = cv2.calcOpticalFlowPyrLK(
        curr_gray, prev_gray, p1, None,
        winSize=WIN_SIZE, maxLevel=MAX_LEVEL, criteria=TERM_CRITERIA
    )
    # 前后向误差
    fb_err = np.linalg.norm(p0 - p0_back, axis=2).reshape(-1)
    err1_ = err1.reshape(-1) if err1 is not None else np.full(len(p0), np.inf)
    st = (st1.reshape(-1) == 1) & (st2.reshape(-1) == 1)
    return p1, st, fb_err, err1_

def in_bounds(p, w, h):
    x, y = p[:,0], p[:,1]
    return (x >= 0) & (x < w) & (y >= 0) & (y < h)

# ========== 主流程 ==========
cap = cv2.VideoCapture(VIDEO_PATH)
ok, frame0 = cap.read()
if not ok:
    raise RuntimeError("无法读取视频/摄像头")

h, w = frame0.shape[:2]
prev_gray = cv2.cvtColor(frame0, cv2.COLOR_BGR2GRAY)

# 初始化角点与轨迹容器
p0 = detect_features(prev_gray, max_corners=MAX_CORNERS)
next_track_id = 0
# 为每个点分配一个track_id
active_ids = []
tracks = defaultdict(list)  # track_id -> [(t,x,y)]
colors = {}

def add_points(points, t):
    global next_track_id
    for pt in points.reshape(-1,2):
        tracks[next_track_id].append((t, float(pt[0]), float(pt[1])))
        active_ids.append(next_track_id)
        colors[next_track_id] = tuple(np.random.randint(0, 255, 3).tolist())
        next_track_id += 1

add_points(p0, t=0)
t = 0

# 用于可视化的轨迹缓存（只保留最近 DRAW_TRAJ_LEN 个）
vis_traj = defaultdict(list)  # track_id -> [(x,y), ...]

while True:
    ok, frame = cap.read()
    if not ok:
        break
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    t += 1

    if len(active_ids) > 0:
        # 组织上一帧的点坐标数组（与 active_ids 顺序一致）
        p0_arr = []
        for tid in active_ids:
            xlast, ylast = tracks[tid][-1][1], tracks[tid][-1][2]
            p0_arr.append([xlast, ylast])
        p0_arr = np.float32(p0_arr).reshape(-1,1,2)

        # LK + 前后向一致性
        p1, st, fb_err, err1_ = forward_backward_check(prev_gray, gray, p0_arr)

        # 质量筛选
        st = st & (fb_err < FB_ERR_THRESH) & (err1_ < ERR_THRESH)
        st = st & in_bounds(p1.reshape(-1,2), w, h)

        # 更新轨迹
        new_active_ids = []
        p1_flat = p1.reshape(-1,2)
        for keep, tid, new_pt in zip(st, active_ids, p1_flat):
            if keep:
                tracks[tid].append((t, float(new_pt[0]), float(new_pt[1])))
                new_active_ids.append(tid)
                # 更新可视化缓存
                vis_traj[tid].append((int(new_pt), int(new_pt[1])))
                if len(vis_traj[tid]) > DRAW_TRAJ_LEN:
                    vis_traj[tid] = vis_traj[tid][-DRAW_TRAJ_LEN:]
        active_ids = new_active_ids

    # 周期性补点（避免与已有点太近）
    if t % REDETECT_INTERVAL == 0:
        mask = np.full((h, w), 255, dtype=np.uint8)
        for tid in active_ids:
            x, y = tracks[tid][-1][1], tracks[tid][-1][2]
            cv2.circle(mask, (int(x), int(y)), MIN_DISTANCE, 0, -1)
        new_pts = detect_features(gray, mask=mask, max_corners=MAX_CORNERS // 2)
        if new_pts.shape > 0:
            add_points(new_pts, t)

    # 可视化
    vis = frame.copy()
    for tid in active_ids:
        c = colors[tid]
        pts_list = vis_traj[tid]
        for i in range(1, len(pts_list)):
            cv2.line(vis, pts_list[i-1], pts_list[i], c, 2)
        if len(pts_list) > 0:
            cv2.circle(vis, pts_list[-1], 3, c, -1)

    cv2.putText(vis, f"t={t} active_tracks={len(active_ids)}",
                (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0,255,0), 2)
    cv2.imshow("KLT Tracks", vis)
    key = cv2.waitKey(1) & 0xFF
    if key == 27 or key == ord('q'):
        break

    prev_gray = gray

cap.release()
cv2.destroyAllWindows()

# 导出轨迹到 CSV：列为 track_id, t, x, y
with open(OUTPUT_CSV, "w", newline="") as f:
    writer = csv.writer(f)
    writer.writerow(["track_id", "t", "x", "y"])
    for tid, seq in tracks.items():
        for (tt, xx, yy) in seq:
            writer.writerow([tid, tt, f"{xx:.3f}", f"{yy:.3f}"])

print(f"保存轨迹到 {OUTPUT_CSV}")
