Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 47 additions & 19 deletions src/adcp/webhooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ def extract_webhook_result_data(webhook_payload: dict[str, Any]) -> AdcpAsyncRes
client initialization.

Protocol Detection:
- A2A Task: Has "artifacts" field (terminated statuses: completed, failed)
- A2A Task: Has "artifacts" field (terminated statuses: completed, failed, canceled, rejected)
- A2A TaskStatusUpdateEvent: Has nested "status.message" structure (intermediate statuses)
- MCP: Has "result" field directly

Expand Down Expand Up @@ -505,9 +505,10 @@ def create_a2a_webhook_payload(
Create A2A webhook payload (Task or TaskStatusUpdateEvent).

Per A2A specification:
- Terminated statuses (completed, failed): Returns Task with artifacts[].parts[]
- Intermediate statuses (working, input-required, submitted): Returns TaskStatusUpdateEvent
with status.message.parts[]
- Terminated statuses (completed, failed, canceled, rejected): Returns Task
with artifacts[].parts[]
- Intermediate statuses (working, input-required, submitted, auth-required):
Returns TaskStatusUpdateEvent with status.message.parts[]

This function helps agent implementations construct properly formatted A2A webhook
payloads for sending to clients.
Expand Down Expand Up @@ -564,28 +565,47 @@ def create_a2a_webhook_payload(
timestamp_proto = _isoformat_to_proto_timestamp(timestamp_str) if timestamp_str else None

# Map GeneratedTaskStatus to A2A TaskState enum value.
status_value = status.value if hasattr(status, "value") else str(status)
# GeneratedTaskStatus is always an Enum so .value is guaranteed.
status_value = status.value
adcp_to_task_state: dict[str, int] = {
"completed": pb.TaskState.TASK_STATE_COMPLETED,
"failed": pb.TaskState.TASK_STATE_FAILED,
"canceled": pb.TaskState.TASK_STATE_CANCELED,
"rejected": pb.TaskState.TASK_STATE_REJECTED,
"working": pb.TaskState.TASK_STATE_WORKING,
"submitted": pb.TaskState.TASK_STATE_SUBMITTED,
# GeneratedTaskStatus enum values are hyphenated ("input-required",
# "auth-required"). The underscore forms are accepted as a convenience
# for callers passing raw strings rather than enum members.
"input_required": pb.TaskState.TASK_STATE_INPUT_REQUIRED,
# Tolerate the hyphenated form servers may echo back.
"input-required": pb.TaskState.TASK_STATE_INPUT_REQUIRED,
"auth_required": pb.TaskState.TASK_STATE_AUTH_REQUIRED,
"auth-required": pb.TaskState.TASK_STATE_AUTH_REQUIRED,
}
if status_value not in adcp_to_task_state:
# Falling back to TASK_STATE_UNSPECIFIED would normalize to the
# string ``"unspecified"`` on the wire, which is not a valid A2A
# v0.3 ``TaskState`` — buyer receivers validating against the
# spec reject the webhook. Fail loud at the builder boundary
# instead of producing a silently-broken envelope.
task_state_enum = adcp_to_task_state.get(status_value)
if task_state_enum is None:
# Falling back to TASK_STATE_UNSPECIFIED (proto3 zero) would be
# silently omitted by MessageToDict, producing an invalid wire
# shape ``{"status": {}}`` that A2A v0.3 receivers reject as
# missing the required ``state`` field. Fail loud at the builder
# boundary so callers can't ship a broken envelope.
known = [
"submitted",
"working",
"input-required",
"completed",
"canceled",
"failed",
"rejected",
"auth-required",
]
raise ValueError(
f"Unknown AdCP task status {status_value!r}; expected one of "
f"{sorted(set(adcp_to_task_state))}. AdCP→A2A status mapping is "
"closed — an unknown value indicates a caller bug."
f"create_a2a_webhook_payload: unknown status {status_value!r}. "
f"Known AdCP→A2A states: {known}. "
"Note: 'unknown' has no a2a-sdk 1.0 protobuf constant; build a "
"Task manually and pass it through to_wire_dict if you need to "
"emit that state."
)
task_state_enum = adcp_to_task_state[status_value]

# Build parts for the message/artifact.
parts: list[pb.Part] = []
Expand All @@ -600,8 +620,14 @@ def create_a2a_webhook_payload(
ParseDict(result_dict, value)
parts.append(pb.Part(data=value))

# Determine if this is a terminated status (Task) or intermediate (TaskStatusUpdateEvent)
is_terminated = status in [GeneratedTaskStatus.completed, GeneratedTaskStatus.failed]
# Determine if this is a terminated status (Task) or intermediate (TaskStatusUpdateEvent).
# canceled and rejected are terminal: the task will not continue.
is_terminated = status in (
GeneratedTaskStatus.completed,
GeneratedTaskStatus.failed,
GeneratedTaskStatus.canceled,
GeneratedTaskStatus.rejected,
)

if is_terminated:
status_kwargs: dict[str, Any] = {"state": task_state_enum}
Expand Down Expand Up @@ -1111,7 +1137,9 @@ def _normalize_a2a_task_state_to_v03(payload: dict[str, Any]) -> None:
state = status.get("state")
if isinstance(state, str) and state.startswith("TASK_STATE_"):
remainder = state[len("TASK_STATE_") :].lower()
# Spec uses hyphens for multi-word states.
# Spec uses hyphens for multi-word states (e.g. "auth-required").
# Note: TASK_STATE_UNSPECIFIED (0) is the proto3 default and is
# silently omitted by MessageToDict, so it never reaches this branch.
status["state"] = remainder.replace("_", "-")
message = status.get("message")
if isinstance(message, dict):
Expand Down
94 changes: 94 additions & 0 deletions tests/test_a2a_webhook_payload.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
"""Tests for create_a2a_webhook_payload status mapping correctness.

Guards against issue #603: previously, statuses not in the AdCP→A2A map
(canceled, rejected, auth_required) silently fell back to TASK_STATE_UNSPECIFIED
(proto3 zero value), which MessageToDict omits — producing an invalid
``{"status": {}}`` wire shape with no ``state`` field. Unknown statuses now
raise ValueError instead.
"""

from __future__ import annotations

import types

import pytest
from a2a.types import Task, TaskStatusUpdateEvent
from google.protobuf.json_format import MessageToDict

from adcp.types import GeneratedTaskStatus
from adcp.webhooks import create_a2a_webhook_payload


def _wire_state(obj: Task | TaskStatusUpdateEvent) -> str:
"""Serialize to wire dict and return the normalized status.state string."""
d = MessageToDict(obj, preserving_proto_field_name=False)
state = d.get("status", {}).get("state", "")
if isinstance(state, str) and state.startswith("TASK_STATE_"):
state = state[len("TASK_STATE_") :].lower().replace("_", "-")
return state


# --- terminal statuses return Task ---


def test_canceled_returns_task_with_canceled_state() -> None:
payload = create_a2a_webhook_payload(
task_id="t1",
status=GeneratedTaskStatus.canceled,
context_id="ctx1",
result={},
)
assert isinstance(payload, Task)
assert _wire_state(payload) == "canceled"


def test_rejected_returns_task_with_rejected_state() -> None:
payload = create_a2a_webhook_payload(
task_id="t2",
status=GeneratedTaskStatus.rejected,
context_id="ctx2",
result={},
)
assert isinstance(payload, Task)
assert _wire_state(payload) == "rejected"


# --- intermediate statuses return TaskStatusUpdateEvent ---


def test_auth_required_returns_event_with_auth_required_state() -> None:
payload = create_a2a_webhook_payload(
task_id="t3",
status=GeneratedTaskStatus.auth_required,
context_id="ctx3",
result={},
)
assert isinstance(payload, TaskStatusUpdateEvent)
assert _wire_state(payload) == "auth-required"


# --- unknown status raises ValueError ---


def test_unknown_status_value_raises_value_error() -> None:
# Simulate a caller passing an enum-like object whose .value is not in
# the AdCP→A2A map (e.g. a future enum member not yet supported).
fake_status = types.SimpleNamespace(value="bogus_status")
with pytest.raises(ValueError, match="unknown status"):
create_a2a_webhook_payload(
task_id="t4",
status=fake_status, # type: ignore[arg-type]
context_id="ctx4",
result={},
)


def test_unknown_status_error_message_names_known_states() -> None:
fake_status = types.SimpleNamespace(value="bogus_status")
with pytest.raises(ValueError, match="canceled"):
create_a2a_webhook_payload(
task_id="t5",
status=fake_status, # type: ignore[arg-type]
context_id="ctx5",
result={},
)
14 changes: 0 additions & 14 deletions tests/test_webhooks_to_wire_dict.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,17 +127,3 @@ def test_unsupported_type_raises_type_error() -> None:
"""Silent fallthrough would mask integration bugs — fail loud."""
with pytest.raises(TypeError, match="Unsupported webhook payload type"):
to_wire_dict("not a payload") # type: ignore[arg-type]


def test_a2a_unknown_status_raises_value_error() -> None:
"""Unknown AdCP status must fail at the builder, not produce an
invalid-on-the-wire ``"unspecified"`` TaskState that buyer receivers
reject. (Issue #603.)
"""
with pytest.raises(ValueError, match="Unknown AdCP task status"):
create_a2a_webhook_payload(
task_id="task_123",
status="not-a-real-status", # type: ignore[arg-type]
context_id="ctx_456",
result={"media_buy_id": "mb_1"},
)
Loading