Skip to content

fix(tape): persist allocated entry ids in FileTapeStore#122

Merged
frostming merged 1 commit intobubbuild:mainfrom
Andy963:fix/tape-file-entry-id-persistence-clean
Mar 17, 2026
Merged

fix(tape): persist allocated entry ids in FileTapeStore#122
frostming merged 1 commit intobubbuild:mainfrom
Andy963:fix/tape-file-entry-id-persistence-clean

Conversation

@Andy963
Copy link
Copy Markdown
Contributor

@Andy963 Andy963 commented Mar 17, 2026

  • What / Why
    • TapeFile.append() allocated a new next_id for the in-memory cache, but persisted the incoming entry (with the original entry.id) to disk.
    • This could produce duplicate/non-monotonic ids in the JSONL tape file (especially when merging forked entries), breaking assumptions after restart.
  • Changes
    • Persist the allocated id by writing the stored entry (and preserve date).
    • Add regression test ensuring ids remain monotonic when merging forked entries into FileTapeStore.
    • Add reproduction document with a runnable script.

you can reproduce with this code :

from __future__ import annotations

import argparse
import asyncio
import contextlib
import json
import tempfile
from dataclasses import asdict
from pathlib import Path
from typing import Iterator

from adapter.bub.bootstrap import ensure_local_bub_source

ensure_local_bub_source()

from bub.builtin.store import FileTapeStore, ForkTapeStore, TapeFile
from republic import TapeEntry


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(
        description="Probe the FileTapeStore fork/merge-back ID mismatch bug.",
    )
    parser.add_argument(
        "--directory",
        type=Path,
        help="Optional directory to keep the generated tape JSONL file.",
    )
    parser.add_argument(
        "--simulate-bug",
        action="store_true",
        help="Monkeypatch TapeFile.append to the buggy implementation for this run.",
    )
    return parser.parse_args()


@contextlib.contextmanager
def patched_buggy_append(enabled: bool) -> Iterator[None]:
    if not enabled:
        yield
        return

    original = TapeFile.append

    def buggy_append(self: TapeFile, entry: TapeEntry) -> None:
        with self._lock:
            self._read_locked()
            with self.path.open("a", encoding="utf-8") as handle:
                next_id = self._next_id()
                stored = TapeEntry(next_id, entry.kind, dict(entry.payload), dict(entry.meta))
                handle.write(json.dumps(asdict(entry), ensure_ascii=False) + "\n")
                self._read_entries.append(stored)
                self._read_offset = handle.tell()

    TapeFile.append = buggy_append
    try:
        yield
    finally:
        TapeFile.append = original


def read_disk_payloads(path: Path) -> list[dict[str, object]]:
    payloads: list[dict[str, object]] = []
    if not path.exists():
        return payloads
    for line in path.read_text(encoding="utf-8").splitlines():
        line = line.strip()
        if not line:
            continue
        payload = json.loads(line)
        if isinstance(payload, dict):
            payloads.append(payload)
    return payloads


def entry_summary(entries: list[TapeEntry]) -> list[tuple[int, str | None]]:
    return [(entry.id, entry.payload.get("name")) for entry in entries]


def strict_increasing(ids: list[int]) -> bool:
    return all(current < following for current, following in zip(ids, ids[1:]))


async def run_probe(directory: Path, *, simulate_bug: bool) -> int:
    tape = "repro__fork_merge_back_id_bug"
    tape_file = directory / f"{tape}.jsonl"

    with patched_buggy_append(simulate_bug):
        parent = FileTapeStore(directory=directory)
        store = ForkTapeStore(parent)

        async with store.fork(tape, merge_back=True):
            await store.append(tape, TapeEntry.event(name="first", data={}))

        async with store.fork(tape, merge_back=True):
            await store.append(tape, TapeEntry.event(name="second", data={}))

        cached_entries = parent.read(tape) or []
        cached_ids = [entry.id for entry in cached_entries]

    disk_payloads = read_disk_payloads(tape_file)
    disk_ids = [int(payload["id"]) for payload in disk_payloads if isinstance(payload.get("id"), int)]

    reloaded_store = FileTapeStore(directory=directory)
    reloaded_entries = reloaded_store.read(tape) or []
    reloaded_ids = [entry.id for entry in reloaded_entries]

    bug_reproduced = (
        disk_ids != cached_ids
        or reloaded_ids != cached_ids
        or not strict_increasing(disk_ids)
        or len(disk_ids) != len(set(disk_ids))
    )

    print(f"Workdir: {directory}")
    print(f"Mode: {'simulated-bug' if simulate_bug else 'current-implementation'}")
    print(f"Tape file: {tape_file}")
    print(f"Cached entries:   {entry_summary(cached_entries)}")
    print(f"Disk payload ids: {disk_ids}")
    print(f"Reloaded entries: {entry_summary(reloaded_entries)}")
    print("Raw JSONL:")
    for payload in disk_payloads:
        print(json.dumps(payload, ensure_ascii=False))

    if bug_reproduced:
        print("Result: BUG_REPRODUCED")
        print("Reason: disk ids diverge from cached ids or are not strictly increasing.")
        return 1

    print("Result: NO_BUG_REPRODUCED")
    print("Reason: cached ids, disk ids, and reloaded ids stay aligned and strictly increasing.")
    return 0


def main() -> int:
    args = parse_args()
    if args.directory is not None:
        args.directory.mkdir(parents=True, exist_ok=True)
        return asyncio.run(run_probe(args.directory, simulate_bug=args.simulate_bug))

    with tempfile.TemporaryDirectory(prefix="file-tape-store-id-bug-") as tempdir:
        return asyncio.run(run_probe(Path(tempdir), simulate_bug=args.simulate_bug))


if __name__ == "__main__":
    raise SystemExit(main())
  • Verification
    • uv run ruff check .
    • uv run mypy src
    • uv run pytest -q

@frostming frostming merged commit edcf6f8 into bubbuild:main Mar 17, 2026
5 checks passed
@Andy963 Andy963 deleted the fix/tape-file-entry-id-persistence-clean branch March 19, 2026 02:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants