feat(publisher): structured StorageACK declines instead of stream resets#559
feat(publisher): structured StorageACK declines instead of stream resets#559branarakic wants to merge 5 commits into
Conversation
| ]); | ||
|
|
||
| if (collected.length < REQUIRED_ACKS) { | ||
| let detail = ''; |
There was a problem hiding this comment.
🔴 Bug: This only adds decline detail after every peer request has settled. In the mixed case this PR is trying to improve (for example 2 peers decline and 1 remaining peer hangs), collect() still waits the full ACK_TIMEOUT_MS and returns storage_ack_timeout, so the new per-peer decline reasons never surface. Track how many peers are still capable of producing an ACK and fail fast with storage_ack_insufficient once quorum becomes impossible.
| * Encode a structured decline response. Used in place of `throw` for | ||
| * the subset of failures that represent "I as a core legitimately | ||
| * cannot ACK this request right now" — most importantly the | ||
| * `<contextGraphsServed>` mismatch that GitHub issue #541 stalls on, |
There was a problem hiding this comment.
🟡 Issue: this docstring now says <contextGraphsServed> mismatches are handled via typed declines, but the implementation below never checks whether the peer actually serves the requested context graph and never emits NOT_HOSTED. As written, an unserved graph still collapses into NO_DATA_IN_SWM, which loses the distinction this PR is trying to add. Either wire the host-check in this PR or defer documenting/exporting that decline code until it can be produced.
| expect(new Uint8Array(decoded.coreNodeSignatureR).length).toBe(0); | ||
| }); | ||
|
|
||
| it('a new decoder reading bytes from an old encoder still yields a valid ACK (forward compat)', () => { |
There was a problem hiding this comment.
🟡 Issue: this test title claims forward compatibility with an old encoder, but it still generates the wire bytes with the current schema and simply leaves the new fields unset. That will pass even if a future protobufjs/schema change breaks decoding of historical payloads. Consider pinning a literal pre-change byte sequence (or generating one with the old schema) so the compatibility guarantee is actually exercised.
| return this.encodeDecline( | ||
| cgId, | ||
| STORAGE_ACK_DECLINE_CODES.NO_DATA_IN_SWM, | ||
| `No data found in SWM graph ${swmGraphUri} for entities: ${intent.rootEntities.join(', ')}`, |
There was a problem hiding this comment.
🟡 Issue: this decline message includes every requested root entity. On large publishFromSharedMemory batches that can make the /storage-ack reply itself very large, which risks turning the new in-band decline back into a transport-level failure and bringing back the retry/timeout behavior this PR is trying to avoid. Please cap/summarize the entity list here (for example count + first few IDs).
|
|
||
| if (isStorageACKDecline(ack)) { | ||
| const code = ack.declineCode ?? 'UNKNOWN'; | ||
| const declineMessage = ack.declineMessage ?? ''; |
There was a problem hiding this comment.
🟡 Issue: declineMessage is new peer-controlled protocol input, but we keep it verbatim and later splice it into logs and thrown errors. A buggy or malicious peer can inject newlines/control characters or force very large exception strings here. Please sanitize and truncate the message before storing/logging it.
| const response = await this.deps.sendP2P(peerId, PROTOCOL_STORAGE_ACK, intentBytes); | ||
| const ack: StorageACKMsg = decodeStorageACK(response); | ||
|
|
||
| if (isStorageACKDecline(ack)) { |
There was a problem hiding this comment.
🔴 Bug: This treats every typed decline as permanent, but the new SWM-state codes (NO_DATA_IN_SWM / MERKLE_MISMATCH_IN_SWM) can be transient while gossip replication catches up. Combined with the fast-fail path, a peer that would ACK on the next retry now permanently reduces the quorum pool, so publishes can fail even though enough cores become ready seconds later. Keep retries for replication-state declines, or split decline codes into retryable vs permanent.
| contextGraphIdBigInt = BigInt(cgId); | ||
| } catch { | ||
| throw new Error( | ||
| return this.encodeDecline( |
There was a problem hiding this comment.
🔴 Bug: CG_ID_INVALID is a malformed publish request, not a peer-local refusal. Returning it as a normal decline makes the publisher fan out to every core and eventually report storage_ack_insufficient, which masks the actual caller error and can still wait on unrelated peers. Either keep throwing here, or teach the collector to treat this code as an immediate fatal publish error.
Today, when a core node legitimately can't ACK a publish (it doesn't host the CG, its SWM is missing or stale, or its operational signer was just rotated), the StorageACK handler `throw`s. The libp2p stream is reset and the publisher only sees a generic IO error — so the ACK collector retries the same peer 3× with exponential backoff before giving up, and the publisher's final error is opaque (`stream reset`) with no per-peer reason. This is the surface shape of GitHub issue #541, where one core failed to mirror a `replicationPolicy=full` CG and the publisher kept dialling it. Add a typed-decline path: * Extend the StorageACK protobuf with two optional fields — `declineCode: string` (field 6) and `declineMessage: string` (field 7). The change is strictly additive: old encoders never populate them, old decoders silently ignore them, so cross-version traffic is byte-identical to today. New decoders inspect `declineCode` first; non-empty means decline (signature/merkleRoot are unset). The decline codes — `NOT_HOSTED`, `NO_DATA_IN_SWM`, `MERKLE_MISMATCH_IN_SWM`, `SIGNER_NOT_REGISTERED`, `CG_ID_INVALID` — live in `STORAGE_ACK_DECLINE_CODES` and are part of the wire contract. * Convert four graceful-throw paths in `packages/publisher/src/storage-ack-handler.ts` to decline returns: no-data-in-SWM, merkle mismatch *from SWM* (the inline-staging mismatch stays a throw — that's a publisher bug, not a network state mismatch), non-numeric / non-positive cgId, and the `isSignerRegistered=false` path. True protocol violations (oversize / unparseable staging, kaCount mismatch, leaf-count mismatch, edge-node-tried-to-ACK, identity > 2^64) keep `throw` so the connection still resets on actually-bad input. * Update `packages/publisher/src/ack-collector.ts` to recognise declines: per-peer log, no retry against a declining peer (decline is permanent for the request), and a per-peer decline summary appended to the final `storage_ack_insufficient` error so operators see *why* each core declined from a single log line. Tests: * `packages/core/test/v10-proto.test.ts` — three new round-trip cases (decline-only message, old-shape forward compat, old-encoder bytes decode without spurious decline fields). * `packages/publisher/test/v10-ack-edge-cases.test.ts` — four new decline-shape assertions on the handler (NO_DATA_IN_SWM, MERKLE_MISMATCH_IN_SWM, two CG_ID_INVALID variants, SIGNER_NOT_REGISTERED), one assertion that inline-staging mismatch *still* throws, and four collector-side cases covering quorum-with- declines, decline reasons surfaced in the final error, unknown decline codes (forward compat), and empty-message declines. * `packages/publisher/test/storage-ack-handler.test.ts` and `v10-protocol-operations.test.ts` — three pre-existing tests updated to assert the new decline-vs-throw boundary. Co-authored-by: Cursor <cursoragent@cursor.com>
ack-collector now fast-fails with `storage_ack_insufficient` (and the full per-peer decline detail) the moment the still-pending peer pool can no longer satisfy `REQUIRED_ACKS`, instead of waiting out the full ACK_TIMEOUT_MS for a hung peer that — by that point — couldn't change the outcome. The check is conservative: it only fires once `collected.length + still_pending < REQUIRED_ACKS`, so a publish that could still reach quorum is never aborted early. Every requestACK settles via a `try/finally`, so transport-error retries also contribute to the impossibility detection (covered by the existing "transport errors exhaust enough peers" test, which previously expected a slow `storage_ack_timeout`). Also factor the decline-formatting into `formatDeclineDetail()` so the timeout / fast-fail / final-throw paths all surface the same diagnostic. Drop the stale `PR#557` reference in the comment block. Co-authored-by: Cursor <cursoragent@cursor.com>
49757d9 to
8f6a83e
Compare
|
Rebased onto current main ( Local verification:
The four commits on the branch (feat + Codex fixup + 2 follow-up tightenings) were preserved as-is. Happy to squash to a single commit on request — the PR uses a merge-commit strategy on main, so I left history intact. Rebased HEAD: |
| throw new Error( | ||
| return this.encodeDecline( | ||
| cgId, | ||
| STORAGE_ACK_DECLINE_CODES.CG_ID_INVALID, |
There was a problem hiding this comment.
🔴 Bug: CG_ID_INVALID is a malformed publish request, not a peer-specific decline. Returning it as a typed decline makes the publisher report storage_ack_insufficient after querying multiple peers instead of surfacing the real contract error immediately, which hides the root cause and changes the failure mode for callers. Keep invalid/non-positive contextGraphId on the stream-error path (throw) and reserve typed declines for cases where a well-formed request cannot be ACKed because of this peer's local state.
Two last-round Codex bugs from the May 17 and May 20 review passes. 1. CG_ID_INVALID is a malformed-publish error, not a peer-local decline. Returning it as a typed decline made the publisher fan out to every other core looking for a different answer and report `storage_ack_insufficient` only after the full retry budget — masking the real caller error. Reverted both code-paths in the handler to `throw new Error(...)` (libp2p stream reset) and dropped the `CG_ID_INVALID` member from `STORAGE_ACK_DECLINE_CODES`. The two `declines (CG_ID_INVALID)` tests are rewritten as `expect(...).rejects.toThrow(...)`. 2. The collector treated every typed decline as permanent, but `NO_DATA_IN_SWM` / `MERKLE_MISMATCH_IN_SWM` can be transient while gossip replication catches up. Combined with the fast-fail path on impossible quorum, a core that would have ACKed seconds later was being permanently removed from the quorum pool the moment its SWM trailed the publish by even one gossip cycle. Added `TRANSIENT_STORAGE_ACK_DECLINE_CODES` + an `isTransientStorageACKDeclineCode()` helper next to the existing `STORAGE_ACK_DECLINE_CODES` enum, exported through `proto/index.ts`. The collector now distinguishes transient declines (retry through the normal transport backoff) from permanent ones (return null immediately) and clears any stored decline reason on success so stale decline reasons do not leak into `storage_ack_insufficient` error messages if quorum later fails for unrelated reasons. Updated test #1 to exercise SIGNER_NOT_REGISTERED (permanent semantics, sendCount=1 preserved) and added two new tests covering the transient path: - peer dialled MAX_RETRIES times on persistent transient decline - peer that resolves to a valid ACK on retry counts toward quorum Existing decline tests that hit transient codes legitimately take ~3s now (1s + 2s of backoff) — bumped their per-test timeouts to 15s; the fast-fail-on-hang test's elapsed bound moved from <5s to <15s (still two orders of magnitude under ACK_TIMEOUT_MS). Local verification: dkg-core 905/905, dkg-publisher 951/951 (+1 skip). Co-authored-by: Cursor <cursoragent@cursor.com>
|
Addressed the two open 🔴 items from Codex rounds 4 + 5 in What landed in
|
| # | Item | File | Severity | Status |
|---|---|---|---|---|
| 1 | Fast-fail when quorum impossible | ack-collector.ts |
🔴 | ✅ 979c232d |
| 2 | Docstring references NOT_HOSTED that's never produced |
storage-ack-handler.ts |
🟡 | ✅ 93ba4b6e |
| 3 | Forward-compat test must pin literal pre-change bytes | v10-proto.test.ts |
🟡 | ✅ 93ba4b6e |
| 4 | Decline message must cap entity list | storage-ack-handler.ts |
🟡 | ✅ 8f6a83e5 |
| 5 | declineMessage is peer-controlled — sanitize + truncate |
ack-collector.ts |
🟡 | ✅ 8f6a83e5 |
| 6 | Transient SWM-state declines must retry | ack-collector.ts |
🔴 | ✅ e3452dc4 (this push) |
| 7 | CG_ID_INVALID should throw, not decline |
storage-ack-handler.ts |
🔴 | ✅ e3452dc4 (this push) |
| 8 | Round-5 reaffirmation of #7 after rebase | storage-ack-handler.ts |
🔴 | ✅ e3452dc4 (this push) |
Local verification
pnpm --filter @origintrail-official/dkg-core --filter @origintrail-official/dkg-publisher build— green.dkg-coretests: 905/905 passed.dkg-publishertests: 951 passed, 1 skipped (+2 vs prior runs: the two new transient-retry tests).
HEAD: e3452dc4.
|
|
||
| if (swmQuads.length === 0) { | ||
| throw new Error(`No data found in SWM graph ${swmGraphUri} for entities: ${intent.rootEntities.join(', ')}`); | ||
| return this.encodeDecline( |
There was a problem hiding this comment.
🔴 Bug: returning a typed decline on the existing /dkg/10.0.0/storage-ack contract is not backward-compatible with older publishers. They ignore fields 6/7, see the empty signature fields in this payload, and treat it as an invalid ACK instead of following the old retry-on-stream-reset path. That means mixed-version rollouts can still lose quorum on transient SWM lag. Please gate declines behind a new protocol/capability or keep the legacy throw behavior until every publisher in the cluster understands decline messages. The same issue applies to the other encodeDecline(...) returns below.
| // final `storage_ack_insufficient` error. Overwriting any | ||
| // prior entry is intentional — operators care most about | ||
| // why the peer ultimately could not ACK. | ||
| declines.set(peerId, { code, message: declineMessage }); |
There was a problem hiding this comment.
🟡 Issue: this per-peer decline record is only cleared on a later valid ACK. If the next retry fails with a transport error, the final storage_ack_insufficient message will still report the stale decline even though the terminal reason was connection/reset/timeout. Clear or overwrite the map entry in the catch/final-failure path so the aggregated diagnostics reflect the last observed outcome for that peer.
Summary
Today, when a core node legitimately can't ACK a publish — it doesn't host the CG, its SWM is missing or stale, or its operational signer was just rotated — the StorageACK handler
throws. libp2p resets the stream, the publisher sees a generic IO error, and the ACK collector retries the same peer 3× with exponential backoff before giving up. The publisher's final error reads as a stream-reset / timeout with no per-peer reason — that's the surface shape behind GitHub issue #541.This PR introduces a typed-decline path so the receive side can say "I can't ACK this, here's why" in band, and the publisher can deselect that peer without retries and surface the reason to the operator.
Companion to PR #556 (publisher-side hosting filter); independently useful even if #556 doesn't land.
What's in this PR
Wire —
packages/core/src/proto/storage-ack.tsStorageACKproto:declineCode(field 6) anddeclineMessage(field 7).NOT_HOSTED,NO_DATA_IN_SWM,MERKLE_MISMATCH_IN_SWM,SIGNER_NOT_REGISTERED,CG_ID_INVALID— live inSTORAGE_ACK_DECLINE_CODESand are exported from@origintrail-official/dkg-core. HelperisStorageACKDecline(msg)codifies the empty-string-vs-undefined check protobufjs uses for unset string fields.Receive side —
packages/publisher/src/storage-ack-handler.tsConverts four graceful-throw paths to
return encodeDecline(...):NO_DATA_IN_SWM— SWM CONSTRUCT returned no quads (the literal RepNet CG15 publish cannot collect identity 1 ACK while public graph can #541 case)MERKLE_MISMATCH_IN_SWM— core has data, publisher's merkle root doesn't match (decline so the publisher tries another core; inline-staging mismatch keepsthrowbecause it's a publisher bug, not a network mismatch)CG_ID_INVALID— non-numeric or non-positive cgIdSIGNER_NOT_REGISTERED—isSignerRegistered=falseTrue protocol violations keep
throw(oversize / unparseable staging, kaCount mismatch, leaf-count mismatch, edge-node-tried-to-ACK, identity > 2^64). Those still reset the stream so we don't keep the connection open on bad input.Send side —
packages/publisher/src/ack-collector.tsdecodeStorageACK, checksisStorageACKDecline. If declining: log per-peer reason, no retry, return null.Declines: peerX→NO_DATA_IN_SWM (no swm data for repnet-v2-official); peerY→MERKLE_MISMATCH_IN_SWM (...)summary to the finalstorage_ack_insufficienterror so operators see exactly why quorum failed from a single log line.Backward compatibility
Every cross-version pair stays correct:
recoverACKSignerreturns null, the ACK is rejected as invalid (same as today's failure mode for these cases), and the collector moves on. No regression.Test plan
pnpm --filter @origintrail-official/dkg-core build+test— green (new round-trip + forward-compat tests for the proto change)pnpm --filter @origintrail-official/dkg-publisher build— greenpnpm --filter @origintrail-official/dkg-publisher test— 918 passed / 1 skipped (62 files), all greenpnpm --filter @origintrail-official/dkg-agent build— greenpnpm --filter "./packages/cli" build— greenpackages/core/test/v10-proto.test.ts):packages/publisher/test/v10-ack-edge-cases.test.ts+storage-ack-handler.test.ts+v10-protocol-operations.test.ts):NO_DATA_IN_SWMdecline returned (was throw)MERKLE_MISMATCH_IN_SWMdecline returned (was throw)CG_ID_INVALIDdecline returned (two variants: non-numeric, "0")SIGNER_NOT_REGISTEREDdecline returned (was throw);onSignerUnregisteredcallback still firespackages/publisher/test/v10-ack-edge-cases.test.ts):storage_ack_insufficienterror names every per-peer reason with both code and messagedeclineMessageproduces clean error formatting (no()artefact)Related
<contextGraphsServed>#556 (publisher-side hosting filter — also addresses RepNet CG15 publish cannot collect identity 1 ACK while public graph can #541)Made with Cursor