Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
222 changes: 219 additions & 3 deletions src/adcp/server/serve.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,13 +418,20 @@ def serve(
(from ``adcp_server()``). Builders are auto-converted via ``build_handler()``.

This is the simplest way to run an ADCP agent. Set ``transport="a2a"``
to serve over the A2A protocol instead of MCP.
to serve over the A2A protocol instead of MCP, or ``transport="both"``
to serve both protocols on the same port (MCP at ``/mcp``, A2A at
``/``).

Args:
handler: An ADCPHandler subclass instance with your tool implementations.
name: Server name shown to clients / in the A2A agent card.
port: Port to listen on. Defaults to PORT env var, then 3001.
transport: ``"streamable-http"`` (default, MCP) or ``"a2a"``.
transport: ``"streamable-http"`` (default, MCP), ``"a2a"``, or
``"both"`` (one Starlette binary serving MCP at ``/mcp``
and A2A at ``/``). Use ``"both"`` when you want adopters
on either protocol to reach the same handler with shared
``context_factory`` + ``middleware`` wiring — JS hosts both
on one Express app; this is the Python parity.
instructions: Optional system instructions for the agent (MCP only).
test_controller: Optional TestControllerStore instance for storyboard testing.
context_factory: Optional factory that builds a :class:`ToolContext`
Expand Down Expand Up @@ -551,8 +558,25 @@ async def force_account_status(self, account_id, status):
max_request_size=max_request_size,
streaming_responses=streaming_responses,
)
elif transport == "both":
_serve_mcp_and_a2a(
handler,
name=name,
port=port,
host=host,
instructions=instructions,
test_controller=test_controller,
context_factory=context_factory,
task_store=task_store,
push_config_store=push_config_store,
middleware=middleware,
message_parser=message_parser,
advertise_all=advertise_all,
max_request_size=max_request_size,
streaming_responses=streaming_responses,
)
else:
valid = ", ".join(sorted(("a2a", "streamable-http", "sse", "stdio")))
valid = ", ".join(sorted(("a2a", "both", "streamable-http", "sse", "stdio")))
raise ValueError(f"Unknown transport {transport!r}. Valid: {valid}")


Expand Down Expand Up @@ -845,6 +869,198 @@ async def _serve() -> None:
sock.close()


def _build_mcp_and_a2a_app(
handler: ADCPHandler[Any],
*,
name: str,
port: int,
host: str,
instructions: str | None,
test_controller: TestControllerStore | None,
context_factory: ContextFactory | None = None,
task_store: TaskStore | None = None,
push_config_store: PushNotificationConfigStore | None = None,
middleware: Sequence[SkillMiddleware] | None = None,
message_parser: MessageParser | None = None,
advertise_all: bool = False,
max_request_size: int | None = None,
streaming_responses: bool = False,
) -> Any:
"""Build the unified MCP+A2A ASGI app without starting a server.

Split out from :func:`_serve_mcp_and_a2a` so tests can route
requests through Starlette's ``TestClient`` against the same
dispatcher production uses.

Returns the size-limit-wrapped ASGI app. Wire to uvicorn /
Starlette / your test harness as you would any other ASGI app.
"""
import contextlib

from starlette.applications import Starlette
from starlette.types import ASGIApp, Receive, Scope, Send

from adcp.server.a2a_server import create_a2a_server

# MCP app — FastMCP registers its streamable-http endpoint at
# ``streamable_http_path`` (default ``/mcp``). The dispatcher
# below preserves the full request path when routing to MCP, so
# the inner Starlette router matches ``/mcp`` directly without
# needing a Mount-based prefix strip.
mcp = create_mcp_server(
handler,
name=name,
port=port,
host=host,
instructions=instructions,
include_test_controller=test_controller is not None,
context_factory=context_factory,
middleware=middleware,
advertise_all=advertise_all,
streaming_responses=streaming_responses,
)
if test_controller is not None:
from adcp.server.test_controller import register_test_controller

register_test_controller(mcp, test_controller, context_factory=context_factory)
mcp_inner = mcp.streamable_http_app()
# Wrap with the standard trailing-slash normalizer so ``/mcp/``
# and ``/mcp`` resolve to the same FastMCP endpoint. Keep the
# unwrapped ``mcp_inner`` reference so the lifespan composer
# below can reach ``.router.lifespan_context``.
mcp_app = _wrap_with_path_normalize(mcp_inner)

# A2A app — built via the a2a-sdk wrapper. It mounts at the root
# of its own app and handles ``/.well-known/agent.json``, ``/``,
# and the message / push-notif endpoints.
a2a_app = create_a2a_server(
handler,
name=name,
port=port,
test_controller=test_controller,
context_factory=context_factory,
task_store=task_store,
push_config_store=push_config_store,
middleware=middleware,
message_parser=message_parser,
advertise_all=advertise_all,
)

# Lifespan composition: FastMCP's session manager initializes a
# task group on startup; a2a-sdk's stores have their own init.
# Compose both inner lifespans on a parent Starlette; the
# dispatcher routes ``lifespan`` scope events to the parent so
# both initializers run before any request lands.
@contextlib.asynccontextmanager
async def _composed_lifespan(_app): # type: ignore[no-untyped-def]
async with mcp_inner.router.lifespan_context(mcp_inner):
async with a2a_app.router.lifespan_context(a2a_app):
yield

parent = Starlette(lifespan=_composed_lifespan)

async def _dispatch(scope: Scope, receive: Receive, send: Send) -> None:
"""Path-based ASGI dispatcher.

``/mcp`` and ``/mcp/...`` route to the FastMCP streamable-http
app with the full original path preserved (FastMCP's inner
route is at ``/mcp``). Everything else goes to A2A. Lifespan
events route to the parent Starlette which composes both
inner lifespans.
"""
if scope["type"] == "http":
path = scope.get("path", "")
if path == "/mcp" or path.startswith("/mcp/"):
await mcp_app(scope, receive, send)
return
await a2a_app(scope, receive, send)
return
if scope["type"] == "lifespan":
await parent(scope, receive, send)
return
# Websocket and other scopes: route to A2A by default. MCP
# streamable-http doesn't use websockets; A2A doesn't either
# in the default a2a-sdk shape, but if either grows that
# surface the dispatcher needs an explicit branch.
await a2a_app(scope, receive, send)

app: ASGIApp = _dispatch
return _wrap_with_size_limit(app, max_request_size)


def _serve_mcp_and_a2a(
handler: ADCPHandler[Any],
*,
name: str,
port: int | None,
host: str | None = None,
instructions: str | None,
test_controller: TestControllerStore | None,
context_factory: ContextFactory | None = None,
task_store: TaskStore | None = None,
push_config_store: PushNotificationConfigStore | None = None,
middleware: Sequence[SkillMiddleware] | None = None,
message_parser: MessageParser | None = None,
advertise_all: bool = False,
max_request_size: int | None = None,
streaming_responses: bool = False,
) -> None:
"""Serve MCP and A2A on a single port via path dispatch.

JS sellers host both transports on one Express/Hono app; this is
the Python parity. Build both apps independently with the same
handler, ``context_factory``, ``middleware``, etc., then route
by URL path: ``/mcp`` and ``/mcp/...`` go to the MCP streamable-http
app, everything else (``/``, ``/.well-known/agent.json``,
A2A push endpoints) goes to the A2A app.

Both apps see the same ``ToolContext`` and middleware chain because
they share the same ``handler`` instance — adopters writing audit
or rate-limit middleware get one wiring point that applies to both
transports automatically.
"""
import anyio
import uvicorn

resolved_port = port or int(os.environ.get("PORT", "3001"))
resolved_host = host or os.environ.get("ADCP_HOST", "0.0.0.0")
log_level = "info"

app = _build_mcp_and_a2a_app(
handler,
name=name,
port=resolved_port,
host=resolved_host,
instructions=instructions,
test_controller=test_controller,
context_factory=context_factory,
task_store=task_store,
push_config_store=push_config_store,
middleware=middleware,
message_parser=message_parser,
advertise_all=advertise_all,
max_request_size=max_request_size,
streaming_responses=streaming_responses,
)

sock = _bind_reusable_socket(resolved_host, resolved_port)
try:
logger.info(
"MCP+A2A unified listening on http://%s:%s " "(MCP at /mcp, A2A at /)",
resolved_host,
resolved_port,
)
config = uvicorn.Config(app, log_level=log_level)
server = uvicorn.Server(config)

async def _serve() -> None:
await server.serve(sockets=[sock])

anyio.run(_serve)
finally:
sock.close()


def create_mcp_server(
handler: ADCPHandler[Any],
*,
Expand Down
Loading
Loading