Add WsClient: resilient WebSocket subscriptions with auto-reconnect#51
Add WsClient: resilient WebSocket subscriptions with auto-reconnect#51
Conversation
WsClient wraps WsTransport with three features production bots need: transparent reconnect with exponential backoff and jitter, multiplexed subscriptions on a single socket, and application-layer ping keepalive. Subscription handles stay valid across reconnects -- the underlying server-side sub-id is swapped automatically on each resubscribe. - src/ws_client.zig: new WsClient + Subscription (handle-style), Opts/Event/Error, single-threaded reconnect+resubscribe state machine, pending-notification queue for request/notify multiplexing. - src/ws_transport.zig: add readFrameDeadline + pollReadable for deadline-aware reads, plus sendPing and a frames_received counter so WsClient can detect liveness via control frames. - src/subscription.zig: promote extractResultString, isSubscriptionNotification, getNotificationResult to pub; lift nextBlock/nextLog/nextTxHash bodies into free parseBlockFromNotification / parseLogFromNotification / parseTxHashFromNotification so WsClient (and others) can reuse them without holding a Subscription. Add getSubscriptionId and extractResponseId helpers. - tests: 8 new unit tests in ws_client.zig (backoff, jitter, params clone, dispatch, queue ordering, server_id remap), 5 new tests in subscription.zig for the new helpers, 3 new integration tests against Anvil (newHeads, multiplexed subs, unsubscribe). - Drive-by: fix three pre-existing parseEther usages in tests/integration_tests.zig that no longer compiled (parseEther returns ?u256, callers had unwrapped it inconsistently). The integration test file was uncompilable on origin/main. Closes #35.
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
📝 WalkthroughWalkthroughA new resilient WebSocket client ( Changes
Sequence Diagram(s)sequenceDiagram
participant App as Application
participant Client as WsClient
participant Transport as WsTransport
participant Server as WebSocket Server
Note over App,Server: Connect & Subscribe
App->>Client: connect(url, opts)
Client->>Transport: create/open connection
Transport->>Server: TCP + WS upgrade
Server-->>Transport: upgrade response
App->>Client: subscribe(params)
Client->>Transport: sendFrame(eth_subscribe)
Transport->>Server: send JSON-RPC
Server-->>Transport: response (subscription_id)
Client->>Client: store handle & server_id
Note over App,Server: Event Dispatch
Server-->>Transport: subscription notification
Transport->>Client: deliver frame
Client->>Client: map server_id -> Subscription
Client-->>App: Event{sub, payload} via next()
Note over Client,Transport: Keepalive / Reconnect
Client->>Transport: sendPing() if idle
Transport->>Server: ping
alt Pong received
Server-->>Transport: pong
Client->>Client: reset timers
else Pong timeout
Client->>Client: computeBackoffMs + applyJitter
Client->>Client: deinit & reconnect loop
Client->>Transport: new connection
loop for each active subscription
Client->>Transport: sendFrame(eth_subscribe original params)
Server-->>Transport: new subscription_id
Client->>Client: update server_id mapping
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Review rate limit: 3/5 reviews remaining, refill in 18 minutes and 13 seconds. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/subscription.zig (1)
344-405:⚠️ Potential issue | 🟠 Major | ⚡ Quick winMake the fast-path JSON scanners whitespace-tolerant.
extractResultString(),getSubscriptionId(), andextractResponseId()only recognize minified payloads like"result":"...","subscription":"...", and"id":42. Valid JSON-RPC responses may include spaces around the colon, and thensubscribe(),resubscribeAll(), request matching, and notification dispatch all start failing even though the payload is valid. This is a protocol-compatibility break at the root helper layer.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/subscription.zig` around lines 344 - 405, The three fast-path scanners (extractResultString, getSubscriptionId, extractResponseId) fail on JSON with spaces around the colon; update each to locate the key (e.g., "\"result\"", "\"subscription\"", "\"id\""), advance past the key, then skip optional whitespace, require and skip a ':' character, skip optional whitespace again, then proceed to parse the value (for extractResultString and getSubscriptionId expect a starting '"' and find the matching closing '"' while handling bounds, and for extractResponseId parse consecutive digits starting at the first digit); keep existing bounds checks and error/null returns, and reference the same function names (extractResultString, getSubscriptionId, extractResponseId) when making the changes.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/ws_client.zig`:
- Around line 256-281: The request() function currently resends the exact same
JSON-RPC payload after beginReconnect(), which can double-execute non-idempotent
methods; modify WsClient.request to avoid transparent replay by default: add a
per-request option (e.g., allow_replay: bool) or check the method name against a
whitelist of safe/idempotent methods before resending, and if replay is not
permitted return a new error (e.g., error.RetryNotAllowed) after
beginReconnect() instead of calling sendOrReconnect(req); update call sites to
opt-in when safe (or pass allow_replay=true for eth_subscribe/read-only RPCs)
and reference WsClient.request, sendOrReconnect, beginReconnect,
readFrameWithKeepalive, and next_id when making these changes.
- Around line 199-224: The unsubscribe function currently frees a Subscription
without removing queued Event entries that contain the raw *Subscription
pointer, causing dangling pointers; update pub fn unsubscribe(self: *WsClient,
sub: *Subscription) to scan and purge any queued events in self.pending (or the
pending queue structure) whose Event.sub == sub before calling
self.freeSubscription(sub), ensuring you remove or drop those Event values
safely (free any owned memory) and only then proceed with orderedRemove on
self.subs and the eth_unsubscribe request so next() cannot return a dangling sub
pointer.
---
Outside diff comments:
In `@src/subscription.zig`:
- Around line 344-405: The three fast-path scanners (extractResultString,
getSubscriptionId, extractResponseId) fail on JSON with spaces around the colon;
update each to locate the key (e.g., "\"result\"", "\"subscription\"",
"\"id\""), advance past the key, then skip optional whitespace, require and skip
a ':' character, skip optional whitespace again, then proceed to parse the value
(for extractResultString and getSubscriptionId expect a starting '"' and find
the matching closing '"' while handling bounds, and for extractResponseId parse
consecutive digits starting at the first digit); keep existing bounds checks and
error/null returns, and reference the same function names (extractResultString,
getSubscriptionId, extractResponseId) when making the changes.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: c0a1980a-d328-4385-9493-98e94654bf06
📒 Files selected for processing (6)
README.mdsrc/root.zigsrc/subscription.zigsrc/ws_client.zigsrc/ws_transport.zigtests/integration_tests.zig
Three independent fixes: 1. Whitespace-tolerant JSON scanners. extractResultString, getSubscriptionId, and extractResponseId previously matched only minified payloads (e.g. "result":"..."). JSON-RPC permits whitespace around the colon, and some proxies and test harnesses produce it. Factor a shared findKeyValueStart helper that walks past optional ws + ':' + optional ws after locating the key, then resumes value parsing. Also covers the case where the key text appears inside a value -- the helper now keeps searching past false matches. 2. Stop auto-replaying WsClient.request on disconnect. The previous code re-sent the same JSON-RPC payload after a mid-flight reconnect, which would double-execute non-idempotent methods (e.g. eth_send- RawTransaction). Public request() now returns error.RequestInterrupted on post-send disconnect; an internal requestReplay() retains the old replay-safe behavior and is used by subscribe()/unsubscribe(), both idempotent at the protocol level. Pre-send retries (the request never reached the wire) remain transparent in both paths. 3. Purge pending queue on unsubscribe. Previously, unsubscribe() freed the Subscription while leaving Events in the pending FIFO that referenced it -- a subsequent next() would return a dangling pointer. Add dropPending(*Subscription) to remove and free those queued events before tearing down the handle. Tests: - 4 new tests in subscription.zig (whitespace cases for all three scanners + a key-found-inside-a-value case). - 1 new test in ws_client.zig for dropPending. - All 1300+ unit tests pass. - 18/18 integration tests against Anvil pass.
CodeRabbit's second-pass review flagged that findKeyValueStart could be fooled by an error message whose value literally contains text like \"result\":\"fake\". The previous fix already handled the case where the scanner found a misplaced key followed by something other than a colon, but it could still be fooled by a fake key-value pair embedded inside an open string. Add isInsideString(json, idx): walks json char-by-char, tracking string state with escape handling, and returns true if idx is inside an open string. findKeyValueStart now consults it on each candidate match and skips any candidate whose opening quote is inside a string. Two new tests: - extractResultString skips a fake \"result\":\"fake\" embedded in an error message and finds the real result key after it. - isInsideString basic correctness with plain strings, escaped quotes, and stray backslashes. The walk is O(idx) per candidate match, which is fine for our typical < 4KB JSON-RPC payloads -- and only runs when there is an actual false match earlier in the document.
There was a problem hiding this comment.
🧹 Nitpick comments (1)
src/subscription.zig (1)
182-199: 💤 Low valueDouble-parsing overhead in
parseBlockFromNotificationis acceptable but worth noting.The current flow (parse JSON → stringify result → wrap in
{"result":...}→ parse again inparseBlockHeader) allocates and parses the result twice. This reuses existing infrastructure cleanly, so it's a reasonable tradeoff for now. If block notification throughput becomes a bottleneck, consider extendingparseBlockHeaderto accept astd.json.Valuedirectly.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/subscription.zig` around lines 182 - 199, parseBlockFromNotification currently parses JSON to std.json.Value then stringifies/wraps and reparses via provider_mod.parseBlockHeader, causing double parsing and allocations; to fix, add an overload or new function in provider_mod named parseBlockHeaderFromValue (or modify parseBlockHeader to accept a std.json.Value) that takes allocator and the parsed std.json.Value (the result_val) and parses the block header directly, then change parseBlockFromNotification to call that new function with result_val and remove the stringifyAlloc/allocPrint/allocator.free steps and wrapped/result_json variables; ensure the new provider_mod function uses the same ownership contract for allocated fields (extra_data) as the existing parseBlockHeader.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@src/subscription.zig`:
- Around line 182-199: parseBlockFromNotification currently parses JSON to
std.json.Value then stringifies/wraps and reparses via
provider_mod.parseBlockHeader, causing double parsing and allocations; to fix,
add an overload or new function in provider_mod named parseBlockHeaderFromValue
(or modify parseBlockHeader to accept a std.json.Value) that takes allocator and
the parsed std.json.Value (the result_val) and parses the block header directly,
then change parseBlockFromNotification to call that new function with result_val
and remove the stringifyAlloc/allocPrint/allocator.free steps and
wrapped/result_json variables; ensure the new provider_mod function uses the
same ownership contract for allocated fields (extra_data) as the existing
parseBlockHeader.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 052dcfca-9dfd-4484-b284-d007282d2d21
📒 Files selected for processing (2)
src/subscription.zigsrc/ws_client.zig
🚧 Files skipped from review as they are similar to previous changes (1)
- src/ws_client.zig
Summary
Adds
ws_client.WsClient: a resilient WebSocket client that wraps the existingws_transport.WsTransportwith the three features production bots need (issue #35):next()andrequest()rebuild the transport and re-issue every activeeth_subscribebefore returning. Subscription handles stay valid across reconnects -- the underlying server-side sub-id is swapped automatically.Subscriptionhandles share one connection; notifications are dispatched to the right handle. Fixes a latent bug wheresubscription.Subscription.next()silently dropped notifications destined for other subs on the same transport.ping_interval_ms. If no pong arrives withinpong_timeout_ms, the connection is treated as dead and reconnect is triggered.Single-threaded by design -- all I/O happens synchronously inside
next()/request()/subscribe()/unsubscribe(). No background threads, locks, or callbacks. Matches the existing I/O model in eth.zig and avoids the Zig 0.16 mutex-copy pitfalls flagged in CLAUDE.md.The lower-level
WsTransportandconnectWithReconnectAPI stays unchanged.Notable changes
src/ws_client.zig--WsClient, handle-styleSubscription,Opts/Event/Error, reconnect+resubscribe state machine, pending-notification queue.src/ws_transport.zig--readFrameDeadline+pollReadable(poll-based deadline reads),sendPing, and aframes_receivedcounter soWsClientcan detect liveness via control frames.src/subscription.zig-- promotedextractResultString,isSubscriptionNotification,getNotificationResulttopub. LiftednextBlock/nextLog/nextTxHashbodies into freeparseBlockFromNotification/parseLogFromNotification/parseTxHashFromNotificationsoWsClientreuses them without needing aSubscriptioninstance. AddedgetSubscriptionIdandextractResponseIdhelpers.README.md-- new "Resilient WebSocket subscriptions" Quick Start example.Drive-by
tests/integration_tests.zighad 3 pre-existing compile errors onorigin/main(callers ofparseEther-- which returns?u256-- forgot to unwrap). The file was uncompilable. Fixed inline so the new integration tests can run.Test plan
make cipasses (build +zig fmt --check src/ tests/+ 1300+ unit tests)zig build integration-testagainst a local Anvil: 18/18 tests pass, including:WsClient subscribe newHeads receives a fresh blockWsClient multiplexes two subscriptions on one connectionWsClient unsubscribe frees handle and removes from registrycoderabbit review --prompt-only --type committed --base origin/main: no findingsstd.debug.printin library code, no emojisOut of scope
SO_KEEPALIVE(application ping is sufficient and portable)Closes #35.
Summary by CodeRabbit
New Features
Documentation
Tests