SOL-24c: Add WebSocketTransactionStream (free adapter)#91
SOL-24c: Add WebSocketTransactionStream (free adapter)#91Puneethkumarck merged 3 commits intomainfrom
Conversation
Implements the free-tier WebSocket adapter for the TransactionStream port, mirroring the paid Yellowstone gRPC sibling. Uses JDK java.net.http.WebSocket with no external dependency. On connect, sends a blockSubscribe JSON-RPC with commitment=confirmed and encoding=jsonParsed. Accumulates text fragments until last=true, parses the resulting frame, extracts the block notification value, and routes transactions through the existing BlockNotificationParser. Vote transactions are filtered client-side by matching Vote111... against both the direct programId field and the programIdIndex-resolved accountKeys entry. The run loop follows the gRPC sibling's pattern: virtual thread with AtomicBoolean lifecycle, CountDownLatch per connection attempt, and ReconnectHandler for exponential backoff with resetIfStable on clean disconnects. HttpClient is built with a connect timeout so close() during a hanging connect does not stall the shutdown path. Consumer exceptions and parser failures are isolated per-call so one bad handler cannot kill the stream. A package-private WebSocketFactory seam is introduced to let unit tests stub HttpClient.newWebSocketBuilder() interactions without mocking a builder chain. Closes #26 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
Caution Review failedPull request was closed or merged during review 📝 WalkthroughWalkthroughAdds a package-private Changes
Sequence DiagramsequenceDiagram
participant App as Application
participant Stream as WebSocketTransactionStream
participant Factory as WebSocketFactory
participant WS as WebSocket
participant Server as Solana RPC
participant Parser as BlockNotificationParser
participant Consumer as Consumers
App->>Stream: subscribe(txConsumer, acctConsumer)
Stream->>Factory: connect(endpoint, listener)
Factory-->>Stream: CompletableFuture<WebSocket>
Stream->>WS: (onOpen) send blockSubscribe JSON-RPC
loop Message Processing
Server->>WS: text frame (block notification)
WS->>Stream: onText(fragment)
Stream->>Stream: buffer / assemble frame
Stream->>Parser: parse block JSON
Parser-->>Stream: transactions + fee payer accounts
Stream->>Stream: filter out Vote111... transactions
Stream->>Consumer: txConsumer.accept(tx) (non-vote)
Stream->>Consumer: acctConsumer.accept(account)
end
Server->>WS: close / error
WS->>Stream: onClose/onError
Stream->>Stream: record duration, compute reconnection delay
Stream->>Factory: connect(endpoint, listener) (retry)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@prism/src/main/java/com/stablebridge/prism/infrastructure/websocket/WebSocketTransactionStream.java`:
- Around line 136-139: The code installs a just-completed connection
unconditionally which can register a stale socket when close() or another
connect happened concurrently; after the join of
webSocketFactory.connect(endpoint, listener) in WebSocketTransactionStream,
instead of unconditionally calling currentSocket.set(socket) and
socket.sendText(...), atomically check that the stream is still accepting a new
socket (e.g., use currentSocket.compareAndSet(null, socket) or check a closed
flag) and only send BLOCK_SUBSCRIBE_PAYLOAD if the CAS/check succeeds; if it
fails (another socket or closed), immediately cleanly abort/close the newly
connected socket so it cannot become an orphaned duplicate.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: dcace92a-1b76-4169-8ee8-4f0ae04ee79b
📒 Files selected for processing (3)
prism/src/main/java/com/stablebridge/prism/infrastructure/websocket/WebSocketFactory.javaprism/src/main/java/com/stablebridge/prism/infrastructure/websocket/WebSocketTransactionStream.javaprism/src/test/java/com/stablebridge/prism/infrastructure/websocket/WebSocketTransactionStreamTest.java
...rc/main/java/com/stablebridge/prism/infrastructure/websocket/WebSocketTransactionStream.java
Show resolved
Hide resolved
Prevent a stale WebSocket from being installed when close() or a close-then-subscribe cycle fires while the connect future is still in flight. After join() returns, verify the stream is still running and the current thread owns the loop; otherwise close the just-connected socket and abort the iteration. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
prism/src/main/java/com/stablebridge/prism/infrastructure/websocket/WebSocketTransactionStream.java (1)
136-146:⚠️ Potential issue | 🔴 CriticalMake socket registration atomic with shutdown.
The new stale-socket guard still leaves a race window: if
close()flipsrunningafter Line 137 but before Line 145,close()seescurrentSocket == null, then this method installs the just-connected socket and sends the subscribe payload anyway. That can still leave an orphaned live subscription after shutdown or a fast close→subscribe cycle.Use an atomic install step here (for example, a CAS-based ownership check, paired with clearing
currentSocketin the close/error callbacks) before sendingBLOCK_SUBSCRIBE_PAYLOAD.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@prism/src/main/java/com/stablebridge/prism/infrastructure/websocket/WebSocketTransactionStream.java` around lines 136 - 146, The connection install has a race where close() can flip running between the running/loopThread check and installing/sending on currentSocket, leaving an orphaned subscription; modify WebSocketTransactionStream to atomically install the newly opened socket into currentSocket (use a CAS like currentSocket.compareAndSet(null, socket) or an ownership token) immediately after connect and before calling socket.sendText(BLOCK_SUBSCRIBE_PAYLOAD, true), and if the CAS fails (or running is false) close the unowned socket; also ensure close() clears currentSocket and that error/close callbacks only act on the socket instance currently held in currentSocket to avoid clearing someone else’s socket.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@prism/src/test/java/com/stablebridge/prism/infrastructure/websocket/WebSocketTransactionStreamTest.java`:
- Line 123: Add a fragmented-frame regression test in
WebSocketTransactionStreamTest that exercises the buffering branch of
BlockSubscribeListener.onText by calling listener.onText(webSocket, frame,
false) for the initial fragment(s) and then listener.onText(webSocket,
finalFrame, true) for the final fragment; assert that parsing/handling (the code
path that processes a complete block notification) only occurs after the final
fragment is delivered. Update the other two similar test spots (the cases around
the comments at the other occurrences) to include an equivalent multi-fragment
case so onText is exercised with last=false followed by last=true and verify no
parse/handle side-effects occur until the final fragment.
---
Duplicate comments:
In
`@prism/src/main/java/com/stablebridge/prism/infrastructure/websocket/WebSocketTransactionStream.java`:
- Around line 136-146: The connection install has a race where close() can flip
running between the running/loopThread check and installing/sending on
currentSocket, leaving an orphaned subscription; modify
WebSocketTransactionStream to atomically install the newly opened socket into
currentSocket (use a CAS like currentSocket.compareAndSet(null, socket) or an
ownership token) immediately after connect and before calling
socket.sendText(BLOCK_SUBSCRIBE_PAYLOAD, true), and if the CAS fails (or running
is false) close the unowned socket; also ensure close() clears currentSocket and
that error/close callbacks only act on the socket instance currently held in
currentSocket to avoid clearing someone else’s socket.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 66c6da84-8ff7-44e0-8304-22620975e450
📒 Files selected for processing (2)
prism/src/main/java/com/stablebridge/prism/infrastructure/websocket/WebSocketTransactionStream.javaprism/src/test/java/com/stablebridge/prism/infrastructure/websocket/WebSocketTransactionStreamTest.java
| .formatted(SOME_SIGNATURE_BASE58, SOME_VOTE_AUTHORITY_BASE58)); | ||
|
|
||
| // when | ||
| listener.onText(webSocket, frame, true); |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Add a fragmented-frame regression test.
Every onText(...) call here uses last = true, so the new buffering path in BlockSubscribeListener.onText(...) is still untested. Add one case that delivers a block notification across multiple fragments and asserts parsing happens only after the final fragment.
Also applies to: 171-171, 212-212
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@prism/src/test/java/com/stablebridge/prism/infrastructure/websocket/WebSocketTransactionStreamTest.java`
at line 123, Add a fragmented-frame regression test in
WebSocketTransactionStreamTest that exercises the buffering branch of
BlockSubscribeListener.onText by calling listener.onText(webSocket, frame,
false) for the initial fragment(s) and then listener.onText(webSocket,
finalFrame, true) for the final fragment; assert that parsing/handling (the code
path that processes a complete block notification) only occurs after the final
fragment is delivered. Update the other two similar test spots (the cases around
the comments at the other occurrences) to include an equivalent multi-fragment
case so onText is exercised with last=false followed by last=true and verify no
parse/handle side-effects occur until the final fragment.
Extend the WebSocket stream with an idle-timeout watchdog that forces a reconnect when no frames arrive for a sustained period, preventing the runLoop from hanging on a silent half-open socket. Refactor the lifecycle so the socket is always closed and the connected duration always recorded after each iteration, eliminating the orphan-socket path on partial connect failure. Log asynchronous sendText failures via whenComplete so write errors surface instead of being silently discarded. Add regression coverage for connect-attempt failure, explicit close, and the idle-timeout reconnect, and strengthen the existing reconnect assertion to verify the second subscribe payload. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Review feedback addressedCodeRabbit #discussion_r3067994490 — stale connect completion race Additional lifecycle hardening (e65f325)
Full build green: |
Summary
Adds
WebSocketTransactionStream, the free-tier adapter implementing theTransactionStreamport, paired with the paidYellowstoneTransactionStreamgRPC sibling. The adapter:java.net.http.WebSocketonly — no new build dependencyblockSubscribeJSON-RPC payload per the spec (commitment=confirmed,encoding=jsonParsed,transactionDetails=full,maxSupportedTransactionVersion=0)onTextfragments in aStringBuilderuntillast=true, then parses the frame via Jackson, extractsparams.result.value, logs[SLOT n], and routes non-vote transactions through the existingBlockNotificationParserprogramIdfield and theprogramIdIndex-resolvedaccountKeysentry againstVote111111111111111111111111111111111111111(supports textualaccountKeysand thejsonParsed{pubkey}object form)ReconnectHandler(exponential backoff,resetIfStableon clean disconnects)AtomicBoolean+CountDownLatch+ thread interrupt — nosynchronized, matching the gRPC sibling's lifecycleThe
HttpClientindefaultFactory()is built with a connect timeout soclose()during a hanging connect attempt does not stall the shutdown path. A package-privateWebSocketFactoryseam is introduced so unit tests can stubHttpClient.newWebSocketBuilder()without mocking a builder chain.Test plan
blockSubscribepayload sent exactly as specified on connect — asserted via a literal payload match onsendText(..., last=true)programIdstringprogramIdIndexpointing intoaccountKeysusingRecursiveComparisonover a composite(txs, feePayers)recordonClosetriggers reconnect — verified by asserting a second listener registration and aReconnectHandler.nextDelay()invocation./gradlew build— green (compile + Spotless + unit + integration + ArchUnit)synchronized; domain layer untouched; no new entries inbuild.gradle.ktsorlibs.versions.tomlNotes
WebSocketFactorystub that captures registered listeners into aCopyOnWriteArrayList, allowing deterministic assertions without network I/OReconnectHandleris spied and itsnextDelay()stubbed to0Lso reconnect tests don't sleepTransactionStreamport as the gRPC sibling; no changes to domain or application layers required for the alternative streaming modeCloses #26
Summary by CodeRabbit
New Features
Tests