diff --git a/.fpm b/.fpm index cbec0c9..dd458df 100644 --- a/.fpm +++ b/.fpm @@ -5,3 +5,5 @@ --log warn --python-bin python3 --python-package-name-prefix python3 +--depends python3-opencv +--depends python3-numpy \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json index fbf984b..d22bbb3 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -12,9 +12,14 @@ "black-formatter.interpreter": [ "${workspaceFolder}/.venv/bin/python3" ], - "black-formatter.args": [ - "--config=setup.cfg" - ], + "black-formatter.args": [], "python.analysis.typeCheckingMode": "standard", - "python.testing.pytestArgs": [] + "python.testing.pytestArgs": [], + "python-envs.pythonProjects": [ + { + "path": ".", + "envManager": "ms-python.python:venv", + "packageManager": "ms-python.python:pip" + } + ] } diff --git a/common_utility/__init__.py b/common_utility/__init__.py index bf395b4..9d59e65 100644 --- a/common_utility/__init__.py +++ b/common_utility/__init__.py @@ -5,3 +5,4 @@ from .configLoader import * from .rateLimiter import * from .interfaceResolver import * +from .capture import * diff --git a/common_utility/capture.py b/common_utility/capture.py new file mode 100644 index 0000000..ed866ec --- /dev/null +++ b/common_utility/capture.py @@ -0,0 +1,516 @@ +from __future__ import annotations + +import argparse +import mmap +import struct +import pathlib +import os +import sys +import datetime +from abc import ABC, abstractmethod +from typing import Any, Optional + +import cv2 +import numpy as np +from numpy.typing import NDArray + +MAGIC: int = 0xEFFEC51E +VERSION: int = 1 +MAX_FILENAME_BYTES: int = 1024 + +# Superblock: magic(I) version(I) num_slots(I) write_head(I) max_image_bytes(I) index_entry_size(I) + 40 pad = 64 bytes +SUPERBLOCK_FORMAT = " None: + parser.add_argument( + "--blob-capture-file", + default="capture.blob", + help="Name of the blob ring-buffer file, stored under the capture folder", + ) + parser.add_argument( + "--blob-num-slots", + default=60, + type=int, + help="Number of image slots in the blob ring buffer", + ) + parser.add_argument( + "--blob-max-image-bytes", + default=4 * 1024 * 1024, + type=int, + help="Maximum bytes per image slot in the blob ring buffer", + ) + parser.add_argument( + "--blob-png-compression", + default=1, + type=int, + choices=range(10), + metavar="0-9", + help="PNG compression level for captured images (0=none, 9=max); default 1 favours throughput", + ) + + +class BlobCompletionHandler(ABC): + """Invoked when a rotated event blob has received all its follow-up writes.""" + + @abstractmethod + def __call__(self, event_id: str, blob: BlobFsCapture) -> None: ... + + +class NoOpBlobCompletionHandler(BlobCompletionHandler): + def __call__(self, event_id: str, blob: BlobFsCapture) -> None: + pass + + +class ImageCaptureInterface(ABC): + @abstractmethod + def capture(self, image: NDArray[np.uint8], filename: str, flags: int = 0) -> None: ... + + @abstractmethod + def get_last_filepath(self) -> Optional[pathlib.Path]: ... + + @abstractmethod + def set_capture_ts(self, ts: datetime.datetime) -> None: ... + + @abstractmethod + def rotate(self, event_id: str, immediate: bool = False) -> ImageCaptureInterface: ... + + @abstractmethod + def active_blob_path(self) -> Optional[pathlib.Path]: ... + + @abstractmethod + def update_last_flags(self, flags: int) -> None: ... + + +class BlobFsCapture(ImageCaptureInterface): + def __init__( + self, + blob_path: pathlib.Path, + num_slots: int = 60, + max_image_bytes: int = 4 * 1024 * 1024, + png_compression: int = 1, + ) -> None: + self._blob_path = blob_path + self._num_slots = num_slots + self._max_image_bytes = max_image_bytes + self._png_compression = png_compression + self._index_base = SUPERBLOCK_SIZE + self._data_base = SUPERBLOCK_SIZE + num_slots * INDEX_ENTRY_SIZE + self._file_size = self._data_base + num_slots * max_image_bytes + self._file: Optional[Any] = None + self._mm: Optional[mmap.mmap] = None + self._write_head = 0 + self._open() + + def _open(self) -> None: + if not self._blob_path.exists(): + self._create() + else: + self._reopen() + + def _create(self) -> None: + self._blob_path.parent.mkdir(parents=True, exist_ok=True) + with open(self._blob_path, "wb") as f: + f.seek(self._file_size - 1) + f.write(b"\x00") + self._file = open(self._blob_path, "r+b") + self._mm = mmap.mmap(self._file.fileno(), self._file_size) + self._write_head = 0 + self._flush_superblock() + + def _reopen(self) -> None: + self._file = open(self._blob_path, "r+b") + actual_size = os.path.getsize(self._blob_path) + if actual_size != self._file_size: + self._file.close() + raise ValueError(f"Blob file size mismatch: expected {self._file_size}, got {actual_size}") + self._mm = mmap.mmap(self._file.fileno(), self._file_size) + fields = struct.unpack_from(SUPERBLOCK_FORMAT, self._mm, 0) + magic = fields[0] + if magic != MAGIC: + self._mm.close() + self._file.close() + raise ValueError(f"Invalid blob magic: 0x{magic:08X}, expected 0x{MAGIC:08X}") + self._write_head = fields[3] + + def _flush_superblock(self) -> None: + data = struct.pack( + SUPERBLOCK_FORMAT, + MAGIC, + VERSION, + self._num_slots, + self._write_head, + self._max_image_bytes, + INDEX_ENTRY_SIZE, + ) + assert self._mm is not None + self._mm[0:SUPERBLOCK_SIZE] = data + + def _slot_index_offset(self, slot: int) -> int: + return self._index_base + slot * INDEX_ENTRY_SIZE + + def _slot_data_offset(self, slot: int) -> int: + return self._data_base + slot * self._max_image_bytes + + def capture(self, image: NDArray[np.uint8], filename: str, flags: int = 0) -> None: + assert self._mm is not None + filename_bytes = filename.encode("utf-8") + if len(filename_bytes) > MAX_FILENAME_BYTES: + raise ValueError(f"Filename too long: {len(filename_bytes)} bytes, max {MAX_FILENAME_BYTES}") + + ok, buf = cv2.imencode(".png", image, [cv2.IMWRITE_PNG_COMPRESSION, self._png_compression]) + if not ok: + raise ValueError("cv2.imencode failed to encode image as PNG") + png_bytes = buf.tobytes() + + if len(png_bytes) > self._max_image_bytes: + raise ValueError(f"PNG too large: {len(png_bytes)} bytes, max {self._max_image_bytes}") + + slot = self._write_head + data_offset = self._slot_data_offset(slot) + idx_offset = self._slot_index_offset(slot) + + # Write image data into the slot's region + self._mm[data_offset : data_offset + len(png_bytes)] = png_bytes + + # Write index entry + filename_padded = filename_bytes.ljust(MAX_FILENAME_BYTES, b"\x00") + entry = struct.pack( + INDEX_ENTRY_FORMAT, + data_offset, + len(png_bytes), + flags, + len(filename_bytes), + 0, + filename_padded, + ) + self._mm[idx_offset : idx_offset + INDEX_ENTRY_SIZE] = entry + + # Advance write_head and persist to superblock + self._write_head = (self._write_head + 1) % self._num_slots + struct.pack_into(" Optional[pathlib.Path]: + return None + + def set_capture_ts(self, ts: datetime.datetime) -> None: + pass + + def active_blob_path(self) -> Optional[pathlib.Path]: + return self._blob_path + + def update_last_flags(self, flags: int) -> None: + assert self._mm is not None + last_slot = (self._write_head - 1) % self._num_slots + idx_offset = self._slot_index_offset(last_slot) + struct.pack_into(" BlobFsCapture: + """Rename the current blob to .blob and open a fresh one at the original path. + + The current file handle and mmap are transferred to a new BlobFsCapture wrapper (the + "old handle") which is returned to the caller. On Linux, renaming an open/mmap'd file + preserves the inode so the old handle remains valid after the rename. + """ + old: BlobFsCapture = object.__new__(BlobFsCapture) + old.__dict__.update(self.__dict__) + + renamed = self._blob_path.parent / f"{event_id}.blob" + os.rename(self._blob_path, renamed) + old._blob_path = renamed + + self._file = None + self._mm = None + self._write_head = 0 + self._open() + + return old + + def close(self) -> None: + if self._mm is not None: + self._mm.flush() + self._mm.close() + self._mm = None + if self._file is not None: + self._file.close() + self._file = None + + def __enter__(self) -> BlobFsCapture: + return self + + def __exit__(self, *args: Any) -> None: + self.close() + + +class CompositeBlobCapture(ImageCaptureInterface): + """Composite capture backend: one primary blob + a list of recently-rotated event blobs. + + After rotation each event blob receives `follow_up_count` additional frames (flags=0), + then is closed and the completion handler is invoked. + """ + + def __init__( + self, + blob_path: pathlib.Path, + num_slots: int, + max_image_bytes: int, + follow_up_count: int, + completion_handler: BlobCompletionHandler, + png_compression: int = 1, + ) -> None: + self._blob = BlobFsCapture(blob_path, num_slots, max_image_bytes, png_compression) + self._follow_up_count = follow_up_count + self._completion_handler = completion_handler + self._active: list[tuple[BlobFsCapture, str, int]] = [] + + def set_capture_ts(self, ts: datetime.datetime) -> None: + self._blob.set_capture_ts(ts) + + def get_last_filepath(self) -> Optional[pathlib.Path]: + return self._blob.get_last_filepath() + + def capture(self, image: NDArray[np.uint8], filename: str, flags: int = 0) -> None: + self._blob.capture(image, filename, flags) + next_active: list[tuple[BlobFsCapture, str, int]] = [] + for blob, event_id, remaining in self._active: + blob.capture(image, filename, 0) + remaining -= 1 + if remaining > 0: + next_active.append((blob, event_id, remaining)) + else: + blob.close() + self._completion_handler(event_id, blob) + self._active = next_active + + def rotate(self, event_id: str, immediate: bool = False) -> CompositeBlobCapture: + old = self._blob.rotate(event_id) + if immediate: + old.close() + self._completion_handler(event_id, old) + else: + self._active.append((old, event_id, self._follow_up_count)) + return self + + def close(self) -> None: + for blob, _event_id, _remaining in self._active: + blob.close() + self._active = [] + self._blob.close() + + def active_blob_path(self) -> Optional[pathlib.Path]: + return self._blob._blob_path + + def update_last_flags(self, flags: int) -> None: + self._blob.update_last_flags(flags) + + +class BlobExtractor: + def __init__(self, blob_path: pathlib.Path) -> None: + self._blob_path = blob_path + self._file: Optional[Any] = None + self._mm: Optional[mmap.mmap] = None + file = open(blob_path, "rb") + self._file = file + try: + file_size = os.path.getsize(blob_path) + if file_size < SUPERBLOCK_SIZE: + raise ValueError("File too small to contain a valid superblock") + self._mm = mmap.mmap(file.fileno(), 0, access=mmap.ACCESS_READ) + magic, version, num_slots, write_head, max_image_bytes, index_entry_size = struct.unpack_from( + SUPERBLOCK_FORMAT, self._mm, 0 + ) + if magic != MAGIC: + raise ValueError(f"Invalid blob magic: 0x{magic:08X}, expected 0x{MAGIC:08X}") + except Exception: + if self._mm is not None: + self._mm.close() + file.close() + self._file = None + raise + self.version = version + self.num_slots = num_slots + self.write_head = write_head + self.max_image_bytes = max_image_bytes + self.index_entry_size = index_entry_size + self._frame_flags: Optional[dict[str, int]] = None + + def close(self) -> None: + if self._mm is not None: + self._mm.close() + self._mm = None + if self._file is not None: + self._file.close() + self._file = None + + def __enter__(self) -> BlobExtractor: + return self + + def __exit__(self, *_: Any) -> None: + self.close() + + def extract(self, dest_dir: pathlib.Path) -> list[pathlib.Path]: + assert self._mm is not None + dest_dir.mkdir(parents=True, exist_ok=True) + + extracted: list[pathlib.Path] = [] + for i in range(self.num_slots): + idx_offset = SUPERBLOCK_SIZE + i * self.index_entry_size + if idx_offset + self.index_entry_size > len(self._mm): + break + + image_offset, image_size, _flags, filename_len, _reserved, filename_raw = struct.unpack_from( + INDEX_ENTRY_FORMAT, self._mm, idx_offset + ) + + if image_size == 0 or image_offset + image_size > len(self._mm): + continue + + png_bytes = bytes(self._mm[image_offset : image_offset + image_size]) + arr = np.frombuffer(png_bytes, dtype=np.uint8) + if cv2.imdecode(arr, cv2.IMREAD_COLOR) is None: + continue + + raw_fn = filename_raw[:filename_len] + try: + filename = raw_fn.decode("utf-8") + except UnicodeDecodeError: + filename = raw_fn.decode("latin-1") + + if not filename: + filename = f"slot_{i:04d}.png" + + dest_path = dest_dir / filename + dest_path.write_bytes(png_bytes) + extracted.append(dest_path) + + return extracted + + def _ensure_frame_flags(self) -> dict[str, int]: + if self._frame_flags is None: + assert self._mm is not None + frame_flags: dict[str, int] = {} + for i in range(self.num_slots): + idx_offset = SUPERBLOCK_SIZE + i * self.index_entry_size + if idx_offset + self.index_entry_size > len(self._mm): + break + _, img_size, flags, fn_len, _, fn_raw = struct.unpack_from(INDEX_ENTRY_FORMAT, self._mm, idx_offset) + if img_size > 0: + raw_fn = fn_raw[:fn_len] + try: + fn = raw_fn.decode("utf-8") + except UnicodeDecodeError: + fn = raw_fn.decode("latin-1") + frame_flags[fn] = flags + self._frame_flags = frame_flags + return frame_flags + return self._frame_flags + + def get_filenames_with_flags(self, mask: int) -> set[str]: + return {fn for fn, flags in self._ensure_frame_flags().items() if flags & mask} + + +def main() -> None: + + parser = argparse.ArgumentParser( + prog="er-blob-extract", + description="Extract PNG images from a BlobFsCapture ring-buffer file.", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + parser.add_argument( + "dest", + type=pathlib.Path, + nargs="?", + default=pathlib.Path("."), + help="Destination directory for extracted images (created if absent)", + ) + parser.add_argument( + "--info", + action="store_true", + help="Print blob metadata and slot summary without extracting images", + ) + add_args(parser) + args = parser.parse_args() + + try: + extractor = BlobExtractor(args.blob_capture_file) + except FileNotFoundError: + print(f"error: blob file not found: {args.blob_capture_file}", file=sys.stderr) + sys.exit(1) + except ValueError as e: + print(f"error: {e}", file=sys.stderr) + sys.exit(1) + + if args.info: + assert extractor._mm is not None + print_blob_info( + args, + extractor._mm, + extractor.version, + extractor.num_slots, + extractor.write_head, + extractor.max_image_bytes, + extractor.index_entry_size, + extractor.get_filenames_with_flags(TRIGGER_FRAME_FLAG), + ) + return + + extracted = extractor.extract(args.dest) + for path in sorted(extracted): + marker = " [TRIGGER_FRAME_FLAG]" if path.name in extractor.get_filenames_with_flags(TRIGGER_FRAME_FLAG) else "" + print(f"{path}{marker}") + print(f"\nextracted {len(extracted)} image(s) to {args.dest}", file=sys.stderr) + + +def print_blob_info( + args: Any, + data: bytes | mmap.mmap, + version: int, + num_slots: int, + write_head: int, + max_image_bytes: int, + index_entry_size: int, + trigger_names: set[str], +) -> None: + occupied = sum( + 1 for i in range(num_slots) if struct.unpack_from(" 0 + ) + + def _fmt(n: int) -> str: + if n >= 1024 * 1024: + return f"{n / 1024 / 1024:.1f} MiB ({n} bytes)" + return f"{n / 1024:.1f} KiB ({n} bytes)" + + print(f"blob: {args.blob_capture_file}") + print(f"magic: {MAGIC:#010x}") + print(f"version: {version}") + print(f"num_slots: {num_slots} (write_head={write_head})") + print(f"max_image_bytes: {_fmt(max_image_bytes)}") + print(f"index_entry_size: {index_entry_size}") + print(f"file_size: {_fmt(len(data))}") + print(f"occupied_slots: {occupied}/{num_slots}") + for trigger in sorted(trigger_names): + print(f"trigger_frame: {trigger}") + + +def make_capture_backend( + args: argparse.Namespace, + capture_folder: pathlib.Path, + follow_up_count: int, + completion_handler: Optional[BlobCompletionHandler] = None, +) -> ImageCaptureInterface: + blob_path = capture_folder / args.blob_capture_file + num_slots: int = args.blob_num_slots + max_image_bytes: int = args.blob_max_image_bytes + png_compression: int = args.blob_png_compression + handler = completion_handler if completion_handler is not None else NoOpBlobCompletionHandler() + return CompositeBlobCapture(blob_path, num_slots, max_image_bytes, follow_up_count, handler, png_compression) diff --git a/pyproject.toml b/pyproject.toml index 875ad7d..44e9e04 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,10 +12,22 @@ dependencies = [ "pydantic", "jinja2", "netifaces", + "opencv-python-headless", + "numpy", "python-context-logger @ git+https://github.com/EffectiveRange/python-context-logger.git@latest", ] dynamic = ["version"] +[project.optional-dependencies] +dev = [ + "mypy", + "flake8", + "pytest-cov", +] + +[project.scripts] +er-blob-extract = "common_utility.capture:main" + [tool.setuptools] package-dir = {"" = "."} packages = ["common_utility", "test_utility"] @@ -32,6 +44,9 @@ build-backend = "setuptools.build_meta" version_scheme = "guess-next-dev" local_scheme = "node-and-date" +[tool.black] +line-length = 120 + [tool.pytest.ini_options] addopts = ["--verbose", "--capture=no"] python_files = ["*Test.py"] diff --git a/setup.cfg b/setup.cfg index 5bea32d..fecb071 100644 --- a/setup.cfg +++ b/setup.cfg @@ -5,15 +5,16 @@ packaging = [mypy] packages = common_utility -strict = True +strict = true [flake8] exclude = build,dist,.eggs,.venv max-line-length = 120 max-complexity = 10 -count = True -statistics = True -show-source = True +extend-ignore = E203 +count = true +statistics = true +show-source = true per-file-ignores = # F401: imported but unused # F403: import * used; unable to detect undefined names @@ -26,7 +27,7 @@ python_classes = *Test [coverage:run] relative_files = true -branch = True +branch = true source = common_utility [coverage:report] @@ -47,8 +48,8 @@ exclude_also = ; Don't complain about abstract methods, they aren't run: @(abc\.)?abstractmethod -ignore_errors = True -skip_empty = True +ignore_errors = true +skip_empty = true [coverage:html] directory = coverage/html diff --git a/tests/captureTest.py b/tests/captureTest.py new file mode 100644 index 0000000..a4b57a2 --- /dev/null +++ b/tests/captureTest.py @@ -0,0 +1,709 @@ +import argparse +import io +import pathlib +import struct + +import numpy as np +import pytest + +from unittest.mock import MagicMock, patch + +from common_utility.capture import ( + MAGIC, + SUPERBLOCK_FORMAT, + SUPERBLOCK_SIZE, + INDEX_ENTRY_FORMAT, + INDEX_ENTRY_SIZE, + TRIGGER_FRAME_FLAG, + BlobCompletionHandler, + NoOpBlobCompletionHandler, + CompositeBlobCapture, + BlobFsCapture, + BlobExtractor, + MAX_FILENAME_BYTES, + add_args, + make_capture_backend, + main as blob_extract_main, +) + + +def _make_image(h: int = 64, w: int = 64) -> np.ndarray: + rng = np.random.default_rng(42) + return rng.integers(0, 255, (h, w, 3), dtype=np.uint8) + + +# --------------------------------------------------------------------------- +# BlobFsCapture +# --------------------------------------------------------------------------- + + +class BlobFsCaptureTest: + def test_creates_blob_file_with_correct_size(self, tmp_path: pathlib.Path) -> None: + num_slots = 4 + max_bytes = 512 * 1024 + blob = BlobFsCapture(tmp_path / "cap.blob", num_slots=num_slots, max_image_bytes=max_bytes) + blob.close() + expected = SUPERBLOCK_SIZE + num_slots * INDEX_ENTRY_SIZE + num_slots * max_bytes + assert (tmp_path / "cap.blob").stat().st_size == expected + + def test_superblock_magic_on_new_file(self, tmp_path: pathlib.Path) -> None: + blob = BlobFsCapture(tmp_path / "cap.blob", num_slots=4, max_image_bytes=256 * 1024) + blob.close() + data = (tmp_path / "cap.blob").read_bytes() + magic = struct.unpack_from(" None: + path = tmp_path / "cap.blob" + blob = BlobFsCapture(path, num_slots=4, max_image_bytes=256 * 1024) + blob.close() + # Corrupt magic + raw = bytearray(path.read_bytes()) + struct.pack_into(" None: + num_slots = 3 + blob = BlobFsCapture(tmp_path / "cap.blob", num_slots=num_slots, max_image_bytes=512 * 1024) + images = [_make_image() for _ in range(num_slots + 1)] + filenames = [f"frame_{i}.png" for i in range(num_slots + 1)] + for img, fn in zip(images, filenames): + blob.capture(img, fn) + blob.close() + + # write_head should have wrapped: after 4 writes into 3 slots, write_head == 1 + data = (tmp_path / "cap.blob").read_bytes() + _, _, _, write_head, _, _ = struct.unpack_from(SUPERBLOCK_FORMAT, data, 0) + assert write_head == 1 + + # Slot 0 should contain the 4th image (index 3) + idx0_offset = SUPERBLOCK_SIZE + img_offset, img_size, _flags, fn_len, _res, fn_raw = struct.unpack_from(INDEX_ENTRY_FORMAT, data, idx0_offset) + fn = fn_raw[:fn_len].decode("utf-8") + assert fn == "frame_3.png" + + def test_filename_too_long_raises(self, tmp_path: pathlib.Path) -> None: + blob = BlobFsCapture(tmp_path / "cap.blob", num_slots=4, max_image_bytes=256 * 1024) + with pytest.raises(ValueError, match="too long"): + blob.capture(_make_image(), "x" * (MAX_FILENAME_BYTES + 1)) + blob.close() + + def test_image_too_large_raises(self, tmp_path: pathlib.Path) -> None: + blob = BlobFsCapture(tmp_path / "cap.blob", num_slots=4, max_image_bytes=1) + with pytest.raises(ValueError, match="too large"): + blob.capture(_make_image(), "frame.png") + blob.close() + + def test_context_manager(self, tmp_path: pathlib.Path) -> None: + path = tmp_path / "cap.blob" + with BlobFsCapture(path, num_slots=4, max_image_bytes=256 * 1024) as blob: + blob.capture(_make_image(), "frame.png") + assert path.exists() + + def test_flags_stored_in_index(self, tmp_path: pathlib.Path) -> None: + blob = BlobFsCapture(tmp_path / "cap.blob", num_slots=4, max_image_bytes=512 * 1024) + blob.capture(_make_image(), "frame.png", flags=0xCAFEBABE) + blob.close() + data = (tmp_path / "cap.blob").read_bytes() + _img_off, _img_sz, flags, _fn_len, _res, _fn_raw = struct.unpack_from(INDEX_ENTRY_FORMAT, data, SUPERBLOCK_SIZE) + assert flags == 0xCAFEBABE + + def test_update_last_flags_overwrites_flags_field(self, tmp_path: pathlib.Path) -> None: + blob = BlobFsCapture(tmp_path / "cap.blob", num_slots=4, max_image_bytes=512 * 1024) + blob.capture(_make_image(), "frame.png", flags=0) + blob.update_last_flags(TRIGGER_FRAME_FLAG) + blob.close() + data = (tmp_path / "cap.blob").read_bytes() + _img_off, _img_sz, flags, _fn_len, _res, _fn_raw = struct.unpack_from(INDEX_ENTRY_FORMAT, data, SUPERBLOCK_SIZE) + assert flags == TRIGGER_FRAME_FLAG + + def test_reopen_size_mismatch_raises(self, tmp_path: pathlib.Path) -> None: + path = tmp_path / "cap.blob" + blob = BlobFsCapture(path, num_slots=4, max_image_bytes=256 * 1024) + blob.close() + with open(path, "ab") as f: + f.write(b"\x00") + with pytest.raises(ValueError, match="size mismatch"): + BlobFsCapture(path, num_slots=4, max_image_bytes=256 * 1024) + + def test_get_last_filepath_returns_none(self, tmp_path: pathlib.Path) -> None: + blob = BlobFsCapture(tmp_path / "cap.blob", num_slots=4, max_image_bytes=256 * 1024) + assert blob.get_last_filepath() is None + blob.close() + + def test_set_capture_ts_is_a_no_op(self, tmp_path: pathlib.Path) -> None: + blob = BlobFsCapture(tmp_path / "cap.blob", num_slots=4, max_image_bytes=256 * 1024) + blob.set_capture_ts(MagicMock()) + blob.close() + + def test_active_blob_path_returns_blob_path(self, tmp_path: pathlib.Path) -> None: + path = tmp_path / "cap.blob" + blob = BlobFsCapture(path, num_slots=4, max_image_bytes=256 * 1024) + assert blob.active_blob_path() == path + blob.close() + + def test_reopen_existing_blob_restores_write_head(self, tmp_path: pathlib.Path) -> None: + path = tmp_path / "cap.blob" + blob = BlobFsCapture(path, num_slots=4, max_image_bytes=256 * 1024) + blob.capture(_make_image(), "frame.png") + expected_head = blob._write_head + blob.close() + reopened = BlobFsCapture(path, num_slots=4, max_image_bytes=256 * 1024) + assert reopened._write_head == expected_head + reopened.close() + + def test_double_close_is_safe(self, tmp_path: pathlib.Path) -> None: + blob = BlobFsCapture(tmp_path / "cap.blob", num_slots=4, max_image_bytes=256 * 1024) + blob.close() + blob.close() + + def test_update_last_flags_targets_most_recent_slot(self, tmp_path: pathlib.Path) -> None: + blob = BlobFsCapture(tmp_path / "cap.blob", num_slots=4, max_image_bytes=512 * 1024) + blob.capture(_make_image(), "first.png", flags=0) + blob.capture(_make_image(), "second.png", flags=0) + blob.update_last_flags(TRIGGER_FRAME_FLAG) + blob.close() + data = (tmp_path / "cap.blob").read_bytes() + # slot 0 (first) must stay 0 + _a, _b, flags0, *_ = struct.unpack_from(INDEX_ENTRY_FORMAT, data, SUPERBLOCK_SIZE) + assert flags0 == 0 + # slot 1 (second/last) must have TRIGGER_FRAME_FLAG + _a, _b, flags1, *_ = struct.unpack_from(INDEX_ENTRY_FORMAT, data, SUPERBLOCK_SIZE + INDEX_ENTRY_SIZE) + assert flags1 == TRIGGER_FRAME_FLAG + + +# --------------------------------------------------------------------------- +# BlobExtractor +# --------------------------------------------------------------------------- + + +class BlobExtractorTest: + def _write_blob(self, tmp_path: pathlib.Path, images_and_names: list[tuple]) -> pathlib.Path: + path = tmp_path / "cap.blob" + blob = BlobFsCapture(path, num_slots=5, max_image_bytes=512 * 1024) + for img, fn in images_and_names: + blob.capture(img, fn) + blob.close() + return path + + def test_extract_valid_images(self, tmp_path: pathlib.Path) -> None: + names = ["a.png", "b.png", "c.png"] + path = self._write_blob(tmp_path, [(_make_image(), n) for n in names]) + dest = tmp_path / "out" + extracted = BlobExtractor(path).extract(dest) + assert len(extracted) == 3 + assert {p.name for p in extracted} == set(names) + for p in extracted: + assert p.exists() + + def test_partial_blob_is_decodable(self, tmp_path: pathlib.Path) -> None: + """A blob with fewer written entries than its slot capacity must still decode correctly.""" + num_slots = 5 + written_names = ["frame_0.png", "frame_1.png", "frame_2.png"] + path = tmp_path / "partial.blob" + + blob = BlobFsCapture(path, num_slots=num_slots, max_image_bytes=512 * 1024) + for name in written_names: + blob.capture(_make_image(), name) + blob.close() + + extracted = BlobExtractor(path).extract(tmp_path / "out") + assert len(extracted) == len(written_names) + assert {p.name for p in extracted} == set(written_names) + + def test_extract_creates_dest_dir(self, tmp_path: pathlib.Path) -> None: + path = self._write_blob(tmp_path, [(_make_image(), "a.png")]) + dest = tmp_path / "new" / "subdir" + BlobExtractor(path).extract(dest) + assert dest.exists() + + def test_extract_invalid_magic_raises(self, tmp_path: pathlib.Path) -> None: + path = tmp_path / "bad.blob" + path.write_bytes(b"\xff" * 64) + with pytest.raises(ValueError, match="magic"): + BlobExtractor(path).extract(tmp_path / "out") + + def test_truncated_data_section_extracts_intact_images(self, tmp_path: pathlib.Path) -> None: + """Blob truncated partway into the third slot's data: first two images are still extracted.""" + num_slots = 3 + max_bytes = 512 * 1024 + path = tmp_path / "truncated.blob" + blob = BlobFsCapture(path, num_slots=num_slots, max_image_bytes=max_bytes) + for name in ["a.png", "b.png", "c.png"]: + blob.capture(_make_image(), name) + blob.close() + + # Keep the superblock, full index, and both first slots' data — cut 50 bytes into slot 2 + data_section_start = SUPERBLOCK_SIZE + num_slots * INDEX_ENTRY_SIZE + path.write_bytes(path.read_bytes()[: data_section_start + 2 * max_bytes + 50]) + + extracted = BlobExtractor(path).extract(tmp_path / "out") + assert len(extracted) == 2 + assert {p.name for p in extracted} == {"a.png", "b.png"} + + def test_double_close_is_safe(self, tmp_path: pathlib.Path) -> None: + path = self._write_blob(tmp_path, [(_make_image(), "frame.png")]) + e = BlobExtractor(path) + e.close() + e.close() + + def test_file_too_small_raises(self, tmp_path: pathlib.Path) -> None: + path = tmp_path / "tiny.blob" + path.write_bytes(b"\x00" * (SUPERBLOCK_SIZE - 1)) + with pytest.raises(ValueError, match="too small"): + BlobExtractor(path) + + def test_context_manager_and_truncated_index_breaks_early(self, tmp_path: pathlib.Path) -> None: + """Index truncated mid-second-entry: extract() and get_filenames_with_flags() both break early.""" + path = tmp_path / "cap.blob" + blob = BlobFsCapture(path, num_slots=4, max_image_bytes=512 * 1024) + blob.capture(_make_image(), "frame.png") + blob.close() + # Cut in the middle of the second index entry so the loop must break + truncate_at = SUPERBLOCK_SIZE + INDEX_ENTRY_SIZE + INDEX_ENTRY_SIZE // 2 + path.write_bytes(path.read_bytes()[:truncate_at]) + with BlobExtractor(path) as e: + extracted = e.extract(tmp_path / "out") + trigger_names = e.get_filenames_with_flags(TRIGGER_FRAME_FLAG) + assert extracted == [] + assert trigger_names == set() + assert e._mm is None # context manager closed it + + def test_zero_filename_len_uses_slot_index_fallback(self, tmp_path: pathlib.Path) -> None: + path = tmp_path / "cap.blob" + blob = BlobFsCapture(path, num_slots=4, max_image_bytes=512 * 1024) + blob.capture(_make_image(), "real.png") + blob.close() + # fn_len is at byte 24 within the slot-0 index entry + raw = bytearray(path.read_bytes()) + struct.pack_into(" None: + """Filename bytes invalid in UTF-8 but valid in latin-1 are decoded correctly in both paths.""" + path = tmp_path / "cap.blob" + blob = BlobFsCapture(path, num_slots=4, max_image_bytes=512 * 1024) + blob.capture(_make_image(), "placeholder.png") + blob.close() + # Patch slot-0 index: set a latin-1 filename (0xE9 = é, not valid UTF-8) + latin1_name = b"caf\xe9.png" + raw = bytearray(path.read_bytes()) + struct.pack_into(" None: + """Simulates two interrupted writes; BlobExtractor must extract the 3 valid images.""" + path = tmp_path / "cap.blob" + blob = BlobFsCapture(path, num_slots=5, max_image_bytes=512 * 1024) + for i in range(3): + blob.capture(_make_image(), f"valid_{i}.png") + blob.close() + + raw = bytearray(path.read_bytes()) + + # Scenario A: slot 3 — advance write_head in superblock but leave index entry zeroed + struct.pack_into(" None: + path = tmp_path / "literal.blob" + blob = BlobFsCapture(path, num_slots=4, max_image_bytes=256 * 1024) + blob.close() + assert path.exists() + + +# --------------------------------------------------------------------------- +# rotate() +# --------------------------------------------------------------------------- + + +class RotateTest: + def test_blob_rotate_renames_file_to_event_id(self, tmp_path: pathlib.Path) -> None: + original = tmp_path / "cap-XXX.blob" + blob = BlobFsCapture(original, num_slots=4, max_image_bytes=512 * 1024) + resolved = blob._blob_path # e.g. cap-001.blob + blob.capture(_make_image(), "frame0.png") + old = blob.rotate("my-event-id") + try: + # The original file was renamed to my-event-id.blob + assert (tmp_path / "my-event-id.blob").exists() + assert old._blob_path == tmp_path / "my-event-id.blob" + # The primary blob reopened at the original resolved path + assert blob._blob_path == resolved + assert resolved.exists() + finally: + old.close() + blob.close() + + def test_blob_rotate_old_handle_retains_data(self, tmp_path: pathlib.Path) -> None: + blob = BlobFsCapture(tmp_path / "cap-XXX.blob", num_slots=4, max_image_bytes=512 * 1024) + blob.capture(_make_image(), "before_rotate.png") + old = blob.rotate("event-abc") + try: + blob.capture(_make_image(), "after_rotate.png") + blob.close() + old.close() + # Old handle (renamed to event-abc.blob): contains before_rotate.png + dest = tmp_path / "from_old" + extracted_old = BlobExtractor(tmp_path / "event-abc.blob").extract(dest) + assert {p.name for p in extracted_old} == {"before_rotate.png"} + # New blob (original path): contains after_rotate.png + dest2 = tmp_path / "from_new" + extracted_new = BlobExtractor(tmp_path / "cap-XXX.blob").extract(dest2) + assert {p.name for p in extracted_new} == {"after_rotate.png"} + finally: + pass # already closed above + + def test_blob_rotate_multiple_times(self, tmp_path: pathlib.Path) -> None: + blob = BlobFsCapture(tmp_path / "cap-XX.blob", num_slots=4, max_image_bytes=512 * 1024) + event_ids = ["evt-a", "evt-b", "evt-c"] + handles = [] + for i, eid in enumerate(event_ids): + blob.capture(_make_image(), f"frame{i}.png") + handles.append(blob.rotate(eid)) + blob.close() + for h in handles: + h.close() + # Three event blobs renamed + the primary still exists at cap-01.blob + for eid in event_ids: + assert (tmp_path / f"{eid}.blob").exists() + assert (tmp_path / "cap-XX.blob").exists() + + +# --------------------------------------------------------------------------- +# CompositeBlobCapture +# --------------------------------------------------------------------------- + + +class CompositeBlobCaptureTest: + def _make_composite( + self, + tmp_path: pathlib.Path, + follow_up: int = 2, + handler: BlobCompletionHandler | None = None, + ) -> CompositeBlobCapture: + if handler is None: + handler = NoOpBlobCompletionHandler() + return CompositeBlobCapture( + tmp_path / "cap-XXX.blob", + num_slots=8, + max_image_bytes=512 * 1024, + follow_up_count=follow_up, + completion_handler=handler, + ) + + def test_capture_writes_to_primary(self, tmp_path: pathlib.Path) -> None: + comp = self._make_composite(tmp_path) + comp.capture(_make_image(), "frame.png") + comp.close() + blobs = list(tmp_path.glob("*.blob")) + assert len(blobs) == 1 + extracted = BlobExtractor(blobs[0]).extract(tmp_path / "out") + assert len(extracted) == 1 + + def test_rotate_renames_file_to_event_id(self, tmp_path: pathlib.Path) -> None: + comp = self._make_composite(tmp_path) + comp.capture(_make_image(), "trigger.png") + comp.rotate("fire-uuid") + comp.close() + assert (tmp_path / "fire-uuid.blob").exists() + + def test_follow_up_writes_reach_rotated_blob(self, tmp_path: pathlib.Path) -> None: + follow_up = 3 + comp = self._make_composite(tmp_path, follow_up=follow_up) + comp.capture(_make_image(), "trigger.png") + comp.rotate("ev1") + for i in range(follow_up): + comp.capture(_make_image(), f"followup{i}.png") + comp.close() + # Event blob should contain: trigger frame + follow_up frames + extracted = BlobExtractor(tmp_path / "ev1.blob").extract(tmp_path / "out") + assert len(extracted) == 1 + follow_up + + def test_completion_handler_called_with_event_id_and_blob(self, tmp_path: pathlib.Path) -> None: + handler = MagicMock(spec=BlobCompletionHandler) + follow_up = 2 + comp = self._make_composite(tmp_path, follow_up=follow_up, handler=handler) + comp.capture(_make_image(), "trigger.png") + comp.rotate("ev-done") + for _ in range(follow_up): + comp.capture(_make_image(), "extra.png") + comp.close() + handler.assert_called_once() + call_event_id, call_blob = handler.call_args[0] + assert call_event_id == "ev-done" + assert isinstance(call_blob, BlobFsCapture) + + def test_no_completion_on_regular_close(self, tmp_path: pathlib.Path) -> None: + handler = MagicMock(spec=BlobCompletionHandler) + comp = self._make_composite(tmp_path, follow_up=5, handler=handler) + comp.capture(_make_image(), "trigger.png") + comp.rotate("ev-early") + # close without exhausting follow-up quota + comp.capture(_make_image(), "one.png") + comp.close() + handler.assert_not_called() + + def test_trigger_frame_flag_in_primary_pre_rotation(self, tmp_path: pathlib.Path) -> None: + comp = self._make_composite(tmp_path, follow_up=2) + comp.capture(_make_image(), "trigger.png", flags=TRIGGER_FRAME_FLAG) + primary_path = comp._blob._blob_path + comp.rotate("ev-flag") + comp.close() + # The trigger frame is in the event blob (the file that was the primary before rotation) + data = (tmp_path / "ev-flag.blob").read_bytes() + _img_off, _img_sz, flags, *_ = struct.unpack_from(INDEX_ENTRY_FORMAT, data, SUPERBLOCK_SIZE) + assert flags == TRIGGER_FRAME_FLAG + # New primary blob has no entries yet (fresh) + primary_data = primary_path.read_bytes() + _magic2, _ver2, _slots2, wh2, *_ = struct.unpack_from(SUPERBLOCK_FORMAT, primary_data, 0) + assert wh2 == 0 + + def test_follow_up_entries_have_zero_flags(self, tmp_path: pathlib.Path) -> None: + follow_up = 2 + comp = self._make_composite(tmp_path, follow_up=follow_up) + comp.capture(_make_image(), "trigger.png", flags=TRIGGER_FRAME_FLAG) + comp.rotate("ev-flags") + for i in range(follow_up): + comp.capture(_make_image(), f"followup{i}.png") + comp.close() + data = (tmp_path / "ev-flags.blob").read_bytes() + # Check all follow-up entries have flags == 0 + for slot in range(1, 1 + follow_up): + offset = SUPERBLOCK_SIZE + slot * INDEX_ENTRY_SIZE + _img_off, _img_sz, flags, *_ = struct.unpack_from(INDEX_ENTRY_FORMAT, data, offset) + assert flags == 0 + + def test_update_last_flags_sets_flag_on_primary_blob(self, tmp_path: pathlib.Path) -> None: + comp = self._make_composite(tmp_path) + comp.capture(_make_image(), "frame.png", flags=0) + comp.update_last_flags(TRIGGER_FRAME_FLAG) + primary_path = comp._blob._blob_path + comp.close() + data = primary_path.read_bytes() + _img_off, _img_sz, flags, *_ = struct.unpack_from(INDEX_ENTRY_FORMAT, data, SUPERBLOCK_SIZE) + assert flags == TRIGGER_FRAME_FLAG + + def test_active_blob_path_returns_primary_path(self, tmp_path: pathlib.Path) -> None: + comp = self._make_composite(tmp_path) + assert comp.active_blob_path() == comp._blob._blob_path + comp.close() + + def test_active_blob_path_updates_after_rotate(self, tmp_path: pathlib.Path) -> None: + comp = self._make_composite(tmp_path) + original_primary = comp._blob._blob_path + comp.capture(_make_image(), "trigger.png") + comp.rotate("ev-rotate") + # After rotation the primary blob has been reopened at the original path + assert comp.active_blob_path() == original_primary + comp.close() + + def test_rotate_immediate_calls_handler_right_away(self, tmp_path: pathlib.Path) -> None: + handler = MagicMock(spec=BlobCompletionHandler) + comp = self._make_composite(tmp_path, follow_up=3, handler=handler) + comp.capture(_make_image(), "trigger.png") + comp.rotate("ev-imm", immediate=True) + # Handler must be called immediately — no follow-up frames needed + handler.assert_called_once() + call_event_id, call_blob = handler.call_args[0] + assert call_event_id == "ev-imm" + assert isinstance(call_blob, BlobFsCapture) + comp.close() + + def test_rotate_immediate_does_not_add_to_active(self, tmp_path: pathlib.Path) -> None: + comp = self._make_composite(tmp_path, follow_up=3) + comp.capture(_make_image(), "trigger.png") + comp.rotate("ev-imm2", immediate=True) + assert len(comp._active) == 0 + comp.close() + + def test_set_capture_ts_delegates_to_primary(self, tmp_path: pathlib.Path) -> None: + comp = self._make_composite(tmp_path) + comp.set_capture_ts(MagicMock()) + comp.close() + + def test_get_last_filepath_delegates_to_primary(self, tmp_path: pathlib.Path) -> None: + comp = self._make_composite(tmp_path) + assert comp.get_last_filepath() is None + comp.close() + + def test_rotate_immediate_does_not_deliver_follow_up_frames(self, tmp_path: pathlib.Path) -> None: + handler = MagicMock(spec=BlobCompletionHandler) + comp = self._make_composite(tmp_path, follow_up=3, handler=handler) + comp.capture(_make_image(), "trigger.png") + comp.rotate("ev-imm3", immediate=True) + # Additional frames go only to the new primary — handler stays called exactly once + for i in range(3): + comp.capture(_make_image(), f"extra{i}.png") + comp.close() + handler.assert_called_once() + + +# --------------------------------------------------------------------------- +# add_args / make_capture_backend +# --------------------------------------------------------------------------- + + +class CaptureFactoryTest: + def test_add_args_registers_defaults(self) -> None: + parser = argparse.ArgumentParser() + add_args(parser) + args = parser.parse_args([]) + assert args.blob_capture_file == "capture.blob" + assert args.blob_num_slots == 60 + assert args.blob_max_image_bytes == 4 * 1024 * 1024 + assert args.blob_png_compression == 1 + + def test_add_args_allows_overrides(self) -> None: + parser = argparse.ArgumentParser() + add_args(parser) + args = parser.parse_args( + ["--blob-capture-file", "my.blob", "--blob-num-slots", "10", "--blob-max-image-bytes", "1024"] + ) + assert args.blob_capture_file == "my.blob" + assert args.blob_num_slots == 10 + assert args.blob_max_image_bytes == 1024 + + def test_make_capture_backend_returns_composite(self, tmp_path: pathlib.Path) -> None: + parser = argparse.ArgumentParser() + add_args(parser) + args = parser.parse_args([]) + backend = make_capture_backend(args, tmp_path, follow_up_count=5) + assert isinstance(backend, CompositeBlobCapture) + backend.close() + + def test_make_capture_backend_uses_completion_handler(self, tmp_path: pathlib.Path) -> None: + parser = argparse.ArgumentParser() + add_args(parser) + args = parser.parse_args([]) + handler = MagicMock(spec=BlobCompletionHandler) + backend = make_capture_backend(args, tmp_path, follow_up_count=1, completion_handler=handler) + assert isinstance(backend, CompositeBlobCapture) + backend.capture(_make_image(), "t.png") + backend.rotate("ev") + backend.capture(_make_image(), "f.png") + backend.close() + handler.assert_called_once() + + +# --------------------------------------------------------------------------- +# blob-extract CLI — trigger frame annotation +# --------------------------------------------------------------------------- + + +def _run_extract_cli(blob_path: pathlib.Path, dest: pathlib.Path, info: bool = False) -> str: + """Run the extract CLI and return captured stdout.""" + stdout = io.StringIO() + argv = ["blob-extract", f"--blob-capture-file={str(blob_path)}", str(dest)] + if info: + argv.append("--info") + with patch("sys.argv", argv), patch("sys.stdout", stdout), patch("sys.stderr", io.StringIO()): + blob_extract_main() + return stdout.getvalue() + + +class BlobExtractCliTest: + def _write_blob(self, tmp_path: pathlib.Path) -> pathlib.Path: + path = tmp_path / "test.blob" + blob = BlobFsCapture(path, num_slots=4, max_image_bytes=512 * 1024) + blob.capture(_make_image(), "before.png", flags=0) + blob.capture(_make_image(), "trigger.png", flags=TRIGGER_FRAME_FLAG) + blob.capture(_make_image(), "after.png", flags=0) + blob.close() + return path + + def test_trigger_frame_line_is_annotated(self, tmp_path: pathlib.Path) -> None: + blob_path = self._write_blob(tmp_path) + output = _run_extract_cli(blob_path, tmp_path / "out") + lines = {line.strip() for line in output.splitlines() if line.strip()} + assert any("trigger.png" in line and "[TRIGGER_FRAME_FLAG]" in line for line in lines) + + def test_non_trigger_lines_have_no_annotation(self, tmp_path: pathlib.Path) -> None: + blob_path = self._write_blob(tmp_path) + output = _run_extract_cli(blob_path, tmp_path / "out") + for line in output.splitlines(): + if "before.png" in line or "after.png" in line: + assert "[TRIGGER_FRAME_FLAG]" not in line + + def test_no_trigger_frame_no_annotation(self, tmp_path: pathlib.Path) -> None: + path = tmp_path / "plain.blob" + blob = BlobFsCapture(path, num_slots=4, max_image_bytes=512 * 1024) + blob.capture(_make_image(), "frame0.png", flags=0) + blob.capture(_make_image(), "frame1.png", flags=0) + blob.close() + output = _run_extract_cli(path, tmp_path / "out") + assert "[TRIGGER_FRAME_FLAG]" not in output + + def test_missing_blob_exits_with_error(self, tmp_path: pathlib.Path) -> None: + stderr = io.StringIO() + argv = ["blob-extract", f"--blob-capture-file={tmp_path / 'nonexistent.blob'}", str(tmp_path / "out")] + with patch("sys.argv", argv), patch("sys.stderr", stderr): + with pytest.raises(SystemExit) as exc: + blob_extract_main() + assert exc.value.code == 1 + assert "not found" in stderr.getvalue() + + def test_invalid_blob_exits_with_error(self, tmp_path: pathlib.Path) -> None: + path = tmp_path / "bad.blob" + path.write_bytes(b"\xff" * SUPERBLOCK_SIZE) + stderr = io.StringIO() + argv = ["blob-extract", f"--blob-capture-file={path}", str(tmp_path / "out")] + with patch("sys.argv", argv), patch("sys.stderr", stderr): + with pytest.raises(SystemExit) as exc: + blob_extract_main() + assert exc.value.code == 1 + assert "error" in stderr.getvalue() + + def test_blob_info(self, tmp_path: pathlib.Path) -> None: + path = tmp_path / "info.blob" + blob = BlobFsCapture(path, num_slots=4, max_image_bytes=512 * 1024) + blob.capture(_make_image(), "frame0.png", flags=0) + blob.capture(_make_image(), "frame1.png", flags=TRIGGER_FRAME_FLAG) + blob.close() + output = _run_extract_cli(path, tmp_path / "out", info=True) + + assert f"blob: {path}" in output + assert f"magic: {MAGIC:#010x}" in output + assert "version: 1" in output + assert "num_slots: 4 (write_head=2)" in output + assert "max_image_bytes: 512.0 KiB (524288 bytes)" in output + assert "occupied_slots: 2/4" in output + assert "trigger_frame: frame1.png" in output + assert not (tmp_path / "out").exists()