Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions src/tool_system/tools/glob.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,26 @@
from ..protocol import ToolResult
from ..utils.path_utils import suggest_path_under_cwd, to_relative_path
from ..utils.ripgrep import (
RipgrepAbortedError,
RipgrepTimeoutError,
RipgrepUnavailableError,
find_ripgrep,
ripgrep,
)
from ...utils.abort_controller import AbortError, AbortSignal


_VCS_DIRS = {".git", ".svn", ".hg", ".bzr", ".jj", ".sl"}


def _glob_via_ripgrep(
pattern: str, base_dir: str
pattern: str, base_dir: str, abort_signal: AbortSignal | None = None,
) -> list[str]:
"""Use ripgrep --files --glob for fast file discovery."""
args = ["--files", "--hidden", "--glob", pattern]
for d in _VCS_DIRS:
args.extend(["--glob", f"!{d}"])
return ripgrep(args, base_dir)
return ripgrep(args, base_dir, abort_signal=abort_signal)


def _glob_fallback(pattern: str, base_dir: Path) -> list[str]:
Expand Down Expand Up @@ -93,9 +95,20 @@ def _glob_call(tool_input: dict[str, Any], context: ToolContext) -> ToolResult:
cwd = context.cwd
use_ripgrep = find_ripgrep() is not None

abort_signal = context.abort_controller.signal
if use_ripgrep:
try:
files = _glob_via_ripgrep(pattern, str(base_dir))
files = _glob_via_ripgrep(pattern, str(base_dir), abort_signal=abort_signal)
except RipgrepAbortedError as e:
# User pressed ESC mid-search. Convert into the canonical
# ``AbortError`` so the agent loop's
# ``except AbortError: raise`` branch unwinds cleanly to the
# outer cancel boundary. Partial results are dropped — the
# caller has decided to cancel, so surfacing them would be
# noise the agent has to re-read and dismiss.
# ``abort_signal`` is non-None: ``ToolContext.abort_controller``
# is non-optional after PR #137.
raise AbortError(abort_signal.reason or "user_interrupt") from e
except RipgrepTimeoutError as e:
files = e.partial_results
except (RipgrepUnavailableError, RuntimeError):
Expand Down
16 changes: 15 additions & 1 deletion src/tool_system/tools/grep.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
from ..protocol import ToolResult
from ..utils.path_utils import suggest_path_under_cwd, to_relative_path
from ..utils.ripgrep import (
RipgrepAbortedError,
RipgrepTimeoutError,
RipgrepUnavailableError,
find_ripgrep,
ripgrep,
)
from ...utils.abort_controller import AbortError, AbortSignal


_VCS_DIRS = {".git", ".svn", ".hg", ".bzr", ".jj", ".sl"}
Expand Down Expand Up @@ -206,6 +208,7 @@ def _grep_via_ripgrep(
show_line_numbers: bool,
context_before: int,
context_after: int,
abort_signal: AbortSignal | None = None,
) -> list[str]:
"""Build ripgrep args and execute."""
args = ["--hidden"]
Expand Down Expand Up @@ -250,7 +253,7 @@ def _grep_via_ripgrep(
for pat in _split_glob_patterns(glob_pattern):
args.extend(["--glob", pat])

return ripgrep(args, base_path)
return ripgrep(args, base_path, abort_signal=abort_signal)


# -- Pagination ----------------------------------------------------------------
Expand Down Expand Up @@ -370,6 +373,7 @@ def _grep_call(tool_input: dict[str, Any], context: ToolContext) -> ToolResult:

cwd = context.cwd
use_ripgrep = find_ripgrep() is not None
abort_signal = context.abort_controller.signal

if use_ripgrep:
try:
Expand All @@ -384,7 +388,17 @@ def _grep_call(tool_input: dict[str, Any], context: ToolContext) -> ToolResult:
show_line_numbers=show_line_numbers,
context_before=ctx_before,
context_after=ctx_after,
abort_signal=abort_signal,
)
except RipgrepAbortedError as e:
# User pressed ESC mid-search. Re-raise as ``AbortError`` so
# the agent loop's ``except AbortError: raise`` branch
# unwinds to the outer cancel boundary. Partial results are
# dropped — the caller already cancelled, surfacing them
# would be noise the agent has to dismiss.
# ``abort_signal`` is non-None: ``ToolContext.abort_controller``
# is non-optional after PR #137.
raise AbortError(abort_signal.reason or "user_interrupt") from e
except RipgrepTimeoutError as e:
results = e.partial_results
except RipgrepUnavailableError:
Expand Down
165 changes: 145 additions & 20 deletions src/tool_system/utils/ripgrep.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,25 @@
import os
import platform
import shutil
import signal as _signal_mod
import subprocess
import sys
import time as _time_mod
from typing import Any

from src.utils.abort_controller import AbortSignal


# Poll interval for the abort/timeout watcher. 50 ms keeps ESC perceptibly
# instant (well under the ~100 ms threshold where humans notice latency)
# while costing ~20 wakeups/sec for a long search — negligible. Mirrors
# the same constant in ``bash_tool`` so abort latency is consistent
# across long-running tool surfaces.
_ABORT_POLL_INTERVAL_S = 0.05

# Grace period between SIGTERM and SIGKILL after an abort/timeout. Lets
# ripgrep flush any buffered output before we forcibly tear it down.
_KILL_GRACE_S = 2.0


class RipgrepTimeoutError(Exception):
Expand All @@ -15,6 +32,20 @@ def __init__(self, message: str, partial_results: list[str] | None = None):
self.partial_results = partial_results or []


class RipgrepAbortedError(Exception):
"""Raised when ``abort_signal`` fired before ripgrep produced a result.

Distinct from ``RipgrepTimeoutError`` so callers can decide whether
to surface partial results (timeout: yes, useful) or unwind without
them (abort: caller already decided to cancel — emitting "partial
results" would be noise that the agent has to re-read).
"""

def __init__(self, message: str, partial_results: list[str] | None = None):
super().__init__(message)
self.partial_results = partial_results or []


class RipgrepUnavailableError(Exception):
pass

Expand Down Expand Up @@ -58,16 +89,109 @@ def _get_install_hint() -> str:
return "Linux: use your distro package manager, e.g. `apt install ripgrep`."


def _kill_process_group(pid: int, sig: int) -> None:
try:
if sys.platform == "win32":
os.kill(pid, sig)
else:
os.killpg(os.getpgid(pid), sig)
except (ProcessLookupError, PermissionError, OSError):
# Already gone (race vs. natural exit) or insufficient privileges
# — fall through to ``proc.wait()`` which will surface the right
# state.
pass


def _run_rg_with_abort(
argv: list[str],
*,
timeout_s: float,
abort_signal: AbortSignal | None,
) -> tuple[int, str, str, bool, bool]:
"""Run ripgrep with abort + timeout supervision.

Returns ``(returncode, stdout, stderr, aborted, timed_out)``.

Replaces ``subprocess.run(..., timeout=...)`` so a tripped
``abort_signal`` can tear the ripgrep process down within
``_ABORT_POLL_INTERVAL_S`` rather than waiting out the entire
``timeout_s`` deadline. This mirrors ``bash_tool._run_bash_with_abort``
— long-running search and long-running shell share the same
abort-latency contract.
"""

popen_kwargs: dict[str, Any] = {
"stdout": subprocess.PIPE,
"stderr": subprocess.PIPE,
"text": True,
}
if sys.platform == "win32":
popen_kwargs["creationflags"] = getattr(
subprocess, "CREATE_NEW_PROCESS_GROUP", 0
)
else:
popen_kwargs["start_new_session"] = True

proc = subprocess.Popen(argv, **popen_kwargs)
deadline = _time_mod.monotonic() + timeout_s
aborted = False
timed_out = False

while True:
if proc.poll() is not None:
break
if abort_signal is not None and abort_signal.aborted:
aborted = True
break
if _time_mod.monotonic() >= deadline:
timed_out = True
break
_time_mod.sleep(_ABORT_POLL_INTERVAL_S)

if aborted or timed_out:
_kill_process_group(proc.pid, _signal_mod.SIGTERM)
try:
proc.wait(timeout=_KILL_GRACE_S)
except subprocess.TimeoutExpired:
_kill_process_group(proc.pid, _signal_mod.SIGKILL)
try:
proc.wait(timeout=_KILL_GRACE_S)
except subprocess.TimeoutExpired:
pass

try:
stdout, stderr = proc.communicate(timeout=_KILL_GRACE_S)
except subprocess.TimeoutExpired:
stdout, stderr = "", ""

return (
proc.returncode if proc.returncode is not None else -1,
stdout or "",
stderr or "",
aborted,
timed_out,
)


def ripgrep(
args: list[str],
target: str,
timeout: float | None = None,
single_thread: bool = False,
abort_signal: AbortSignal | None = None,
) -> list[str]:
"""Run ripgrep and return output lines.

Exit code 0 = matches found, 1 = no matches (both are success).
Exit code >= 2 = actual error.

When ``abort_signal`` is provided and trips mid-run, the subprocess
is torn down within ``_ABORT_POLL_INTERVAL_S`` and a
:class:`RipgrepAbortedError` is raised so the caller can unwind
(typically by re-raising ``AbortError`` from the agent loop).
Without the signal parameter the previous ``subprocess.run`` style
is preserved, so SDK consumers that pass ``abort_signal=None``
behave exactly as before.
"""
rg = find_ripgrep()
if rg is None:
Expand All @@ -80,28 +204,29 @@ def ripgrep(
full_args = [rg, *thread_args, *args, target]
effective_timeout = timeout if timeout is not None else _get_timeout()

try:
result = subprocess.run(
full_args,
capture_output=True,
text=True,
timeout=effective_timeout,
)
except subprocess.TimeoutExpired as e:
partial = []
if e.stdout:
stdout_text = e.stdout if isinstance(e.stdout, str) else e.stdout.decode("utf-8", errors="replace")
partial = [line for line in stdout_text.splitlines() if line]
returncode, stdout, stderr, aborted, timed_out = _run_rg_with_abort(
full_args, timeout_s=effective_timeout, abort_signal=abort_signal
)

if aborted:
partial = [line for line in stdout.splitlines() if line] if stdout else []
raise RipgrepAbortedError("ripgrep cancelled by abort signal", partial)

if timed_out:
partial = [line for line in stdout.splitlines() if line] if stdout else []
raise RipgrepTimeoutError(
f"ripgrep timed out after {effective_timeout}s", partial
) from e
)

if result.returncode >= 2:
stderr = result.stderr.strip()
if _is_eagain_error(stderr) and not single_thread:
return ripgrep(args, target, timeout=timeout, single_thread=True)
raise RuntimeError(f"ripgrep error (exit {result.returncode}): {stderr}")
if returncode >= 2:
stderr_text = stderr.strip()
if _is_eagain_error(stderr_text) and not single_thread:
return ripgrep(
args, target, timeout=timeout, single_thread=True,
abort_signal=abort_signal,
)
raise RuntimeError(f"ripgrep error (exit {returncode}): {stderr_text}")

if not result.stdout:
if not stdout:
return []
return [line for line in result.stdout.splitlines() if line]
return [line for line in stdout.splitlines() if line]
Loading