From 599925ce8e3c28ef1101c2e2de4543ded1f22ed7 Mon Sep 17 00:00:00 2001 From: Chojan Shang Date: Fri, 14 Nov 2025 00:38:02 +0800 Subject: [PATCH] fix(issue #27): added an optional limit parameter to spawn_stdio_transport and stdio_streams Signed-off-by: Chojan Shang --- README.md | 2 ++ src/acp/stdio.py | 26 ++++++++++++------ src/acp/transports.py | 31 +++++++++++++++------ tests/real_user/test_stdio_limits.py | 41 ++++++++++++++++++++++++++++ 4 files changed, 83 insertions(+), 17 deletions(-) create mode 100644 tests/real_user/test_stdio_limits.py diff --git a/README.md b/README.md index 51f8cfc..1793246 100644 --- a/README.md +++ b/README.md @@ -122,6 +122,8 @@ if __name__ == "__main__": Full example with streaming and lifecycle hooks lives in [examples/echo_agent.py](examples/echo_agent.py). +Python's asyncio applies a 64KB default line buffer for stdio. If your agent streams larger JSON lines, pass `limit=10 * 1024 * 1024` (or similar) to `stdio_streams()` and `spawn_stdio_transport()` to raise the cap. + ## Examples - `examples/echo_agent.py`: the canonical streaming agent with lifecycle hooks diff --git a/src/acp/stdio.py b/src/acp/stdio.py index 40aa5a8..88917c9 100644 --- a/src/acp/stdio.py +++ b/src/acp/stdio.py @@ -96,8 +96,11 @@ def get_extra_info(self, name: str, default=None): # type: ignore[override] return default -async def _windows_stdio_streams(loop: asyncio.AbstractEventLoop) -> tuple[asyncio.StreamReader, asyncio.StreamWriter]: - reader = asyncio.StreamReader() +async def _windows_stdio_streams( + loop: asyncio.AbstractEventLoop, + limit: int | None = None, +) -> tuple[asyncio.StreamReader, asyncio.StreamWriter]: + reader = asyncio.StreamReader(limit=limit) if limit is not None else asyncio.StreamReader() _ = asyncio.StreamReaderProtocol(reader) _start_stdin_feeder(loop, reader) @@ -108,9 +111,12 @@ async def _windows_stdio_streams(loop: asyncio.AbstractEventLoop) -> tuple[async return reader, writer -async def _posix_stdio_streams(loop: asyncio.AbstractEventLoop) -> tuple[asyncio.StreamReader, asyncio.StreamWriter]: +async def _posix_stdio_streams( + loop: asyncio.AbstractEventLoop, + limit: int | None = None, +) -> tuple[asyncio.StreamReader, asyncio.StreamWriter]: # Reader from stdin - reader = asyncio.StreamReader() + reader = asyncio.StreamReader(limit=limit) if limit is not None else asyncio.StreamReader() reader_protocol = asyncio.StreamReaderProtocol(reader) await loop.connect_read_pipe(lambda: reader_protocol, sys.stdin) @@ -121,12 +127,16 @@ async def _posix_stdio_streams(loop: asyncio.AbstractEventLoop) -> tuple[asyncio return reader, writer -async def stdio_streams() -> tuple[asyncio.StreamReader, asyncio.StreamWriter]: - """Create stdio asyncio streams; on Windows use a thread feeder + custom stdout transport.""" +async def stdio_streams(limit: int | None = None) -> tuple[asyncio.StreamReader, asyncio.StreamWriter]: + """Create stdio asyncio streams; on Windows use a thread feeder + custom stdout transport. + + Args: + limit: Optional buffer limit for the stdin reader. + """ loop = asyncio.get_running_loop() if platform.system() == "Windows": - return await _windows_stdio_streams(loop) - return await _posix_stdio_streams(loop) + return await _windows_stdio_streams(loop, limit=limit) + return await _posix_stdio_streams(loop, limit=limit) @asynccontextmanager diff --git a/src/acp/transports.py b/src/acp/transports.py index be2a002..31a704e 100644 --- a/src/acp/transports.py +++ b/src/acp/transports.py @@ -51,6 +51,7 @@ async def spawn_stdio_transport( env: Mapping[str, str] | None = None, cwd: str | Path | None = None, stderr: int | None = aio_subprocess.PIPE, + limit: int | None = None, shutdown_timeout: float = 2.0, ) -> AsyncIterator[tuple[asyncio.StreamReader, asyncio.StreamWriter, aio_subprocess.Process]]: """Launch a subprocess and expose its stdio streams as asyncio transports. @@ -62,15 +63,27 @@ async def spawn_stdio_transport( if env: merged_env.update(env) - process = await asyncio.create_subprocess_exec( - command, - *args, - stdin=aio_subprocess.PIPE, - stdout=aio_subprocess.PIPE, - stderr=stderr, - env=merged_env, - cwd=str(cwd) if cwd is not None else None, - ) + if limit is None: + process = await asyncio.create_subprocess_exec( + command, + *args, + stdin=aio_subprocess.PIPE, + stdout=aio_subprocess.PIPE, + stderr=stderr, + env=merged_env, + cwd=str(cwd) if cwd is not None else None, + ) + else: + process = await asyncio.create_subprocess_exec( + command, + *args, + stdin=aio_subprocess.PIPE, + stdout=aio_subprocess.PIPE, + stderr=stderr, + env=merged_env, + cwd=str(cwd) if cwd is not None else None, + limit=limit, + ) if process.stdout is None or process.stdin is None: process.kill() diff --git a/tests/real_user/test_stdio_limits.py b/tests/real_user/test_stdio_limits.py new file mode 100644 index 0000000..3de0ef9 --- /dev/null +++ b/tests/real_user/test_stdio_limits.py @@ -0,0 +1,41 @@ +import sys +import textwrap + +import pytest + +from acp.transports import spawn_stdio_transport + +LARGE_LINE_SIZE = 70 * 1024 + + +def _large_line_script(size: int = LARGE_LINE_SIZE) -> str: + return textwrap.dedent( + f""" + import sys + sys.stdout.write("X" * {size}) + sys.stdout.write("\\n") + sys.stdout.flush() + """ + ).strip() + + +@pytest.mark.asyncio +async def test_spawn_stdio_transport_hits_default_limit() -> None: + script = _large_line_script() + async with spawn_stdio_transport(sys.executable, "-c", script) as (reader, writer, _process): + # readline() re-raises LimitOverrunError as ValueError on CPython 3.12+. + with pytest.raises(ValueError): + await reader.readline() + + +@pytest.mark.asyncio +async def test_spawn_stdio_transport_custom_limit_handles_large_line() -> None: + script = _large_line_script() + async with spawn_stdio_transport( + sys.executable, + "-c", + script, + limit=LARGE_LINE_SIZE * 2, + ) as (reader, writer, _process): + line = await reader.readline() + assert len(line) == LARGE_LINE_SIZE + 1