# video

> Video processing and sampling utils


In [None]:
# | default_exp core.data.video

In [None]:
#| hide
%load_ext autoreload
%autoreload 2

In [None]:
# | export
from __future__ import annotations

import json
import logging
import math
import subprocess
from abc import ABC, abstractmethod
from pathlib import Path

import cv2
import pandas as pd
from tqdm.auto import tqdm

from ds_contrib.core.paths import Directory, PathLike, pathify
from ds_contrib.core.utils import exclusive_args

In [None]:
# | export
# | hide

logger = logging.getLogger("__name__")

## Metadata reading


In [None]:
# | export


def get_video_metadata(video_path):
    cmd = [
        "ffprobe",
        "-v",
        "quiet",
        "-print_format",
        "json",
        "-show_format",
        "-show_streams",
        video_path,
    ]

    result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    metadata = json.loads(result.stdout)
    assert len(metadata["streams"]) == 1, "Video should have only one stream"
    fps = metadata["streams"][0]["r_frame_rate"].split("/")
    fps = int(fps[0]) / int(fps[1])
    video_metadata = {
        "file": {
            "filename": metadata["format"]["filename"],
            "format_name": metadata["format"]["format_name"],
            "size": int(metadata["format"]["size"]),
            "creation_time": metadata["format"]["tags"]["creation_time"],
        },
        "video": {
            "codec_name": metadata["streams"][0]["codec_name"],
            "width": int(metadata["streams"][0]["width"]),
            "height": int(metadata["streams"][0]["height"]),
            "duration": float(metadata["streams"][0]["duration"]),
            "fps": float(fps),
            "frame_count": int(metadata["streams"][0]["nb_frames"]),
        },
    }
    return video_metadata

In [None]:
# | eval: false
# | hide

VIDEO_FILE = Path("downloads/2022-07-17_09-05-31_4453D774-04F/video_2")
FRAMES_DIR = VIDEO_FILE.parent / "frames"

In [None]:
# | eval: false

metadata = get_video_metadata(VIDEO_FILE)

In [None]:
# | eval: false

metadata

{'file': {'filename': 'downloads/2022-07-17_09-05-31_4453D774-04F/video_2',
  'format_name': 'mov,mp4,m4a,3gp,3g2,mj2',
  'size': 78215424,
  'creation_time': '2022-07-17T06:05:31.000000Z'},
 'video': {'codec_name': 'h264',
  'width': 1920,
  'height': 1080,
  'duration': 105.796667,
  'fps': 5.0,
  'frame_count': 529}}

In [None]:
# | export


class IFramesSampler(ABC):
    def __init__(
        self,
        video_metadata: dict,
        start_frame: int = 0,
        end_frame: int | None = None,
        max_frames: int | None = None,
        batch_size: int | None = None,
    ):
        self._video_metadata = video_metadata

        video_total_frames = self._video_metadata["video"]["frame_count"]
        self._original_fps: float = self._video_metadata["video"]["fps"]

        self._start_frame = start_frame
        self._end_frame = end_frame if end_frame else video_total_frames

        self._total_frames = self._end_frame - self._start_frame
        self._max_frames = max_frames if max_frames else self._total_frames

        self._frame_counter = 0
        self._batch_counter = 0
        self._current_frame = self._start_frame

        self._batch_size = batch_size

        # Validate args
        assert (
            0 <= start_frame < video_total_frames
        ), "start_frame should be in [0, frame_count)"
        assert (
            0 < self._end_frame <= video_total_frames
        ), "end_frame should be in [1, frame_count)"
        assert (
            start_frame < self._end_frame
        ), "start_frame should be less than end_frame"

    def _reset_iter(self):
        # reset iterator and raise StopIteration
        self._current_frame = self._start_frame
        self._frame_counter = 0
        self._batch_counter = 0

    @property
    def batch_size(self):
        return self._batch_size

    @abstractmethod
    def _next_frame_ind(self) -> int:
        raise NotImplementedError

    def __iter__(self):
        while True:
            if (
                self._frame_counter == (self._max_frames)
                or self._current_frame < 0
                or self._current_frame >= self._end_frame
            ):
                self._reset_iter()
                break

            yield self._batch_counter, self._current_frame
            self._frame_counter += 1
            if (
                self._batch_size is not None
                and self._frame_counter % self._batch_size == 0
                and self._frame_counter != 0
            ):
                self._batch_counter += 1
            self._current_frame = self._next_frame_ind()

    @property
    @abstractmethod
    def total_frames(self) -> int | None:
        raise NotImplementedError


class FramesSamplerUniform(IFramesSampler):
    # TODO[Low]: add support for start_time, end_time

    @exclusive_args(["n_frames", "frame_step", "time_step", "fps"])
    def __init__(
        self,
        video_metadata: dict,
        frame_step: int | None = None,
        n_frames: int | None = None,
        fps: float | None = None,
        time_step: float | None = None,
        start_frame: int = 0,
        end_frame: int | None = None,
        max_frames: int | None = None,
        batch_size: int | None = None,
    ):
        super().__init__(
            video_metadata,
            start_frame=start_frame,
            end_frame=end_frame,
            max_frames=max_frames,
            batch_size=batch_size,
        )
        self._current_frame_float = self._current_frame
        self._frame_step: float = self._init_frame_step(
            frame_step, n_frames, fps, time_step
        )

    @property
    def total_frames(self) -> int | None:
        return math.ceil(self._max_frames / self._frame_step)

    def _init_frame_step(
        self,
        frame_step: int | None,
        n_frames: int | None,
        fps: float | None,
        time_step: float | None,
    ):
        if frame_step:
            if frame_step < 1:
                raise ValueError(
                    f"frame_step=`{frame_step}` is too small cannot be < `1`"
                )
            return frame_step
        elif n_frames:
            n_frames = max(n_frames, 1)  # n_frames should be at least 1
            return max(self._total_frames / n_frames, 1)
        elif fps:
            frame_step = self._original_fps / fps
            if frame_step < 1:
                raise ValueError(
                    f"fps=`{fps}` is too high, because original fps=`{self._original_fps}`"
                )
            if not math.isclose(frame_step, round(frame_step), rel_tol=0.1):
                raise ValueError(
                    f"new fps `{fps}` cannot be achieved from original fps `{self._original_fps}` without precision loss <10%, use such `fps` that `{self._original_fps}/{fps}` is close to integer"
                )
            return frame_step
        elif time_step:
            frame_step = time_step * self._original_fps
            if frame_step < 1:
                raise ValueError(
                    f"time_step=`{time_step}` is too small, because original time_step is `{1/self._original_fps}`"
                )
            if not math.isclose(frame_step, round(frame_step), rel_tol=0.1):
                raise ValueError(
                    f"new time_step `{time_step}` cannot be achieved from original timestap `{1/self._original_fps}` without precision loss <10%, use such `time_step` that `time_step'/'{1/self._original_fps}` is close to integer"
                )
            return frame_step
        else:
            raise ValueError(
                "One of the following args should be provided: frame_step, n_frames, fps, time_step"
            )

    def _next_frame_ind(self) -> int:
        self._current_frame_float = self._current_frame_float + self._frame_step
        return round(self._current_frame_float)

In [None]:
# | eval: false

fs = FramesSamplerUniform(
    metadata, n_frames=100, batch_size=4, start_frame=10, max_frames=10
)

In [None]:
# | export


def directorify(d: PathLike | Directory) -> Directory:
    if isinstance(d, Directory):
        return d
    else:
        return Directory(d, temporary=False)


def sample_frames_from_video(
    input_video: PathLike,
    output_frames_dir: Directory | PathLike,
    video_metadata: dict | None = None,
    sampler: IFramesSampler | None = None,
    with_catalog: bool = True,
):
    """Sample frames from a video and save them to a directory.

    Simple sampler based on time delta between frames.

    Parameters
    ----------
    input_video : PathLike
        path to the video file
    output_frames_dir : PathLike
        path to the directory where frames will be saved
        NOTE: directory will be created if it doesn't exist, frames will be written to the subdirectory "part_{i:04d}"
        because batching is used, by default if the batch size is None all the frames will be saved to the same directory
    start_frame : int, optional
        from which frame to start sampling, by default 0
    max_frames : _type_, optional
        maximum number of frames to sample, by default None
    time_delta : int, optional
        time delta between frames in seconds, by default 5

    Raises
    ------
    ValueError
        if batch index (number of batches) is too large, max is 9999. Split video to shorter clips.
    """
    # Open the video file
    output_frames_dir: Directory = directorify(output_frames_dir)
    input_video_path = pathify(input_video)

    if video_metadata is None:
        logger.info(f"Reaing video metadata from {input_video_path}")
        video_metadata = get_video_metadata(input_video)
    if sampler is None:
        sampler = FramesSamplerUniform(video_metadata, frame_step=1)

    cap = cv2.VideoCapture(str(input_video_path))

    frames_catalog_dict = {}
    for batch_ind, frame_ind in tqdm(
        sampler, desc="Sampling frames", leave=False, total=sampler.total_frames
    ):
        # Set the frame position
        cap.set(cv2.CAP_PROP_POS_FRAMES, frame_ind)
        # Read the frame
        ret, frame = cap.read()
        if not ret:
            break

        if batch_ind > 9999:
            raise ValueError(
                f"Batch index {batch_ind} is too large, max is 9999. Split video to shorter clips."
            )
        # Save the frame
        if not with_catalog:
            batch_dir = output_frames_dir.path / f"part_{batch_ind:04d}"
            batch_dir.mkdir(parents=True, exist_ok=True)
            frame_path = str(batch_dir / f"{frame_ind:08d}.jpg")
        else:
            frame_path = str(output_frames_dir.path / f"{frame_ind:08d}.jpg")
            frames_catalog_dict[frame_ind] = {
                "batch_ind": batch_ind,
                "frame_path": Path(frame_path).name,
                "timestamp": cap.get(cv2.CAP_PROP_POS_MSEC),
            }
        cv2.imwrite(
            frame_path,
            frame,
            [cv2.IMWRITE_JPEG_QUALITY, 100],
        )

    cap.release()

    if with_catalog:
        frames_catalog_df = pd.DataFrame.from_dict(frames_catalog_dict, orient="index")
        frames_catalog_df.to_csv(output_frames_dir.path / "frames_catalog.csv")

In [None]:
# | eval: false
out_dir = Directory(FRAMES_DIR, temporary=False)
frame_sampler = FramesSamplerUniform(metadata, time_step=1, batch_size=10)
sample_frames_from_video(VIDEO_FILE, out_dir, sampler=frame_sampler)

Sampling frames:   0%|          | 0/106 [00:00<?, ?it/s]

0 0 0
0 5 1
0 10 2
0 15 3
0 20 4
0 25 5
0 30 6
0 35 7
0 40 8
0 45 9
1 50 10
1 55 11
1 60 12
1 65 13
1 70 14
1 75 15
1 80 16
1 85 17
1 90 18
1 95 19
2 100 20
2 105 21
2 110 22
2 115 23
2 120 24
2 125 25
2 130 26
2 135 27
2 140 28
2 145 29
3 150 30
3 155 31
3 160 32
3 165 33
3 170 34
3 175 35
3 180 36
3 185 37
3 190 38
3 195 39
4 200 40
4 205 41
4 210 42
4 215 43
4 220 44
4 225 45
4 230 46
4 235 47
4 240 48
4 245 49
5 250 50
5 255 51
5 260 52
5 265 53
5 270 54
5 275 55
5 280 56
5 285 57
5 290 58
5 295 59
6 300 60
6 305 61
6 310 62
6 315 63
6 320 64
6 325 65
6 330 66
6 335 67
6 340 68
6 345 69
7 350 70
7 355 71
7 360 72
7 365 73
7 370 74
7 375 75
7 380 76
7 385 77
7 390 78
7 395 79
8 400 80
8 405 81
8 410 82
8 415 83
8 420 84
8 425 85
8 430 86
8 435 87
8 440 88
8 445 89
9 450 90
9 455 91
9 460 92
9 465 93
9 470 94
9 475 95
9 480 96
9 485 97
9 490 98
9 495 99
10 500 100
10 505 101
10 510 102
10 515 103
10 520 104
10 525 105


---


#| hide

Utility section


In [None]:
# | hide
from nbdev.showdoc import *

In [None]:
# | hide
import nbdev

nbdev.nbdev_export()