This sample illustrates decoding which facilitates producer-consumer pattern.

- FFMpeg is launched in a subprocess which does the following:
    - Takes the input (H.264 and HEVC).
    - Demuxes video.
    - Puts Annex.B elementary bitstream into pipe.
- `PyDecoder` takes input from pipe and decodes it.
- Color conversion and JPEG encoding are done.
- Frames are shown.

In [1]:
class StopExecution(Exception):
    def _render_traceback_(self):
        return []

In [2]:
from typing import Dict
import python_vali as vali
import numpy as np

from io import BytesIO
import subprocess
import json

from PIL import Image
from IPython.display import display

In [3]:
def get_stream_params(url: str) -> Dict:
    """
    This function extracts some parameters of input video using ffprobe.

    Args:
        url (str): input file URL.

    Raises:
        ValueError: if codec or pixel format are not supported.

    Returns:
        Dict: dictionary with parameters.
    """

    cmd = [
        "ffprobe",
        "-v",
        "quiet",
        "-print_format",
        "json",
        "-show_format",
        "-show_streams",
        url,
    ]
    ffmpeg_proc = subprocess.Popen(cmd, stdout=subprocess.PIPE)
    stdout = ffmpeg_proc.communicate()[0]

    bio = BytesIO(stdout)
    json_out = json.load(bio)

    params = {}
    if not "streams" in json_out:
        return {}

    for stream in json_out["streams"]:
        if stream["codec_type"] == "video":
            params["width"] = stream["width"]
            params["height"] = stream["height"]
            params["framerate"] = float(eval(stream["avg_frame_rate"]))

            codec_name = stream["codec_name"]
            is_h264 = True if codec_name == "h264" else False
            is_hevc = True if codec_name == "hevc" else False
            if not is_h264 and not is_hevc:
                raise ValueError(
                    "Unsupported codec: "
                    + codec_name
                    + ". Only H.264 and HEVC are supported in this sample."
                )
            params["codec_name"] = codec_name

            pix_fmt = stream["pix_fmt"]
            is_yuv420 = pix_fmt == "yuv420p"
            is_yuv444 = pix_fmt == "yuv444p"

            # YUVJ420P and YUVJ444P are deprecated but still wide spread, so handle
            # them as well. They also indicate JPEG color range.
            is_yuvj420 = pix_fmt == "yuvj420p"
            is_yuvj444 = pix_fmt == "yuvj444p"

            if is_yuvj420:
                is_yuv420 = True
                params["color_range"] = vali.ColorRange.JPEG
            if is_yuvj444:
                is_yuv444 = True
                params["color_range"] = vali.ColorRange.JPEG

            if not is_yuv420 and not is_yuv444:
                raise ValueError(
                    "Unsupported pixel format: "
                    + pix_fmt
                    + ". Only YUV420 and YUV444 are supported in this sample."
                )
            else:
                params["format"] = (
                    vali.PixelFormat.NV12 if is_yuv420 else vali.PixelFormat.YUV444
                )

            # Color range default option. We may have set when parsing
            # pixel format, so check first.
            if "color_range" not in params:
                params["color_range"] = vali.ColorRange.MPEG
            # Check actual value.
            if "color_range" in stream:
                color_range = stream["color_range"]
                if color_range == "pc" or color_range == "jpeg":
                    params["color_range"] = vali.ColorRange.JPEG

            # Color space default option:
            params["color_space"] = vali.ColorSpace.BT_601
            # Check actual value.
            if "color_space" in stream:
                color_space = stream["color_space"]
                if color_space == "bt709":
                    params["color_space"] = vali.ColorSpace.BT_709

            return params
    return {}

In [4]:
def rtsp_client(gpu_id: int, url: str) -> None:
    """
    This function launches RTSP client which decodes video and
    presents it to user as series of decoded frames.

    Args:
        gpu_id (int): GPU ordinal.
        url (str): input file URL.

    Raises:
        StopExecution: if things go wrong.
    """
    params=get_stream_params(url)

    if not len(params):
        print("Can not get " + url + " streams params")
        raise StopExecution

    # Prepare ffmpeg arguments
    codec_name = params["codec_name"]
    bsf_name = codec_name + "_mp4toannexb,dump_extra=all"

    cmd = [
        "ffmpeg",
        "-hide_banner",
        "-loglevel", 
        "fatal",
        "-i",
        url,
        "-c:v",
        "copy",
        "-bsf:v",
        bsf_name,
        "-f",
        codec_name,
        "pipe:1",
    ]

    # Run ffmpeg in subprocess and redirect it's output to pipe
    ffmpeg_proc = subprocess.Popen(
        args=cmd, stdout=subprocess.PIPE)

    # Create HW decoder class which takes input from pipe
    py_dec = vali.PyDecoder(ffmpeg_proc.stdout, {}, gpu_id)

    # GPU-accelerated converter
    pyCvt = vali.PySurfaceConverter(
        py_dec.Format, vali.PixelFormat.RGB, gpu_id=0)

    # GPU-accelerated JPEG encoder.
    # It's faster to encode Surface on GPU and show JPEG in widget.
    pyJpeg = vali.PyNvJpegEncoder(gpu_id=0)
    pyJpegEncCtx = pyJpeg.Context(100, vali.PixelFormat.RGB)

    # Allocate surface for decoder to output
    surf_src = vali.Surface.Make(
        py_dec.Format, py_dec.Width, py_dec.Height, gpu_id=0)

    # Raw Surface, converted to RGB
    surf_dst = vali.Surface.Make(
        vali.PixelFormat.RGB, py_dec.Width, py_dec.Height, gpu_id=0)

    # Main decoding loop.
    success = True
    while success:
        try:
            # Decode single Surface
            success, details = py_dec.DecodeSingleSurface(surf_src)
            if not success:
                print(details)
                raise StopExecution

            # Convert to RGB
            success, details = pyCvt.Run(surf_src, surf_dst)
            if not success:
                print(details)
                raise StopExecution

            # Compress to JPEG
            buffers, details = pyJpeg.Run(pyJpegEncCtx, [surf_dst])
            if len(buffers) != 1:
                print(details)
                raise StopExecution

            # Display in notebook.
            # The same picture is shown twice for some reason.
            jpeg_bytes = BytesIO(np.ndarray.tobytes(buffers[0]))
            display(Image.open(jpeg_bytes), display_id="decoded_frame")

        # Handle HW exceptions in simplest possible way by decoder respawn
        except Exception as e:
            print(e)
            py_dec = vali.PyDecoder(ffmpeg_proc.stdout, {}, gpu_id)
            continue

In [None]:
vali.SetFFMpegLogLevel(vali.FfmpegLogLevel.FATAL)

rtsp_client(
    gpu_id=0,
    url="https://github.com/RomanArzumanyan/VALI/raw/refs/heads/main/tests/data/test.mp4")