Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions src/adcp/protocols/_adcp_errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""Shared ``adcp_error`` envelope validation for protocol adapters.

The shape and size caps are spec-defined (transport-errors.mdx §The
``adcp_error`` Object): ``code`` is a non-empty string ≤ 64 chars,
total serialized envelope ≤ 4 KB. Both MCP and A2A transports project
their wire-specific error envelopes onto the same shape, so the
validation lives here rather than in either transport.
"""

from __future__ import annotations

import json
from typing import Any

MAX_ERROR_CODE_LEN = 64
MAX_ERROR_SIZE_BYTES = 4096


def validate_adcp_error(err: Any) -> dict[str, Any] | None:
"""Return ``err`` if it's a spec-shaped ``adcp_error`` envelope, else ``None``.

Spec rules: ``code`` is a non-empty string ≤ 64 chars; the whole
serialized envelope is ≤ 4 KB. Non-dict input, missing code, oversize
payloads, and non-serializable values all fail closed (returns ``None``).
"""
if not isinstance(err, dict):
return None
code = err.get("code")
if not isinstance(code, str) or not (0 < len(code) <= MAX_ERROR_CODE_LEN):
return None
try:
if len(json.dumps(err)) > MAX_ERROR_SIZE_BYTES:
return None
except (TypeError, ValueError):
return None
return err
34 changes: 32 additions & 2 deletions src/adcp/protocols/a2a.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
IdempotencyConflictError,
IdempotencyExpiredError,
)
from adcp.protocols._adcp_errors import validate_adcp_error
from adcp.protocols.base import ProtocolAdapter
from adcp.signing.autosign import current_operation as _signing_operation
from adcp.types.core import AgentConfig, DebugInfo, TaskResult, TaskStatus
Expand Down Expand Up @@ -646,11 +647,26 @@ def _process_task_response(
debug_info=debug_info,
)
elif task_state == pb.TaskState.TASK_STATE_FAILED:
# Protocol-level failure - extract error message from TextPart
error_msg = self._extract_text_from_task(task) or "Task failed"
# Per transport-errors.mdx §A2A Binding: a failed task carries
# an ``adcp_error`` DataPart alongside the human-readable
# TextPart. The structured envelope lands on
# ``TaskResult.adcp_error`` for programmatic branching; the
# text stays on ``error`` for humans. When the seller omits
# the TextPart, fall back to the structured envelope's
# ``message`` / ``code`` so adopters don't see the
# ``"Task failed"`` placeholder mask a real diagnostic.
text_msg = self._extract_text_from_task(task)
adcp_error = self._extract_adcp_error_from_task(task)
if text_msg:
error_msg: str | None = text_msg
elif adcp_error:
error_msg = adcp_error.get("message") or adcp_error.get("code")
else:
error_msg = "Task failed"
return TaskResult[Any](
status=TaskStatus.FAILED,
error=error_msg,
adcp_error=adcp_error,
success=False,
debug_info=debug_info,
)
Expand Down Expand Up @@ -722,6 +738,20 @@ def _extract_result_from_task(self, task: pb.Task) -> Any:

return data

def _extract_adcp_error_from_task(self, task: pb.Task) -> dict[str, Any] | None:
"""Extract a spec-shaped ``adcp_error`` DataPart from a failed task.

Per transport-errors.mdx §A2A Binding the failed task's artifact
carries a DataPart wrapping ``{"adcp_error": {...}}``. Returns the
validated envelope or ``None`` if no spec-shaped payload is present
(spec-permitted: graceful sellers MAY omit the structured envelope,
in which case adopters fall back to ``TaskResult.error``).
"""
data = self._extract_result_from_task(task)
if not isinstance(data, dict):
return None
return validate_adcp_error(data.get("adcp_error"))

def _extract_text_from_task(self, task: pb.Task) -> str | None:
"""Extract human-readable message from TextPart if present."""
if not task.artifacts:
Expand Down
20 changes: 2 additions & 18 deletions src/adcp/protocols/mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
IdempotencyConflictError,
IdempotencyExpiredError,
)
from adcp.protocols._adcp_errors import validate_adcp_error as _validate_adcp_error
from adcp.protocols.base import ProtocolAdapter
from adcp.signing.autosign import current_operation as _signing_operation
from adcp.types.core import DebugInfo, TaskResult, TaskStatus
Expand All @@ -66,8 +67,6 @@
# Spec-defined limits from docs/building/implementation/mcp-response-extraction.mdx
# and docs/building/implementation/transport-errors.mdx.
_MAX_TEXT_SIZE_BYTES = 1_048_576 # 1MB cap on text items before JSON.parse
_MAX_ERROR_SIZE_BYTES = 4096 # total adcp_error JSON-serialized size
_MAX_ERROR_CODE_LEN = 64


def _make_signing_http_factory(
Expand Down Expand Up @@ -155,22 +154,6 @@ def extract_adcp_success(result: Any) -> dict[str, Any] | None:
return None


def _validate_adcp_error(err: Any) -> dict[str, Any] | None:
"""Per transport-errors.mdx: ``code`` must be a non-empty string ≤ 64 chars,
total serialized size ≤ 4KB. Returns the validated error or None."""
if not isinstance(err, dict):
return None
code = err.get("code")
if not isinstance(code, str) or not (0 < len(code) <= _MAX_ERROR_CODE_LEN):
return None
try:
if len(json.dumps(err)) > _MAX_ERROR_SIZE_BYTES:
return None
except (TypeError, ValueError):
return None
return err


def extract_adcp_error(result: Any) -> dict[str, Any] | None:
"""Extract and validate an AdCP ``adcp_error`` object from an MCP result.

Expand Down Expand Up @@ -611,6 +594,7 @@ async def _call_mcp_tool(self, tool_name: str, params: dict[str, Any]) -> TaskRe
return TaskResult[Any](
status=TaskStatus.FAILED,
error=error_message,
adcp_error=adcp_error,
success=False,
debug_info=debug_info,
idempotency_key=idempotency_key,
Expand Down
6 changes: 6 additions & 0 deletions src/adcp/types/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,12 @@ class TaskResult(BaseModel, Generic[T]):
submitted: SubmittedInfo | None = None
needs_input: NeedsInputInfo | None = None
error: str | None = None
# Structured AdCP error per transport-errors.mdx (``adcp_error`` object:
# ``code``, ``message``, ``detail``, ``field_path``, ``recovery`` ...).
# Always populated on the MCP FAILED path when the seller returned a
# spec-shaped ``adcp_error`` — independent of ``debug``. Callers should
# branch on ``adcp_error.code`` rather than regex-matching ``error``.
adcp_error: dict[str, Any] | None = None
success: bool = Field(default=True)
metadata: dict[str, Any] | None = None
debug_info: DebugInfo | None = None
Expand Down
46 changes: 18 additions & 28 deletions tests/test_mcp_extraction.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@

import pytest

from adcp.protocols._adcp_errors import (
MAX_ERROR_SIZE_BYTES as _MAX_ERROR_SIZE_BYTES,
)
from adcp.protocols._adcp_errors import (
validate_adcp_error as _validate_adcp_error,
)
from adcp.protocols.mcp import (
_MAX_ERROR_SIZE_BYTES,
_MAX_TEXT_SIZE_BYTES,
_validate_adcp_error,
extract_adcp_error,
extract_adcp_success,
)
Expand Down Expand Up @@ -86,19 +90,15 @@ def _mcp_tool_error_vectors() -> list[dict[str, Any]]:
class TestErrorVectorsFromSpec:
"""Replay the MCP-relevant subset of the 29 transport-error vectors."""

@pytest.mark.parametrize(
"vector", _mcp_tool_error_vectors(), ids=lambda v: v["id"]
)
@pytest.mark.parametrize("vector", _mcp_tool_error_vectors(), ids=lambda v: v["id"])
def test_vector(self, vector: dict[str, Any]) -> None:
response = _response_shim(vector["response"])
expected = vector.get("expected_error")
got = extract_adcp_error(response)
if expected is None:
assert got is None, f"vector {vector['id']}: expected None, got {got!r}"
else:
assert got == expected, (
f"vector {vector['id']}: expected {expected!r}, got {got!r}"
)
assert got == expected, f"vector {vector['id']}: expected {expected!r}, got {got!r}"


class TestSuccessExtractionEdgeCases:
Expand All @@ -115,14 +115,12 @@ def test_is_error_short_circuits_structured_content(self) -> None:

def test_oversized_text_item_skipped(self) -> None:
oversized = "x" * (_MAX_TEXT_SIZE_BYTES + 1)
resp = _response_shim(
{"content": [{"type": "text", "text": oversized}]}
)
resp = _response_shim({"content": [{"type": "text", "text": oversized}]})
assert extract_adcp_success(resp) is None

def test_oversized_text_skipped_then_smaller_used(self) -> None:
oversized = "{\"ok\":true," + "x" * _MAX_TEXT_SIZE_BYTES + "}"
small = "{\"status\":\"completed\"}"
oversized = '{"ok":true,' + "x" * _MAX_TEXT_SIZE_BYTES + "}"
small = '{"status":"completed"}'
resp = _response_shim(
{
"content": [
Expand All @@ -147,25 +145,23 @@ def test_non_text_content_items_skipped(self) -> None:
{
"content": [
{"type": "image", "data": "..."},
{"type": "text", "text": "{\"status\":\"completed\"}"},
{"type": "text", "text": '{"status":"completed"}'},
]
}
)
assert extract_adcp_success(resp) == {"status": "completed"}

def test_adcp_error_only_structured_content_returns_none(self) -> None:
# Error response missing the isError flag — spec says treat as None.
resp = _response_shim(
{"structuredContent": {"adcp_error": {"code": "INTERNAL_ERROR"}}}
)
resp = _response_shim({"structuredContent": {"adcp_error": {"code": "INTERNAL_ERROR"}}})
assert extract_adcp_success(resp) is None

def test_adcp_error_only_text_skipped_then_valid_used(self) -> None:
resp = _response_shim(
{
"content": [
{"type": "text", "text": "{\"adcp_error\":{\"code\":\"X\"}}"},
{"type": "text", "text": "{\"status\":\"completed\"}"},
{"type": "text", "text": '{"adcp_error":{"code":"X"}}'},
{"type": "text", "text": '{"status":"completed"}'},
]
}
)
Expand Down Expand Up @@ -206,9 +202,7 @@ def test_structured_content_preferred(self) -> None:
resp = _response_shim(
{
"isError": True,
"content": [
{"type": "text", "text": "{\"adcp_error\":{\"code\":\"OTHER\"}}"}
],
"content": [{"type": "text", "text": '{"adcp_error":{"code":"OTHER"}}'}],
"structuredContent": {
"adcp_error": {"code": "RATE_LIMITED", "recovery": "transient"}
},
Expand Down Expand Up @@ -238,9 +232,7 @@ def test_not_is_error_returns_none(self) -> None:
resp = _response_shim(
{
"isError": False,
"structuredContent": {
"adcp_error": {"code": "WOULD_BE_ERROR"}
},
"structuredContent": {"adcp_error": {"code": "WOULD_BE_ERROR"}},
}
)
assert extract_adcp_error(resp) is None
Expand All @@ -250,9 +242,7 @@ def test_error_path_enforces_pre_parse_size_limit(self) -> None:
# multi-MB text blob must NOT be json.loads'd into memory.
padding = "y" * _MAX_TEXT_SIZE_BYTES
oversized = '{"adcp_error":{"code":"X","pad":"' + padding + '"}}'
resp = _response_shim(
{"isError": True, "content": [{"type": "text", "text": oversized}]}
)
resp = _response_shim({"isError": True, "content": [{"type": "text", "text": oversized}]})
assert extract_adcp_error(resp) is None

def test_invalid_error_passes_through_to_none(self) -> None:
Expand Down
Loading
Loading