Add support for MCP list changed notifications#1789
Conversation
…istChanged in initialize
Widen McpLifecycleBeginEx.capabilities to uint16 and add three new bits
(SERVER_TOOLS_LIST_CHANGED, SERVER_PROMPTS_LIST_CHANGED,
SERVER_RESOURCES_LIST_CHANGED) so the listChanged capability can flow
end-to-end through the lifecycle BEGIN_EX.
Server kind: replace the hardcoded "capabilities" JSON fragment with one
of 8 pre-computed variants selected by the listChanged bits in the
application's lifecycle reply. The reply is now opened immediately on
"initialize" so the JSON-RPC response can be deferred until the app
declares its capabilities.
Client kind: parse the upstream "initialize" result for
capabilities.{tools,prompts,resources}.listChanged and OR the matching
bits into the lifecycle BEGIN_EX. HttpInitializeRequest's decoder is now
decodeJsonRpc rather than decodeIgnore, with a new onResponseComplete
hook that runs after the result body is fully accumulated.
https://claude.ai/code/session_01XW5Ph2usPaFkkewqMmcVtH
…ed bits When McpBindingConfig.cache is non-null, OR in SERVER_TOOLS_LIST_CHANGED, SERVER_PROMPTS_LIST_CHANGED, and SERVER_RESOURCES_LIST_CHANGED into the binding's serverCapabilities. The cache's TTL refresh path is itself a listChanged mechanism, so the proxy can credibly emit list_changed notifications for any kind regardless of whether downstream advertises listChanged. https://claude.ai/code/session_01XW5Ph2usPaFkkewqMmcVtH
McpProxyCacheHydrater.McpHydrateLifecycleStream gains a FlushFW branch in onLifecycleMessage that decodes the McpFlushEx, maps the toolsListChanged / promptsListChanged / resourcesListChanged kind to the corresponding KIND_*_LIST, and calls handler.onListChanged(kind). The new callback chain (McpProxyCacheHandler.onListChanged → McpProxyCacheListener.onListChanged → McpProxyCacheManager.onListChanged) cancels the pending TTL signal and invokes handler.hydrate(kind) — the same code path the TTL callback uses. The existing settle-arms-next-signal cycle re-arms the next TTL from "now" once the refresh completes, so the TTL countdown naturally resets. Per-kind lock semantics unchanged: only the hydrate-lock-winning worker receives notifications and runs the refresh, so there is no contention on the per-kind <kind>.lock from a notification. https://claude.ai/code/session_01XW5Ph2usPaFkkewqMmcVtH
…che refresh
When McpListCache.put receives a value whose CRC32 differs from the
previously cached value, the cache invokes onChanged(kind). The
McpProxyFactory.attach registers a broadcaster lambda on
McpProxyCache.onChanged that iterates the binding's open agent
sessions and calls McpProxySession.doNotifyListChanged(kind, traceId)
on each.
McpLifecycleServer (proxy-side, per-agent lifecycle) implements
doNotifyListChanged by building an McpFlushExFW with the matching
toolsListChanged / promptsListChanged / resourcesListChanged union
case, stamping a per-session monotonic id, and writing it on the
agent's lifecycle reply stream. The downstream MCP server kind then
echoes the id onto its outbound SSE GET event and emits a
notifications/{kind}/list_changed line.
The diff-gate ensures TTL refreshes that produce identical content
don't spam agents; a fresh worker that reads existing store content on
attach seeds its CRC from that value so subsequent puts diff against
the correct baseline.
Adds cache.refresh.tools.notify scenario (agent script only) and
McpProxyCacheIT.shouldNotifyToolsListChangedAfterRefresh to verify
end-to-end: initial hydrate populates with [get_weather], TTL fires,
refresh returns [get_weather, get_time] (differs), fan-out delivers
FlushEx-toolsListChanged with id=0 to the connected agent.
https://claude.ai/code/session_01XW5Ph2usPaFkkewqMmcVtH
…-out Adds cache.notify.tools.list.changed scenario: - Downstream simulator (server.rpt): completes initial tools/list, then emits FlushEx-toolsListChanged on its lifecycle reply, then accepts a second tools/list call with updated content - Agent simulator (client.rpt): opens a lifecycle session and verifies it receives FlushEx-toolsListChanged McpProxyCacheIT.shouldRefreshToolsOnListChangedNotification wires these through Zilla under proxy.cache.yaml (no TTL) with MCP_HYDRATE_FILTER=tools, so the second tools/list must be triggered by the downstream's notification — and the diff-gated fan-out delivers the FlushEx to the connected agent only when content actually differs. The peer-to-peer variant is intentionally omitted: notification-driven refresh requires the proxy mediating between hydrate and agent sessions (distinct session ids), which can't be replayed by a single pair of peer scripts. https://claude.ai/code/session_01XW5Ph2usPaFkkewqMmcVtH
Resolves conflicts in McpProxyLifecycleFactory.java: - Take union of FlushEx flyweight type imports (develop adds aggregate-id rewrite paths for elicitComplete/progress/resumable/list-changed variants on top of the basic McpFlushExFW import). - Drop duplicate mcpFlushExRW builder field created by auto-merge accepting both sides' independently-added declaration. Both sides' work is complementary: branch adds initialize-time listChanged advertisement + cache→list_changed wiring + diff-gated fan-out + ITs; develop (PR #1791) adds multi-route aggregate event IDs. Verified all 182 binding-mcp tests pass. https://claude.ai/code/session_01Gx5yC2CuFd54Fyoy7kL3qg
…nd, changed) Cache callback is now fired on every successful put — not just when the CRC32 differs from the prior value. Signature changes from IntConsumer to a small @FunctionalInterface carrying both the kind and the changed flag, so downstream broadcasters can act on the no-change case (e.g., clear pending state without emitting). Broadcaster lambda in McpProxyFactory.attach gates emission on 'changed' to preserve current behavior. Prepares the surface for the upcoming defer-list-changed-in-cache-mode work; no behavior change in this commit. https://claude.ai/code/session_01Gx5yC2CuFd54Fyoy7kL3qg
…ycle when cache mode Per-route lifecycle clients on agent-facing lifecycle servers (originId != routedId, the multi-route or single-route exit pattern) no longer forward toolsListChanged / promptsListChanged / resourcesListChanged FlushEx to the agent when the binding has cache enabled. The cache settle path (McpListCache.put → onSettled → broadcaster in McpProxyFactory.attach) is now the sole emission path for these notifications in cache mode. Eliminates the duplicate notification that previously fired when an agent had invoked a per-route operation before an upstream list_changed event: once via the per-route forward, then again via the cache broadcaster after the refresh settled. With deferral the agent sees exactly one notification, and crucially it arrives after the cache holds the new value — so any immediate re-fetch by the agent sees fresh content. Hydrater lifecycle servers (originId == routedId, self-loop) are unchanged: they continue to forward via doServerFlush so that McpHydrateLifecycleStream.onLifecycleFlush observes the upstream notification and triggers cache.hydrate(kind). https://claude.ai/code/session_01Gx5yC2CuFd54Fyoy7kL3qg
…dler.watch McpProxyCacheManager.start() registers one watch per kind on the kind's store value key. When any worker (local or remote) updates the key, the watch listener fires; the manager re-reads the value via the cache's existing get path, which routes through McpListCache.checkGet where the CRC32 of the returned value is compared against the worker-local lastChecksum. McpListCache.checkGet now fires onSettled(kind, changed) on every read — mirroring the put path. Initial loads and watch fires of identical content report changed=false (broadcaster matrix no-ops); watch fires after a remote worker's content-changing put report changed=true on this worker (broadcaster matrix emits to local sessions). stop() unsubscribes via the Closeable handle returned at registration; the listener's get-on-fire is harmless if it races a concurrent stop because checkGet honours stopped=true upstream and onSettled is already gated on cache state. The IOException from Closeable.close is swallowed — unsubscribe is best-effort, parallels the engine signaler-cancel pattern. https://claude.ai/code/session_01Gx5yC2CuFd54Fyoy7kL3qg
New k3po scenario cache.notify.tools.list.changed.after.tools.call
exercises the duplicate-fix: the agent's per-route lifecycle client
(opened by the agent invoking tools/call) receives an upstream
toolsListChanged FlushEx with id="200", which McpLifecycleClient.
onClientFlush now defers (no doServerFlush forwarding) because the
binding has cache enabled and the lifecycle server is agent-facing.
The hydrater's per-route lifecycle client receives its own copy of the
notification with id="100", which is forwarded to the hydrater's
self-loop lifecycle server, triggering McpProxyCacheHydrater
.onLifecycleFlush -> McpProxyCacheManager.onListChanged ->
handler.hydrate -> cache.put. The put detects a CRC32 diff against
the initial tools list, fires onSettled(kind, true), and the
broadcaster in McpProxyFactory.attach emits a single
toolsListChanged FlushEx with synthetic id="0" to the agent.
Without the defer-in-cache-mode fix the agent would have read
id="200" first (the forwarded upstream id), failing the id("0")
match. With the fix the cache-broadcast wins and the agent sees
exactly one notification at id="0".
https://claude.ai/code/session_01Gx5yC2CuFd54Fyoy7kL3qg
McpProxyCacheManager previously released the cache lifecycle lock the moment initial hydrate completed (in onCacheReady), opening a window where a different worker's reconnect retry would acquire the lock and also open its own lifecycle stream to upstream. The result was multiple workers redundantly subscribing to the upstream SSE for the same binding — wasted resources, and harder to reason about which worker drives TTL refreshes. Move releaseLifecycle out of onCacheReady. The lifecycle lock is acquired once at binding attach (or after lifecycle abort + reconnect) and held until detach / engine shutdown. Loser workers keep retrying the acquire; their attempts only succeed if the holder dies and the lock TTL expires (or releases explicitly on detach). This matches MCP semantics: one worker per binding owns the upstream SSE; others serve their agents from the shared cache populated via the store-watch propagation path. Migrate the per-kind and lifecycle locks in McpProxyCache from putIfAbsent/delete to lock/unlock from PR #1790. Ownership-checked unlock means a worker that never acquired the lock cannot accidentally release another worker's lock (which the old unauthenticated delete allowed). Token state is held on McpListCache and McpProxyCache; null token short-circuits release as a no-op. Engine TestStoreHandler updated to share watchers and locks per storeConfig.id (mirroring how entries are already shared via TestStoreContext.supplyEntries). The watcher record carries the registering worker's signaler so cross-worker notify dispatches listener invocations onto the registering worker's I/O thread, matching the contract documented on StoreHandler. Without these fixes cross-worker watch propagation either didn't fire at all or fired on the wrong thread. https://claude.ai/code/session_01Gx5yC2CuFd54Fyoy7kL3qg
Adds StoreHandler.renew(key, token, ttl, completion) to the engine SPI, following the same ownership-checked, async-completion contract as unlock. Callers that hold a coordination lock for longer than its initial TTL — e.g. a singleton worker that owns the cache lifecycle for the lifetime of a binding — schedule renewals at an interval shorter than the lease TTL. A failed renewal signals that ownership has been lost (the lock was reacquired by another holder after a TTL expiry), giving callers a deterministic cue to surrender state and let the new owner take over. store-memory and the engine TestStoreHandler implement renew with an atomic ConcurrentMap.replace against the previously-observed LockEntry: if the token matches the unexpired current holder, the entry is replaced with a renewed expiresAt and the original token is returned; otherwise null is returned. Expired entries are evicted opportunistically, mirroring the unlock cleanup behaviour. TestBindingFactory gains a renew assertion alongside the existing lock/unlock/watch ops. Lock now stashes the acquired token in a heldLockTokens map keyed by lock key, so subsequent renew (and unlock) assertions can target the same key without the YAML needing to surface the token. Explicit value: in the assertion still wins when set. store-memory ships an IT covering renew of an owned lock (success) and renew with a non-matching token (null). McpProxyCache exposes renewLifecycle for the cache manager. McpProxyCacheManager schedules a renewal at leaseTtl / 3 once the hydrater opens, re-scheduling on each successful renewal so the cache owner holds its lifecycle lock uninterrupted while the node runs. On a failed renewal the manager treats it as a lifecycle loss: it stops the current handler, falls into the existing reconnect path, and the next race winner — possibly this worker, possibly another — takes over. Combined with TTL-bounded recovery this gives uninterrupted ownership during normal operation and timely takeover when a holding node crashes. https://claude.ai/code/session_01Gx5yC2CuFd54Fyoy7kL3qg
…n notifications id-less
Initialize response now unconditionally advertises listChanged:true for prompts,
resources, and tools. MCP spec treats the bit as advisory (SHOULD), and the
zilla server delivers list_changed notifications in both cache-enabled mode
(via cache settle path) and passthrough mode (forwarded from upstream). The
8-variant byte-array selector and the pendingInitialize back-reference from
McpLifecycleStream to McpServer are removed; the initialize JSON is now
encoded synchronously on receipt of the JSON-RPC initialize request.
Cache-driven list_changed FlushEx no longer carries a synthetic monotonic id.
The id field is left as the absent sentinel (string16 length=-1), and the SSE
encoder skips the entire `id:` line when the supplied String16FW length is -1.
This preserves the agent's previously-anchored upstream Last-Event-ID across
cache-driven notifications, instead of overwriting it with a proxy-local
counter that no upstream could resume from.
The McpProxyCacheManager unconditional OR-in of SERVER_*_LIST_CHANGED bits on
the agent-facing BEGIN_EX is removed - the BEGIN_EX bits now reflect upstream's
actual capabilities. Tests that asserted the cache-enabled proxy adds those
bits are deleted alongside the now-collapsed 3-of-4 lifecycle.initialize.*.
list.changed scenarios; one shouldInitializeLifecycleAllListChanged scenario
remains in each of network/ and application/ to assert the unconditional JSON
shape.
McpFunctions adds null-tolerant matcher semantics for the three listChanged
FlushEx variants: .id("X") matches exact value, omitting .id(...) matches any
id (wildcard, unchanged), and .id(null) matches when the id field is absent
(length=-1) - distinct from wildcard.
…nged-support-epMnQ
…ed scenario Since the initialize advertise is now unconditional (every initialize.* scenario already asserts the listChanged JSON output regardless of the input BEGIN_EX capability bits), the .all.list.changed scenario adds no regression coverage that the base lifecycle.initialize doesn't already provide. Any future change that re-couples input bits to output JSON would break the base scenario first. Removes the scenario directory and its IT methods in network/ ApplicationIT, runtime McpClientIT, and runtime McpServerIT.
| prior -> completion.accept(prior == null)); | ||
| store.lock(STORE_LOCK_KEY_LIFECYCLE, leaseTtl, (k, t) -> | ||
| { | ||
| lifecycleLockToken = t; |
There was a problem hiding this comment.
| lifecycleLockToken = t; | |
| lockToken = t; |
| @@ -131,14 +142,47 @@ public void register( | |||
| void acquireLifecycle( | |||
There was a problem hiding this comment.
| void acquireLifecycle( | |
| void acquireLock( |
| }); | ||
| } | ||
|
|
||
| void releaseLifecycle( |
There was a problem hiding this comment.
| void releaseLifecycle( | |
| void releaseLock( |
| lastChecksum = newChecksum; | ||
| store.put(storeKey, value, STORE_TTL_FOREVER, completion.andThen(this::checkPut).andThen(k -> | ||
| { | ||
| if (onSettled != null) |
There was a problem hiding this comment.
Let's make onSettled null case a no-op instead, so we can avoid this null check and use andThen directly.
| void hydrate( | ||
| int kind); | ||
|
|
||
| void onListChanged( |
There was a problem hiding this comment.
| void onListChanged( | |
| void onChanged( |
| if (onSettled != null) | ||
| { | ||
| onSettled.accept(kind, changed); | ||
| } |
There was a problem hiding this comment.
Same here, no-op instead of null for onSettled, eliminate null check.
| final OctetsFW extension = flush.extension(); | ||
| if (extension == null || extension.sizeof() == 0) | ||
| { | ||
| return; | ||
| } | ||
| final McpFlushExFW flushEx = mcpFlushExRO.tryWrap(extension.buffer(), extension.offset(), extension.limit()); | ||
| if (flushEx == null) | ||
| { | ||
| return; | ||
| } | ||
| final int listKind; | ||
| switch (flushEx.kind()) | ||
| { | ||
| case KIND_TOOLS_LIST_CHANGED: | ||
| listKind = KIND_TOOLS_LIST; | ||
| break; | ||
| case KIND_PROMPTS_LIST_CHANGED: | ||
| listKind = KIND_PROMPTS_LIST; | ||
| break; | ||
| case KIND_RESOURCES_LIST_CHANGED: | ||
| listKind = KIND_RESOURCES_LIST; | ||
| break; | ||
| default: | ||
| listKind = -1; | ||
| break; | ||
| } | ||
| if (listKind != -1) | ||
| { | ||
| handler.onListChanged(listKind); | ||
| } |
There was a problem hiding this comment.
Assume flushEx is valid.
| final OctetsFW extension = flush.extension(); | |
| if (extension == null || extension.sizeof() == 0) | |
| { | |
| return; | |
| } | |
| final McpFlushExFW flushEx = mcpFlushExRO.tryWrap(extension.buffer(), extension.offset(), extension.limit()); | |
| if (flushEx == null) | |
| { | |
| return; | |
| } | |
| final int listKind; | |
| switch (flushEx.kind()) | |
| { | |
| case KIND_TOOLS_LIST_CHANGED: | |
| listKind = KIND_TOOLS_LIST; | |
| break; | |
| case KIND_PROMPTS_LIST_CHANGED: | |
| listKind = KIND_PROMPTS_LIST; | |
| break; | |
| case KIND_RESOURCES_LIST_CHANGED: | |
| listKind = KIND_RESOURCES_LIST; | |
| break; | |
| default: | |
| listKind = -1; | |
| break; | |
| } | |
| if (listKind != -1) | |
| { | |
| handler.onListChanged(listKind); | |
| } | |
| final OctetsFW extension = flush.extension(); | |
| final McpFlushExFW flushEx = mcpFlushExRO.wrap(extension.buffer(), extension.offset(), extension.limit()); | |
| switch (flushEx.kind()) | |
| { | |
| case KIND_TOOLS_LIST_CHANGED: | |
| handler.onListChanged(KIND_TOOLS_LIST); | |
| break; | |
| case KIND_PROMPTS_LIST_CHANGED: | |
| handler.onListChanged(KIND_PROMPTS_LIST); | |
| break; | |
| case KIND_RESOURCES_LIST_CHANGED: | |
| handler.onListChanged(KIND_RESOURCES_LIST); | |
| break; | |
| default: | |
| break; | |
| } |
| final McpFlushExFW flushEx; | ||
| switch (kind) | ||
| { | ||
| case KIND_TOOLS_LIST: | ||
| flushEx = mcpFlushExRW | ||
| .wrap(codecBuffer, 0, codecBuffer.capacity()) | ||
| .typeId(mcpTypeId) | ||
| .toolsListChanged(b -> {}) | ||
| .build(); | ||
| break; | ||
| case KIND_PROMPTS_LIST: | ||
| flushEx = mcpFlushExRW | ||
| .wrap(codecBuffer, 0, codecBuffer.capacity()) | ||
| .typeId(mcpTypeId) | ||
| .promptsListChanged(b -> {}) | ||
| .build(); | ||
| break; | ||
| case KIND_RESOURCES_LIST: | ||
| flushEx = mcpFlushExRW | ||
| .wrap(codecBuffer, 0, codecBuffer.capacity()) | ||
| .typeId(mcpTypeId) | ||
| .resourcesListChanged(b -> {}) | ||
| .build(); | ||
| break; | ||
| default: | ||
| flushEx = null; | ||
| break; | ||
| } |
There was a problem hiding this comment.
Use a switch expression instead of a switch statement.
| String id) | ||
| { | ||
| this.id = new String16FW(id); | ||
| this.id = new String16FW(id, StandardCharsets.UTF_8); |
There was a problem hiding this comment.
Use static import for UTF_8.
| SERVER_TOOLS_LIST_CHANGED(64), | ||
| SERVER_PROMPTS_LIST_CHANGED(128), | ||
| SERVER_RESOURCES_LIST_CHANGED(256) |
There was a problem hiding this comment.
Move the SERVER_ capabilities together.
…aming - Rename cache lock methods to drop redundant Lifecycle suffix: acquireLifecycle → acquireLock, releaseLifecycle → releaseLock, renewLifecycle → renewLock; lifecycleLockToken field → lockToken. - Rename onListChanged → onChanged on McpProxyCacheHandler and McpProxyCacheListener; rename OnSettled SAM type to ListChangedListener and initialize the field to a no-op so callers can chain via andThen without null checks at every fire site. - McpProxyCacheManager: define a NO_OP BiConsumer constant for the watch-fired get callback; use Agrona CloseHelper.quietClose for best-effort watch unsubscribe (drops the IOException import and the inline try/catch). - McpProxyCacheHydrater.onLifecycleFlush: assume the extension wraps a valid McpFlushExFW (consistent with peer call sites); drop the null/size guards and inline the listKind temporary into a switch statement that calls handler.onChanged directly. - McpProxyLifecycleFactory.doNotifyListChanged: switch expression instead of switch statement. - McpListCache: move the boolean populated field below the private fields with a blank line separator. - mcp.idl: group SERVER_ capabilities (TOOLS, PROMPTS, RESOURCES, and their LIST_CHANGED variants) together. - McpFunctions: use static import for UTF_8.
| this.refreshCancelId = NO_CANCEL_ID; | ||
| this.reconnectCancelId = NO_CANCEL_ID; | ||
| this.renewCancelId = NO_CANCEL_ID; |
There was a problem hiding this comment.
| this.refreshCancelId = NO_CANCEL_ID; | |
| this.reconnectCancelId = NO_CANCEL_ID; | |
| this.renewCancelId = NO_CANCEL_ID; | |
| this.refreshId = NO_CANCEL_ID; | |
| this.reconnectId = NO_CANCEL_ID; | |
| this.renewId = NO_CANCEL_ID; |
| // renew at one third of the lease TTL so two consecutive renew failures still leave | ||
| // headroom before the lock would expire and let another worker take over | ||
| renewCancelId = signaler.signalAt( | ||
| Instant.now().plusMillis(cache.leaseTtl.toMillis() / 3L), 0, this::onLifecycleRenew); |
There was a problem hiding this comment.
Move calculation to cache.renewTtl with duration derived from cache.leastTtl, then use direclty here via cache.renewTtl.toMillis().
Also remove comment above.
| SERVER_TOOLS(1), | ||
| SERVER_PROMPTS(2), | ||
| SERVER_RESOURCES(4), | ||
| SERVER_TOOLS_LIST_CHANGED(64), | ||
| SERVER_PROMPTS_LIST_CHANGED(128), | ||
| SERVER_RESOURCES_LIST_CHANGED(256), | ||
| CLIENT_ROOTS(8), | ||
| CLIENT_SAMPLING(16), | ||
| CLIENT_ELICITATION(32) |
There was a problem hiding this comment.
Values for bitmask should be ordered, but grouping server_ together before client_ capabilities.
…, renumber capability bits - McpProxyCache: add renewTtl as leaseTtl.dividedBy(3) so the renew schedule is derived once at construction; consumers use cache.renewTtl.toMillis() directly without inline arithmetic. - McpProxyCacheManager: rename refreshCancelId / reconnectCancelId / renewCancelId fields to refreshId / reconnectId / renewId (drops the redundant Cancel infix); drop the inline comment over the renew schedule now that the duration is named. - mcp.idl: renumber the McpCapabilities bitmask so values are ascending within and across groups — SERVER_* (1..32) before CLIENT_* (64..256). CLIENT_ROOTS / CLIENT_SAMPLING / CLIENT_ELICITATION shift from 8/16/32 to 64/128/256; SERVER_*_LIST_CHANGED shift from 64/128/256 to 8/16/32. No literal bit values are referenced from scripts or Java; all consumers go through the generated enum constants so the renumber is source-compatible.
…ushEx structs
McpToolsListChangedFlushEx, McpPromptsListChangedFlushEx, and
McpResourcesListChangedFlushEx each carried a non-nullable string16 id.
The generated builder asserts that all non-default fields are set, so a
caller that wanted to emit an id-less notification (the cache-driven
emit path in McpProxyLifecycleFactory.doNotifyListChanged after the
recent refactor) hit an AssertionError at flyweight build time when
invoked through .toolsListChanged(b -> {}).
The AssertionError propagated out of an EngineSignaler deferred lambda
inside the engine worker loop, terminated the worker via
AgentTerminationException, and left ring buffers and timer-wheel state
inconsistent. Teardown of the test engine then tried to drain those
buffers and crashed with SIGSEGV in C2-compiled UnsafeApi.getLongVolatile
— the apparent JVM crash was the symptom, not the root cause.
Adds = null as the IDL default for all three list_changed id fields,
mirroring how McpResumableFlushEx already declares its nullable id.
The builder now skips the assertion when id is not set, and the
flyweight serializes length() == -1 on the wire — which is exactly
what the SSE encoder relies on to skip the id: line for cache-driven
notifications.
Full binding-mcp IT suite now passes locally end-to-end
(McpAggregateEventIdTest, McpClientIT, McpProxyCacheIT, McpServerIT,
McpProxyIT, McpProxyLifecycleIT).
Description
End-to-end support for MCP
notifications/{tools,prompts,resources}/list_changedacross all three binding kinds (
mcp · server,mcp · client,mcp · proxy),with cache-coordinated propagation in the cache-enabled proxy and
resumability across multi-route deployments.
Behavior delivered
Initialize advertise.
mcp · serverunconditionally advertisescapabilities.{tools,prompts,resources}.listChanged: truein the JSON-RPCinitialize response. The MCP spec treats the flag as advisory (SHOULD), and
Zilla in fact delivers notifications in every deployment shape — passthrough
proxies forward upstream
list_changed; cache-enabled proxies generatenotifications from their refresh path. Encoding the bits unconditionally
collapses an 8-variant byte-array selector to one constant and removes a
back-reference from
McpLifecycleStreamtoMcpServerthat previously hadthe initialize encode wait on the lifecycle reply.
Passthrough proxy. Upstream
list_changedFlushEx flows through to theagent unchanged. Event IDs survive across multi-route deployments via the
aggregate-event-ID scheme introduced in PR #1791.
Cache-enabled proxy — single-fire, fresh-content-only. When an upstream
emits
list_changed, the per-route lifecycle client (one-per-active-agent-route) does not forward it to the agent. Instead the proxy:
list_changedto each connected agentsession, but only if the refreshed content differs from the previous
cached value (CRC32 diff-gate). TTL-driven refreshes that produce
identical content are silent.
This guarantees: (a) the agent never sees
list_changedbefore the cache isfresh; (b) the agent never sees a notification for content that did not
change; (c) exactly one notification per upstream signal that produces real
content change, never two.
Cache-driven notifications carry no event id. The SSE encoder skips the
entire
id:line when the FlushEx id has the absent sentinel(
length() == -1). Per the SSE spec, an emptyid:line resets theclient's
lastEventId— only the absence of the field preserves it. Thismeans the agent's prior upstream-anchored
Last-Event-IDsurvives acache-driven notification, so reconnect can still resume from upstream.
Cross-worker fan-out. The cache-enabled proxy elects a single
"lifecycle owner" worker per binding via
StoreHandler.lock(introduced inPR #1790) and holds the lease for the lifetime of the binding via
StoreHandler.renew(PR #1792). The owner subscribes to upstream and writesrefreshed content into the shared store; non-owner workers subscribe to
StoreHandler.watch(PR #1790) on the same keys, so a settle on one workerfires the change-detection + per-session broadcaster on every worker — every
agent gets notified regardless of which worker holds the upstream stream.
Multi-route resumability. When the proxy has multiple routes (e.g. one
upstream per toolkit), inbound FlushEx event IDs from each per-route
lifecycle client are recorded keyed by
routedId. Outbound FlushEx idssent to the agent are minted as an aggregate using PR #1791's
shortest-unique-prefix scheme over CRC32C(toolkit). On reconnect the agent
sends
Last-Event-ID: <prefix>=<id>;<prefix>=<id>, the proxy decodes it,and each per-route client issues its own
McpResumeChallengeExagainstthat route's upstream id. Single-route proxies and the hydrater's
self-loop bypass aggregation as a transparent relay.
Engine dependencies (already merged to
develop)StoreHandler.lock/unlock/watch— coordinationprimitives consumed by this PR.
per-route resumability; this PR consumes it on the agent-facing side.
StoreHandler.renew— lets the lifecycle-owner worker holdits lock across the binding's lifetime with periodic lease renewals.
Key implementation files
Binding side (
runtime/binding-mcp/.../stream/):McpServerFactory— unconditionalINITIALIZE_RESPONSE_CAPABILITIESconstant; SSE encoder
encodeSseNotifyEventskips theid:line whenlength is -1.
McpClientFactory— parses upstreamcapabilities.{...}.listChangedand surfaces them on lifecycle BEGIN_EX; decodes inbound SSE
notifications/.../list_changedinto FlushEx.McpProxyLifecycleFactory— defers upstream list_changed inMcpLifecycleClient.onClientFlushwhen cache is enabled; mints aggregateevent IDs on outbound FlushEx; decodes inbound
McpResumeChallengeExand dispatches per-route resumes;
doNotifyListChangedemits a FlushExwith the id field left as the absent sentinel.
cache/McpProxyCache— CRC32 diff-detect onMcpListCache.put; per-kindlock lifecycle via
StoreHandler.lock/unlock;renewLifecycle()exposesthe lifecycle lock to the manager for periodic renewal.
cache/McpProxyCacheManager— lifecycle lock held for binding lifetime;scheduled
renewatleaseTtl/3; settle dispatch matrix(pending, changed)→ emit-or-drain.cache/McpProxyCacheHydrater— intercepts upstream list_changed anddrives
handler.onListChanged(kind)→ cache refresh.Config (
runtime/binding-mcp/.../config/):McpBindingConfig— toolkit prefix map for aggregate-id encode/decode;validation that multi-route deployments specify
with.toolkitper route.Spec (
specs/binding-mcp.spec/.../):McpFunctionsmatcher gains null-tolerant semantics on the threelist_changed FlushEx variants: omitting
.id(...)matches any id(wildcard, unchanged);
.id("X")matches the exact value;.id(null)asserts the id field is absent (length=-1) — distinct from wildcard.
McpLifecycleBeginEx.capabilitieswidened touint16; newcapability bits
SERVER_TOOLS_LIST_CHANGED(64),SERVER_PROMPTS_LIST_CHANGED(128),SERVER_RESOURCES_LIST_CHANGED(256).
Test coverage
Initialize advertise (no dedicated scenario needed):
The unconditional advertise is exercised implicitly by every existing
lifecycle.initialize*scenario — each now asserts the new"prompts":{"listChanged":true},"resources":{"listChanged":true}, "tools":{"listChanged":true}JSON in its expected response. A regressionthat re-couples output to input bits would break those tests first.
Passthrough list_changed:
lifecycle.notify.{tools,prompts,resources}.list.changed— single-routepassthrough, upstream id flows through.
lifecycle.notify.tools.list.changed.toolkit.multiand.toolkit.multi.prefixed— multi-route, aggregate event ID minting.lifecycle.events.resume.{aggregate,partial,partial.prefixed, aggregate.prefixed}— agent reconnect with aggregateLast-Event-ID;per-route resume challenge dispatch.
Cache-driven list_changed:
cache.notify.tools.list.changed— upstreamlist_changed→cache refresh → agent sees one notification with id-absent
(asserted via
.id(null)).cache.notify.tools.list.changed.after.tools.call— agent first invokestools/call(forces per-route lifecycle client to exist, exercising thedefer-not-suppress path), then receives the cache-driven notification.
cache.refresh.tools.notify— TTL-driven refresh with new contenttriggers fan-out; id-absent on the wire.
ITs:
McpProxyCacheIT—shouldRefreshToolsOnListChangedNotification,shouldEmitOneListChangedAfterAgentInvokesToolsCall, plus thepre-existing cache contention and TTL refresh ITs continue passing.
https://claude.ai/code/session_01Gx5yC2CuFd54Fyoy7kL3qg