Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/actions/setup-python-env/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ runs:
uses: astral-sh/setup-uv@v6
with:
version: ${{ inputs.uv-version }}
enable-cache: 'true'
enable-cache: "true"
cache-suffix: ${{ matrix.python-version }}

- name: Install Python dependencies
run: uv sync --frozen
run: uv sync --all-extras --all-groups --frozen
shell: bash
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ dev = [
"python-dotenv>=1.1.1",
]

[project.optional-dependencies]
logfire = ["logfire>=0.14", "opentelemetry-sdk>=1.28.0"]

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
Expand Down
3 changes: 3 additions & 0 deletions src/acp/agent/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .connection import AgentSideConnection

__all__ = ["AgentSideConnection"]
137 changes: 137 additions & 0 deletions src/acp/agent/connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
from __future__ import annotations

import asyncio
from collections.abc import Callable
from typing import Any

from ..connection import Connection, MethodHandler
from ..interfaces import Agent
from ..meta import CLIENT_METHODS
from ..schema import (
CreateTerminalRequest,
CreateTerminalResponse,
KillTerminalCommandRequest,
KillTerminalCommandResponse,
ReadTextFileRequest,
ReadTextFileResponse,
ReleaseTerminalRequest,
ReleaseTerminalResponse,
RequestPermissionRequest,
RequestPermissionResponse,
SessionNotification,
TerminalOutputRequest,
TerminalOutputResponse,
WaitForTerminalExitRequest,
WaitForTerminalExitResponse,
WriteTextFileRequest,
WriteTextFileResponse,
)
from ..terminal import TerminalHandle
from ..utils import notify_model, request_model, request_optional_model
from .router import build_agent_router

__all__ = ["AgentSideConnection"]

_AGENT_CONNECTION_ERROR = "AgentSideConnection requires asyncio StreamWriter/StreamReader"


class AgentSideConnection:
"""Agent-side connection wrapper that dispatches JSON-RPC messages to a Client implementation."""

def __init__(
self,
to_agent: Callable[[AgentSideConnection], Agent],
input_stream: Any,
output_stream: Any,
) -> None:
agent = to_agent(self)
handler = self._create_handler(agent)

if not isinstance(input_stream, asyncio.StreamWriter) or not isinstance(output_stream, asyncio.StreamReader):
raise TypeError(_AGENT_CONNECTION_ERROR)
self._conn = Connection(handler, input_stream, output_stream)

def _create_handler(self, agent: Agent) -> MethodHandler:
router = build_agent_router(agent)

async def handler(method: str, params: Any | None, is_notification: bool) -> Any:
if is_notification:
await router.dispatch_notification(method, params)
return None
return await router.dispatch_request(method, params)

return handler

async def sessionUpdate(self, params: SessionNotification) -> None:
await notify_model(self._conn, CLIENT_METHODS["session_update"], params)

async def requestPermission(self, params: RequestPermissionRequest) -> RequestPermissionResponse:
return await request_model(
self._conn,
CLIENT_METHODS["session_request_permission"],
params,
RequestPermissionResponse,
)

async def readTextFile(self, params: ReadTextFileRequest) -> ReadTextFileResponse:
return await request_model(
self._conn,
CLIENT_METHODS["fs_read_text_file"],
params,
ReadTextFileResponse,
)

async def writeTextFile(self, params: WriteTextFileRequest) -> WriteTextFileResponse | None:
return await request_optional_model(
self._conn,
CLIENT_METHODS["fs_write_text_file"],
params,
WriteTextFileResponse,
)

async def createTerminal(self, params: CreateTerminalRequest) -> TerminalHandle:
create_response = await request_model(
self._conn,
CLIENT_METHODS["terminal_create"],
params,
CreateTerminalResponse,
)
return TerminalHandle(create_response.terminalId, params.sessionId, self._conn)

async def terminalOutput(self, params: TerminalOutputRequest) -> TerminalOutputResponse:
return await request_model(
self._conn,
CLIENT_METHODS["terminal_output"],
params,
TerminalOutputResponse,
)

async def releaseTerminal(self, params: ReleaseTerminalRequest) -> ReleaseTerminalResponse | None:
return await request_optional_model(
self._conn,
CLIENT_METHODS["terminal_release"],
params,
ReleaseTerminalResponse,
)

async def waitForTerminalExit(self, params: WaitForTerminalExitRequest) -> WaitForTerminalExitResponse:
return await request_model(
self._conn,
CLIENT_METHODS["terminal_wait_for_exit"],
params,
WaitForTerminalExitResponse,
)

async def killTerminal(self, params: KillTerminalCommandRequest) -> KillTerminalCommandResponse | None:
return await request_optional_model(
self._conn,
CLIENT_METHODS["terminal_kill"],
params,
KillTerminalCommandResponse,
)

async def extMethod(self, method: str, params: dict[str, Any]) -> dict[str, Any]:
return await self._conn.send_request(f"_{method}", params)

async def extNotification(self, method: str, params: dict[str, Any]) -> None:
await self._conn.send_notification(f"_{method}", params)
76 changes: 76 additions & 0 deletions src/acp/agent/router.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
from __future__ import annotations

from typing import Any

from ..exceptions import RequestError
from ..interfaces import Agent
from ..meta import AGENT_METHODS
from ..router import MessageRouter, RouterBuilder
from ..schema import (
AuthenticateRequest,
CancelNotification,
InitializeRequest,
LoadSessionRequest,
NewSessionRequest,
PromptRequest,
SetSessionModelRequest,
SetSessionModeRequest,
)
from ..utils import normalize_result

__all__ = ["build_agent_router"]


def build_agent_router(agent: Agent) -> MessageRouter:
builder = RouterBuilder()

builder.request_attr(AGENT_METHODS["initialize"], InitializeRequest, agent, "initialize")
builder.request_attr(AGENT_METHODS["session_new"], NewSessionRequest, agent, "newSession")
builder.request_attr(
AGENT_METHODS["session_load"],
LoadSessionRequest,
agent,
"loadSession",
adapt_result=normalize_result,
)
builder.request_attr(
AGENT_METHODS["session_set_mode"],
SetSessionModeRequest,
agent,
"setSessionMode",
adapt_result=normalize_result,
)
builder.request_attr(AGENT_METHODS["session_prompt"], PromptRequest, agent, "prompt")
builder.request_attr(
AGENT_METHODS["session_set_model"],
SetSessionModelRequest,
agent,
"setSessionModel",
adapt_result=normalize_result,
)
builder.request_attr(
AGENT_METHODS["authenticate"],
AuthenticateRequest,
agent,
"authenticate",
adapt_result=normalize_result,
)

builder.notification_attr(AGENT_METHODS["session_cancel"], CancelNotification, agent, "cancel")

async def handle_extension_request(name: str, payload: dict[str, Any]) -> Any:
ext = getattr(agent, "extMethod", None)
if ext is None:
raise RequestError.method_not_found(f"_{name}")
return await ext(name, payload)

async def handle_extension_notification(name: str, payload: dict[str, Any]) -> None:
ext = getattr(agent, "extNotification", None)
if ext is None:
return
await ext(name, payload)

return builder.build(
request_extensions=handle_extension_request,
notification_extensions=handle_extension_notification,
)
3 changes: 3 additions & 0 deletions src/acp/client/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .connection import ClientSideConnection

__all__ = ["ClientSideConnection"]
129 changes: 129 additions & 0 deletions src/acp/client/connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
from __future__ import annotations

import asyncio
from collections.abc import Callable
from typing import Any

from ..connection import Connection, MethodHandler
from ..interfaces import Agent, Client
from ..meta import AGENT_METHODS
from ..schema import (
AuthenticateRequest,
AuthenticateResponse,
CancelNotification,
InitializeRequest,
InitializeResponse,
LoadSessionRequest,
LoadSessionResponse,
NewSessionRequest,
NewSessionResponse,
PromptRequest,
PromptResponse,
SetSessionModelRequest,
SetSessionModelResponse,
SetSessionModeRequest,
SetSessionModeResponse,
)
from ..utils import (
notify_model,
request_model,
request_model_from_dict,
)
from .router import build_client_router

__all__ = ["ClientSideConnection"]

_CLIENT_CONNECTION_ERROR = "ClientSideConnection requires asyncio StreamWriter/StreamReader"


class ClientSideConnection:
"""Client-side connection wrapper that dispatches JSON-RPC messages to an Agent implementation."""

def __init__(
self,
to_client: Callable[[Agent], Client],
input_stream: Any,
output_stream: Any,
) -> None:
if not isinstance(input_stream, asyncio.StreamWriter) or not isinstance(output_stream, asyncio.StreamReader):
raise TypeError(_CLIENT_CONNECTION_ERROR)

client = to_client(self) # type: ignore[arg-type]
handler = self._create_handler(client)
self._conn = Connection(handler, input_stream, output_stream)

def _create_handler(self, client: Client) -> MethodHandler:
router = build_client_router(client)

async def handler(method: str, params: Any | None, is_notification: bool) -> Any:
if is_notification:
await router.dispatch_notification(method, params)
return None
return await router.dispatch_request(method, params)

return handler

async def initialize(self, params: InitializeRequest) -> InitializeResponse:
return await request_model(
self._conn,
AGENT_METHODS["initialize"],
params,
InitializeResponse,
)

async def newSession(self, params: NewSessionRequest) -> NewSessionResponse:
return await request_model(
self._conn,
AGENT_METHODS["session_new"],
params,
NewSessionResponse,
)

async def loadSession(self, params: LoadSessionRequest) -> LoadSessionResponse:
return await request_model_from_dict(
self._conn,
AGENT_METHODS["session_load"],
params,
LoadSessionResponse,
)

async def setSessionMode(self, params: SetSessionModeRequest) -> SetSessionModeResponse:
return await request_model_from_dict(
self._conn,
AGENT_METHODS["session_set_mode"],
params,
SetSessionModeResponse,
)

async def setSessionModel(self, params: SetSessionModelRequest) -> SetSessionModelResponse:
return await request_model_from_dict(
self._conn,
AGENT_METHODS["session_set_model"],
params,
SetSessionModelResponse,
)

async def authenticate(self, params: AuthenticateRequest) -> AuthenticateResponse:
return await request_model_from_dict(
self._conn,
AGENT_METHODS["authenticate"],
params,
AuthenticateResponse,
)

async def prompt(self, params: PromptRequest) -> PromptResponse:
return await request_model(
self._conn,
AGENT_METHODS["session_prompt"],
params,
PromptResponse,
)

async def cancel(self, params: CancelNotification) -> None:
await notify_model(self._conn, AGENT_METHODS["session_cancel"], params)

async def extMethod(self, method: str, params: dict[str, Any]) -> dict[str, Any]:
return await self._conn.send_request(f"_{method}", params)

async def extNotification(self, method: str, params: dict[str, Any]) -> None:
await self._conn.send_notification(f"_{method}", params)
Loading