In [1]:
import ffmpeg
import numpy as np
from torch.utils.data import DataLoader, Dataset


class VideoStreamDataset(Dataset):
    _closed = False

    def __init__(self, filename, dst_filename, batch_size=1):
        probe = ffmpeg.probe(filename)
        # get first video channel
        video_stream = next((stream for stream in probe['streams'] if stream['codec_type'] == 'video'), None)
        self.probe = probe
        self.video_stream = video_stream
        self.inhale = (
            ffmpeg
            .input(filename)
            .output('pipe:', format='rawvideo', pix_fmt='rgb24', vframes=batch_size)
            .run_async(pipe_stdout=True)
        )
        self.width = int(video_stream['width'])
        self.height = int(video_stream['height'])
        self.exhale = (
            ffmpeg
            .input('pipe:', format='rawvideo', pix_fmt='rgb24', s='{}x{}'.format(self.width, self.height))
            .output(dst_filename, pix_fmt='yuv420p')
            .overwrite_output()
            .run_async(pipe_stdin=True)
        )

    def __getitem__(self, i):
        in_bytes = self.inhale.stdout.read(self.width * self.height * 3)
        if not in_bytes:
            raise IndexError('video frame out of range')
        in_frame = (
            np
            .frombuffer(in_bytes, np.uint8)
            .reshape([self.height, self.width, 3])
        )
        print(in_frame.shape, in_frame.dtype, np.percentile(in_frame, (0,100)))
        return in_frame

    def __len__(self):
        pass

    def write_frame(self, frame):
        self.exhale.stdin.write(
            frame
            .astype(np.uint8)
            .tobytes()
        )

    def close(self):
        if not self._closed:
            self._closed = True
            self.exhale.stdin.close()
            self.inhale.wait()
            self.exhale.wait()

    def __del__(self):
        self.close()


read video frame by OpenCV, embed wm, then write to file
extract wm later