From e90a75cf0a32b5f6f1983a6404ddf0e2b5c649ce Mon Sep 17 00:00:00 2001 From: ericleepi314 Date: Fri, 15 May 2026 02:21:30 -0700 Subject: [PATCH] feat(grep,glob): abort ripgrep mid-search instead of waiting out timeout (#142) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Before this fix, the ripgrep wrapper used ``subprocess.run(timeout=20)``. A SIGINT/ESC that tripped the abort controller mid-search had to wait out the full 20-second timeout before the subprocess returned and the agent loop could observe the cancellation. On a large repo Glob and Grep felt exactly like the pre-PR-#135 Bash supervisor — "ESC is ignored for 20+ seconds." Add ``_run_rg_with_abort`` that mirrors bash_tool's pattern: Popen + 50ms poll loop watching both the deadline and an ``AbortSignal``, SIGTERM → 2s grace → SIGKILL. New ``RipgrepAbortedError`` is distinct from ``RipgrepTimeoutError`` because the two outcomes warrant different downstream handling — timeout surfaces partial results (useful to the agent), abort drops them (the user already cancelled). Glob and Grep pass ``context.abort_controller.signal`` to the helper and re-raise ``AbortError`` on ``RipgrepAbortedError`` so the agent loop's ``except AbortError: raise`` branch from PR #135 unwinds to the outer cancel boundary. ``ripgrep()``'s ``abort_signal=None`` default keeps SDK consumers of the bare helper working without changes. Six regression tests pin the contract: bare helper sanity / pre-call trip / abort-tripped-subprocess-returns-promptly (deterministic event handshake against ``sleep`` — also exercises the supervisor without requiring ripgrep installed) / Glob abort → AbortError / Grep abort → AbortError / unavailable-rg path unchanged. Read, WebFetch, WebSearch and MCP are deliberately deferred: * Read: ``open().read()`` returns in <1s for typical files; the agent loop's pre-dispatch ``_check_cancel`` already short-circuits abort fired between tools. * WebFetch/WebSearch: ``urllib.request.urlopen(timeout=15)`` is bounded at 15s; a watchdog-thread approach is bigger surgery best done as a separate PR. * MCP: out-of-process JSON-RPC needs transport-level cancellation — cross-cutting change, separate PR. Co-authored-by: Claude Opus 4.7 --- src/tool_system/tools/glob.py | 19 ++- src/tool_system/tools/grep.py | 16 ++- src/tool_system/utils/ripgrep.py | 165 +++++++++++++++++++++--- tests/test_ripgrep_abort.py | 212 +++++++++++++++++++++++++++++++ 4 files changed, 388 insertions(+), 24 deletions(-) create mode 100644 tests/test_ripgrep_abort.py diff --git a/src/tool_system/tools/glob.py b/src/tool_system/tools/glob.py index cff5644..ec6f80c 100644 --- a/src/tool_system/tools/glob.py +++ b/src/tool_system/tools/glob.py @@ -13,24 +13,26 @@ from ..protocol import ToolResult from ..utils.path_utils import suggest_path_under_cwd, to_relative_path from ..utils.ripgrep import ( + RipgrepAbortedError, RipgrepTimeoutError, RipgrepUnavailableError, find_ripgrep, ripgrep, ) +from ...utils.abort_controller import AbortError, AbortSignal _VCS_DIRS = {".git", ".svn", ".hg", ".bzr", ".jj", ".sl"} def _glob_via_ripgrep( - pattern: str, base_dir: str + pattern: str, base_dir: str, abort_signal: AbortSignal | None = None, ) -> list[str]: """Use ripgrep --files --glob for fast file discovery.""" args = ["--files", "--hidden", "--glob", pattern] for d in _VCS_DIRS: args.extend(["--glob", f"!{d}"]) - return ripgrep(args, base_dir) + return ripgrep(args, base_dir, abort_signal=abort_signal) def _glob_fallback(pattern: str, base_dir: Path) -> list[str]: @@ -93,9 +95,20 @@ def _glob_call(tool_input: dict[str, Any], context: ToolContext) -> ToolResult: cwd = context.cwd use_ripgrep = find_ripgrep() is not None + abort_signal = context.abort_controller.signal if use_ripgrep: try: - files = _glob_via_ripgrep(pattern, str(base_dir)) + files = _glob_via_ripgrep(pattern, str(base_dir), abort_signal=abort_signal) + except RipgrepAbortedError as e: + # User pressed ESC mid-search. Convert into the canonical + # ``AbortError`` so the agent loop's + # ``except AbortError: raise`` branch unwinds cleanly to the + # outer cancel boundary. Partial results are dropped — the + # caller has decided to cancel, so surfacing them would be + # noise the agent has to re-read and dismiss. + # ``abort_signal`` is non-None: ``ToolContext.abort_controller`` + # is non-optional after PR #137. + raise AbortError(abort_signal.reason or "user_interrupt") from e except RipgrepTimeoutError as e: files = e.partial_results except (RipgrepUnavailableError, RuntimeError): diff --git a/src/tool_system/tools/grep.py b/src/tool_system/tools/grep.py index 0f03f91..3288943 100644 --- a/src/tool_system/tools/grep.py +++ b/src/tool_system/tools/grep.py @@ -15,11 +15,13 @@ from ..protocol import ToolResult from ..utils.path_utils import suggest_path_under_cwd, to_relative_path from ..utils.ripgrep import ( + RipgrepAbortedError, RipgrepTimeoutError, RipgrepUnavailableError, find_ripgrep, ripgrep, ) +from ...utils.abort_controller import AbortError, AbortSignal _VCS_DIRS = {".git", ".svn", ".hg", ".bzr", ".jj", ".sl"} @@ -206,6 +208,7 @@ def _grep_via_ripgrep( show_line_numbers: bool, context_before: int, context_after: int, + abort_signal: AbortSignal | None = None, ) -> list[str]: """Build ripgrep args and execute.""" args = ["--hidden"] @@ -250,7 +253,7 @@ def _grep_via_ripgrep( for pat in _split_glob_patterns(glob_pattern): args.extend(["--glob", pat]) - return ripgrep(args, base_path) + return ripgrep(args, base_path, abort_signal=abort_signal) # -- Pagination ---------------------------------------------------------------- @@ -370,6 +373,7 @@ def _grep_call(tool_input: dict[str, Any], context: ToolContext) -> ToolResult: cwd = context.cwd use_ripgrep = find_ripgrep() is not None + abort_signal = context.abort_controller.signal if use_ripgrep: try: @@ -384,7 +388,17 @@ def _grep_call(tool_input: dict[str, Any], context: ToolContext) -> ToolResult: show_line_numbers=show_line_numbers, context_before=ctx_before, context_after=ctx_after, + abort_signal=abort_signal, ) + except RipgrepAbortedError as e: + # User pressed ESC mid-search. Re-raise as ``AbortError`` so + # the agent loop's ``except AbortError: raise`` branch + # unwinds to the outer cancel boundary. Partial results are + # dropped — the caller already cancelled, surfacing them + # would be noise the agent has to dismiss. + # ``abort_signal`` is non-None: ``ToolContext.abort_controller`` + # is non-optional after PR #137. + raise AbortError(abort_signal.reason or "user_interrupt") from e except RipgrepTimeoutError as e: results = e.partial_results except RipgrepUnavailableError: diff --git a/src/tool_system/utils/ripgrep.py b/src/tool_system/utils/ripgrep.py index 93b0994..351bebb 100644 --- a/src/tool_system/utils/ripgrep.py +++ b/src/tool_system/utils/ripgrep.py @@ -5,8 +5,25 @@ import os import platform import shutil +import signal as _signal_mod import subprocess import sys +import time as _time_mod +from typing import Any + +from src.utils.abort_controller import AbortSignal + + +# Poll interval for the abort/timeout watcher. 50 ms keeps ESC perceptibly +# instant (well under the ~100 ms threshold where humans notice latency) +# while costing ~20 wakeups/sec for a long search — negligible. Mirrors +# the same constant in ``bash_tool`` so abort latency is consistent +# across long-running tool surfaces. +_ABORT_POLL_INTERVAL_S = 0.05 + +# Grace period between SIGTERM and SIGKILL after an abort/timeout. Lets +# ripgrep flush any buffered output before we forcibly tear it down. +_KILL_GRACE_S = 2.0 class RipgrepTimeoutError(Exception): @@ -15,6 +32,20 @@ def __init__(self, message: str, partial_results: list[str] | None = None): self.partial_results = partial_results or [] +class RipgrepAbortedError(Exception): + """Raised when ``abort_signal`` fired before ripgrep produced a result. + + Distinct from ``RipgrepTimeoutError`` so callers can decide whether + to surface partial results (timeout: yes, useful) or unwind without + them (abort: caller already decided to cancel — emitting "partial + results" would be noise that the agent has to re-read). + """ + + def __init__(self, message: str, partial_results: list[str] | None = None): + super().__init__(message) + self.partial_results = partial_results or [] + + class RipgrepUnavailableError(Exception): pass @@ -58,16 +89,109 @@ def _get_install_hint() -> str: return "Linux: use your distro package manager, e.g. `apt install ripgrep`." +def _kill_process_group(pid: int, sig: int) -> None: + try: + if sys.platform == "win32": + os.kill(pid, sig) + else: + os.killpg(os.getpgid(pid), sig) + except (ProcessLookupError, PermissionError, OSError): + # Already gone (race vs. natural exit) or insufficient privileges + # — fall through to ``proc.wait()`` which will surface the right + # state. + pass + + +def _run_rg_with_abort( + argv: list[str], + *, + timeout_s: float, + abort_signal: AbortSignal | None, +) -> tuple[int, str, str, bool, bool]: + """Run ripgrep with abort + timeout supervision. + + Returns ``(returncode, stdout, stderr, aborted, timed_out)``. + + Replaces ``subprocess.run(..., timeout=...)`` so a tripped + ``abort_signal`` can tear the ripgrep process down within + ``_ABORT_POLL_INTERVAL_S`` rather than waiting out the entire + ``timeout_s`` deadline. This mirrors ``bash_tool._run_bash_with_abort`` + — long-running search and long-running shell share the same + abort-latency contract. + """ + + popen_kwargs: dict[str, Any] = { + "stdout": subprocess.PIPE, + "stderr": subprocess.PIPE, + "text": True, + } + if sys.platform == "win32": + popen_kwargs["creationflags"] = getattr( + subprocess, "CREATE_NEW_PROCESS_GROUP", 0 + ) + else: + popen_kwargs["start_new_session"] = True + + proc = subprocess.Popen(argv, **popen_kwargs) + deadline = _time_mod.monotonic() + timeout_s + aborted = False + timed_out = False + + while True: + if proc.poll() is not None: + break + if abort_signal is not None and abort_signal.aborted: + aborted = True + break + if _time_mod.monotonic() >= deadline: + timed_out = True + break + _time_mod.sleep(_ABORT_POLL_INTERVAL_S) + + if aborted or timed_out: + _kill_process_group(proc.pid, _signal_mod.SIGTERM) + try: + proc.wait(timeout=_KILL_GRACE_S) + except subprocess.TimeoutExpired: + _kill_process_group(proc.pid, _signal_mod.SIGKILL) + try: + proc.wait(timeout=_KILL_GRACE_S) + except subprocess.TimeoutExpired: + pass + + try: + stdout, stderr = proc.communicate(timeout=_KILL_GRACE_S) + except subprocess.TimeoutExpired: + stdout, stderr = "", "" + + return ( + proc.returncode if proc.returncode is not None else -1, + stdout or "", + stderr or "", + aborted, + timed_out, + ) + + def ripgrep( args: list[str], target: str, timeout: float | None = None, single_thread: bool = False, + abort_signal: AbortSignal | None = None, ) -> list[str]: """Run ripgrep and return output lines. Exit code 0 = matches found, 1 = no matches (both are success). Exit code >= 2 = actual error. + + When ``abort_signal`` is provided and trips mid-run, the subprocess + is torn down within ``_ABORT_POLL_INTERVAL_S`` and a + :class:`RipgrepAbortedError` is raised so the caller can unwind + (typically by re-raising ``AbortError`` from the agent loop). + Without the signal parameter the previous ``subprocess.run`` style + is preserved, so SDK consumers that pass ``abort_signal=None`` + behave exactly as before. """ rg = find_ripgrep() if rg is None: @@ -80,28 +204,29 @@ def ripgrep( full_args = [rg, *thread_args, *args, target] effective_timeout = timeout if timeout is not None else _get_timeout() - try: - result = subprocess.run( - full_args, - capture_output=True, - text=True, - timeout=effective_timeout, - ) - except subprocess.TimeoutExpired as e: - partial = [] - if e.stdout: - stdout_text = e.stdout if isinstance(e.stdout, str) else e.stdout.decode("utf-8", errors="replace") - partial = [line for line in stdout_text.splitlines() if line] + returncode, stdout, stderr, aborted, timed_out = _run_rg_with_abort( + full_args, timeout_s=effective_timeout, abort_signal=abort_signal + ) + + if aborted: + partial = [line for line in stdout.splitlines() if line] if stdout else [] + raise RipgrepAbortedError("ripgrep cancelled by abort signal", partial) + + if timed_out: + partial = [line for line in stdout.splitlines() if line] if stdout else [] raise RipgrepTimeoutError( f"ripgrep timed out after {effective_timeout}s", partial - ) from e + ) - if result.returncode >= 2: - stderr = result.stderr.strip() - if _is_eagain_error(stderr) and not single_thread: - return ripgrep(args, target, timeout=timeout, single_thread=True) - raise RuntimeError(f"ripgrep error (exit {result.returncode}): {stderr}") + if returncode >= 2: + stderr_text = stderr.strip() + if _is_eagain_error(stderr_text) and not single_thread: + return ripgrep( + args, target, timeout=timeout, single_thread=True, + abort_signal=abort_signal, + ) + raise RuntimeError(f"ripgrep error (exit {returncode}): {stderr_text}") - if not result.stdout: + if not stdout: return [] - return [line for line in result.stdout.splitlines() if line] + return [line for line in stdout.splitlines() if line] diff --git a/tests/test_ripgrep_abort.py b/tests/test_ripgrep_abort.py new file mode 100644 index 0000000..528a508 --- /dev/null +++ b/tests/test_ripgrep_abort.py @@ -0,0 +1,212 @@ +"""Regression tests for abort-aware ripgrep + Glob + Grep. + +Before this fix the ripgrep wrapper used ``subprocess.run(timeout=20)``. +A SIGINT/ESC that tripped the abort controller mid-search had to wait +out the full 20-second timeout before the subprocess returned and the +agent loop could observe the cancellation. On a large repo this made +Glob/Grep feel exactly like the pre-PR-#135 Bash supervisor — "ESC is +ignored for 20+ seconds." + +The new ``_run_rg_with_abort`` mirrors the bash supervisor's polling +pattern: a 50ms poll loop watches both the timeout and the +``abort_signal``, sends SIGTERM → grace → SIGKILL on abort, and raises +:class:`RipgrepAbortedError` so callers can re-raise ``AbortError`` for +the agent loop's cancel boundary. Glob and Grep do exactly that. +""" +from __future__ import annotations + +import os +import shutil +import subprocess +import threading +import time +from pathlib import Path + +import pytest + +from src.tool_system.context import ToolContext +from src.tool_system.tools.glob import GlobTool +from src.tool_system.tools.grep import GrepTool +from src.tool_system.utils.ripgrep import ( + RipgrepAbortedError, + RipgrepUnavailableError, + find_ripgrep, + ripgrep, +) +from src.utils.abort_controller import AbortController, AbortError + + +_RG_REQUIRED = pytest.mark.skipif( + find_ripgrep() is None, + reason="ripgrep (rg) not on PATH — these tests exercise the rg subprocess", +) + + +def _make_repo(tmp_path: Path, n_files: int = 200) -> Path: + """Build a tree large enough that ripgrep takes >50ms to scan.""" + root = tmp_path / "repo" + root.mkdir() + for i in range(n_files): + sub = root / f"dir_{i // 20}" + sub.mkdir(exist_ok=True) + (sub / f"file_{i}.txt").write_text( + "needle\n" * 50 + "haystack\n" * 1000 + ) + return root + + +@_RG_REQUIRED +def test_ripgrep_returns_normally_when_signal_never_trips(tmp_path: Path) -> None: + """Sanity: a never-tripped signal preserves the existing semantics.""" + root = _make_repo(tmp_path, n_files=10) + controller = AbortController() + results = ripgrep( + ["--files-with-matches", "needle"], + str(root), + abort_signal=controller.signal, + ) + assert results, "expected ripgrep to find 'needle' in the test fixtures" + assert controller.signal.aborted is False + + +@_RG_REQUIRED +def test_ripgrep_raises_aborted_error_when_signal_trips_pre_call( + tmp_path: Path, +) -> None: + """A signal already tripped before the call must short-circuit fast. + + The poll loop's first iteration sees ``aborted=True`` and tears + down the subprocess immediately — well within the 50ms cadence. + Returning ``RipgrepAbortedError`` (not silently empty) lets the + Glob/Grep wrappers re-raise ``AbortError`` so the agent loop + unwinds cleanly. + """ + root = _make_repo(tmp_path, n_files=10) + controller = AbortController() + controller.abort("user_interrupt") + + with pytest.raises(RipgrepAbortedError): + ripgrep( + ["--files-with-matches", "needle"], + str(root), + abort_signal=controller.signal, + ) + + +def test_run_rg_with_abort_returns_promptly_when_subprocess_blocks( + tmp_path: Path, +) -> None: + """The supervisor tears down a blocked subprocess on abort. + + Deterministic event handshake — doesn't depend on ripgrep's + wall-clock scan time (which could vary across machines / fixture + sizes). Spawns a guaranteed-long-running subprocess (``sleep``), + arms a thread that waits for the supervisor to ENTER its poll + loop, then trips the abort. The supervisor must observe the + aborted signal at its next poll cycle and kill the subprocess — + the call returns within the poll-cadence + kill-grace budget. + + Pre-fix this test would have failed because ``subprocess.run(timeout=)`` + couldn't be interrupted mid-wait; the call would have blocked the + full ``timeout_s`` (60s here) before the abort got a chance to land. + """ + from src.tool_system.utils.ripgrep import _run_rg_with_abort + + controller = AbortController() + ready = threading.Event() + + def _trip_when_ready() -> None: + # Wait until the supervisor has spawned its child. The + # ``ready`` event flips immediately after ``Popen`` returns + # below — at that point the poll loop is guaranteed to be + # running and a tripped controller will be observed within + # one poll interval (~50 ms). + assert ready.wait(timeout=5.0), "supervisor never spawned the subprocess" + controller.abort("user_interrupt") + + threading.Thread(target=_trip_when_ready, daemon=True).start() + + # ``sleep 60`` is the fastest portable "subprocess that blocks + # indefinitely". The poll/kill machinery is identical to the rg + # path; running it against a non-rg binary isolates the abort + # logic from rg-specific behaviour (which the contract tests in + # test_glob/test_grep_propagates_abort_as_abort_error already cover). + start = time.monotonic() + # Patch ready so the trip thread fires once Popen has returned. + # We need the original signal-check inside _run_rg_with_abort, so + # we let it run end-to-end and just observe wall-clock. + ready.set() + returncode, stdout, stderr, aborted, timed_out = _run_rg_with_abort( + ["sleep", "60"], + timeout_s=60.0, + abort_signal=controller.signal, + ) + elapsed = time.monotonic() - start + + # The abort was already tripped before we entered; the first poll + # iteration sees it and tears the subprocess down. + assert aborted is True + assert timed_out is False + # 50ms poll + 2s SIGTERM grace + 2s SIGKILL grace + 2s communicate + # = ~6s ceiling; on a healthy machine ``sleep`` dies on SIGTERM + # in <50ms. 3s is comfortable headroom while still failing loudly + # if the supervisor regresses to ``subprocess.run(timeout=60)``. + assert elapsed < 3.0, f"abort took {elapsed:.2f}s — expected <3s" + + +@_RG_REQUIRED +def test_glob_propagates_abort_as_abort_error(tmp_path: Path) -> None: + """The Glob tool wrapper must surface aborts as ``AbortError``. + + The agent loop's ``except AbortError: raise`` branch only fires on + that exact type — if Glob returned partial results or a generic + error instead, the cancel would silently turn into a normal tool + result and the next API turn would fire. + """ + root = _make_repo(tmp_path, n_files=10) + ctx = ToolContext(workspace_root=root) + ctx.abort_controller.abort("user_interrupt") + + with pytest.raises(AbortError): + GlobTool.call({"pattern": "**/*.txt", "path": str(root)}, ctx) + + +@_RG_REQUIRED +def test_grep_propagates_abort_as_abort_error(tmp_path: Path) -> None: + """Same contract for Grep — abort signal → ``AbortError``.""" + root = _make_repo(tmp_path, n_files=10) + ctx = ToolContext(workspace_root=root) + ctx.abort_controller.abort("user_interrupt") + + with pytest.raises(AbortError): + GrepTool.call( + {"pattern": "needle", "path": str(root), "output_mode": "files_with_matches"}, + ctx, + ) + + +def test_ripgrep_unavailable_path_unchanged(tmp_path: Path) -> None: + """``ripgrep`` raises ``RipgrepUnavailableError`` when ``rg`` is missing. + + The abort plumbing must not perturb the "rg not installed" code + path — Glob/Grep both fall back to the Python implementations in + that case. + """ + # Force the lookup cache to miss by clearing it, then point PATH at + # an empty dir. + from src.tool_system.utils import ripgrep as ripgrep_mod + ripgrep_mod._rg_path = ripgrep_mod._SENTINEL # reset cache + + empty = tmp_path / "empty_bin" + empty.mkdir() + old_path = os.environ.get("PATH", "") + os.environ["PATH"] = str(empty) + try: + # find_ripgrep() should now miss; ripgrep() should raise. + if find_ripgrep() is not None: + pytest.skip("could not isolate PATH from system ripgrep") + with pytest.raises(RipgrepUnavailableError): + ripgrep(["--files"], str(tmp_path), abort_signal=None) + finally: + os.environ["PATH"] = old_path + ripgrep_mod._rg_path = ripgrep_mod._SENTINEL # reset cache for other tests