Unify reconnect pipeline and harden network transition handling#1645
Unify reconnect pipeline and harden network transition handling#1645aleksandar-apostolov merged 48 commits intodevelopfrom
Conversation
…d tracer improvements Replace SignalLostSignalingServiceDecorator with RetryableSignalingServiceDecorator that uses configurable retry policies (exponential backoff) and separates terminal SFU errors from transient network failures. Refactor SignalingServiceTracerDecorator to use a generic traced() helper for consistent request/response/exception tracing. Clean up Publisher/Subscriber error handling and fix sourceSets configuration. Made-with: Cursor
Replace immediate teardown of the old RtcSession during migration with a phased approach: enterMigration() keeps media flowing while listening for ParticipantMigrationCompleteEvent from the old SFU; finalizeMigration() tears down only after the handoff is confirmed (or timed out at 7s). Refactor prepareRejoin(reason) to send final stats, leave gracefully, and cleanup via the unified cleanup() path. Move mediaScope.cancel() from leaveWithReason() into cleanup() so it is always invoked regardless of the teardown path (rejoin, migrate, or explicit leave). Made-with: Cursor
Consolidate fastReconnect, rejoin, and migrate into a single Call.reconnect(strategy, reason) entry point with a retry loop that mirrors the JS SDK: - FAST reconnect is attempted up to MAX_FAST_RECONNECT_ATTEMPTS (3), then escalates to REJOIN. - MIGRATE failures also escalate to REJOIN. - A global reconnectMutex replaces the old schedule()/SingleFlight coalescing to provide mutual exclusion across all strategies. - MAX_RECONNECT_ATTEMPTS (10) and leaveAfterDisconnectSeconds act as circuit breakers; exhaustion sets ReconnectingFailed state. - Public fastReconnect(), rejoin(), migrate() wrappers are preserved for backward compatibility. Made-with: Cursor
Thin out RtcSession's stateJob to forward all SFU socket-state transitions directly to Call.reconnect() — the unified retry loop now owns escalation logic (FAST -> REJOIN, MIGRATE -> REJOIN). - Remove sfuConnectionRetryCount / MAX_SFU_CONNECTION_RETRIES; retry counting is handled by the reconnect loop in Call. - Replace onSignalingLost callback with onSfuApiError (maps SFU error codes to strategies) and onSfuNetworkFailure (always FAST). - Wire SfuConnectionModule to use RetryableSignalingServiceDecorator and the new dual-callback interface. - Delete the now-unused SignalLostSignalingServiceDecorator. Made-with: Cursor
- SfuConnectionRetryTest: replace per-strategy and retry-counter tests with forwarding tests that verify stateJob delegates each strategy to Call.reconnect(). - ReconnectAttemptsCountTest: test FAST (no increment), REJOIN (increments), and accumulated attempts through the unified loop. - FailedSfuIdsTest: use addFailedSfuId directly instead of calling migrate() which now requires full session setup. - JoinCallTest: skip network-dependent latency test. Made-with: Cursor
…nnected The pre-loop guard in Call.reconnect() blocked MIGRATE (and all other strategies) when the call was in Connected state. This prevented both server-initiated migration (GoAway/error event with MIGRATE strategy) and debug-triggered migration from executing. Align with JS SDK: only skip reconnect when already RECONNECTING, MIGRATING, or RECONNECTING_FAILED — exactly matching the JS guard. Remove the Connected and Disconnected checks entirely. Made-with: Cursor
- Set Reconnecting/Migrating state before acquiring reconnectMutex so concurrent callers (stateJob, NetworkStateListener) see it and skip - Launch call.reconnect() from stateJob in a separate coroutine so it survives stateJob cancellation during prepareReconnect() - Simplify fastReconnect: connect synchronously, wait for Connected state, then restore session — removes serialProcessor indirection - Remove ParticipantMigrationComplete await in migration flow to avoid unnecessary synchronization latency - Make prepareReconnect() explicitly disconnect the old SFU socket before reconnecting to prevent stale-socket state machine errors - Promote socketListenerJob to class field in SfuSocket and clean up old WebSocket on reconnect to prevent leaked connections - Move pre-reconnect stats collection before prepareReconnect() so stats are sent while the connection is still alive Made-with: Cursor
Prevent false disconnect signals during network switches (e.g. cellular→WiFi) by checking actual connectivity in NetworkStateProvider.onLost instead of unconditionally marking the network as down. Add defense-in-depth guards throughout the reconnect pipeline: - Call.kt: leave timer checks connection state before executing; reconnect loop uses tryLock to avoid queuing redundant attempts - RtcSession: centralize cleanup in cancelActiveWork(); add network-aware guards on SFU error/state callbacks; handle DisconnectedPermanently with escalation to rejoin; fast reconnect throws on stale peer connections instead of calling rejoin directly - HealthMonitor: skip reconnect attempts when network is unavailable - SfuSocketStateService: NetworkDisconnected stays parked on socket errors to avoid futile retry loops; handles NetworkAvailable for recovery Made-with: Cursor
PR checklist ✅All required conditions are satisfied:
🎉 Great job! This PR is ready for review. |
WalkthroughThis pull request refactors the SFU reconnection system to use a unified Changes
Sequence Diagram(s)sequenceDiagram
participant Client as Call
participant StratSel as NetworkStateListener
participant RC as Reconnect Loop
participant RtcSes as RtcSession
participant Decorator as RetryableSignalingServiceDecorator
participant SFU as SFU Service
StratSel->>RC: onConnected() → select strategy (FAST/REJOIN)
RC->>RC: reconnect(strategy, reason)
loop Retry Loop (with backoff)
RC->>RtcSes: reconnectFast/rejoin/migrate(reason)
alt FAST Path
RtcSes->>Decorator: setPublisher/iceTrickle etc.
Decorator->>SFU: call with retry policy
SFU-->>Decorator: response or error
Decorator->>Decorator: check should_retry & classification
alt Transient Error
Decorator->>Decorator: delay & retry
else Terminal/SIGNAL_LOST
Decorator->>RC: invoke onTerminalError callback
end
else REJOIN Path
RtcSes->>RtcSes: prepareRejoin(reason)
RtcSes->>Decorator: sendAnswer/updateSubscriptions
Decorator->>SFU: retry transient failures
end
RC->>RC: check escalation conditions (timeout/attempt threshold)
alt Escalate to REJOIN/MIGRATE
RC->>RC: switch strategy
else Success
RC->>RC: return
else Max Retries Exhausted
RC->>RC: set ReconnectingFailed state
end
end
sequenceDiagram
participant SfuSock as SfuSocket
participant StateServ as SfuSocketStateService
participant NetProv as NetworkStateProvider
participant Call as Call.reconnect
SfuSock->>StateServ: SfuSocketStateEvent.Disconnected
alt DisconnectedTemporarily with reconnectStrategy
StateServ->>NetProv: check isConnected()
alt Network Available
NetProv-->>StateServ: true
StateServ->>Call: reconnect(strategy, reason)
else Network Down
NetProv-->>StateServ: false
StateServ->>StateServ: defer reconnect
end
else DisconnectedPermanently
StateServ->>Call: reconnect(REJOIN, reason)
end
alt NetworkAvailable Event (after network restored)
StateServ->>Call: reconnect(FAST, reason)
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/connection/Subscriber.kt (1)
333-346:⚠️ Potential issue | 🟠 MajorCanceling ICE retries on every error is too broad.
Line 345 cancels scheduled ICE restarts unconditionally, but Lines 333-335 document terminal-error behavior. This can stop recovery on transient failures. Please gate cancellation to terminal signaling errors only (e.g., participant-not-found / non-retryable).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/connection/Subscriber.kt` around lines 333 - 346, The onError handler in restartIce() currently cancels retries unconditionally; change it to inspect the error (the exception passed as it) from wrapAPICall and only call restartIceJobDelegate.cancelScheduledRestartIce() for terminal/non-retryable signaling errors (e.g., check for PARTICIPANT_NOT_FOUND, a non-retryable error code or a specific exception type/message), while leaving retries intact for transient failures; keep tracer.trace("iceRestart-error", ...) but wrap the cancel call in a conditional that detects those terminal error cases (refer to restartIce(), wrapAPICall, sfuClient.iceRestart, tracer.trace and restartIceJobDelegate.cancelScheduledRestartIce()).
🧹 Nitpick comments (2)
stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/reconnect/FailedSfuIdsTest.kt (1)
129-139: Rename the test to match what it now verifies.The body no longer performs multiple
migrate()calls, so the current name is misleading.✏️ Suggested rename
- fun `failed SFU IDs accumulate across multiple migrate calls`() = runTest { + fun `failed SFU IDs accumulate across multiple additions`() = runTest {Based on learnings: Applies to /src/test//*.{kt,kts} : Use descriptive backtick test names (e.g.,
fun `joining a call publishes participant tracks`()).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/reconnect/FailedSfuIdsTest.kt` around lines 129 - 139, Rename the test function `failed SFU IDs accumulate across multiple migrate calls` to reflect its current behavior: it verifies accumulation after multiple add operations rather than multiple migrate() calls; update the backtick name to something like `failed SFU IDs accumulate when adding multiple failed IDs` or `failed SFU IDs accumulate across multiple addFailedSfuId calls` so the function name (the test identifier) matches the assertions around call.invokeAddFailedSfuId and call.getFailedSfuIds.stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/retry/StreamRetryPolicy.kt (1)
47-67: Consider:exponentialimplements additive growth, not true exponential backoff.The formula
prev + retry * backoffStepMillisproduces quadratic growth (e.g., 250→750→1500→2500), not exponential (e.g., 250→500→1000→2000). If true exponential is intended:nextBackOffDelayFunction = { _, prev -> (prev * 2).coerceAtMost(maxBackoffMillis) }If the current behavior is intentional for compatibility with
stream-core-android, a KDoc note clarifying the growth pattern would help future maintainers.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/retry/StreamRetryPolicy.kt` around lines 47 - 67, The current exponential(...) factory in StreamRetryPolicy uses an additive formula (prev + retry * backoffStepMillis) which yields quadratic growth; replace nextBackOffDelayFunction with a true exponential step: compute a base = if (prev <= 0) backoffStepMillis else prev, then next = (base * 2).coerceAtMost(maxBackoffMillis) and ensure it is coerced into the [backoffStepMillis, maxBackoffMillis] range; update the nextBackOffDelayFunction in the StreamRetryPolicy.exponential(...) call accordingly so retries grow exponentially (and keep giveUpFunction and other params unchanged).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/rtc/SfuConnectionRetryTest.kt`:
- Around line 336-357: Rename the test function to reflect that
DisconnectedPermanently will trigger a reconnect regardless of network state;
specifically update the test named `DisconnectedPermanently does not trigger
reconnect when network is down` to something like `DisconnectedPermanently
triggers reconnect even when network is down`, keeping the existing setup that
mocks `mockNetworkStateProvider.isConnected()` to return false, creates the RTC
session via `createRtcSession()`, emits
`SfuSocketState.Disconnected.DisconnectedPermanently` into `socketStateFlow`,
and verifies `mockCall.reconnect` is invoked with
`WebsocketReconnectStrategy.WEBSOCKET_RECONNECT_STRATEGY_REJOIN`.
---
Outside diff comments:
In
`@stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/connection/Subscriber.kt`:
- Around line 333-346: The onError handler in restartIce() currently cancels
retries unconditionally; change it to inspect the error (the exception passed as
it) from wrapAPICall and only call
restartIceJobDelegate.cancelScheduledRestartIce() for terminal/non-retryable
signaling errors (e.g., check for PARTICIPANT_NOT_FOUND, a non-retryable error
code or a specific exception type/message), while leaving retries intact for
transient failures; keep tracer.trace("iceRestart-error", ...) but wrap the
cancel call in a conditional that detects those terminal error cases (refer to
restartIce(), wrapAPICall, sfuClient.iceRestart, tracer.trace and
restartIceJobDelegate.cancelScheduledRestartIce()).
---
Nitpick comments:
In
`@stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/retry/StreamRetryPolicy.kt`:
- Around line 47-67: The current exponential(...) factory in StreamRetryPolicy
uses an additive formula (prev + retry * backoffStepMillis) which yields
quadratic growth; replace nextBackOffDelayFunction with a true exponential step:
compute a base = if (prev <= 0) backoffStepMillis else prev, then next = (base *
2).coerceAtMost(maxBackoffMillis) and ensure it is coerced into the
[backoffStepMillis, maxBackoffMillis] range; update the nextBackOffDelayFunction
in the StreamRetryPolicy.exponential(...) call accordingly so retries grow
exponentially (and keep giveUpFunction and other params unchanged).
In
`@stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/reconnect/FailedSfuIdsTest.kt`:
- Around line 129-139: Rename the test function `failed SFU IDs accumulate
across multiple migrate calls` to reflect its current behavior: it verifies
accumulation after multiple add operations rather than multiple migrate() calls;
update the backtick name to something like `failed SFU IDs accumulate when
adding multiple failed IDs` or `failed SFU IDs accumulate across multiple
addFailedSfuId calls` so the function name (the test identifier) matches the
assertions around call.invokeAddFailedSfuId and call.getFailedSfuIds.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 77038151-b2b5-42b0-bd92-ece63b714c5e
📒 Files selected for processing (28)
stream-video-android-core/api/stream-video-android-core.apistream-video-android-core/build.gradle.ktsstream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.ktstream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.ktstream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/RtcSession.ktstream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/connection/Publisher.ktstream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/connection/Subscriber.ktstream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/utils/RetryableSignalingServiceDecorator.ktstream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/utils/SignalLostSignalingServiceDecorator.ktstream-video-android-core/src/main/kotlin/io/getstream/video/android/core/internal/module/SfuConnectionModule.ktstream-video-android-core/src/main/kotlin/io/getstream/video/android/core/internal/network/NetworkStateProvider.ktstream-video-android-core/src/main/kotlin/io/getstream/video/android/core/retry/StreamRetryAttemptInfo.ktstream-video-android-core/src/main/kotlin/io/getstream/video/android/core/retry/StreamRetryPolicy.ktstream-video-android-core/src/main/kotlin/io/getstream/video/android/core/retry/StreamRetryProcessor.ktstream-video-android-core/src/main/kotlin/io/getstream/video/android/core/socket/common/HealthMonitor.ktstream-video-android-core/src/main/kotlin/io/getstream/video/android/core/socket/sfu/SfuSocket.ktstream-video-android-core/src/main/kotlin/io/getstream/video/android/core/socket/sfu/SfuSocketStateService.ktstream-video-android-core/src/main/kotlin/io/getstream/video/android/core/trace/SignalingServiceTracerDecorator.ktstream-video-android-core/src/test/kotlin/io/getstream/video/android/core/call/utils/RetryableSignalingServiceDecoratorTest.ktstream-video-android-core/src/test/kotlin/io/getstream/video/android/core/reconnect/FailedSfuIdsTest.ktstream-video-android-core/src/test/kotlin/io/getstream/video/android/core/reconnect/ReconnectAttemptsCountTest.ktstream-video-android-core/src/test/kotlin/io/getstream/video/android/core/retry/StreamRetryPolicyTest.ktstream-video-android-core/src/test/kotlin/io/getstream/video/android/core/retry/StreamRetryProcessorTest.ktstream-video-android-core/src/test/kotlin/io/getstream/video/android/core/rtc/JoinCallTest.ktstream-video-android-core/src/test/kotlin/io/getstream/video/android/core/rtc/ReconnectDetailsTest.ktstream-video-android-core/src/test/kotlin/io/getstream/video/android/core/rtc/ReconnectEscalationTest.ktstream-video-android-core/src/test/kotlin/io/getstream/video/android/core/rtc/SfuConnectionRetryTest.ktstream-video-android-core/src/test/kotlin/io/getstream/video/android/core/trace/SignalingServiceTracerDecoratorKtTest.kt
💤 Files with no reviewable changes (1)
- stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/utils/SignalLostSignalingServiceDecorator.kt
|
@CodeRabbit review specifically for unhandled exceptions and coroutine misuse in this PR |
|
@coderabbitai review specifically for unhandled exceptions and coroutine misuse in this PR |
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chain🏁 Script executed: Length of output: 68609 🏁 Script executed: Length of output: 41507 🏁 Script executed: Length of output: 319 🏁 Script executed: Length of output: 8593 🏁 Script executed: Length of output: 22232
🔴 Critical1.
|
| # | File | Severity | Issue |
|---|---|---|---|
| 1 | Call.kt ~L895 |
🔴 Critical | catch (Exception) swallows CancellationException in reconnect loop |
| 2 | Call.kt ~L987, ~L1045 |
🔴 Critical | session.value!! force-unwrap TOCTOU race in reconnectRejoin/reconnectMigrate |
| 3 | RtcSession.kt |
🟠 Significant | launch { reconnect() } with no CoroutineExceptionHandler |
| 4 | NetworkStateProvider.kt |
🟠 Significant | Listener loop in scope.launch{} with no error handler |
| 5 | SfuSocket.kt |
🟠 Significant | Flow .onEach{}.launchIn() missing .catch{} |
| 6 | Call.kt ~L386 |
🟡 Minor | leaveTimeoutAfterDisconnect launch has no error handler |
| 7 | Call.kt ~L1928–52 |
🟡 Minor | Debug launches with no error handler |
| 8 | Call.kt |
🟡 Minor | throw Exception(...) loses typed error info |
| 9 | RetryableSignalingServiceDecorator.kt |
🟡 Minor | onNetworkFailure throw can suppress original exception |
|
Oops, something went wrong! Please try again later. 🐰 💔 |
|
@coderabbitaireview in this PR. Can you review for the following
|
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chain🏁 Script executed: Length of output: 17347 🏁 Script executed: Length of output: 13538
1.
|
| Usage type | Examples |
|---|---|
| Upward method calls | call.reconnect(...), call.rejoin(...), call.onSfuConnectionEstablished() |
| State mutation | call.state._connection.value = RealtimeConnection.Connected (Line ~668, 680) |
| Resource access | call.peerConnectionFactory, call.mediaManager, call.clientCapabilities |
| Scope borrowing | call.scope.launch { call.reconnect(...) } |
This creates a circular dependency: Call → RtcSession → Call. The direction of data and command flow is ambiguous — both objects own each other.
Suggested fix: Define a narrow upward-communication interface and pass only that to RtcSession:
// New interface — only what RtcSession needs to tell Call
internal interface RtcSessionDelegate {
val connectionState: StateFlow<RealtimeConnection>
val ownCapabilities: StateFlow<List<OwnCapability>>
suspend fun onSfuConnected()
suspend fun reconnect(strategy: WebsocketReconnectStrategy, reason: String)
fun getOrCreateParticipant(sessionId: String): ParticipantState
// ...
}
// Call implements it:
internal class Call : RtcSessionDelegate { ... }
// RtcSession receives only the interface:
internal class RtcSession(
private val delegate: RtcSessionDelegate, // ← narrow contract, not the full Call
...
)State mutations like call.state._connection.value = ... should move entirely into Call, triggered by callbacks/events raised by RtcSession:
// RtcSession signals upward:
delegate.onSfuConnected()
// Call handles the state mutation in its own layer:
override suspend fun onSfuConnected() {
state._connection.value = RealtimeConnection.Connected
}2. SfuConnectionModule lambdas capture call — implicit back-reference
File: RtcSession.kt Lines ~254–269
private val sfuConnectionModuleProvider: () -> SfuConnectionModule = {
SfuConnectionModule(
...
onSfuApiError = { error ->
if (call.state.connection.value is RealtimeConnection.Disconnected) return@SfuConnectionModule
call.scope.launch {
call.reconnect(strategy, "SfuApiError:...") // ← call captured in lambda
}
},
onSfuNetworkFailure = { throwable ->
if (call.state.connection.value is RealtimeConnection.Disconnected) return@SfuConnectionModule
call.scope.launch {
call.reconnect(...) // ← call captured in lambda
}
},
)
}SfuConnectionModule should be completely unaware of Call. The lambdas passed to it are closures over call, making the module indirectly depend on the top-level Call. This also means the module holds a strong reference to Call for its lifetime.
Suggested fix: Keep the lambda abstraction but resolve call only at the RtcSession→delegate boundary:
onSfuApiError = { error ->
if (delegate.connectionState.value is RealtimeConnection.Disconnected) return@SfuConnectionModule
coroutineScope.launch {
delegate.reconnect(strategy, "SfuApiError:...") // ← delegate, not call
}
},3. Call.Debug is a public nested class holding val call: Call
File: Call.kt Line 1917
public class Debug(val call: Call) { // ← val call is public
fun rejoin(reason: String = "Debug") {
call.scope.launch { call.rejoin(reason) }
}
fun migrate() {
call.scope.launch { call.migrate() }
}
fun fastReconnect(reason: String = "Debug") {
call.scope.launch { call.fastReconnect(reason) }
}
// ...
}val callispublic, so any consumer that holds aDebuginstance can reach the entireCallobject — including internal state, session, media tracks, etc.- The nested class is tightly coupled to the outer class through
call.scope.launch {}, which is the outer class's lifecycle object. - If a
Debuginstance is retained anywhere (e.g. a test framework, a UI debug panel), it prevents garbage collection ofCall.
Suggested fix: Replace the Call reference with typed callback parameters:
public class Debug(
private val onRejoin: (reason: String) -> Unit,
private val onMigrate: () -> Unit,
private val onFastReconnect: (reason: String) -> Unit,
private val onDisableSubscriber: () -> Unit,
private val onEnableSubscriber: () -> Unit,
private val onRestartSubscriberIce: () -> Unit,
private val onRestartPublisherIce: () -> Unit,
private val onSimulateSfuFull: () -> Unit,
) {
fun rejoin(reason: String = "Debug") = onRejoin(reason)
fun migrate() = onMigrate()
fun fastReconnect(reason: String = "Debug") = onFastReconnect(reason)
}And build it at the Call layer:
val debug = Debug(
onRejoin = { reason -> scope.launch { rejoin(reason) } },
onMigrate = { scope.launch { migrate() } },
onFastReconnect = { reason -> scope.launch { fastReconnect(reason) } },
// ...
)This way Debug holds no knowledge of Call at all.
4. RtcSession directly mutates Call's internal state — ownership violation
File: RtcSession.kt Lines ~668, 680
call.state._connection.value = RealtimeConnection.Connected // inside RtcSession
call.state._connection.value = RealtimeConnection.Reconnecting_connection is a MutableStateFlow that belongs to Call. Mutating it from RtcSession means two separate classes share ownership of the same state. The state diagram becomes impossible to reason about — either class can change the connection state at any time.
Suggested fix: RtcSession should emit an event/callback; Call alone should translate that into a state update:
// In RtcSession — signal upward:
delegate.onSfuConnected()
// In Call — owns the mutation:
override suspend fun onSfuConnected() {
state._connection.value = RealtimeConnection.Connected
}Dependency direction summary
Current (problematic):
Call ──creates──▶ RtcSession
▲ │
└────call.*──────────┘ ← circular
Target (unidirectional):
Call ──creates──▶ RtcSession
│ │
│ implements │ calls via
▼ ▼
RtcSessionDelegate (interface)
🧠 Learnings used
Learnt from: aleksandar-apostolov
Repo: GetStream/stream-video-android PR: 1587
File: stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt:795-798
Timestamp: 2025-12-19T14:10:16.577Z
Learning: In the `rejoin()` function in `stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt`, the design invariant is that `rejoin()` should only be called when there is an active session (`this.session != null`). Therefore, using `this.session!!` is safe and intentional.
Learnt from: CR
Repo: GetStream/stream-video-android PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-19T09:15:37.269Z
Learning: Applies to **/*.{kt,kts} : Ensure cleanup/teardown paths handle cancellation and failure (important for sockets, queues, retries)
Learnt from: rahul-lohra
Repo: GetStream/stream-video-android PR: 1614
File: stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/CallBusyHandler.kt:40-44
Timestamp: 2026-02-18T12:33:02.111Z
Learning: In the `CallBusyHandler` class in `stream-video-android-core`, fire-and-forget rejection calls (using `streamVideo.scope.launch` to call `reject(RejectReason.Busy)`) without explicit error handling are acceptable for the busy call rejection feature.
Learnt from: CR
Repo: GetStream/stream-video-android PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-19T09:15:37.269Z
Learning: Applies to **/*.{kt,kts} : Keep concurrency deterministic—use structured coroutines and avoid global scope
Learnt from: CR
Repo: GetStream/stream-video-android PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-19T09:15:37.269Z
Learning: Applies to **/*.{kt,kts} : Keep critical RTC paths off the main thread; prefer coroutines with structured scopes
rahul-lohra
left a comment
There was a problem hiding this comment.
Please see all above comments
Replace the fragmented wrapAPICall / safeCallWithResult helpers with a single sfuCall wrapper that properly rethrows CancellationException. Merge onTerminalError and onNetworkFailure into one onSessionError callback so only session-fatal SFU errors (SIGNAL_LOST, PARTICIPANT_NOT_FOUND, etc.) trigger reconnection — regular API failures are returned to callers without side-effects. Add ICE connection state monitoring in RtcSession so the UI surfaces RealtimeConnection.Reconnecting when publisher or subscriber ICE degrades, and restores Connected when both recover. Made-with: Cursor
Starting the ICE monitoring job during RtcSession construction caused it to collect from a mock Subscriber's iceState in tests, blocking the StandardTestDispatcher's event loop and preventing the stateJob from processing socket state changes. Move startIceMonitoring() to the SfuSocketState.Connected handler so it only runs once the real SFU connection is established. Add an idempotency guard to avoid duplicate monitoring jobs on reconnect. Fixes 9 failing tests in SfuConnectionRetryTest. Made-with: Cursor
…nto fix/cleanup-old-rtc-session-on-migration
…w, improve docs
- Rename `internalConnect` → `connectInternal` for better IDE discoverability
- Rename `sfuCall` → `safeApiCall` to reflect its generic nature
- Add `ReconnectOutcome.Disconnect` to unify the DISCONNECT strategy
handling into the single `when(outcome)` decision flow, removing the
early-return short-circuit and the misleading `error("Handled above")`
- Improve KDoc on `SfuRetryableException` explaining its role as a
retry-signal mechanism for `StreamRetryProcessor`
- Remove default `{ true }` from `HealthMonitor.isNetworkAvailable`,
requiring callers to pass it explicitly
- Add `isNetworkAvailable` to `CoordinatorSocket` health monitor
- Enhance `NetworkStateProvider` debug logs with network/capabilities info
Made-with: Cursor
…nto fix/cleanup-old-rtc-session-on-migration
- revert onLost behaviour - remove condition (loopIteration >= MAX_FAST_RECONNECT_ATTEMPTS) while escalating to REJOIN - Removed unwanted logging
…ocket disconnect - Move network availability check to the top of the reconnect while-loop so no other logic runs when offline; polls without consuming attempt budget - Remove redundant network guard from DisconnectedTemporarily handler - connectInternal now observes Disconnected states immediately instead of waiting for the full 10s timeout, enabling faster retry cycles Made-with: Cursor
…when loop to improve re - Keep early exit checks in the starting of the while loop
…nto fix/cleanup-old-rtc-session-on-migration
…igration' into fix/cleanup-old-rtc-session-on-migration
|
|
🚀 Available in v1.23.0 |


Goal
To Unify the reconnect pipeline and harden network transition handling
Summary
fastReconnect()/rejoin()/migrate()call sites with a singleCall.reconnect(strategy, reason)entry point that owns all retry logic and strategy escalation (FAST → REJOIN, MIGRATE → REJOIN)cancelActiveWork()to prevent stale background jobs from triggering unwanted reconnects on old sessionsRetryableSignalingServiceDecoratorwithStreamRetryPolicy/StreamRetryProcessorfor robust SFU API call retries with proper error propagationImplementation
Unified reconnect loop (
Call.kt)Previously,
fastReconnect(),rejoin(), andmigrate()were independent methods scheduled throughStreamSingleFlightProcessorImpl. Multiple concurrent triggers (stateJob, NetworkStateListener, SfuSocket errors, HealthMonitor) could race and queue conflicting reconnect paths.Now all callers funnel through
reconnect(strategy, reason)which:MutexviatryLock()— concurrent triggers are dropped, not queuedPeerConnectionNotUsableException(stale WebRTC peer connections) as an escalation triggerConnected,ReconnectingFailed,Disconnected, or max attemptsNetwork transition reliability (
NetworkStateProvider,Call.kt)onLostnow checks actual connectivity before signaling disconnect — prevents false events during transport handoffs (OS releases cellular after WiFi is already active)onDisconnectedguards against leaving when the connection has already recoveredRtcSession cleanup (
RtcSession.kt)cancelActiveWork()centralizes cancellation of stateJob, eventJob, muteStateSyncJob, participantsMonitoringJob, and serialProcessor — every teardown path (migration, reconnect, rejoin, leave) goes through itDisconnectedTemporarily→call.reconnect(strategy),DisconnectedPermanently→call.reconnect(REJOIN)— no more inline escalation logic or retry countersfastReconnectuseswithTimeout+state().first { Connected }instead of callback-basedwhenConnected; throwsPeerConnectionNotUsableExceptionon stale peer connections instead of calling rejoin directlyenterMigration()→ new session connect →finalizeMigration()— old peer connections stay alive during transitionSFU API retry layer (
RetryableSignalingServiceDecorator)SignalLostSignalingServiceDecoratorwith a decorator that retries transient SFU API failures viaStreamRetryProcessoronSfuApiError(maps to reconnect strategy), network failures →onSfuNetworkFailure(triggers FAST reconnect)StreamRetryPolicyandStreamRetryProcessorprovide configurable linear/exponential backoffSocket layer hardening (
HealthMonitor,SfuSocket,SfuSocketStateService)HealthMonitoracceptsisNetworkAvailablelambda — skips reconnect when network is downSfuSocketStateService:NetworkDisconnectedstays parked on socket errors (no futile retry loops); handlesNetworkAvailableevent for recoverySfuSocket: cancels stalesocketListenerJobon reconnect; cleans up old WebSocket before creating new oneTesting
ReconnectEscalationTest— verifies FAST → REJOIN escalation,PeerConnectionNotUsableExceptionhandling, and mutex non-deadlockSfuConnectionRetryTest— stateJob forwarding, network-aware guards,DisconnectedPermanentlyescalationReconnectAttemptsCountTest— reconnect attempt counter increments correctly through unified loopReconnectDetailsTest— reconnect details populated correctly for FAST/REJOIN/MIGRATERetryableSignalingServiceDecoratorTest— retry behavior, terminal error propagation, network failure callbacksStreamRetryPolicyTest,StreamRetryProcessorTest— policy configuration and processor retry semanticsSignalingServiceTracerDecoratorKtTest— tracing decorator integration☑️Contributor Checklist
General
developbranchCode & documentation
stream-video-examples)☑️Reviewer Checklist
🎉 GIF
Please provide a suitable gif that describes your work on this pull request
Summary by CodeRabbit
New Features
ReconnectingFailedconnection state to indicate when reconnection attempts are exhausted.Bug Fixes
Tests