Resolve race conditions in HealthMonitor and SocketSession#50
Resolve race conditions in HealthMonitor and SocketSession#50VelikovPetar merged 7 commits intodevelopfrom
HealthMonitor and SocketSession#50Conversation
HealthMonitor: Replace plain `var lastAck: Long` with `AtomicLong` to guarantee memory visibility between the OkHttp callback thread (acknowledgeHeartbeat) and the monitor coroutine that reads it. SocketSession: Eliminate dual-listener race during WebSocket handshake. Previously both eventListener and connectListener were subscribed before socket open, causing the auth response to be processed by both. Now only the handshake listener is subscribed initially. It handles auth responses directly and buffers any other messages. On success, eventListener subscribes and buffered messages are replayed through the batcher — ensuring zero message loss and no duplicate processing.
PR checklist ✅All required conditions are satisfied:
🎉 Great job! This PR is ready for review. |
WalkthroughThe changes refactor the socket subscription lifecycle to buffer non-authentication messages during handshake, defer Changes
Sequence DiagramsequenceDiagram
participant Client
participant SocketSession as StreamSocketSession
participant Socket as WebSocket
participant Batcher
participant HealthMonitor
Client->>SocketSession: connect()
activate SocketSession
SocketSession->>SocketSession: pendingMessages = []
SocketSession->>Socket: subscribe(handshakeListener)
activate Socket
Socket-->>SocketSession: onOpen()
deactivate Socket
Socket-->>SocketSession: onMessage(authResponse)
activate SocketSession
SocketSession->>HealthMonitor: acknowledgeHeartbeat()
SocketSession->>SocketSession: deserialize & validate auth
alt Authentication Success
SocketSession->>Socket: subscribe(eventListener)
SocketSession->>Batcher: offer(bufferedMessage) [replay]
SocketSession->>SocketSession: pendingMessages.clear()
else Non-Auth Message During Handshake
SocketSession->>SocketSession: pendingMessages.append(message)
end
deactivate SocketSession
deactivate SocketSession
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
stream-android-core/src/test/java/io/getstream/android/core/internal/socket/StreamSocketSessionTest.kt (1)
564-594: Please add a direct regression test for the buffered-handshake path.These additions cover the failure-side subscription counts, but the new behavior in
StreamSocketSession.connectis buffering non-auth frames untilconnection.okand replaying them afterward. Add one test that delivers a normal event beforeCONNECTED_JSONand assert it is forwarded exactly once after the steady-state listener is installed.Based on learnings: "Maintain idempotency and guard callback ordering in connection state, batch processing, and retry flows that feed StreamSubscriptionManager."
Also applies to: 1173-1223
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@stream-android-core/src/test/java/io/getstream/android/core/internal/socket/StreamSocketSessionTest.kt` around lines 564 - 594, Add a regression test in StreamSocketSessionTest that verifies the buffered-handshake path: call session.connect(...) as in existing tests but simulate the handshake listener receiving a non-auth event (a normal event frame) before emitting CONNECTED_JSON, then emit CONNECTED_JSON and assert that the normal event is forwarded exactly once to the steady-state subscription handling (i.e., after the handshake listener is replaced by the steady listener). Use the same mocking style as the file: stub socket.subscribe(...) and socket.open(...) to succeed, push a normal event into the initial StreamWebSocketListener callback before sending the CONNECTED_JSON frame, then send CONNECTED_JSON and verify the subscription forward path invoked once (use the same subs.forEach(...) / subscription manager verifications or the steady listener verification used elsewhere) to ensure the buffered event is replayed exactly one time.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@stream-android-core/src/main/java/io/getstream/android/core/internal/socket/monitor/StreamHealthMonitorImpl.kt`:
- Around line 52-53: The monitorJob field is racy: ensure start/stop are atomic
by replacing the mutable monitorJob with an AtomicReference<Job?> or by guarding
the check/assign/cancel sequence with a mutex/@Synchronized; update
StreamHealthMonitorImpl.start() to use compare-and-set (CAS) on the
AtomicReference when storing the newly launched Job and change stop() to
atomically getAndSet(null) and cancel the returned Job, referencing monitorJob,
start(), and stop() in your changes (lastAck can remain as-is).
In
`@stream-android-core/src/main/java/io/getstream/android/core/internal/socket/StreamSocketSession.kt`:
- Around line 286-303: The success lambda handling post-handshake must treat
subscription or replay failures as fatal: if
internalSocket.subscribe(eventListener) fails (socketSubRes.isFailure) or any
batcher.offer(message) returns false, cancel handshakeSubscription, avoid
starting healthMonitor or calling connect() success, tear down the session
(stop/close the socket, clear socketSubscription, stop healthMonitor) and
propagate/return a failed connect result instead; also sanitize the replay
failure log so it does not print raw message payloads (use a redacted/summary
message or message id). Ensure you update logic around success,
socketSubscription, pendingMessages, batcher.offer, handshakeSubscription and
healthMonitor to implement this behavior.
---
Nitpick comments:
In
`@stream-android-core/src/test/java/io/getstream/android/core/internal/socket/StreamSocketSessionTest.kt`:
- Around line 564-594: Add a regression test in StreamSocketSessionTest that
verifies the buffered-handshake path: call session.connect(...) as in existing
tests but simulate the handshake listener receiving a non-auth event (a normal
event frame) before emitting CONNECTED_JSON, then emit CONNECTED_JSON and assert
that the normal event is forwarded exactly once to the steady-state subscription
handling (i.e., after the handshake listener is replaced by the steady
listener). Use the same mocking style as the file: stub socket.subscribe(...)
and socket.open(...) to succeed, push a normal event into the initial
StreamWebSocketListener callback before sending the CONNECTED_JSON frame, then
send CONNECTED_JSON and verify the subscription forward path invoked once (use
the same subs.forEach(...) / subscription manager verifications or the steady
listener verification used elsewhere) to ensure the buffered event is replayed
exactly one time.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 682ba919-9bf8-4ed8-80f9-accfb74c0774
📒 Files selected for processing (3)
stream-android-core/src/main/java/io/getstream/android/core/internal/socket/StreamSocketSession.ktstream-android-core/src/main/java/io/getstream/android/core/internal/socket/monitor/StreamHealthMonitorImpl.ktstream-android-core/src/test/java/io/getstream/android/core/internal/socket/StreamSocketSessionTest.kt
...e/src/main/java/io/getstream/android/core/internal/socket/monitor/StreamHealthMonitorImpl.kt
Outdated
Show resolved
Hide resolved
...-android-core/src/main/java/io/getstream/android/core/internal/socket/StreamSocketSession.kt
Outdated
Show resolved
Hide resolved
…ke failure handling HealthMonitor: Replace plain `var monitorJob` with `AtomicReference<Job?>` and use CAS in `start()` / `getAndSet(null)` in `stop()` to prevent the start/stop race where a stop landing between the active-check and assignment in start() could miss the newly launched job. SocketSession: Treat eventListener subscribe failure and buffered message replay failure as fatal — tear down the session and return a failed connect result instead of silently continuing to Connected state. Also remove raw message payload from replay failure logs. Add tests for handshake message buffering/replay, heartbeat ack during handshake, eventListener subscribe failure, and replay failure.
Verify that a non-auth message arriving during handshake is: - NOT offered to the batcher during handshake phase - eventListener is installed BEFORE replay - replayed exactly once through the batcher after success - connection.ok itself is NOT replayed
Last-write-wins debouncer for coalescing bursty signals. Each submit replaces the pending value and resets the delay timer. When the timer expires, the registered callback fires with the settled value. Thread-safe via AtomicReference<Job?> for the pending job and AtomicReference<T?> for the pending value. Follows the same factory function pattern as StreamBatcher, StreamSerialProcessingQueue, etc. Primary use case: debouncing network/lifecycle state changes before evaluating reconnect decisions, preventing reconnect storms from rapid network flaps.
Replace the raw scope.launch in the network/lifecycle listener with a StreamDebouncer (300ms). Rapid network flaps now settle before the recovery evaluator runs, preventing reconnect storms from intermediate state transitions. The debouncer is cancelled on explicit disconnect to avoid stale recovery attempts after teardown.
Revert the injectable debouncer approach. The debouncer is created internally in StreamClientImpl with a 500ms delay using Pair instead of a dedicated data class. Not exposed in the factory — product SDKs don't need to configure this.
On CAS failure, the losing thread was retrying with its own (possibly stale) value instead of reading the latest from pendingValue. This could cause an older network state to overwrite a newer one.
|
HealthMonitor and SocketSession
|
🚀 Available in v3.0.1 |



Goal
Fix two race conditions identified in the core socket infrastructure that could cause incorrect behavior under concurrent access.
Implementation
1. HealthMonitor
lastAckatomicityvar lastAck: Long→AtomicLong. The field is written from OkHttp's callback thread (acknowledgeHeartbeat) and read from the monitor coroutine. Without volatile or atomic ops, the reader could see stale values and falsely trigger the liveness threshold, causing premature socket disconnection.2. SocketSession dual-listener race
During WebSocket handshake, both
eventListenerandconnectListenerwere subscribed simultaneously — the auth response (connection.ok) was processed by both listeners. Now only the handshake listener is subscribed initially. It handles auth responses and buffers any other messages that arrive during handshake. On success,eventListenersubscribes and buffered messages are replayed through the batcher. Zero message loss, no duplicate processing.Testing
Existing unit tests updated to reflect the new single-listener handshake flow. Full suite passes (all SocketSession + HealthMonitor tests).
Summary by CodeRabbit