In [None]:
from typing import Iterator, Optional, Dict, List
from pathlib import Path
import hashlib
import random
import csv
import yaml
import ffmpeg
import utils


# ==============================
# 配置加载
# ==============================
def load_config(filename: str = "pipeline_config.yaml") -> dict:
    """
    从与当前脚本同目录的 pipeline_config.yaml 读取配置。
    使用绝对路径，避免从其他工作目录运行时找不到文件。
    """
    cfg_path = Path(__file__).resolve().parent / filename
    with open(cfg_path, "r", encoding="utf-8") as f:
        return yaml.safe_load(f)


# ==============================
# 工具函数
# ==============================
def get_video_time(video_path: str) -> float:
    """
    使用 ffmpeg.probe 获取视频总时长（秒）。
    注意 ffmpeg 返回的是字符串，这里转换为 float。
    异常时返回 0.0，便于上层跳过该视频。
    """
    try:
        duration_str = ffmpeg.probe(video_path)["format"]["duration"]
        return float(duration_str)
    except Exception as e:
        print(f"[warn] 获取时长失败：{video_path} | {e}")
        return 0.0


def get_video_list() -> List[str]:
    """
    按 CSV 顺序读取视频文件名（默认第一列）。
    CSV 路径由 utils.get_train_path_str() 决定（你现有的工具函数）。
    """
    names: List[str] = []
    with open(utils.get_train_path_str(), newline="", encoding="utf-8") as csvfile:
        reader = csv.reader(csvfile)
        for row in reader:
            if row and row[0]:
                names.append(row[0].strip())
    return names


def _stable_int_from_str(s: str, bits: int = 32) -> int:
    """
    基于 sha1 的稳定哈希，返回固定宽度的整数（默认 32bit）。
    用于把字符串（如文件名）映射为稳定的随机种子。
    """
    h = hashlib.sha1(s.encode("utf-8")).hexdigest()
    return int(h[: bits // 4], 16)


# ==============================
# 核心：稳定随机滑窗
# ==============================
def iter_stable_random_windows(
    total_sec: float,
    base_step_sec: float,
    jitter_sec: float,
    min_len_sec: float,
    max_len_sec: float,
    seed: Optional[int] = None,
    fps: Optional[int] = None,
) -> Iterator[Dict]:
    """
    生成 “稳定随机” 滑动窗口：
      - 起点按：start_{k+1} = start_k + base_step_sec + U[-jitter, +jitter]
      - 时长按：dur_k ~ U[min_len_sec, max_len_sec]
      - 若越界（尾部不足）：返回 valid_sec / pad_sec，供 mask 或 tpad 使用
      - 若提供 fps：返回帧级 mask（有效=1，padding=0）

    返回字段：
      {
        "index": idx,             # 第几个窗口（从0开始）
        "start_sec": start_time,  # 窗口起点（秒）
        "dur_sec": dur,           # 目标窗口时长（秒）
        "valid_sec": valid_sec,   # 有效（不越界）时长（秒）
        "pad_sec": pad_sec,       # 需要补齐的时长（秒）
        # 可选（传入 fps 时）：
        "target_frames": int,
        "valid_frames": int,
        "mask": List[int],        # 长度 = target_frames；有效=1，padding=0
      }
    """
    if total_sec <= 0:
        return
    if base_step_sec <= 0:
        raise ValueError("base_step_sec 必须 > 0")
    if min_len_sec <= 0 or max_len_sec <= 0 or max_len_sec < min_len_sec:
        raise ValueError("窗口长度范围非法")

    # 独立的随机引擎，确保不同视频/不同seed的可复现性
    rng = random.Random(seed)

    start_time = 0.0
    idx = 0
    min_advance = 1e-6  # 最小前进，防止极端情况下原地踏步
    # 抖动幅度上限，避免“反向走”或卡死
    jitter_cap = min(abs(jitter_sec), base_step_sec * 0.9)

    while start_time < total_sec:
        # 随机目标窗口时长
        dur = rng.uniform(min_len_sec, max_len_sec)

        # 有效时长 + 需要补齐的时长（若越界）
        valid_sec = max(0.0, min(dur, max(0.0, total_sec - start_time)))
        pad_sec = max(0.0, dur - valid_sec)

        item: Dict = {
            "index": idx,
            "start_sec": start_time,
            "dur_sec": dur,
            "valid_sec": valid_sec,
            "pad_sec": pad_sec,
        }

        # 帧级信息（用于训练 mask 或对齐）
        if fps and fps > 0:
            target_frames = max(1, int(round(dur * fps)))
            valid_frames = max(0, int(round(valid_sec * fps)))
            pad_frames = max(0, target_frames - valid_frames)
            mask = [1] * valid_frames + [0] * pad_frames
            item.update(
                {
                    "target_frames": target_frames,
                    "valid_frames": valid_frames,
                    "mask": mask,
                }
            )

        yield item

        # 计算下一窗口起点：基础步长 + 小范围抖动
        step_jitter = rng.uniform(-jitter_cap, jitter_cap)
        next_start = start_time + base_step_sec + step_jitter
        start_time = max(start_time + min_advance, next_start)
        idx += 1


# ==============================
# 片段导出（可选）
# ==============================
def export_clip(
    src_path: str,
    out_path: Path,
    start_sec: float,
    dur_sec: float,
    pad_sec: float,
    target_fps: Optional[int] = None,
) -> None:
    """
    用 ffmpeg 导出一个视频片段：
      - 从 start_sec 开始，导出 dur_sec 长度
      - 若 pad_sec > 0：用 tpad 在尾部克隆最后一帧补齐到目标长度
      - 若给定 target_fps：统一帧率，便于下游处理或对齐
      - 使用重编码（x264 + aac），保证切点和时长精确
    """
    stream = ffmpeg.input(src_path, ss=start_sec)
    if target_fps:
        stream = stream.filter("fps", fps=target_fps)
    if pad_sec > 0:
        stream = stream.filter("tpad", stop_mode="clone", stop_duration=pad_sec)

    out_path.parent.mkdir(parents=True, exist_ok=True)

    (
        ffmpeg.output(
            stream,
            str(out_path),
            t=dur_sec,
            vcodec="libx264",
            preset="veryfast",
            crf=23,
            acodec="aac",
            audio_bitrate="128k",
        )
        .overwrite_output()
        .run(quiet=True)
    )


# ==============================
# 主流程
# ==============================
def main():
    # ---- 1) 读取配置 ----
    config = load_config()
    BASE_STEP_SEC = float(config["base_step_sec"])
    JITTER_SEC = float(config["jitter_sec"])
    MIN_LEN_SEC = float(config["min_len_sec"])
    MAX_LEN_SEC = float(config["max_len_sec"])
    TARGET_FPS = int(config.get("target_fps", 0)) or None
    GLOBAL_SEED = int(config["global_seed"])

    # 输出目录：以仓库根（utils.get_repository_path()）+ 配置的 output_dir 构建
    OUTPUT_DIR: Path = utils.get_repository_path() / config["output_dir"]
    video_out_dir: Path = OUTPUT_DIR / "video"
    video_out_dir.mkdir(parents=True, exist_ok=True)

    # ---- 2) 读取视频清单 ----
    list_video = get_video_list()
    print(f"[info] 待处理视频数：{len(list_video)}")

    # ---- 3) 遍历视频 ----
    for video in list_video:
        # 你的工具函数：根据视频名拿到原始视频的绝对路径
        src_path = utils.get_origin_video_path_str(video)

        total = get_video_time(src_path)
        if total <= 0:
            print(f"[skip] 视频不可读或时长为0：{src_path}")
            continue

        # 每个视频一个稳定子种子：全局种子 XOR sha1(视频名)
        per_video_seed = (GLOBAL_SEED ^ _stable_int_from_str(video)) & 0xFFFFFFFF

        # ---- 4) 生成窗口并（可选）导出片段 ----
        for window in iter_stable_random_windows(
            total_sec=total,
            base_step_sec=BASE_STEP_SEC,
            jitter_sec=JITTER_SEC,
            min_len_sec=MIN_LEN_SEC,
            max_len_sec=MAX_LEN_SEC,
            seed=per_video_seed,
            fps=TARGET_FPS,
        ):
            idx = window["index"]
            start_sec = window["start_sec"]
            dur_sec = window["dur_sec"]
            valid_sec = window["valid_sec"]
            pad_sec = window["pad_sec"]
            mask = window.get("mask")  # 可能为 None

            # 这里可以直接把 (start_sec, dur_sec, mask) 喂给下游模型/处理流程
            print(
                f"[{video}] idx={idx:04d} start={start_sec:.3f}s dur={dur_sec:.3f}s "
                f"| valid={valid_sec:.3f}s pad={pad_sec:.3f}s "
                f"{'(frames='+str(len(mask))+')' if mask else ''}"
            )

            # 如需实际导出 MP4 片段，取消注释下方两行
            out_path = video_out_dir / f"{Path(video).stem}_idx{idx:04d}_s{start_sec:.2f}_t{dur_sec:.2f}.mp4"
            export_clip(
                src_path=src_path,
                out_path=out_path,
                start_sec=start_sec,
                dur_sec=dur_sec,
                pad_sec=pad_sec,
                target_fps=TARGET_FPS,
            )


if __name__ == "__main__":
    main()


In [1]:
import ffmpeg

video = r"D:\python_project\FYP\data\CMLRdataset\video\s1\20170121\section_1_000.80_002.91.mp4"  # 换成真实视频文件
info = ffmpeg.probe(video, cmd=r"D:\ffmpeg\bin\ffprobe.exe")
print("duration:", info["format"]["duration"])


duration: 2.040000
