# Load Video

In [10]:
HEIGHT = 360
WIDTH = 640
FRAMERATE = 24
INPUT_PATH = "/Users/mihir/Downloads/in.mov"
OUTPUT_PATH = "/Users/mihir/Downloads/out.mov"
MAPPER_ENDPOINT = ""

In [11]:
import ffmpeg
import numpy as np

out, _ = (
    ffmpeg
    .input(INPUT_PATH)
    .output('pipe:', format='rawvideo', pix_fmt='rgb24')
    .run(capture_stdout=True)
)
video = (
    np
    .frombuffer(out, np.uint8)
    .reshape([-1, HEIGHT, WIDTH, 3])
)

In [12]:
print(video.shape)

(769, 360, 640, 3)


# Define Reducer

In [13]:
from knn import utils
from knn.reducers import Reducer

class VideoReducer(Reducer):
    def __init__(self, num_frames):
        self.frames = [None] * num_frames
    
    def handle_result(self, input, output):
        self.frames[input["frame_id"]] = utils.base64_to_numpy(output)

    @property
    def result(self):
        process = (
            ffmpeg
                .input('pipe:', format='rawvideo', pix_fmt='rgb24', s='{}x{}'.format(WIDTH, HEIGHT))
                .output('out.mp4', pix_fmt='yuv420p', vcodec='libx264', r=FRAMERATE)
                .overwrite_output()
                .run_async(pipe_stdin=True)
        )
        for frame in self.frames:
            if frame is not None:
                process.stdin.write(
                    frame
                        .astype(np.uint8)
                        .tobytes()
                )

# Perform Rendering

In [None]:
from knn.jobs import MapReduceJob

iterable = [{"frame_id": i, "frame": utils.numpy_to_base64(frame)}
            for i, frame in enumerate(video)]

job = MapReduceJob(
    MAPPER_ENDPOINT,
    VideoReducer(),
    {},
    n_mappers=1000,
    chunk_size=1,
)
output_path = await job.run_until_complete(iterable)

In [None]:
print(output_path)