导入环境

In [2]:
import cv2
import numpy as np
import os
import tqdm

计算帧差

In [4]:
def preprocess_change_video(video_path, target_frames=50, target_size=(100, 100)):
    """
    1. 均勻採樣 50 個位置
    2. 計算採樣點及其後一幀的差值 (frame_{t+1} - frame_t)
    3. 縮放與歸一化
    """
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"無法打開影片: {video_path}")
        return None

    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    # 均勻採樣索引計算 (需要取第 t 幀和第 t+1 幀，所以最後一幀索引上限為 total_frames - 2)
    if total_frames > target_frames:
        indices = np.linspace(0, total_frames - 2, target_frames, dtype=int)
    else:
        indices = np.arange(max(0, total_frames - 1))
        print(f"警告: {video_path} 幀數過短")

    diff_frames = []
    
    needed_frames = set(indices) | set(indices + 1)
    frames_dict = {}
    
    current_idx = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        if current_idx in needed_frames:
            # 論文要求 100x100
            frame = cv2.resize(frame, target_size)
            # 轉換為 RGB 順序
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            # 歸一化到 [0, 1] 作為基礎
            frames_dict[current_idx] = frame.astype(np.float32) / 255.0
        current_idx += 1
    cap.release()

    # 計算差值: frame_{t+1} - frame_t 
    for idx in indices:
        if idx in frames_dict and (idx + 1) in frames_dict:
            # 保留正負號信息
            diff = frames_dict[idx + 1] - frames_dict[idx]
            diff_frames.append(diff)
        else:
            # 如果數據缺失則填充零
            diff_frames.append(np.zeros((target_size[0], target_size[1], 3), dtype=np.float32))

    # 處理幀數不足的情況
    while len(diff_frames) < target_frames:
        diff_frames.append(diff_frames[-1] if diff_frames else np.zeros((target_size[0], target_size[1], 3)))

    # 返回維度: (50, 100, 100, 3)
    return np.array(diff_frames[:target_frames])

批量处理

In [5]:
def batch_convert_to_npy(input_dir, output_dir):
    """
    批量處理資料夾內的所有原始影片，計算變化檢測特徵並保存為 npy
    """
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    video_list = [f for f in os.listdir(input_dir) if f.endswith(('.avi', '.mp4'))]

    for video_name in tqdm.tqdm(video_list, desc="處理 Change Pipeline"):
        video_path = os.path.join(input_dir, video_name)
        processed_data = preprocess_change_video(video_path)

        if processed_data is not None:
            # 保存名稱與 pose 文件保持一致，方便 Dataset 同步讀取
            save_name = os.path.splitext(video_name)[0] + ".npy"
            np.save(os.path.join(output_dir, save_name), processed_data)

主程序

In [7]:
if __name__ == "__main__":
    SOURCE_RGB_DIR = "../data/RWF2000/fights_gamma" 
    OUTPUT_NPY_DIR = "../data/RWF2000/fights_change_npy"

    batch_convert_to_npy(SOURCE_RGB_DIR, OUTPUT_NPY_DIR)

處理 Change Pipeline: 100%|██████████| 1000/1000 [02:46<00:00,  5.99it/s]
