In [1]:



import cv2 as cv
from multiprocessing import Pool, cpu_count
from time import time
from stitching import Stitcher

# 封装图像拼接函数
def stitch_images_from_videos(video_paths, frame_index, settings, settings1):
    # 使用 settings 进行初步拼接
    stitcher = Stitcher(**settings)
    panoramas = []
    images = []
    for video_path in video_paths:
        cap = cv.VideoCapture(video_path)
        cap.set(cv.CAP_PROP_POS_FRAMES, frame_index)
        ret, frame = cap.read()
        if not ret:
            raise ValueError(f"Could not read frame {frame_index} from video {video_path}")
        images.append(frame)
        cap.release()
    
    # 进行初步拼接
    panorama0 = stitcher.stitch([images[0], images[1]])
    panorama1 = stitcher.stitch([images[2], images[3]])
    
    # 使用 settings1 进行最终拼接
    stitcher1 = Stitcher(**settings1)
    final_panorama = stitcher1.stitch([panorama0, panorama1])
    return final_panorama

# 多进程处理函数
def multiprocess_stitch(video_path_groups, frame_indices, settings, settings1, num_processes):
    with Pool(num_processes) as pool:
        args = [(group, frame_index, settings, settings1) for group, frame_index in zip(video_path_groups, frame_indices)]
        panoramas = pool.starmap(stitch_images_from_videos, args)
    return panoramas

# if __name__ == "__main__":
#     # 定义视频路径组
#     video_paths = ["../data/testolabc1.avi", "../data/testolabc2.avi", "../data/testolabc3.avi", "../data/testolabc4.avi"]
    
#     # 假设我们要处理前8帧，每帧组成一组
#     num_frames = 16
#     video_path_groups = [video_paths for _ in range(num_frames)]
#     frame_indices = list(range(num_frames))

#     # 设置参数
#     settings = {"detector": "orb", "range_width": 600, "confidence_threshold": 0.01, "print_time": False, "crop": False}
#     settings1 = {"detector": "sift", "range_width": 900, "confidence_threshold": 0.01, "print_time": False, "crop": False}

#     # 测试性能
#     num_processes = max(8, cpu_count())  # 使用最多8个进程

#     start_time = time()
#     panoramas = multiprocess_stitch(video_path_groups, frame_indices, settings, settings1, num_processes)
    
#     # 保存最终拼接图像
#     for idx, panorama in enumerate(panoramas):
#         cv.imwrite(f"./output/panoramav_{idx}.jpg", panorama)
    
#     end_time = time()

#     print(f"Total stitching time with {num_processes} processes: {end_time - start_time:.4f} seconds")
#     print(f"fps : {num_frames / (end_time-start_time):.3f} ")



In [2]:


from multiprocessing import Pool, TimeoutError
import signal

# 设置超时信号处理
def init_worker():
    signal.signal(signal.SIGINT, signal.SIG_IGN)

def multiprocess_stitch(video_path_groups, frame_indices, settings, settings1, num_processes):
    with Pool(num_processes, initializer=init_worker) as pool:
        try:
            args = [(group, frame_index, settings, settings1) for group, frame_index in zip(video_path_groups, frame_indices)]
            results = pool.starmap_async(stitch_images_from_videos, args)
            panoramas = results.get(timeout=600)  # 设置超时时间为600秒
            return panoramas
        except TimeoutError:
            print("Stitching process timed out.")
            pool.terminate()
            pool.join()
            return []
if __name__ == "__main__":
    # 定义视频路径组
    video_paths = ["../data/testolabc1.avi", "../data/testolabc2.avi", "../data/testolabc3.avi", "../data/testolabc4.avi"]
    
    # 假设我们要处理前16帧，每帧组成一组
    num_frames = 16
    video_path_groups = [video_paths for _ in range(num_frames)]
    frame_indices = list(range(num_frames))

    # 设置参数
    settings = {
        "detector": "orb", "range_width": 600, "confidence_threshold": 0.01, 
        "print_time": False, "crop": False, "use_momentum": True, "momentum_interval": 5
    }
    settings1 = {
        "detector": "sift", "range_width": 900, "confidence_threshold": 0.01, 
        "print_time": False, "crop": False, "use_momentum": True, "momentum_interval": 5
    }

    # 测试性能
    num_processes = min(8, cpu_count())  # 使用最多8个进程

    start_time = time.time()
    print(f"Starting stitching with {num_processes} processes")
    panoramas = multiprocess_stitch(video_path_groups, frame_indices, settings, settings1, num_processes)
    
    # 保存最终拼接图像
    for idx, panorama in enumerate(panoramas):
        cv.imwrite(f"./output/panoramav_{idx}.jpg", panorama)
        print(f"Saved panorama {idx}")
    
    end_time = time.time()

    print(f"Total stitching time with {num_processes} processes: {end_time - start_time:.4f} seconds")
    print(f"fps : {num_frames / (end_time - start_time):.3f}")


AttributeError: 'builtin_function_or_method' object has no attribute 'time'

In [1]:

import cv2 as cv
from multiprocessing import Pool, cpu_count, TimeoutError
from time import time
import signal
from stitching import Stitcher # type: ignore

# 封装图像拼接函数
def stitch_images_from_videos(video_paths, frame_index, settings, settings1):
    try:
        # 使用 settings 进行初步拼接
        stitcher = Stitcher(**settings)
        panoramas = []
        images = []
        for video_path in video_paths:
            cap = cv.VideoCapture(video_path)
            cap.set(cv.CAP_PROP_POS_FRAMES, frame_index)
            ret, frame = cap.read()
            if not ret:
                raise ValueError(f"Could not read frame {frame_index} from video {video_path}")
            images.append(frame)
            cap.release()

        # 进行初步拼接
        panorama0 = stitcher.stitch([images[0], images[1]])
        panorama1 = stitcher.stitch([images[2], images[3]])

        # 使用 settings1 进行最终拼接
        stitcher1 = Stitcher(**settings1)
        final_panorama = stitcher1.stitch([panorama0, panorama1])
        return final_panorama
    except Exception as e:
        print(f"Error during stitching frame {frame_index}: {e}")
        return None

# 设置超时信号处理
def init_worker():
    signal.signal(signal.SIGINT, signal.SIG_IGN)

# 多进程处理函数
def multiprocess_stitch(video_path_groups, frame_indices, settings, settings1, num_processes):
    with Pool(num_processes, initializer=init_worker) as pool:
        try:
            args = [(group, frame_index, settings, settings1) for group, frame_index in zip(video_path_groups, frame_indices)]
            results = pool.starmap_async(stitch_images_from_videos, args)
            panoramas = results.get(timeout=500)  # 设置超时时间为600秒
            return panoramas
        except TimeoutError:
            print("Stitching process timed out.")
            pool.terminate()
            pool.join()
            return []

if __name__ == "__main__":
    # 定义视频路径组
    video_paths = ["../data/testolabc1.avi", "../data/testolabc2.avi", "../data/testolabc3.avi", "../data/testolabc4.avi"]
    
    # 假设我们要处理前16帧，每帧组成一组
    num_frames = 256
    video_path_groups = [video_paths for _ in range(num_frames)]
    frame_indices = list(range(num_frames))

    # 设置参数
    settings = {
        "detector": "orb", "range_width": 600, "confidence_threshold": 0.01, 
        "print_time": True, "crop": False, "use_momentum": True, "momentum_interval": 10
    }
    settings1 = {
        "detector": "sift", "range_width": 900, "confidence_threshold": 0.01, 
        "print_time": False, "crop": False, "use_momentum": True, "momentum_interval": 10
    }

    # 测试性能
    num_processes = min(8, cpu_count())  # 使用最多8个进程

    start_time = time()
    print(f"Starting stitching with {num_processes} processes")
    panoramas = multiprocess_stitch(video_path_groups, frame_indices, settings, settings1, num_processes)
    
    # 保存最终拼接图像
    for idx, panorama in enumerate(panoramas):
        if panorama is not None:
            cv.imwrite(f"./output/panoramav_{idx}.jpg", panorama)
            print(f"Saved panorama {idx}")
    
    end_time = time()

    print(f"Total stitching time with {num_processes} processes: {end_time - start_time:.4f} seconds")
    print(f"fps : {num_frames / (end_time - start_time):.3f}")

Starting stitching with 8 processes
Stitching call count: 1
Stitching call count: 1Function resize_medium_resolution took 0.0003 seconds

Stitching call count: 1Function resize_medium_resolution took 0.0004 seconds

Function resize_medium_resolution took 0.0003 seconds
Stitching call count: 1
Stitching call count: 1Stitching call count: 1Function resize_medium_resolution took 0.0003 seconds


Function resize_medium_resolution took 0.0006 secondsFunction resize_medium_resolution took 0.0007 seconds

Stitching call count: 1Function find_features took 0.0223 secondsFunction find_features took 0.0173 seconds


Function find_features took 0.0215 seconds
Function resize_medium_resolution took 0.0002 seconds
Stitching call count: 1
Function resize_medium_resolution took 0.0003 secondsFunction find_features took 0.0194 seconds
Function find_features took 0.0142 seconds

Function find_features took 0.0173 seconds
Function find_features took 0.0187 seconds
Function match_features took 0.0215 sec

In [11]:







import os
import cv2

# 定义包含图像的文件夹
image_folder = './output'  # 更新为正确的路径

# 定义输出视频路径
video_path = 'output_video.mp4'  # 更新为所需的输出路径

# 获取图像列表
images = [img for img in os.listdir(image_folder) if img.endswith(".jpg")]
images.sort()  # 确保图像是排序的

# 找到最大尺寸
max_width, max_height = 0, 0
for image in images:
    img_path = os.path.join(image_folder, image)
    frame = cv2.imread(img_path)
    if frame is not None:
        height, width, layers = frame.shape
        if width > max_width:
            max_width = width
        if height > max_height:
            max_height = height

# 定义帧率
fps = 30  # 根据需要调整帧率

# 定义视频编解码器并创建 VideoWriter 对象
fourcc = cv2.VideoWriter_fourcc(*'XVID')  # 定义编解码器
video = cv2.VideoWriter(video_path, fourcc, fps, (max_width, max_height), True)  # 指定帧率和是否彩色

# 将图像写入视频
for image in images:
    img_path = os.path.join(image_folder, image)
    frame = cv2.imread(img_path)
    if frame is not None:
        resized_frame = cv2.resize(frame, (max_width, max_height))
        video.write(resized_frame)

# 释放视频写入对象
video.release()

print("视频创建成功。")







OpenCV: FFMPEG: tag 0x44495658/'XVID' is not supported with codec id 12 and format 'mp4 / MP4 (MPEG-4 Part 14)'
OpenCV: FFMPEG: fallback to use tag 0x7634706d/'mp4v'


视频创建成功。
