In [3]:
import cv2
import logging
import numpy as np
from pathlib import Path
from itertools import combinations
from typing import List, Dict
from concurrent.futures import ThreadPoolExecutor, as_completed

In [None]:
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)


def load_videos_from_folder(folder: str) -> List[str]:
    """加载文件夹中的所有视频路径"""
    video_paths = [str(p) for p in Path(folder).glob("*.mp4")]
    return video_paths


def extract_sift_features_from_video(
    video_path: str, frame_indices: List[int] = [0, 60, 120]
) -> List:
    """在特定帧索引提取 SIFT 特征"""
    sift = cv2.SIFT.create()
    cap = cv2.VideoCapture(video_path)
    descriptors_list = []

    frame_count = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break

        if frame_count in frame_indices:
            image = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            mask = np.ones_like(image, dtype=np.uint8) * 255
            _, descriptors = sift.detectAndCompute(image, mask)
            if descriptors is not None:
                descriptors_list.append(descriptors)

        frame_count += 1

    cap.release()
    return descriptors_list


def precompute_sift_features(
    video_paths: List[str], frame_indices: List[int], max_workers: int = 4
) -> Dict[str, List]:
    """并行计算所有视频的 SIFT 特征"""
    features = {}
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {
            executor.submit(extract_sift_features_from_video, path, frame_indices): path
            for path in video_paths
        }
        for future in as_completed(futures):
            path = futures[future]
            try:
                features[path] = future.result()
                logging.info(f"SIFT features computed for {path}")
            except Exception as e:
                logging.error(f"Error computing SIFT features for {path}: {e}")

    return features


def compare_sift_features(descriptors1: List, descriptors2: List) -> int:
    """比较两段视频的 SIFT 特征"""
    bf = cv2.BFMatcher(cv2.NORM_L2, crossCheck=True)
    match_count = 0

    for desc1 in descriptors1:
        for desc2 in descriptors2:
            matches = bf.match(desc1, desc2)
            match_count += len(matches)

    return match_count


def analyze_video_pairs(
    precomputed_features: Dict[str, List], max_workers: int = 4
) -> List[Dict]:
    """并行分析视频对的特征相似性"""
    video_combinations = list(combinations(precomputed_features.keys(), 2))
    results = []

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {
            executor.submit(
                compare_sift_features,
                precomputed_features[v1],
                precomputed_features[v2],
            ): (v1, v2)
            for v1, v2 in video_combinations
        }

        for future in as_completed(futures):
            v1, v2 = futures[future]
            try:
                match_count = future.result()
                result = {"video_pair": (v1, v2), "sift_matches": match_count}
                results.append(result)
                logging.info(
                    f"Comparison done between {v1} and {v2}: SIFT Matches = {match_count}"
                )
            except Exception as e:
                logging.error(f"Error comparing video pair ({v1}, {v2}): {e}")

    return results


def analyze_videos(
    folder: str, frame_indices: List[int] = [0, 60, 120], max_workers: int = 4
):
    """总体流程：预计算特征并分析所有视频对"""
    video_paths = load_videos_from_folder(folder)[:10]  # 限制加载数量，便于调试
    precomputed_features = precompute_sift_features(
        video_paths, frame_indices, max_workers
    )
    return analyze_video_pairs(precomputed_features, max_workers)

In [None]:
folder = input("请输入包含视频的文件夹路径: ").strip()
max_workers = 4  # 设置并行线程数量
results = analyze_videos(folder, max_workers=max_workers)

for res in results:
    print(f"视频对: {res['video_pair']}")
    print(f"  SIFT匹配点数量: {res['sift_matches']}")