In [2]:
# from pathlib import Path
# import cv2
# # Параметры
# img_dir     = Path("dataset/episode_1/img")
# output_path = Path("dataset/episode_1/episode_1.mp4")
# fps         = 20.0
# codec       = "avc1"  # H.264

# # Собираем и сортируем список файлов
# img_paths = sorted(img_dir.glob("*.*"))
# if not img_paths:
#     raise RuntimeError(f"No images found in {img_dir}")

# # Читаем первый кадр, чтобы узнать размер
# first_frame = cv2.imread(str(img_paths[0]))
# if first_frame is None:
#     raise RuntimeError(f"Cannot read image {img_paths[0]}")
# height, width = first_frame.shape[:2]

# # Настраиваем VideoWriter
# fourcc = cv2.VideoWriter_fourcc(*codec)
# writer = cv2.VideoWriter(str(output_path), fourcc, fps, (width, height))
# if not writer.isOpened():
#     raise RuntimeError(f"Cannot open video writer for {output_path}")

# # Проходим по всем изображениям и пишем в видео
# for img_path in img_paths:
#     frame = cv2.imread(str(img_path))
#     if frame is None:
#         print(f"[WARN] skip unreadable image {img_path}")
#         continue
#     writer.write(frame)

# writer.release()
# print(f"Saved video: {output_path}")


In [None]:
# ECHO_READ_DATA_SIM = 0x5
# type(ECHO_READ_DATA_SIM)

int

In [8]:
leftArmHandleForce=255
leftArmHandleForce.to_bytes(1, 'little')


b'\xff'

In [1]:
import glob
import logging
from pathlib import Path
from PIL import Image
import av

In [6]:
def encode_video_frames(
    imgs_dir: Path | str,
    video_path: Path | str,
    fps: int,
    vcodec: str = "libsvtav1",
    pix_fmt: str = "yuv420p",
    g: int | None = 2,
    crf: int | None = 30,
    fast_decode: int = 0,
    log_level: int | None = av.logging.ERROR,
    overwrite: bool = False,
) -> None:
    """
    Encode a sequence of image files into a single video.

    :param imgs_dir:    folder with frames, expected name like "frame_000001.png", …
    :param video_path:  where to save video (mp4, mkv, etc.)
    :param fps:         frames per second for output video
    :param vcodec:      video codec: "h264", "hevc" (H.265) or "libsvtav1" (AV1)
    :param pix_fmt:     color format: usually "yuv420p"
    :param g: GOP       size (number of frames between key frames)
    :param crf:         compression quality (lower — better quality, higher — stronger compression)
    :param fast_decode: if >0, adds parameters for accelerated decoding
    :param log_level:   logging level for FFmpeg (None to disable)
    :param overwrite:   if True, will create the destination folder even if it already exists
    """
    # Checking arguments
    if vcodec not in ["h264", "hevc", "libsvtav1"]:
        raise ValueError(f"Unsupported video codec: {vcodec}. Supported codecs are: h264, hevc, libsvtav1.")

    video_path = Path(video_path)
    imgs_dir   = Path(imgs_dir)

    # Create destination folder if needed
    video_path.parent.mkdir(parents=True, exist_ok=overwrite)

    # Encoders/pixel formats incompatibility check
    if (vcodec in ("libsvtav1", "hevc")) and pix_fmt == "yuv444p":
        logging.warning(
            f"Incompatible pix_fmt=yuv444p for codec {vcodec}, switching to yuv420p"
        )
        pix_fmt = "yuv420p"

    # Сollect a list of input frames using the template frame_[0-9]{6}.png
    template = "frame_" + ("[0-9]" * 6) + ".png"
    input_list = sorted(
        glob.glob(str(imgs_dir / template)), key=lambda x: int(x.split("_")[-1].split(".")[0])
    )

    # Check if there are any input frames
    dummy_image = Image.open(input_list[0])
    width, height = dummy_image.size

    # Collect video options
    video_options = {}
    if g       is not None: video_options["g"]   = str(g)
    if crf     is not None: video_options["crf"] = str(crf)
    if fast_decode:
        key   = "svtav1-params" if vcodec == "libsvtav1" else "tune"
        value = f"fast-decode={fast_decode}" if vcodec == "libsvtav1" else "fastdecode"
        video_options[key] = value

    # Set logging level
    if log_level is not None:
        logging.getLogger("libav").setLevel(log_level)

    # Create and open output file (overwrite by default)
    with av.open(str(video_path), mode="w") as output:
        # Add a stream to the output file
        stream = output.add_stream(vcodec, rate=fps, options=video_options)
        stream.width     = width
        stream.height    = height
        stream.pix_fmt   = pix_fmt

        # Loop through input frames and encode them
        for img_path in input_list:
            input_image   = Image.open(img_path).convert("RGB")
            input_frame = av.VideoFrame.from_image(input_image)
            # Set the frame's time base
            for packet in stream.encode(input_frame):
                if packet:
                    output.mux(packet)

        # Flush the encoder
        for packet in stream.encode():
            output.mux(packet)

    # Reset logging
    if log_level is not None:
        av.logging.restore_default_callback()

    # Checking for file existence
    if not video_path.exists():
        raise OSError(f"Encoding failed, file not found: {video_path}")


In [12]:
encode_video_frames(
    imgs_dir=Path("dataset/episode_1/img"),
    video_path=Path("dataset/episode_1/episode_1.mp4"),
    fps=30.0,
    # vcodec="h264",
    vcodec="h264",
    pix_fmt="yuv420p",
    g=2,
    crf=20,
    fast_decode=0,
    log_level=logging.INFO,
    overwrite=True
)

