Skip to content

a2a-rs v0.4.0

Choose a tag to compare

@github-actions github-actions released this 05 Jun 06:15
· 7 commits to master since this release
00444d9

Added

  • (a2a-rs) Add runnable jsonrpc_client example
  • (a2a-agents) MCP server over Streamable HTTP transport
  • (0.4) Typed error details, task versioning, call interceptors, streaming wiring + doc audit

Changed

  • (a2a-rs) Remove unused synchronous port traits

Documentation

  • Doc-comment audit, add ROADMAP, retire stale planning docs

Fixed

  • Fix json rpc

Feat

  • (a2a-rs) Client Transport port + JSON-RPC 2.0 client + card negotiation

Refactor

  • (a2a-rs) Split streaming & push out of storage adapters (Phase 4 final)

Added

  • impl AsyncStreamingHandler for Arc<dyn AsyncStreamingHandler> — a
    forwarding blanket impl so a type-erased, shared streaming backend can be
    passed wherever an impl AsyncStreamingHandler is expected (e.g.
    TaskService::with_streaming_handler). This lets one streaming instance be
    injected into both a message handler and a transport adapter without naming
    its concrete type, so handler broadcasts and SSE subscribers share a registry.

Breaking Changes — Port capability decomposition

The server-side AsyncTaskManager port trait carried 17 methods spanning four
distinct capabilities. It has been removed and split into focused capability
traits. All consumers are in-workspace; there is no deprecation shim.

Task ports

  • Removed AsyncTaskManager.
  • Added AsyncTaskLifecycle — per-task CRUD: create, get, update_status,
    cancel, exists.
  • Added AsyncTaskQuery — cross-task listing: list.
  • Added AsyncTaskLifecycleExt (blanket-implemented) — validation
    conveniences: get_validated, cancel_validated.
  • Express requirements at the use site (e.g. T: AsyncTaskLifecycle + AsyncTaskQuery);
    there is no umbrella trait.

Method renames (the noun-prefix is redundant once the trait carries it):

Old (AsyncTaskManager) New
create_task(id, ctx) AsyncTaskLifecycle::create
get_task(id, hist) AsyncTaskLifecycle::get
update_task_status(...) AsyncTaskLifecycle::update_status
cancel_task(id) AsyncTaskLifecycle::cancel
task_exists(id) AsyncTaskLifecycle::exists
list_tasks_v3(params) AsyncTaskQuery::list
  • Removed the dead get_task_metadata and legacy list_tasks(context, limit)
    methods (never called).

Push-notification ports

  • The four v1.0.0 push-config methods moved off AsyncTaskManager and were
    reconciled into AsyncNotificationManager, now expressed in terms of the
    richer multi-config model: set_config, get_config, list_configs,
    delete_config.
  • Added AsyncNotificationManagerExt (blanket-implemented): validate_config,
    set_validated.
  • Removed the drifting single-config methods (set_task_notification,
    get_task_notification, remove_task_notification, has_task_notification,
    send_test_notification) and the unused notify_task_status_update /
    notify_task_artifact_update stubs from the async trait. The synchronous
    NotificationManager trait is unchanged.

Strongly-typed identifiers

  • Added TaskId, ContextId, PushConfigId newtypes (domain::ids,
    re-exported from the crate root). Each validates non-emptiness on construction
    (FromStr/TryFrom), making argument-order mix-ups a compile error. They
    appear in the new port signatures; conversion from wire strings happens once at
    the RPC boundary. #[serde(transparent)] deserialization bypasses validation
    by design (validated at the boundary).

Dispatch — ports held as Arc<dyn …> at the composition edge

The composition-edge structs no longer carry viral generic parameters; they hold
their ports as Arc<dyn …> trait objects. Dispatch goes through the vtable —
one indirect call per RPC, negligible on the I/O-bound port boundary — and the
generic noise disappears from every type that holds a processor or handler.

  • DefaultRequestProcessor lost its five generic parameters
    (<M, T, N, A, S>). It is now a plain non-generic struct with
    Arc<dyn AsyncMessageHandler>, Arc<dyn AsyncTaskLifecycle>,
    Arc<dyn AsyncTaskQuery>, Arc<dyn AsyncNotificationManager>,
    Arc<dyn AgentInfoProvider>, and Arc<dyn AsyncStreamingHandler> fields.
    Constructors (new, with_handler, with_streaming_handler) now take
    impl Trait arguments, so call sites are unchanged.
  • DefaultMessageHandler lost its <T> parameter; it holds
    Arc<dyn AsyncTaskLifecycle> and its constructor takes
    impl AsyncTaskLifecycle + 'static.
  • ReimbursementHandler (in a2a-agents) lost its <T> parameter; it holds
    Arc<dyn AsyncTaskLifecycle> + Arc<dyn AsyncStreamingHandler>. The Clone
    bound it forced on storage is gone (cloning an Arc<dyn …> is a refcount bump).

Migration

  • Construction is source-compatible: the de-generic'd constructors accept the
    same arguments via impl Trait, so existing DefaultRequestProcessor::new(…)
    / ReimbursementHandler::new(…) call sites compile unchanged.
  • Code that named the processor's generic parameters
    (DefaultRequestProcessor<M, T, N, A, S>) must drop the type arguments — the
    type is now non-generic.
  • The HTTP client API (HttpClient::get_task, cancel_task, etc.) is
    unaffected — those names belong to the client surface, not the server port.

Added — cross-port TaskStatusBroadcast mixin

The capability-mixin pattern from .claude/rules/hexagonal_architecture.md §9,
applied at the port boundary (application::task_status_broadcast, behind the
server feature):

  • Added accessor ingredients HasTaskLifecycle and HasStreaming — each
    hands out a &dyn port, never a concrete adapter.
  • Added TaskStatusBroadcast, a blanket-implemented mixin giving any host
    that exposes both ingredients an update_and_broadcast ("commit the status
    through the lifecycle port, then announce it through the streaming port")
    method for free. A host exposing only one ingredient does not get the method —
    a compile_fail doc test pins that guarantee.
  • TaskService implements both accessors (see below), so it gains
    update_and_broadcast without coupling its lifecycle and streaming ports.

This is additive (no behavior change to existing call paths). Consuming it in
the request flow — and shedding the storage adapter's internal self-broadcast —
is deferred (REFACTORING_PLAN.md §4.0.2).

Added — application/transport split (REFACTORING_PLAN.md §4.2)

DefaultRequestProcessor previously did two jobs: orchestrating the ports and
serving as the ConnectRPC transport adapter. Those layers are now separated.

  • Added application::TaskService (behind the server feature) — the inner
    application service. It owns the six ports as Arc<dyn …> and holds all
    use-case orchestration (send_message, send_streaming_message, get,
    list, cancel, subscribe, push-config CRUD, extended_agent_card),
    speaking only domain types and A2AError. It hosts the HasTaskLifecycle /
    HasStreaming accessors, so it owns update_and_broadcast.
  • DefaultRequestProcessor is now a thin ConnectRPC transport adapter that
    decodes buffa wire views, delegates to a TaskService, and re-encodes the
    results. Its public constructors (new, with_handler,
    with_streaming_handler) are unchanged, so all call sites compile as before.
    map_* helpers and NoopStreamingHandler remain transport-side.

Changed — storage no longer self-broadcasts (REFACTORING_PLAN.md §4.0.2)

Persistence and streaming are now decoupled in the adapters; "commit then
announce" is owned by the orchestration layer via the TaskStatusBroadcast
mixin.

  • InMemoryTaskStorage / SqlxTaskStorage update_status and cancel are
    now persistence-only — they no longer call broadcast_status_update as a side
    effect. (Both structs still implement AsyncStreamingHandler; that is where
    streaming subscribers live. Shedding that role entirely is a later struct
    split, not done here.)
  • Added TaskStatusBroadcast::cancel_and_broadcast, the cancellation
    counterpart to update_and_broadcast. TaskService::cancel now routes through
    it, so cancellations still reach subscribers.
  • DefaultMessageHandler now hosts the broadcast mixin: it holds a streaming
    port in addition to the lifecycle port and routes every transition in
    process_message through update_and_broadcast. Breaking: its
    constructor takes a streaming port (and a responder — see below); use
    DefaultMessageHandler::echo(lifecycle, streaming) for the previous behavior.
  • ReimbursementHandler (in a2a-agents) implements HasTaskLifecycle /
    HasStreaming and broadcasts at all five transition sites, including the
    background AI worker — its updates and push notifications no longer depend on a
    storage side effect.
  • Behavioral note: an agent that drives update_status/cancel directly on
    storage no longer streams as a side effect. To announce transitions, host the
    TaskStatusBroadcast mixin (hold both ports) or use DefaultMessageHandler.

Breaking — storage/streaming/push struct-split (REFACTORING_PLAN.md §4.3, final)

The storage adapters shed their two non-persistence jobs. InMemoryTaskStorage
and SqlxTaskStorage previously implemented persistence and streaming
fan-out and fired push notifications inside their broadcast helpers. Each of
those is now its own adapter behind its own port, wired at the composition edge.

  • Removed the AsyncStreamingHandler impl (and the internal subscribers
    map) from InMemoryTaskStorage and SqlxTaskStorage. They now implement only
    AsyncTaskLifecycle + AsyncTaskQuery + AsyncNotificationManager
    (persistence and push-config CRUD).
  • Added adapter::streaming::InMemoryStreamingHandler — the in-memory
    subscriber registry and broadcast fan-out, extracted out of the storage
    structs. Re-exported from the crate root.
  • Added the AsyncPushNotifier port (port::notification_manager) — the
    out-of-band webhook delivery capability, separate from config CRUD
    (AsyncNotificationManager) and from streaming. PushNotificationRegistry
    implements it (the PushNotificationSender trait remains the pluggable backend
    seam: HTTP, no-op, custom). Added NoopPushNotifier, and a deref-forwarding
    impl so Arc<dyn AsyncPushNotifier> satisfies impl AsyncPushNotifier.
  • Added InMemoryTaskStorage::push_notifier() / SqlxTaskStorage::push_notifier()
    returning the store's registry as an Arc<dyn AsyncPushNotifier> — so a config
    written via set_config is visible to the notifier at the composition edge.
  • TaskStatusBroadcast gained a third ingredient HasPushNotifier: the
    mixin now fires push delivery (best-effort, logged on failure) alongside the
    streaming broadcast, and gained a broadcast_artifact method. Every host
    (TaskService, ReimbursementMessageHandler, ResponderMessageHandler) now
    also exposes HasPushNotifier.
  • Breaking constructors: TaskService::new/with_handler,
    ResponderMessageHandler::new/echo, and ReimbursementHandler::new/with_llm
    take a separate impl AsyncPushNotifier; ResponderMessageHandler and
    ReimbursementHandler also take the streaming port separately (no longer
    requiring the storage to be the streaming handler). The transport adapters
    (ConnectRpcAdapter, JsonRpcAdapter) default to NoopPushNotifier and gained
    a with_push_notifier builder method.
  • Behavior change — no replay on subscribe: add_status_subscriber /
    add_artifact_subscriber no longer replay the task's current state to a new
    subscriber (the streaming adapter has no task access). This is spec-compliant —
    the initial Task snapshot is delivered by TaskService::subscribe /
    send_streaming_message and emitted by the transport before stream items.

Added — injected Responder on DefaultMessageHandler

DefaultMessageHandler now separates lifecycle/streaming plumbing from the
business decision of what to reply.

  • Added the Responder trait (adapter::business) —
    async fn respond(&self, message, task) -> Result<(Message, TaskState)>. The
    handler does create-if-absent, history append, and broadcasting; the responder
    only decides the reply and the resulting state, getting streaming for free.
  • Added EchoResponder, the reference implementation (echoes the input,
    stays Working).
  • DefaultMessageHandler::new(lifecycle, streaming, responder) takes a
    custom responder; DefaultMessageHandler::echo(lifecycle, streaming) wires
    EchoResponder. Agents needing "ack now, finish later" semantics still
    implement AsyncMessageHandler directly.