Skip to content

Commit

Permalink
update: Video2VideoProcessor의 async run*
Browse files Browse the repository at this point in the history
- multi-threading 방식으로 수정
  • Loading branch information
404Vector committed Nov 28, 2023
1 parent d405639 commit a726177
Showing 1 changed file with 79 additions and 98 deletions.
177 changes: 79 additions & 98 deletions v2v/_video2video_processor_.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
from asyncio import Queue, Task
from asyncio import Task
from queue import Queue
import functools
import os
from typing import Generator, Optional, Union
Expand All @@ -18,88 +19,61 @@
)


def _create_video2image_task(v2i_proc: Video2ImageProcessor, queue: Queue) -> Task:
async def __job(v2i_proc: Video2ImageProcessor, queue: Queue):
while True:
frame_data = await v2i_proc()
await queue.put(frame_data)
if frame_data.frame is None:
break

task = asyncio.create_task(__job(v2i_proc=v2i_proc, queue=queue))
return task


def _create_video2audio_task(v2a_proc: AudioExtractor) -> Task:
task = asyncio.create_task(asyncio.to_thread(v2a_proc.run))
return task


def _create_image2video_task(i2v_proc: Image2VideoProcessor, queue: Queue) -> Task:
async def __job(i2v_proc: Image2VideoProcessor, queue: Queue):
while True:
frame_data: FrameData = await queue.get()
await i2v_proc(frame_data=frame_data)
if frame_data.frame is None:
break

task = asyncio.create_task(__job(i2v_proc=i2v_proc, queue=queue))
return task
def job_video2image(v2i_proc: Video2ImageProcessor, enqueue: Queue):
while True:
frame_data = v2i_proc()
if frame_data is None:
break
enqueue.put(frame_data)
enqueue.put(None) # end of work


def _create_videoaudio2video_task(va2v_proc: AudioMerger) -> Task:
task = asyncio.create_task(asyncio.to_thread(va2v_proc.run))
return task
def job_video2audio(v2a_proc: AudioExtractor):
v2a_proc.run()


def _create_i2i_processing_task(
def job_image2image(
i2i_pool: IFrameProcessorPool,
input_queue: Queue,
i2i_queue: Queue,
output_queue: Queue,
) -> Task:
async def __job_enqeue(
i2i_pool: IFrameProcessorPool,
input_queue: Queue,
task_queue: Queue,
):
while True:
frame_data: FrameData = await input_queue.get()
task_queue_item = None
if frame_data.frame is not None:
task_queue_item = asyncio.create_task(i2i_pool(frame_data=frame_data))
else:
task_queue_item = asyncio.to_thread(lambda x: x, frame_data)
await task_queue.put(task_queue_item)
if frame_data.frame is None:
dequeue: Queue,
enqueue: Queue,
):
while True:
frame_data: FrameData = dequeue.get()
if frame_data is None:
dequeue.put(None)
break
result = asyncio.run(i2i_pool(frame_data=frame_data))
enqueue.put(result)
enqueue.put(None) # end of work


def job_image2video(
i2v_proc: Image2VideoProcessor,
i2i_pool: IFrameProcessorPool,
dequeue: Queue,
):
temp_state = {}
progress = 0
n_worker = len(i2i_pool)
while True:
frame_data: FrameData = dequeue.get()
if frame_data == None:
n_worker -= 1
if n_worker == 0:
break
else:
continue
key = frame_data.frame_id
temp_state[key] = frame_data
while progress in temp_state:
target_item = temp_state.pop(progress)
i2v_proc(frame_data=target_item)
progress += 1
i2v_proc(None) # end of work

async def __job_dequeue(
task_queue: Queue,
output_queue: Queue,
):
while True:
task_queue_item = await task_queue.get()
frame_data: FrameData = await task_queue_item
assert type(frame_data) is FrameData
await output_queue.put(frame_data)
if frame_data.frame is None:
break

task_enqueue = asyncio.create_task(
__job_enqeue(
i2i_pool=i2i_pool,
input_queue=input_queue,
task_queue=i2i_queue,
)
)
task_dequeue = asyncio.create_task(
__job_dequeue(
task_queue=i2i_queue,
output_queue=output_queue,
)
)
return asyncio.gather(task_enqueue, task_dequeue)
def job_merge_video_and_audio(va2v_proc: AudioMerger):
va2v_proc.run()


class Video2VideoProcessor:
Expand Down Expand Up @@ -171,32 +145,39 @@ async def _async_run(self):
image2image_pool = self._frame_processor_pool
videoaudio2video_proc = self._va2vp
v2i_queue = Queue(self.v2i_queue_size)
i2i_queue = Queue(len(image2image_pool) * 2)
i2v_queue = Queue(self.i2v_queue_size)
video2image_task = _create_video2image_task(
v2i_proc=video2image_proc,
queue=v2i_queue,
)
video2audio_task = _create_video2audio_task(v2a_proc=video2audio_proc)
image2image_task = _create_i2i_processing_task(
i2i_pool=image2image_pool,
input_queue=v2i_queue,
i2i_queue=i2i_queue,
output_queue=i2v_queue,
)
image2video_task = _create_image2video_task(
i2v_proc=image2video_proc, queue=i2v_queue
)
i2i_queue = Queue(len(image2image_pool))
await asyncio.gather(
video2image_task,
video2audio_task,
image2image_task,
image2video_task,
asyncio.to_thread(
job_video2image,
v2i_proc=video2image_proc,
enqueue=v2i_queue,
),
*[
asyncio.to_thread(
job_image2image,
i2i_pool=image2image_pool,
dequeue=v2i_queue,
enqueue=i2i_queue,
)
for _ in range(len(image2image_pool))
],
asyncio.to_thread(
job_image2video,
i2v_proc=image2video_proc,
i2i_pool=image2image_pool,
dequeue=i2i_queue,
),
asyncio.to_thread(
job_video2audio,
v2a_proc=video2audio_proc,
),
)
videoaudio2video_task = _create_videoaudio2video_task(
va2v_proc=videoaudio2video_proc

await asyncio.to_thread(
job_merge_video_and_audio,
va2v_proc=videoaudio2video_proc,
)
await videoaudio2video_task

os.remove(video2audio_proc.dst_audio_path)
os.remove(image2video_proc.dst_video_path)
assert not os.path.exists(video2audio_proc.dst_audio_path)
Expand Down

0 comments on commit a726177

Please sign in to comment.