diff --git a/common_utility/capture.py b/common_utility/capture.py index 14aeb30..64ab78e 100644 --- a/common_utility/capture.py +++ b/common_utility/capture.py @@ -1,7 +1,9 @@ from __future__ import annotations import argparse +import dataclasses import mmap +import shutil import struct import pathlib import os @@ -31,27 +33,34 @@ TRIGGER_FRAME_FLAG: int = 1 -def add_args(parser: argparse.ArgumentParser) -> None: +def add_args(parser: argparse.ArgumentParser, **defaults: dict[str, Any]) -> None: parser.add_argument( "--blob-capture-file", - default="capture.blob", - help="Name of the blob ring-buffer file, stored under the capture folder", + default=defaults.get("blob_capture_file", "capture.blob"), + help="Full path to the active blob ring-buffer file (e.g. a tmpfs location)", + type=pathlib.Path, + ) + parser.add_argument( + "--blob-capture-folder", + default=defaults.get("blob_capture_folder", "."), + help="Folder for the rotated blob capture files (created if absent)", + type=pathlib.Path, ) parser.add_argument( "--blob-num-slots", - default=60, + default=defaults.get("blob_num_slots", 60), type=int, help="Number of image slots in the blob ring buffer", ) parser.add_argument( "--blob-max-image-bytes", - default=4 * 1024 * 1024, + default=defaults.get("blob_max_image_bytes", 4 * 1024 * 1024), type=int, help="Maximum bytes per image slot in the blob ring buffer", ) parser.add_argument( "--blob-png-compression", - default=1, + default=defaults.get("blob_png_compression", 1), type=int, choices=range(10), metavar="0-9", @@ -60,14 +69,28 @@ def add_args(parser: argparse.ArgumentParser) -> None: class BlobCompletionHandler(ABC): - """Invoked when a rotated event blob has received all its follow-up writes.""" + """Invoked when a rotation event is complete. + + blob_path is the primary (pre-rotation) blob; additional_blobs contains the post blob + path when one was opened (empty for immediate rotations). + """ @abstractmethod - def __call__(self, event_id: str, blob: BlobFsCapture) -> None: ... + def __call__( + self, + event_id: str, + blob_path: pathlib.Path, + additional_blobs: list[pathlib.Path], + ) -> None: ... class NoOpBlobCompletionHandler(BlobCompletionHandler): - def __call__(self, event_id: str, blob: BlobFsCapture) -> None: + def __call__( + self, + event_id: str, + blob_path: pathlib.Path, + additional_blobs: list[pathlib.Path], + ) -> None: pass @@ -98,11 +121,13 @@ def __init__( num_slots: int = 60, max_image_bytes: int = 4 * 1024 * 1024, png_compression: int = 1, + capture_folder: Optional[pathlib.Path] = None, ) -> None: self._blob_path = blob_path self._num_slots = num_slots self._max_image_bytes = max_image_bytes self._png_compression = png_compression + self._capture_folder = capture_folder self._index_base = SUPERBLOCK_SIZE self._data_base = SUPERBLOCK_SIZE + num_slots * INDEX_ENTRY_SIZE self._file_size = self._data_base + num_slots * max_image_bytes @@ -166,10 +191,8 @@ def capture(self, image: NDArray[np.uint8], filename: str, flags: int = 0) -> No data_offset = self._slot_data_offset(slot) idx_offset = self._slot_index_offset(slot) - # Write image data into the slot's region self._mm[data_offset : data_offset + len(png_bytes)] = png_bytes - # Write index entry filename_padded = filename_bytes.ljust(MAX_FILENAME_BYTES, b"\x00") entry = struct.pack( INDEX_ENTRY_FORMAT, @@ -182,7 +205,6 @@ def capture(self, image: NDArray[np.uint8], filename: str, flags: int = 0) -> No ) self._mm[idx_offset : idx_offset + INDEX_ENTRY_SIZE] = entry - # Advance write_head and persist to superblock self._write_head = (self._write_head + 1) % self._num_slots struct.pack_into(" None: self._mm.flush() def rotate(self, event_id: str, immediate: bool = False) -> BlobFsCapture: - """Rename the current blob to .blob and open a fresh one at the original path. - - The current file handle and mmap are transferred to a new BlobFsCapture wrapper (the - "old handle") which is returned to the caller. On Linux, renaming an open/mmap'd file - preserves the inode so the old handle remains valid after the rename. - """ - old: BlobFsCapture = object.__new__(BlobFsCapture) - old.__dict__.update(self.__dict__) + """Close the current blob, move it to capture_folder (or same dir) as .blob, then reopen fresh.""" + self.close() - renamed = self._blob_path.parent / f"{event_id}.blob" - os.rename(self._blob_path, renamed) - old._blob_path = renamed + if self._capture_folder is not None: + self._capture_folder.mkdir(parents=True, exist_ok=True) + dest = self._capture_folder / f"{event_id}.blob" + shutil.move(str(self._blob_path), str(dest)) + else: + dest = self._blob_path.parent / f"{event_id}.blob" + os.rename(self._blob_path, dest) - self._file = None - self._mm = None - self._write_head = 0 self._open() - - return old + return self def close(self) -> None: if self._mm is not None: @@ -242,25 +258,36 @@ def __exit__(self, *args: Any) -> None: class CompositeBlobCapture(ImageCaptureInterface): - """Composite capture backend: one primary blob + a list of recently-rotated event blobs. - - After rotation each event blob receives `follow_up_count` additional frames (flags=0), - then is closed and the completion handler is invoked. + """Composite capture backend: one primary blob on a fast path (e.g. tmpfs) + one optional + post-event blob in the capture folder receiving follow-up frames after rotation. + + On rotate(event_id): + - The primary blob is closed and moved to capture_folder/{event_id}.blob. + - A fresh primary blob is opened at the original blob_path. + - A new {event_id}-post.blob is opened in capture_folder to receive follow_up_count frames. + - At most one post blob is active at a time; a second rotation force-completes the existing one. """ def __init__( self, blob_path: pathlib.Path, + capture_folder: pathlib.Path, num_slots: int, max_image_bytes: int, follow_up_count: int, completion_handler: BlobCompletionHandler, png_compression: int = 1, ) -> None: - self._blob = BlobFsCapture(blob_path, num_slots, max_image_bytes, png_compression) + self._capture_folder = capture_folder + self._num_slots = num_slots + self._max_image_bytes = max_image_bytes + self._png_compression = png_compression + self._blob = BlobFsCapture( + blob_path, num_slots, max_image_bytes, png_compression, capture_folder=capture_folder + ) self._follow_up_count = follow_up_count self._completion_handler = completion_handler - self._active: list[tuple[BlobFsCapture, str, int]] = [] + self._post_blob: Optional[tuple[BlobFsCapture, str, int]] = None def set_capture_ts(self, ts: datetime.datetime) -> None: self._blob.set_capture_ts(ts) @@ -270,30 +297,48 @@ def get_last_filepath(self) -> Optional[pathlib.Path]: def capture(self, image: NDArray[np.uint8], filename: str, flags: int = 0) -> None: self._blob.capture(image, filename, flags) - next_active: list[tuple[BlobFsCapture, str, int]] = [] - for blob, event_id, remaining in self._active: - blob.capture(image, filename, 0) + if self._post_blob is not None: + post_b, post_event_id, remaining = self._post_blob + post_b.capture(image, filename, 0) remaining -= 1 if remaining > 0: - next_active.append((blob, event_id, remaining)) + self._post_blob = (post_b, post_event_id, remaining) else: - blob.close() - self._completion_handler(event_id, blob) - self._active = next_active + post_b.close() + self._completion_handler( + post_event_id, + self._capture_folder / f"{post_event_id}.blob", + [self._capture_folder / f"{post_event_id}-post.blob"], + ) + self._post_blob = None def rotate(self, event_id: str, immediate: bool = False) -> CompositeBlobCapture: - old = self._blob.rotate(event_id) + # Force-complete any active post blob before starting a new rotation + if self._post_blob is not None: + post_b, post_event_id, _ = self._post_blob + post_b.close() + self._completion_handler( + post_event_id, + self._capture_folder / f"{post_event_id}.blob", + [self._capture_folder / f"{post_event_id}-post.blob"], + ) + self._post_blob = None + + self._blob.rotate(event_id) + if immediate: - old.close() - self._completion_handler(event_id, old) + self._completion_handler(event_id, self._capture_folder / f"{event_id}.blob", []) else: - self._active.append((old, event_id, self._follow_up_count)) + post_path = self._capture_folder / f"{event_id}-post.blob" + post_blob = BlobFsCapture(post_path, self._num_slots, self._max_image_bytes, self._png_compression) + self._post_blob = (post_blob, event_id, self._follow_up_count) + return self def close(self) -> None: - for blob, _event_id, _remaining in self._active: - blob.close() - self._active = [] + if self._post_blob is not None: + self._post_blob[0].close() + self._post_blob = None self._blob.close() def active_blob_path(self) -> Optional[pathlib.Path]: @@ -303,71 +348,104 @@ def update_last_flags(self, flags: int) -> None: self._blob.update_last_flags(flags) +@dataclasses.dataclass +class _OpenBlob: + file: Optional[Any] + mm: Optional[mmap.mmap] + version: int + num_slots: int + write_head: int + max_image_bytes: int + index_entry_size: int + + class BlobExtractor: def __init__(self, blob_path: pathlib.Path) -> None: self._blob_path = blob_path - self._file: Optional[Any] = None - self._mm: Optional[mmap.mmap] = None + self._blobs: list[_OpenBlob] = [] self._tempdir: Optional[tempfile.TemporaryDirectory] = None # type: ignore[type-arg] - actual_path = self._resolve_blob_path(blob_path) - file = open(actual_path, "rb") - self._file = file + self._frame_flags: Optional[dict[str, int]] = None + try: + paths = self._resolve_blob_paths(blob_path) + for path in paths: + self._open_single_blob(path) + except Exception: + self.close() + raise + + # Backwards-compat public attributes pointing to the first blob + first = self._blobs[0] + self._file = first.file + self._mm = first.mm + self.version = first.version + self.num_slots = first.num_slots + self.write_head = first.write_head + self.max_image_bytes = first.max_image_bytes + self.index_entry_size = first.index_entry_size + + def _open_single_blob(self, path: pathlib.Path) -> None: + file = open(path, "rb") try: - file_size = os.path.getsize(actual_path) + file_size = os.path.getsize(path) if file_size < SUPERBLOCK_SIZE: raise ValueError("File too small to contain a valid superblock") - self._mm = mmap.mmap(file.fileno(), 0, access=mmap.ACCESS_READ) + mm = mmap.mmap(file.fileno(), 0, access=mmap.ACCESS_READ) magic, version, num_slots, write_head, max_image_bytes, index_entry_size = struct.unpack_from( - SUPERBLOCK_FORMAT, self._mm, 0 + SUPERBLOCK_FORMAT, mm, 0 ) if magic != MAGIC: raise ValueError(f"Invalid blob magic: 0x{magic:08X}, expected 0x{MAGIC:08X}") except Exception: - if self._mm is not None: - self._mm.close() file.close() - self._file = None - if self._tempdir is not None: - self._tempdir.cleanup() - self._tempdir = None raise - self.version = version - self.num_slots = num_slots - self.write_head = write_head - self.max_image_bytes = max_image_bytes - self.index_entry_size = index_entry_size - self._frame_flags: Optional[dict[str, int]] = None - - def _resolve_blob_path(self, blob_path: pathlib.Path) -> pathlib.Path: - if not tarfile.is_tarfile(blob_path): - return blob_path + self._blobs.append(_OpenBlob(file, mm, version, num_slots, write_head, max_image_bytes, index_entry_size)) + + def _resolve_blob_paths(self, blob_path: pathlib.Path) -> list[pathlib.Path]: + if tarfile.is_tarfile(blob_path): + return self._extract_tar_blobs(blob_path) + # Regular file: include the primary plus any {stem}-*.blob siblings + parent = blob_path.parent + stem = blob_path.stem + siblings = sorted(parent.glob(f"{stem}-*.blob")) + return [blob_path] + siblings + + def _extract_tar_blobs(self, blob_path: pathlib.Path) -> list[pathlib.Path]: tmpdir = tempfile.TemporaryDirectory() self._tempdir = tmpdir try: + paths: list[pathlib.Path] = [] with tarfile.open(blob_path) as tf: members = [m for m in tf.getmembers() if m.isfile()] blob_members = [m for m in members if m.name.endswith(".blob")] - member = blob_members[0] if blob_members else (members[0] if members else None) - if member is None: + targets = blob_members if blob_members else (members[:1] if members else []) + if not targets: raise ValueError(f"No regular file found in archive: {blob_path}") - extracted = tf.extractfile(member) - if extracted is None: - raise ValueError(f"Cannot extract member from archive: {member.name}") - actual_path = pathlib.Path(tmpdir.name) / pathlib.Path(member.name).name - actual_path.write_bytes(extracted.read()) - return actual_path + for member in targets: + extracted = tf.extractfile(member) + if extracted is None: + continue + actual_path = pathlib.Path(tmpdir.name) / pathlib.Path(member.name).name + actual_path.write_bytes(extracted.read()) + paths.append(actual_path) + if not paths: + raise ValueError(f"No regular file found in archive: {blob_path}") + return paths except Exception: tmpdir.cleanup() self._tempdir = None raise def close(self) -> None: - if self._mm is not None: - self._mm.close() - self._mm = None - if self._file is not None: - self._file.close() - self._file = None + for b in self._blobs: + if b.mm is not None: + b.mm.close() + b.mm = None + if b.file is not None: + b.file.close() + b.file = None + self._blobs = [] + self._mm = None + self._file = None if self._tempdir is not None: self._tempdir.cleanup() self._tempdir = None @@ -378,24 +456,22 @@ def __enter__(self) -> BlobExtractor: def __exit__(self, *_: Any) -> None: self.close() - def extract(self, dest_dir: pathlib.Path) -> list[pathlib.Path]: - assert self._mm is not None - dest_dir.mkdir(parents=True, exist_ok=True) - + def _extract_blob(self, blob: _OpenBlob, dest_dir: pathlib.Path) -> list[pathlib.Path]: + assert blob.mm is not None extracted: list[pathlib.Path] = [] - for i in range(self.num_slots): - idx_offset = SUPERBLOCK_SIZE + i * self.index_entry_size - if idx_offset + self.index_entry_size > len(self._mm): + for i in range(blob.num_slots): + idx_offset = SUPERBLOCK_SIZE + i * blob.index_entry_size + if idx_offset + blob.index_entry_size > len(blob.mm): break image_offset, image_size, _flags, filename_len, _reserved, filename_raw = struct.unpack_from( - INDEX_ENTRY_FORMAT, self._mm, idx_offset + INDEX_ENTRY_FORMAT, blob.mm, idx_offset ) - if image_size == 0 or image_offset + image_size > len(self._mm): + if image_size == 0 or image_offset + image_size > len(blob.mm): continue - png_bytes = bytes(self._mm[image_offset : image_offset + image_size]) + png_bytes = bytes(blob.mm[image_offset : image_offset + image_size]) arr = np.frombuffer(png_bytes, dtype=np.uint8) if cv2.imdecode(arr, cv2.IMREAD_COLOR) is None: continue @@ -415,28 +491,35 @@ def extract(self, dest_dir: pathlib.Path) -> list[pathlib.Path]: return extracted - def _ensure_frame_flags(self) -> dict[str, int]: - if self._frame_flags is None: - assert self._mm is not None - frame_flags: dict[str, int] = {} - for i in range(self.num_slots): - idx_offset = SUPERBLOCK_SIZE + i * self.index_entry_size - if idx_offset + self.index_entry_size > len(self._mm): - break - _, img_size, flags, fn_len, _, fn_raw = struct.unpack_from(INDEX_ENTRY_FORMAT, self._mm, idx_offset) - if img_size > 0: - raw_fn = fn_raw[:fn_len] - try: - fn = raw_fn.decode("utf-8") - except UnicodeDecodeError: - fn = raw_fn.decode("latin-1") - frame_flags[fn] = flags - self._frame_flags = frame_flags - return frame_flags - return self._frame_flags + def extract(self, dest_dir: pathlib.Path) -> list[pathlib.Path]: + dest_dir.mkdir(parents=True, exist_ok=True) + extracted: list[pathlib.Path] = [] + for blob in self._blobs: + extracted.extend(self._extract_blob(blob, dest_dir)) + return extracted + + def _flags_for_blob(self, blob: _OpenBlob, mask: int) -> set[str]: + assert blob.mm is not None + result: set[str] = set() + for i in range(blob.num_slots): + idx_offset = SUPERBLOCK_SIZE + i * blob.index_entry_size + if idx_offset + blob.index_entry_size > len(blob.mm): + break + _, img_size, flags, fn_len, _, fn_raw = struct.unpack_from(INDEX_ENTRY_FORMAT, blob.mm, idx_offset) + if img_size > 0 and flags & mask: + raw_fn = fn_raw[:fn_len] + try: + fn = raw_fn.decode("utf-8") + except UnicodeDecodeError: + fn = raw_fn.decode("latin-1") + result.add(fn) + return result def get_filenames_with_flags(self, mask: int) -> set[str]: - return {fn for fn, flags in self._ensure_frame_flags().items() if flags & mask} + result: set[str] = set() + for blob in self._blobs: + result |= self._flags_for_blob(blob, mask) + return result def main() -> None: @@ -524,13 +607,15 @@ def _fmt(n: int) -> str: def make_capture_backend( args: argparse.Namespace, - capture_folder: pathlib.Path, follow_up_count: int, completion_handler: Optional[BlobCompletionHandler] = None, ) -> ImageCaptureInterface: - blob_path = capture_folder / args.blob_capture_file + blob_path: pathlib.Path = args.blob_capture_file + capture_folder: pathlib.Path = args.blob_capture_folder num_slots: int = args.blob_num_slots max_image_bytes: int = args.blob_max_image_bytes png_compression: int = args.blob_png_compression handler = completion_handler if completion_handler is not None else NoOpBlobCompletionHandler() - return CompositeBlobCapture(blob_path, num_slots, max_image_bytes, follow_up_count, handler, png_compression) + return CompositeBlobCapture( + blob_path, capture_folder, num_slots, max_image_bytes, follow_up_count, handler, png_compression + ) diff --git a/tests/captureTest.py b/tests/captureTest.py index cff6716..6cb02a3 100644 --- a/tests/captureTest.py +++ b/tests/captureTest.py @@ -58,7 +58,6 @@ def test_open_on_existing_path_creates_fresh_blob(self, tmp_path: pathlib.Path) path = tmp_path / "cap.blob" blob = BlobFsCapture(path, num_slots=4, max_image_bytes=256 * 1024) blob.close() - # Corrupt magic — new behaviour deletes the file and starts fresh raw = bytearray(path.read_bytes()) struct.pack_into(" None: blob.capture(img, fn) blob.close() - # write_head should have wrapped: after 4 writes into 3 slots, write_head == 1 data = (tmp_path / "cap.blob").read_bytes() _, _, _, write_head, _, _ = struct.unpack_from(SUPERBLOCK_FORMAT, data, 0) assert write_head == 1 - # Slot 0 should contain the 4th image (index 3) idx0_offset = SUPERBLOCK_SIZE img_offset, img_size, _flags, fn_len, _res, fn_raw = struct.unpack_from(INDEX_ENTRY_FORMAT, data, idx0_offset) fn = fn_raw[:fn_len].decode("utf-8") @@ -129,7 +126,6 @@ def test_open_on_size_mismatch_creates_fresh_blob(self, tmp_path: pathlib.Path) blob.close() with open(path, "ab") as f: f.write(b"\x00") - # New behaviour: size mismatch is resolved by deleting and recreating reopened = BlobFsCapture(path, num_slots=4, max_image_bytes=256 * 1024) reopened.close() expected = SUPERBLOCK_SIZE + 4 * INDEX_ENTRY_SIZE + 4 * 256 * 1024 @@ -156,7 +152,6 @@ def test_open_always_starts_with_zero_write_head(self, tmp_path: pathlib.Path) - blob = BlobFsCapture(path, num_slots=4, max_image_bytes=256 * 1024) blob.capture(_make_image(), "frame.png") blob.close() - # New behaviour: existing file is deleted; fresh blob always starts at write_head == 0 reopened = BlobFsCapture(path, num_slots=4, max_image_bytes=256 * 1024) assert reopened._write_head == 0 reopened.close() @@ -173,10 +168,8 @@ def test_update_last_flags_targets_most_recent_slot(self, tmp_path: pathlib.Path blob.update_last_flags(TRIGGER_FRAME_FLAG) blob.close() data = (tmp_path / "cap.blob").read_bytes() - # slot 0 (first) must stay 0 _a, _b, flags0, *_ = struct.unpack_from(INDEX_ENTRY_FORMAT, data, SUPERBLOCK_SIZE) assert flags0 == 0 - # slot 1 (second/last) must have TRIGGER_FRAME_FLAG _a, _b, flags1, *_ = struct.unpack_from(INDEX_ENTRY_FORMAT, data, SUPERBLOCK_SIZE + INDEX_ENTRY_SIZE) assert flags1 == TRIGGER_FRAME_FLAG @@ -242,7 +235,6 @@ def test_truncated_data_section_extracts_intact_images(self, tmp_path: pathlib.P blob.capture(_make_image(), name) blob.close() - # Keep the superblock, full index, and both first slots' data — cut 50 bytes into slot 2 data_section_start = SUPERBLOCK_SIZE + num_slots * INDEX_ENTRY_SIZE path.write_bytes(path.read_bytes()[: data_section_start + 2 * max_bytes + 50]) @@ -268,7 +260,6 @@ def test_context_manager_and_truncated_index_breaks_early(self, tmp_path: pathli blob = BlobFsCapture(path, num_slots=4, max_image_bytes=512 * 1024) blob.capture(_make_image(), "frame.png") blob.close() - # Cut in the middle of the second index entry so the loop must break truncate_at = SUPERBLOCK_SIZE + INDEX_ENTRY_SIZE + INDEX_ENTRY_SIZE // 2 path.write_bytes(path.read_bytes()[:truncate_at]) with BlobExtractor(path) as e: @@ -283,7 +274,6 @@ def test_zero_filename_len_uses_slot_index_fallback(self, tmp_path: pathlib.Path blob = BlobFsCapture(path, num_slots=4, max_image_bytes=512 * 1024) blob.capture(_make_image(), "real.png") blob.close() - # fn_len is at byte 24 within the slot-0 index entry raw = bytearray(path.read_bytes()) struct.pack_into(" None: raw = bytearray(path.read_bytes()) - # Scenario A: slot 3 — advance write_head in superblock but leave index entry zeroed - struct.pack_into(" None: assert len(extracted) == 3 assert {p.name for p in extracted} == {"valid_0.png", "valid_1.png", "valid_2.png"} + def test_sibling_blobs_combined_on_extract(self, tmp_path: pathlib.Path) -> None: + """A primary blob and a -post sibling are both extracted and combined transparently.""" + primary = tmp_path / "event-abc.blob" + post = tmp_path / "event-abc-post.blob" + + b1 = BlobFsCapture(primary, num_slots=4, max_image_bytes=512 * 1024) + b1.capture(_make_image(), "pre.png") + b1.close() + + b2 = BlobFsCapture(post, num_slots=4, max_image_bytes=512 * 1024) + b2.capture(_make_image(), "post.png") + b2.close() + + extracted = BlobExtractor(primary).extract(tmp_path / "out") + assert {p.name for p in extracted} == {"pre.png", "post.png"} + + def test_no_siblings_when_none_exist(self, tmp_path: pathlib.Path) -> None: + """When no sibling blobs exist, only the primary blob is extracted.""" + path = tmp_path / "solo.blob" + b = BlobFsCapture(path, num_slots=4, max_image_bytes=512 * 1024) + b.capture(_make_image(), "only.png") + b.close() + + extracted = BlobExtractor(path).extract(tmp_path / "out") + assert {p.name for p in extracted} == {"only.png"} + # --------------------------------------------------------------------------- # BlobExtractor — tar/tar.gz archive support @@ -379,8 +391,8 @@ def test_extract_from_tar_gz(self, tmp_path: pathlib.Path) -> None: assert len(extracted) == 2 assert {p.name for p in extracted} == set(names) - def test_tar_prefers_blob_extension_member(self, tmp_path: pathlib.Path) -> None: - """When archive contains multiple files, a member with .blob extension is preferred.""" + def test_tar_prefers_blob_extension_members(self, tmp_path: pathlib.Path) -> None: + """Archive with a non-blob file and a blob file: only the blob is extracted.""" blob_path = self._write_blob(tmp_path, [(_make_image(), "frame.png")]) dummy = tmp_path / "readme.txt" dummy.write_text("hello") @@ -393,6 +405,27 @@ def test_tar_prefers_blob_extension_member(self, tmp_path: pathlib.Path) -> None assert len(extracted) == 1 assert extracted[0].name == "frame.png" + def test_multiple_blobs_in_tar_combined(self, tmp_path: pathlib.Path) -> None: + """Tarball with two blob members: all images from both are combined.""" + pre_blob = tmp_path / "pre.blob" + post_blob = tmp_path / "post.blob" + + b1 = BlobFsCapture(pre_blob, num_slots=4, max_image_bytes=512 * 1024) + b1.capture(_make_image(), "pre.png") + b1.close() + + b2 = BlobFsCapture(post_blob, num_slots=4, max_image_bytes=512 * 1024) + b2.capture(_make_image(), "post.png") + b2.close() + + tar_path = tmp_path / "event.tar" + with tarfile.open(tar_path, "w") as tf: + tf.add(pre_blob, arcname="event.blob") + tf.add(post_blob, arcname="event-post.blob") + + extracted = BlobExtractor(tar_path).extract(tmp_path / "out") + assert {p.name for p in extracted} == {"pre.png", "post.png"} + def test_tar_tempdir_cleaned_up_on_close(self, tmp_path: pathlib.Path) -> None: blob_path = self._write_blob(tmp_path, [(_make_image(), "f.png")]) tar_path = tmp_path / "cap.tar.gz" @@ -451,56 +484,60 @@ def test_no_pattern_uses_literal_path(self, tmp_path: pathlib.Path) -> None: class RotateTest: - def test_blob_rotate_renames_file_to_event_id(self, tmp_path: pathlib.Path) -> None: - original = tmp_path / "cap-XXX.blob" - blob = BlobFsCapture(original, num_slots=4, max_image_bytes=512 * 1024) - resolved = blob._blob_path # e.g. cap-001.blob + def test_blob_rotate_moves_to_capture_folder(self, tmp_path: pathlib.Path) -> None: + cap_folder = tmp_path / "rotated" + blob_path = tmp_path / "active.blob" + blob = BlobFsCapture(blob_path, num_slots=4, max_image_bytes=512 * 1024, capture_folder=cap_folder) + blob.capture(_make_image(), "frame0.png") + blob.rotate("my-event-id") + blob.close() + assert (cap_folder / "my-event-id.blob").exists() + assert blob_path.exists() # fresh blob reopened at original path + + def test_blob_rotate_without_capture_folder_stays_local(self, tmp_path: pathlib.Path) -> None: + blob_path = tmp_path / "cap.blob" + blob = BlobFsCapture(blob_path, num_slots=4, max_image_bytes=512 * 1024) blob.capture(_make_image(), "frame0.png") - old = blob.rotate("my-event-id") - try: - # The original file was renamed to my-event-id.blob - assert (tmp_path / "my-event-id.blob").exists() - assert old._blob_path == tmp_path / "my-event-id.blob" - # The primary blob reopened at the original resolved path - assert blob._blob_path == resolved - assert resolved.exists() - finally: - old.close() - blob.close() - - def test_blob_rotate_old_handle_retains_data(self, tmp_path: pathlib.Path) -> None: - blob = BlobFsCapture(tmp_path / "cap-XXX.blob", num_slots=4, max_image_bytes=512 * 1024) + blob.rotate("event-local") + blob.close() + assert (tmp_path / "event-local.blob").exists() + assert blob_path.exists() + + def test_blob_rotate_data_is_in_capture_folder(self, tmp_path: pathlib.Path) -> None: + cap_folder = tmp_path / "rotated" + blob = BlobFsCapture( + tmp_path / "active.blob", num_slots=4, max_image_bytes=512 * 1024, capture_folder=cap_folder + ) blob.capture(_make_image(), "before_rotate.png") - old = blob.rotate("event-abc") - try: - blob.capture(_make_image(), "after_rotate.png") - blob.close() - old.close() - # Old handle (renamed to event-abc.blob): contains before_rotate.png - dest = tmp_path / "from_old" - extracted_old = BlobExtractor(tmp_path / "event-abc.blob").extract(dest) - assert {p.name for p in extracted_old} == {"before_rotate.png"} - # New blob (original path): contains after_rotate.png - dest2 = tmp_path / "from_new" - extracted_new = BlobExtractor(tmp_path / "cap-XXX.blob").extract(dest2) - assert {p.name for p in extracted_new} == {"after_rotate.png"} - finally: - pass # already closed above + blob.rotate("event-abc") + blob.capture(_make_image(), "after_rotate.png") + blob.close() + + extracted_old = BlobExtractor(cap_folder / "event-abc.blob").extract(tmp_path / "from_old") + assert {p.name for p in extracted_old} == {"before_rotate.png"} + + extracted_new = BlobExtractor(tmp_path / "active.blob").extract(tmp_path / "from_new") + assert {p.name for p in extracted_new} == {"after_rotate.png"} def test_blob_rotate_multiple_times(self, tmp_path: pathlib.Path) -> None: - blob = BlobFsCapture(tmp_path / "cap-XX.blob", num_slots=4, max_image_bytes=512 * 1024) + cap_folder = tmp_path / "rotated" + blob = BlobFsCapture( + tmp_path / "active.blob", num_slots=4, max_image_bytes=512 * 1024, capture_folder=cap_folder + ) event_ids = ["evt-a", "evt-b", "evt-c"] - handles = [] for i, eid in enumerate(event_ids): blob.capture(_make_image(), f"frame{i}.png") - handles.append(blob.rotate(eid)) + blob.rotate(eid) blob.close() - for h in handles: - h.close() - # Three event blobs renamed + the primary still exists at cap-01.blob for eid in event_ids: - assert (tmp_path / f"{eid}.blob").exists() - assert (tmp_path / "cap-XX.blob").exists() + assert (cap_folder / f"{eid}.blob").exists() + assert (tmp_path / "active.blob").exists() + + def test_blob_rotate_returns_self(self, tmp_path: pathlib.Path) -> None: + blob = BlobFsCapture(tmp_path / "cap.blob", num_slots=4, max_image_bytes=512 * 1024) + result = blob.rotate("ev") + assert result is blob + blob.close() # --------------------------------------------------------------------------- @@ -514,11 +551,15 @@ def _make_composite( tmp_path: pathlib.Path, follow_up: int = 2, handler: BlobCompletionHandler | None = None, + cap_folder: pathlib.Path | None = None, ) -> CompositeBlobCapture: if handler is None: handler = NoOpBlobCompletionHandler() + if cap_folder is None: + cap_folder = tmp_path / "rotated" return CompositeBlobCapture( - tmp_path / "cap-XXX.blob", + tmp_path / "active.blob", + cap_folder, num_slots=8, max_image_bytes=512 * 1024, follow_up_count=follow_up, @@ -529,80 +570,81 @@ def test_capture_writes_to_primary(self, tmp_path: pathlib.Path) -> None: comp = self._make_composite(tmp_path) comp.capture(_make_image(), "frame.png") comp.close() - blobs = list(tmp_path.glob("*.blob")) - assert len(blobs) == 1 - extracted = BlobExtractor(blobs[0]).extract(tmp_path / "out") + extracted = BlobExtractor(tmp_path / "active.blob").extract(tmp_path / "out") assert len(extracted) == 1 - def test_rotate_renames_file_to_event_id(self, tmp_path: pathlib.Path) -> None: - comp = self._make_composite(tmp_path) + def test_rotate_moves_primary_to_capture_folder(self, tmp_path: pathlib.Path) -> None: + cap_folder = tmp_path / "rotated" + comp = self._make_composite(tmp_path, cap_folder=cap_folder) comp.capture(_make_image(), "trigger.png") comp.rotate("fire-uuid") comp.close() - assert (tmp_path / "fire-uuid.blob").exists() + assert (cap_folder / "fire-uuid.blob").exists() - def test_follow_up_writes_reach_rotated_blob(self, tmp_path: pathlib.Path) -> None: + def test_post_blob_receives_follow_up_frames(self, tmp_path: pathlib.Path) -> None: follow_up = 3 - comp = self._make_composite(tmp_path, follow_up=follow_up) + cap_folder = tmp_path / "rotated" + comp = self._make_composite(tmp_path, follow_up=follow_up, cap_folder=cap_folder) comp.capture(_make_image(), "trigger.png") comp.rotate("ev1") for i in range(follow_up): comp.capture(_make_image(), f"followup{i}.png") comp.close() - # Event blob should contain: trigger frame + follow_up frames - extracted = BlobExtractor(tmp_path / "ev1.blob").extract(tmp_path / "out") - assert len(extracted) == 1 + follow_up - def test_completion_handler_called_with_event_id_and_blob(self, tmp_path: pathlib.Path) -> None: + # Post blob alone (ev1-post.blob has no -* sibling): only follow_up frames + post_extracted = BlobExtractor(cap_folder / "ev1-post.blob").extract(tmp_path / "post") + assert len(post_extracted) == follow_up + + # BlobExtractor on the primary automatically includes the post sibling: trigger + follow-ups + all_extracted = BlobExtractor(cap_folder / "ev1.blob").extract(tmp_path / "all") + assert len(all_extracted) == 1 + follow_up + + def test_completion_handler_called_with_event_id_and_paths(self, tmp_path: pathlib.Path) -> None: handler = MagicMock(spec=BlobCompletionHandler) follow_up = 2 - comp = self._make_composite(tmp_path, follow_up=follow_up, handler=handler) + cap_folder = tmp_path / "rotated" + comp = self._make_composite(tmp_path, follow_up=follow_up, handler=handler, cap_folder=cap_folder) comp.capture(_make_image(), "trigger.png") comp.rotate("ev-done") for _ in range(follow_up): comp.capture(_make_image(), "extra.png") comp.close() handler.assert_called_once() - call_event_id, call_blob = handler.call_args[0] + call_event_id, call_blob_path, call_additional = handler.call_args[0] assert call_event_id == "ev-done" - assert isinstance(call_blob, BlobFsCapture) + assert call_blob_path == cap_folder / "ev-done.blob" + assert call_additional == [cap_folder / "ev-done-post.blob"] def test_no_completion_on_regular_close(self, tmp_path: pathlib.Path) -> None: handler = MagicMock(spec=BlobCompletionHandler) comp = self._make_composite(tmp_path, follow_up=5, handler=handler) comp.capture(_make_image(), "trigger.png") comp.rotate("ev-early") - # close without exhausting follow-up quota comp.capture(_make_image(), "one.png") comp.close() handler.assert_not_called() - def test_trigger_frame_flag_in_primary_pre_rotation(self, tmp_path: pathlib.Path) -> None: - comp = self._make_composite(tmp_path, follow_up=2) + def test_trigger_frame_flag_in_pre_rotation_blob(self, tmp_path: pathlib.Path) -> None: + cap_folder = tmp_path / "rotated" + comp = self._make_composite(tmp_path, follow_up=2, cap_folder=cap_folder) comp.capture(_make_image(), "trigger.png", flags=TRIGGER_FRAME_FLAG) - primary_path = comp._blob._blob_path comp.rotate("ev-flag") comp.close() - # The trigger frame is in the event blob (the file that was the primary before rotation) - data = (tmp_path / "ev-flag.blob").read_bytes() + data = (cap_folder / "ev-flag.blob").read_bytes() _img_off, _img_sz, flags, *_ = struct.unpack_from(INDEX_ENTRY_FORMAT, data, SUPERBLOCK_SIZE) assert flags == TRIGGER_FRAME_FLAG - # New primary blob has no entries yet (fresh) - primary_data = primary_path.read_bytes() - _magic2, _ver2, _slots2, wh2, *_ = struct.unpack_from(SUPERBLOCK_FORMAT, primary_data, 0) - assert wh2 == 0 def test_follow_up_entries_have_zero_flags(self, tmp_path: pathlib.Path) -> None: follow_up = 2 - comp = self._make_composite(tmp_path, follow_up=follow_up) + cap_folder = tmp_path / "rotated" + comp = self._make_composite(tmp_path, follow_up=follow_up, cap_folder=cap_folder) comp.capture(_make_image(), "trigger.png", flags=TRIGGER_FRAME_FLAG) comp.rotate("ev-flags") for i in range(follow_up): comp.capture(_make_image(), f"followup{i}.png") comp.close() - data = (tmp_path / "ev-flags.blob").read_bytes() - # Check all follow-up entries have flags == 0 - for slot in range(1, 1 + follow_up): + data = (cap_folder / "ev-flags-post.blob").read_bytes() + for slot in range(follow_up): offset = SUPERBLOCK_SIZE + slot * INDEX_ENTRY_SIZE _img_off, _img_sz, flags, *_ = struct.unpack_from(INDEX_ENTRY_FORMAT, data, offset) assert flags == 0 @@ -622,42 +664,34 @@ def test_active_blob_path_returns_primary_path(self, tmp_path: pathlib.Path) -> assert comp.active_blob_path() == comp._blob._blob_path comp.close() - def test_active_blob_path_updates_after_rotate(self, tmp_path: pathlib.Path) -> None: + def test_active_blob_path_unchanged_after_rotate(self, tmp_path: pathlib.Path) -> None: comp = self._make_composite(tmp_path) original_primary = comp._blob._blob_path comp.capture(_make_image(), "trigger.png") comp.rotate("ev-rotate") - # After rotation the primary blob has been reopened at the original path assert comp.active_blob_path() == original_primary comp.close() def test_rotate_immediate_calls_handler_right_away(self, tmp_path: pathlib.Path) -> None: handler = MagicMock(spec=BlobCompletionHandler) - comp = self._make_composite(tmp_path, follow_up=3, handler=handler) + cap_folder = tmp_path / "rotated" + comp = self._make_composite(tmp_path, follow_up=3, handler=handler, cap_folder=cap_folder) comp.capture(_make_image(), "trigger.png") comp.rotate("ev-imm", immediate=True) - # Handler must be called immediately — no follow-up frames needed handler.assert_called_once() - call_event_id, call_blob = handler.call_args[0] + call_event_id, call_blob_path, call_additional = handler.call_args[0] assert call_event_id == "ev-imm" - assert isinstance(call_blob, BlobFsCapture) + assert call_blob_path == cap_folder / "ev-imm.blob" + assert call_additional == [] comp.close() - def test_rotate_immediate_does_not_add_to_active(self, tmp_path: pathlib.Path) -> None: - comp = self._make_composite(tmp_path, follow_up=3) + def test_rotate_immediate_no_post_blob_opened(self, tmp_path: pathlib.Path) -> None: + cap_folder = tmp_path / "rotated" + comp = self._make_composite(tmp_path, follow_up=3, cap_folder=cap_folder) comp.capture(_make_image(), "trigger.png") comp.rotate("ev-imm2", immediate=True) - assert len(comp._active) == 0 - comp.close() - - def test_set_capture_ts_delegates_to_primary(self, tmp_path: pathlib.Path) -> None: - comp = self._make_composite(tmp_path) - comp.set_capture_ts(MagicMock()) - comp.close() - - def test_get_last_filepath_delegates_to_primary(self, tmp_path: pathlib.Path) -> None: - comp = self._make_composite(tmp_path) - assert comp.get_last_filepath() is None + assert comp._post_blob is None + assert not (cap_folder / "ev-imm2-post.blob").exists() comp.close() def test_rotate_immediate_does_not_deliver_follow_up_frames(self, tmp_path: pathlib.Path) -> None: @@ -665,12 +699,37 @@ def test_rotate_immediate_does_not_deliver_follow_up_frames(self, tmp_path: path comp = self._make_composite(tmp_path, follow_up=3, handler=handler) comp.capture(_make_image(), "trigger.png") comp.rotate("ev-imm3", immediate=True) - # Additional frames go only to the new primary — handler stays called exactly once for i in range(3): comp.capture(_make_image(), f"extra{i}.png") comp.close() handler.assert_called_once() + def test_rotate_when_post_blob_active_completes_existing(self, tmp_path: pathlib.Path) -> None: + """A second rotation force-completes the existing post blob before starting a new one.""" + handler = MagicMock(spec=BlobCompletionHandler) + cap_folder = tmp_path / "rotated" + comp = self._make_composite(tmp_path, follow_up=5, handler=handler, cap_folder=cap_folder) + comp.capture(_make_image(), "trigger1.png") + comp.rotate("ev-first") + # Only 1 follow-up before the second rotation + comp.capture(_make_image(), "followup1.png") + comp.rotate("ev-second") + # ev-first's post blob should have been force-completed + handler.assert_called_once() + call_event_id, call_blob_path, call_additional = handler.call_args[0] + assert call_event_id == "ev-first" + comp.close() + + def test_set_capture_ts_delegates_to_primary(self, tmp_path: pathlib.Path) -> None: + comp = self._make_composite(tmp_path) + comp.set_capture_ts(MagicMock()) + comp.close() + + def test_get_last_filepath_delegates_to_primary(self, tmp_path: pathlib.Path) -> None: + comp = self._make_composite(tmp_path) + assert comp.get_last_filepath() is None + comp.close() + # --------------------------------------------------------------------------- # add_args / make_capture_backend @@ -682,7 +741,8 @@ def test_add_args_registers_defaults(self) -> None: parser = argparse.ArgumentParser() add_args(parser) args = parser.parse_args([]) - assert args.blob_capture_file == "capture.blob" + assert args.blob_capture_file == pathlib.Path("capture.blob") + assert args.blob_capture_folder == pathlib.Path(".") assert args.blob_num_slots == 60 assert args.blob_max_image_bytes == 4 * 1024 * 1024 assert args.blob_png_compression == 1 @@ -693,24 +753,37 @@ def test_add_args_allows_overrides(self) -> None: args = parser.parse_args( ["--blob-capture-file", "my.blob", "--blob-num-slots", "10", "--blob-max-image-bytes", "1024"] ) - assert args.blob_capture_file == "my.blob" + assert args.blob_capture_file == pathlib.Path("my.blob") assert args.blob_num_slots == 10 assert args.blob_max_image_bytes == 1024 def test_make_capture_backend_returns_composite(self, tmp_path: pathlib.Path) -> None: parser = argparse.ArgumentParser() add_args(parser) - args = parser.parse_args([]) - backend = make_capture_backend(args, tmp_path, follow_up_count=5) + cap_folder = tmp_path / "rotated" + args = parser.parse_args( + [ + f"--blob-capture-file={tmp_path / 'active.blob'}", + f"--blob-capture-folder={cap_folder}", + ] + ) + backend = make_capture_backend(args, follow_up_count=5) assert isinstance(backend, CompositeBlobCapture) backend.close() def test_make_capture_backend_uses_completion_handler(self, tmp_path: pathlib.Path) -> None: parser = argparse.ArgumentParser() add_args(parser) - args = parser.parse_args([]) + cap_folder = tmp_path / "rotated" + cap_folder.mkdir() + args = parser.parse_args( + [ + f"--blob-capture-file={tmp_path / 'active.blob'}", + f"--blob-capture-folder={cap_folder}", + ] + ) handler = MagicMock(spec=BlobCompletionHandler) - backend = make_capture_backend(args, tmp_path, follow_up_count=1, completion_handler=handler) + backend = make_capture_backend(args, follow_up_count=1, completion_handler=handler) assert isinstance(backend, CompositeBlobCapture) backend.capture(_make_image(), "t.png") backend.rotate("ev")