Skip to content

Commit

Permalink
update: Video2VideoProcessor의 i2i processing 개선*
Browse files Browse the repository at this point in the history
- 중간 과정에 task queue를 추가하여 한번에 여러 작업을 예약할 수 있게 변경
  • Loading branch information
404Vector committed Nov 22, 2023
1 parent b82c528 commit 54ff9f4
Showing 1 changed file with 29 additions and 6 deletions.
35 changes: 29 additions & 6 deletions v2v/_video2video_processor_.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,30 +55,51 @@ def _create_videoaudio2video_task(va2v_proc: AudioMerger) -> Task:
def _create_i2i_processing_task(
i2i_pool: IFrameProcessorPool,
input_queue: Queue,
i2i_queue: Queue,
output_queue: Queue,
) -> Task:
async def __job(
async def __job_enqeue(
i2i_pool: IFrameProcessorPool,
input_queue: Queue,
output_queue: Queue,
task_queue: Queue,
):
while True:
frame_data: FrameData = await input_queue.get()
task_queue_item = None
if frame_data.frame is not None:
frame_data = await i2i_pool(frame_data=frame_data)
task_queue_item = asyncio.create_task(i2i_pool(frame_data=frame_data))
else:
task_queue_item = asyncio.to_thread(lambda x: x, frame_data)
await task_queue.put(task_queue_item)
if frame_data.frame is None:
break

async def __job_dequeue(
task_queue: Queue,
output_queue: Queue,
):
while True:
task_queue_item = await task_queue.get()
frame_data: FrameData = await task_queue_item
assert type(frame_data) is FrameData
await output_queue.put(frame_data)
if frame_data.frame is None:
break

task = asyncio.create_task(
__job(
task_enqueue = asyncio.create_task(
__job_enqeue(
i2i_pool=i2i_pool,
input_queue=input_queue,
task_queue=i2i_queue,
)
)
task_dequeue = asyncio.create_task(
__job_dequeue(
task_queue=i2i_queue,
output_queue=output_queue,
)
)
return task
return asyncio.gather(task_enqueue, task_dequeue)


class Video2VideoProcessor:
Expand Down Expand Up @@ -150,6 +171,7 @@ async def _async_run(self):
image2image_pool = self._frame_processor_pool
videoaudio2video_proc = self._va2vp
v2i_queue = Queue(self.v2i_queue_size)
i2i_queue = Queue(len(image2image_pool) * 2)
i2v_queue = Queue(self.i2v_queue_size)
video2image_task = _create_video2image_task(
v2i_proc=video2image_proc,
Expand All @@ -159,6 +181,7 @@ async def _async_run(self):
image2image_task = _create_i2i_processing_task(
i2i_pool=image2image_pool,
input_queue=v2i_queue,
i2i_queue=i2i_queue,
output_queue=i2v_queue,
)
image2video_task = _create_image2video_task(
Expand Down

0 comments on commit 54ff9f4

Please sign in to comment.