diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamDebouncer.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamDebouncer.kt new file mode 100644 index 0000000..5bb0b7b --- /dev/null +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamDebouncer.kt @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2014-2025 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.android.core.api.processing + +import io.getstream.android.core.annotations.StreamInternalApi +import io.getstream.android.core.api.log.StreamLogger +import io.getstream.android.core.internal.processing.StreamDebouncerImpl +import kotlinx.coroutines.CoroutineScope + +/** + * Debounces a bursty stream of values so that only the **last** value is delivered after a quiet + * period. + * + * Each call to [submit] replaces the pending value and resets the delay timer. When the timer + * expires without a new submission, the registered [onValue] handler is invoked with the most + * recent value. This is useful for coalescing rapid state changes (network flaps, lifecycle + * transitions) into a single settled action. + * + * ### Semantics + * - **Last-write-wins:** Only the most recently submitted value is delivered. + * - **Timer reset:** Each [submit] cancels any pending delivery and restarts the delay. + * - **Thread-safety:** All functions are safe to call from multiple coroutines. + * - **Cancellation:** [cancel] discards the pending value and stops the timer without delivery. + */ +@StreamInternalApi +public interface StreamDebouncer { + /** + * Registers the handler invoked when the debounce window elapses. + * + * @param callback Suspend function called with the settled value. + */ + public fun onValue(callback: suspend (T) -> Unit) + + /** + * Submits a new value, cancelling any pending delivery and restarting the delay timer. + * + * @param value The value to deliver after the debounce window. + */ + public fun submit(value: T) + + /** Cancels any pending delivery without invoking the handler. */ + public fun cancel() +} + +/** + * Creates a new [StreamDebouncer] instance. + * + * @param T The type of value being debounced. + * @param scope Coroutine scope for launching the delayed delivery job. + * @param logger Logger for diagnostics. + * @param delayMs Time in milliseconds to wait after the last [StreamDebouncer.submit] before + * delivering the value. Defaults to 300ms. + * @return A new [StreamDebouncer] instance. + */ +@StreamInternalApi +public fun StreamDebouncer( + scope: CoroutineScope, + logger: StreamLogger, + delayMs: Long = 300L, +): StreamDebouncer = StreamDebouncerImpl(scope = scope, logger = logger, delayMs = delayMs) diff --git a/stream-android-core/src/main/java/io/getstream/android/core/internal/client/StreamClientImpl.kt b/stream-android-core/src/main/java/io/getstream/android/core/internal/client/StreamClientImpl.kt index db9633d..999ed05 100644 --- a/stream-android-core/src/main/java/io/getstream/android/core/internal/client/StreamClientImpl.kt +++ b/stream-android-core/src/main/java/io/getstream/android/core/internal/client/StreamClientImpl.kt @@ -27,6 +27,7 @@ import io.getstream.android.core.api.model.connection.lifecycle.StreamLifecycleS import io.getstream.android.core.api.model.connection.network.StreamNetworkState import io.getstream.android.core.api.model.connection.recovery.Recovery import io.getstream.android.core.api.model.value.StreamToken +import io.getstream.android.core.api.processing.StreamDebouncer import io.getstream.android.core.api.processing.StreamSerialProcessingQueue import io.getstream.android.core.api.processing.StreamSingleFlightProcessor import io.getstream.android.core.api.recovery.StreamConnectionRecoveryEvaluator @@ -46,7 +47,6 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.asStateFlow -import kotlinx.coroutines.launch internal class StreamClientImpl( private val user: StreamUser, @@ -69,6 +69,8 @@ internal class StreamClientImpl( private var socketSessionHandle: StreamSubscription? = null private var networkAndLifecycleMonitorHandle: StreamSubscription? = null + private val recoveryDebouncer: StreamDebouncer> = + StreamDebouncer(scope = scope, logger = logger, delayMs = 500L) override val connectionState: StateFlow get() = mutableConnectionState.asStateFlow() @@ -111,22 +113,23 @@ internal class StreamClientImpl( if (networkAndLifecycleMonitorHandle == null) { logger.v { "[connect] Setup network and lifecycle monitor callback" } + recoveryDebouncer.onValue { (networkState, lifecycleState) -> + val connectionState = mutableConnectionState.value + val recovery = + connectionRecoveryEvaluator.evaluate( + connectionState, + lifecycleState, + networkState, + ) + recoveryEffect(recovery) + } val networkAndLifecycleMonitorListener = object : StreamNetworkAndLifecycleMonitorListener { override fun onNetworkAndLifecycleState( networkState: StreamNetworkState, lifecycleState: StreamLifecycleState, ) { - scope.launch { - val connectionState = mutableConnectionState.value - val recovery = - connectionRecoveryEvaluator.evaluate( - connectionState, - lifecycleState, - networkState, - ) - recoveryEffect(recovery) - } + recoveryDebouncer.submit(networkState to lifecycleState) } } networkAndLifecycleMonitorHandle = @@ -159,6 +162,7 @@ internal class StreamClientImpl( override suspend fun disconnect(): Result = singleFlight.run(disconnectKey) { logger.d { "Disconnecting from socket" } + recoveryDebouncer.cancel() mutableConnectionState.update(StreamConnectionState.Disconnected()) connectionIdHolder.clear() socketSession.disconnect() diff --git a/stream-android-core/src/main/java/io/getstream/android/core/internal/processing/StreamDebouncerImpl.kt b/stream-android-core/src/main/java/io/getstream/android/core/internal/processing/StreamDebouncerImpl.kt new file mode 100644 index 0000000..2b6fb35 --- /dev/null +++ b/stream-android-core/src/main/java/io/getstream/android/core/internal/processing/StreamDebouncerImpl.kt @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2014-2025 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.android.core.internal.processing + +import io.getstream.android.core.api.log.StreamLogger +import io.getstream.android.core.api.processing.StreamDebouncer +import java.util.concurrent.atomic.AtomicReference +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch + +internal class StreamDebouncerImpl( + private val scope: CoroutineScope, + private val logger: StreamLogger, + private val delayMs: Long, +) : StreamDebouncer { + private val pendingJob = AtomicReference(null) + private val pendingValue = AtomicReference(null) + private var callback: suspend (T) -> Unit = {} + + override fun onValue(callback: suspend (T) -> Unit) { + this.callback = callback + } + + override fun submit(value: T) { + pendingValue.set(value) + val oldJob = pendingJob.get() + val newJob = + scope.launch { + delay(delayMs) + val settled = pendingValue.getAndSet(null) ?: return@launch + logger.v { "[debounce] Delivering settled value: $settled" } + callback(settled) + } + if (!pendingJob.compareAndSet(oldJob, newJob)) { + newJob.cancel() + pendingValue.get()?.let { submit(it) } + return + } + oldJob?.cancel() + } + + override fun cancel() { + pendingJob.getAndSet(null)?.cancel() + pendingValue.set(null) + } +} diff --git a/stream-android-core/src/main/java/io/getstream/android/core/internal/socket/StreamSocketSession.kt b/stream-android-core/src/main/java/io/getstream/android/core/internal/socket/StreamSocketSession.kt index 611bb15..1a68bb0 100644 --- a/stream-android-core/src/main/java/io/getstream/android/core/internal/socket/StreamSocketSession.kt +++ b/stream-android-core/src/main/java/io/getstream/android/core/internal/socket/StreamSocketSession.kt @@ -182,6 +182,7 @@ internal class StreamSocketSession( suspend fun connect(data: ConnectUserData): Result = suspendCancellableCoroutine { continuation -> var handshakeSubscription: StreamSubscription? = null + val pendingMessages = mutableListOf() // Ensure we clean up if the caller cancels the connect coroutine continuation.invokeOnCancellation { cause -> @@ -282,8 +283,38 @@ internal class StreamSocketSession( } // Success/Failure continuations - val success: (StreamConnectedUser, String) -> Unit = { user, connectionId -> + val success: (StreamConnectedUser, String) -> Unit = success@{ user, connectionId -> handshakeSubscription?.cancel() + + // Subscribe eventListener for steady-state operation + val eventSubscription = + internalSocket.subscribe(eventListener).getOrElse { err -> + logger.e(err) { "[success] Failed to subscribe event listener" } + disconnect(err) + if (continuation.isActive) { + continuation.resume(Result.failure(err)) + } + return@success + } + socketSubscription = eventSubscription + + // Replay messages buffered during handshake + for (message in pendingMessages) { + if (!batcher.offer(message)) { + val err = + IllegalStateException( + "Failed to replay buffered message during handshake transition" + ) + logger.e(err) { "[success] Buffered message replay failed" } + disconnect(err) + if (continuation.isActive) { + continuation.resume(Result.failure(err)) + } + return@success + } + } + pendingMessages.clear() + if (continuation.isActive) { healthMonitor.start() val connected = StreamConnectionState.Connected(user, connectionId) @@ -310,15 +341,9 @@ internal class StreamSocketSession( // Notify listeners: Opening (block until dispatched) notifyState(StreamConnectionState.Connecting.Opening(data.userId)) - // Subscribe for socket events - val socketSubRes = internalSocket.subscribe(eventListener) - if (socketSubRes.isFailure) { - failure(socketSubRes.exceptionOrNull()!!) - return@suspendCancellableCoroutine - } - socketSubscription = socketSubRes.getOrNull() - - // Temporary listener to handle the initial open/auth handshake + // Only the handshake listener is subscribed initially. + // eventListener subscribes after handshake succeeds (in the success callback) + // to avoid dual-listener processing of the same message. val connectListener = object : StreamWebSocketListener { override fun onOpen(response: Response) { @@ -368,16 +393,16 @@ internal class StreamSocketSession( override fun onMessage(text: String) { logger.d { "[onMessage] Socket message (string): $text" } + healthMonitor.acknowledgeHeartbeat() eventParser .deserialize(text) - .map { it.core } - .map { authResponse -> - when (authResponse) { + .onSuccess { event -> + when (val coreEvent = event.core) { is StreamClientConnectedEvent -> { logger.v { - "[onMessage] Handling connected event: $authResponse" + "[onMessage] Handling connected event: $coreEvent" } - val me = authResponse.me + val me = coreEvent.me val connectedUser = StreamConnectedUser( me.createdAt, @@ -394,19 +419,26 @@ internal class StreamSocketSession( me.lastActive, me.name, ) - streamClientConnectedEvent = authResponse - success(connectedUser, authResponse.connectionId) + streamClientConnectedEvent = coreEvent + success(connectedUser, coreEvent.connectionId) } is StreamClientConnectionErrorEvent -> { logger.e { - "[onMessage] Socket connection recoverable error: $authResponse" + "[onMessage] Socket connection recoverable error: $coreEvent" + } + apiFailure(coreEvent.error) + } + + else -> { + logger.v { + "[onMessage] Buffering non-auth message during handshake" } - apiFailure(authResponse.error) + pendingMessages.add(text) } } } - .recover { + .onFailure { logger.e(it) { "[onMessage] Failed to deserialize socket message. ${it.message}" } @@ -452,6 +484,7 @@ internal class StreamSocketSession( return@suspendCancellableCoroutine } handshakeSubscription = hsRes.getOrNull() + socketSubscription = handshakeSubscription // Open socket val openRes = internalSocket.open(config) diff --git a/stream-android-core/src/main/java/io/getstream/android/core/internal/socket/monitor/StreamHealthMonitorImpl.kt b/stream-android-core/src/main/java/io/getstream/android/core/internal/socket/monitor/StreamHealthMonitorImpl.kt index 3d7d98c..1c8aa15 100644 --- a/stream-android-core/src/main/java/io/getstream/android/core/internal/socket/monitor/StreamHealthMonitorImpl.kt +++ b/stream-android-core/src/main/java/io/getstream/android/core/internal/socket/monitor/StreamHealthMonitorImpl.kt @@ -18,6 +18,8 @@ package io.getstream.android.core.internal.socket.monitor import io.getstream.android.core.api.log.StreamLogger import io.getstream.android.core.api.socket.monitor.StreamHealthMonitor +import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.atomic.AtomicReference import kotlin.time.Clock import kotlin.time.ExperimentalTime import kotlinx.coroutines.CoroutineScope @@ -48,8 +50,8 @@ internal class StreamHealthMonitorImpl( const val ALIVE_THRESHOLD = 60_000L } - private var monitorJob: Job? = null - private var lastAck: Long = clock.now().toEpochMilliseconds() + private val monitorJob = AtomicReference(null) + private val lastAck = AtomicLong(clock.now().toEpochMilliseconds()) // callbacks default to no-op private var onIntervalCallback: suspend () -> Unit = {} @@ -64,23 +66,24 @@ internal class StreamHealthMonitorImpl( } override fun acknowledgeHeartbeat() { - lastAck = clock.now().toEpochMilliseconds() + lastAck.set(clock.now().toEpochMilliseconds()) } - /** Starts (or restarts) the periodic health-check loop */ + /** Starts (or restarts) the periodic health-check loop. */ override fun start() = runCatching { - logger.d { "[start] Staring health monitor" } - if (monitorJob?.isActive == true) { + logger.d { "[start] Starting health monitor" } + val current = monitorJob.get() + if (current?.isActive == true) { logger.d { "Health monitor already running" } return@runCatching } - monitorJob = + val newJob = scope.launch { while (isActive) { delay(interval) val now = clock.now().toEpochMilliseconds() - if (now - lastAck >= livenessThreshold) { + if (now - lastAck.get() >= livenessThreshold) { logger.d { "Liveness threshold reached" } onLivenessThresholdCallback() } else { @@ -89,12 +92,15 @@ internal class StreamHealthMonitorImpl( } } } + if (!monitorJob.compareAndSet(current, newJob)) { + newJob.cancel() + } } - /** Stops the health-check loop */ + /** Stops the health-check loop. */ override fun stop() = runCatching { - logger.d { "[stop] Stopping heath monitor" } - monitorJob?.cancel() + logger.d { "[stop] Stopping health monitor" } + monitorJob.getAndSet(null)?.cancel() Unit } } diff --git a/stream-android-core/src/test/java/io/getstream/android/core/internal/processing/StreamDebouncerImplTest.kt b/stream-android-core/src/test/java/io/getstream/android/core/internal/processing/StreamDebouncerImplTest.kt new file mode 100644 index 0000000..e4cfcb2 --- /dev/null +++ b/stream-android-core/src/test/java/io/getstream/android/core/internal/processing/StreamDebouncerImplTest.kt @@ -0,0 +1,210 @@ +/* + * Copyright (c) 2014-2025 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@file:OptIn(kotlinx.coroutines.ExperimentalCoroutinesApi::class) + +package io.getstream.android.core.internal.processing + +import io.getstream.android.core.api.processing.StreamDebouncer +import io.mockk.mockk +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.test.advanceTimeBy +import kotlinx.coroutines.test.advanceUntilIdle +import kotlinx.coroutines.test.runTest +import org.junit.Assert.assertEquals +import org.junit.Assert.assertTrue +import org.junit.Test + +class StreamDebouncerImplTest { + + private fun debouncer(delayMs: Long = 200L): StreamDebouncer = + StreamDebouncerImpl( + scope = + kotlinx.coroutines.CoroutineScope( + kotlinx.coroutines.SupervisorJob() + + kotlinx.coroutines.test.StandardTestDispatcher( + kotlinx.coroutines.test.TestCoroutineScheduler() + ) + ), + logger = mockk(relaxed = true), + delayMs = delayMs, + ) + + @Test + fun `delivers value after delay elapses`() = runTest { + val delivered = mutableListOf() + val debouncer = + StreamDebouncerImpl( + scope = backgroundScope, + logger = mockk(relaxed = true), + delayMs = 100, + ) + debouncer.onValue { delivered.add(it) } + + debouncer.submit("hello") + advanceTimeBy(50) + assertTrue(delivered.isEmpty()) + + advanceTimeBy(51) + advanceUntilIdle() + assertEquals(listOf("hello"), delivered) + } + + @Test + fun `last value wins when multiple submitted within window`() = runTest { + val delivered = mutableListOf() + val debouncer = + StreamDebouncerImpl( + scope = backgroundScope, + logger = mockk(relaxed = true), + delayMs = 100, + ) + debouncer.onValue { delivered.add(it) } + + debouncer.submit("first") + advanceTimeBy(50) + debouncer.submit("second") + advanceTimeBy(50) + debouncer.submit("third") + + advanceTimeBy(101) + advanceUntilIdle() + + assertEquals(listOf("third"), delivered) + } + + @Test + fun `timer resets on each submit`() = runTest { + val delivered = mutableListOf() + val debouncer = + StreamDebouncerImpl( + scope = backgroundScope, + logger = mockk(relaxed = true), + delayMs = 100, + ) + debouncer.onValue { delivered.add(it) } + + debouncer.submit("a") + advanceTimeBy(80) + assertTrue(delivered.isEmpty()) + + debouncer.submit("b") + advanceTimeBy(80) + assertTrue(delivered.isEmpty()) + + advanceTimeBy(21) + advanceUntilIdle() + assertEquals(listOf("b"), delivered) + } + + @Test + fun `cancel prevents delivery`() = runTest { + val delivered = mutableListOf() + val debouncer = + StreamDebouncerImpl( + scope = backgroundScope, + logger = mockk(relaxed = true), + delayMs = 100, + ) + debouncer.onValue { delivered.add(it) } + + debouncer.submit("value") + advanceTimeBy(50) + debouncer.cancel() + + advanceTimeBy(200) + advanceUntilIdle() + assertTrue(delivered.isEmpty()) + } + + @Test + fun `submit after cancel works normally`() = runTest { + val delivered = mutableListOf() + val debouncer = + StreamDebouncerImpl( + scope = backgroundScope, + logger = mockk(relaxed = true), + delayMs = 100, + ) + debouncer.onValue { delivered.add(it) } + + debouncer.submit("cancelled") + debouncer.cancel() + debouncer.submit("delivered") + + advanceTimeBy(101) + advanceUntilIdle() + assertEquals(listOf("delivered"), delivered) + } + + @Test + fun `delivers each settled value separately when gaps exceed delay`() = runTest { + val delivered = mutableListOf() + val debouncer = + StreamDebouncerImpl( + scope = backgroundScope, + logger = mockk(relaxed = true), + delayMs = 100, + ) + debouncer.onValue { delivered.add(it) } + + debouncer.submit("first") + advanceTimeBy(101) + advanceUntilIdle() + + debouncer.submit("second") + advanceTimeBy(101) + advanceUntilIdle() + + assertEquals(listOf("first", "second"), delivered) + } + + @Test + fun `rapid fire burst delivers only the last value`() = runTest { + val delivered = mutableListOf() + val debouncer = + StreamDebouncerImpl( + scope = backgroundScope, + logger = mockk(relaxed = true), + delayMs = 100, + ) + debouncer.onValue { delivered.add(it) } + + for (i in 1..50) { + debouncer.submit(i) + } + + advanceTimeBy(101) + advanceUntilIdle() + + assertEquals(listOf(50), delivered) + } + + @Test + fun `no delivery when no callback registered`() = runTest { + val debouncer = + StreamDebouncerImpl( + scope = backgroundScope, + logger = mockk(relaxed = true), + delayMs = 100, + ) + + debouncer.submit("orphan") + advanceTimeBy(200) + advanceUntilIdle() + // No crash, no delivery — default no-op callback + } +} diff --git a/stream-android-core/src/test/java/io/getstream/android/core/internal/socket/StreamSocketSessionTest.kt b/stream-android-core/src/test/java/io/getstream/android/core/internal/socket/StreamSocketSessionTest.kt index 054f5a0..0456350 100644 --- a/stream-android-core/src/test/java/io/getstream/android/core/internal/socket/StreamSocketSessionTest.kt +++ b/stream-android-core/src/test/java/io/getstream/android/core/internal/socket/StreamSocketSessionTest.kt @@ -57,6 +57,7 @@ import okhttp3.Protocol import okhttp3.Request import okhttp3.Response import okhttp3.ResponseBody.Companion.toResponseBody +import org.junit.Assert.assertFalse import org.junit.Assert.assertTrue import org.junit.Before import org.junit.Test @@ -451,11 +452,8 @@ class StreamSocketSessionTest { @Test fun `connect fails when handshake subscribe fails - no open`() = runTest { - val lifecycleSub = mockk(relaxed = true) val boom = RuntimeException("handshake sub failed") - every { socket.subscribe(any()) } returns - Result.success(lifecycleSub) every { socket.subscribe(any(), any()) } returns Result.failure(boom) every { socket.open(config) } answers @@ -466,7 +464,7 @@ class StreamSocketSessionTest { val result = session.connect(connectUserData()) assertTrue(result.isFailure) - verify(exactly = 1) { socket.subscribe(any()) } + verify(exactly = 0) { socket.subscribe(any()) } verify(exactly = 1) { socket.subscribe(any(), any()) } verify(exactly = 0) { socket.open(any()) } verify { subs.forEach(any()) } // Opening + Disconnected.Error dispatched @@ -474,11 +472,8 @@ class StreamSocketSessionTest { @Test fun `connect returns failure when open fails - no health start, no auth send`() = runTest { - val lifecycleSub = mockk(relaxed = true) val handshakeSub = mockk(relaxed = true) - every { socket.subscribe(any()) } returns - Result.success(lifecycleSub) every { socket.subscribe(any(), any()) } returns Result.success(handshakeSub) @@ -488,7 +483,8 @@ class StreamSocketSessionTest { val result = session.connect(connectUserData()) assertTrue(result.isFailure) - verify(exactly = 2) { socket.subscribe(any(), any()) } + verify(exactly = 0) { socket.subscribe(any()) } + verify(exactly = 1) { socket.subscribe(any(), any()) } verify(exactly = 1) { socket.open(config) } verify(exactly = 0) { socket.send(any()) } verify(exactly = 0) { health.start() } @@ -497,13 +493,10 @@ class StreamSocketSessionTest { @Test fun `handshake onOpen non-101 causes failure - no auth send`() = runTest { - val lifecycleSub = mockk(relaxed = true) val handshakeSub = mockk(relaxed = true) var hsListener: StreamWebSocketListener? = null - every { socket.subscribe(any()) } returns - Result.success(lifecycleSub) every { socket.subscribe(any(), any()) } answers { hsListener = firstArg() @@ -534,13 +527,10 @@ class StreamSocketSessionTest { @Test fun `handshake onOpen 101 but auth serialize fails - connect fails`() = runTest { - val lifecycleSub = mockk(relaxed = true) val handshakeSub = mockk(relaxed = true) var hsListener: StreamWebSocketListener? = null - every { socket.subscribe(any()) } returns - Result.success(lifecycleSub) every { socket.subscribe(any(), any()) } answers { hsListener = firstArg() @@ -572,34 +562,32 @@ class StreamSocketSessionTest { } @Test - fun `connect fails when lifecycle subscribe fails - no handshake subscribe, no open`() = - runTest { - every { socket.subscribe(any(), any()) } returns - Result.failure(RuntimeException("lifecycle sub failed")) + fun `connect fails when handshake subscribe fails - no open, no event listener`() = runTest { + every { socket.subscribe(any(), any()) } returns + Result.failure(RuntimeException("handshake sub failed")) - val result = session.connect(connectUserData()) + val result = session.connect(connectUserData()) - assertTrue(result.isFailure) - verify(exactly = 1) { socket.subscribe(any(), any()) } - verify(exactly = 0) { socket.open(any()) } - verify(exactly = 0) { socket.send(any()) } - verify(exactly = 0) { health.start() } - verify(atLeast = 1) { subs.forEach(any()) } - } + assertTrue(result.isFailure) + verify(exactly = 0) { socket.subscribe(any()) } + verify(exactly = 1) { socket.subscribe(any(), any()) } + verify(exactly = 0) { socket.open(any()) } + verify(exactly = 0) { socket.send(any()) } + verify(exactly = 0) { health.start() } + verify(atLeast = 1) { subs.forEach(any()) } + } @Test fun `connect fails when open fails - no health start, no auth send`() = runTest { - every { socket.subscribe(any(), any()) } returnsMany - listOf( - Result.success(mockk(relaxed = true)), // lifecycle - Result.success(mockk(relaxed = true)), // handshake - ) + every { socket.subscribe(any(), any()) } returns + Result.success(mockk(relaxed = true)) every { socket.open(config) } returns Result.failure(RuntimeException("open failed")) val result = session.connect(connectUserData()) assertTrue(result.isFailure) - verify(exactly = 2) { socket.subscribe(any(), any()) } + verify(exactly = 0) { socket.subscribe(any()) } + verify(exactly = 1) { socket.subscribe(any(), any()) } verify(exactly = 1) { socket.open(config) } verify(exactly = 0) { socket.send(any()) } verify(exactly = 0) { health.start() } @@ -619,13 +607,12 @@ class StreamSocketSessionTest { val job = async { session.connect(connectUserData()) } advanceUntilIdle() - verify { socket.subscribe(any()) } verify { socket.subscribe(any(), any()) } job.cancelAndJoin() advanceUntilIdle() - verify(atLeast = 2) { sub.cancel() } + verify(atLeast = 1) { sub.cancel() } verify { socket.close(any(), any()) } verify { health.stop() } verify { debounce.stop() } @@ -637,10 +624,10 @@ class StreamSocketSessionTest { every { health.onHeartbeat(any()) } answers { heartbeatCb = arg(0) } every { health.onUnhealthy(any()) } just Runs - every { socket.subscribe(any()) } returns - Result.success(mockk(relaxed = true)) every { socket.subscribe(any(), any()) } returns Result.success(mockk(relaxed = true)) + every { socket.subscribe(any()) } returns + Result.success(mockk(relaxed = true)) every { socket.open(config) } returns Result.success(Unit) every { socket.close(any(), any()) } returns Result.success(Unit) @@ -693,10 +680,10 @@ class StreamSocketSessionTest { every { health.onHeartbeat(any()) } answers { heartbeatCb = arg(0) } every { health.onUnhealthy(any()) } just Runs - every { socket.subscribe(any()) } returns - Result.success(mockk(relaxed = true)) every { socket.subscribe(any(), any()) } returns Result.success(mockk(relaxed = true)) + every { socket.subscribe(any()) } returns + Result.success(mockk(relaxed = true)) every { socket.open(config) } returns Result.success(Unit) every { socket.close(any(), any()) } returns Result.success(Unit) @@ -1184,7 +1171,7 @@ class StreamSocketSessionTest { } @Test - fun `connect fails when handshake subscribe fails - cancels lifecycle, no open`() = runTest { + fun `connect fails when handshake subscribe fails - emits disconnected, no open`() = runTest { val states = mutableListOf() every { subs.forEach(any()) } answers { @@ -1431,6 +1418,198 @@ class StreamSocketSessionTest { return result } + @Test + fun `handshake buffers non-auth message and replays it exactly once after eventListener installed`() = + runTest { + val offeredMessages = mutableListOf() + every { debounce.offer(any()) } answers + { + offeredMessages.add(firstArg()) + true + } + + every { health.onHeartbeat(any()) } just Runs + every { health.onUnhealthy(any()) } just Runs + every { debounce.onBatch(any()) } just Runs + + val hsSub = mockk(relaxed = true) + val eventSub = mockk(relaxed = true) + var hsListener: StreamWebSocketListener? = null + var eventListenerInstalled = false + + every { socket.subscribe(any(), any()) } answers + { + hsListener = firstArg() + Result.success(hsSub) + } + every { socket.subscribe(any()) } answers + { + eventListenerInstalled = true + Result.success(eventSub) + } + + val connectedEvt = + mockk(relaxed = true).also { + every { it.connectionId } returns "conn-1" + } + + val productEvent = mockk(relaxed = true) + + every { parser.deserialize(any()) } answers + { + val msg = firstArg() + when (msg) { + "PRODUCT_EVENT" -> + Result.success(StreamCompositeSerializationEvent.internal(productEvent)) + "CONNECTED_JSON" -> + Result.success(StreamCompositeSerializationEvent.internal(connectedEvt)) + else -> Result.failure(RuntimeException("unknown")) + } + } + + every { socket.open(config) } answers + { + // Product event arrives BEFORE auth response — must be buffered, not batched + assertFalse(eventListenerInstalled) + hsListener!!.onMessage("PRODUCT_EVENT") + // Nothing should be offered to batcher during handshake + assertTrue(offeredMessages.isEmpty()) + + // Auth response completes handshake → triggers eventListener subscribe + replay + hsListener!!.onMessage("CONNECTED_JSON") + Result.success(Unit) + } + + every { socket.close(any(), any()) } returns Result.success(Unit) + every { socket.send(any()) } returns Result.success("Unit") + + val result = async { session.connect(connectUserData()) }.await() + + assertTrue(result.isSuccess) + // eventListener must be installed before replay + assertTrue(eventListenerInstalled) + // Buffered message replayed exactly once through batcher + assertEquals(1, offeredMessages.count { it == "PRODUCT_EVENT" }) + // No other messages leaked to batcher (connection.ok is NOT replayed) + assertEquals(1, offeredMessages.size) + verify { health.start() } + } + + @Test + fun `handshake acknowledges heartbeat for all messages including non-auth`() = runTest { + every { health.onHeartbeat(any()) } just Runs + every { health.onUnhealthy(any()) } just Runs + every { debounce.onBatch(any()) } just Runs + + var hsListener: StreamWebSocketListener? = null + every { socket.subscribe(any(), any()) } answers + { + hsListener = firstArg() + Result.success(mockk(relaxed = true)) + } + every { socket.open(config) } returns Result.success(Unit) + every { socket.close(any(), any()) } returns Result.success(Unit) + + // Non-auth event that won't match + every { parser.deserialize("SOME_EVENT") } returns + Result.success( + StreamCompositeSerializationEvent.internal( + mockk(relaxed = true) + ) + ) + + val job = async { session.connect(connectUserData()) } + advanceUntilIdle() + + hsListener!!.onMessage("SOME_EVENT") + + verify { health.acknowledgeHeartbeat() } + job.cancelAndJoin() + } + + @Test + fun `connect fails when eventListener subscribe fails after handshake`() = runTest { + every { health.onHeartbeat(any()) } just Runs + every { health.onUnhealthy(any()) } just Runs + every { debounce.onBatch(any()) } just Runs + + val hsSub = mockk(relaxed = true) + var hsListener: StreamWebSocketListener? = null + + every { socket.subscribe(any(), any()) } answers + { + hsListener = firstArg() + Result.success(hsSub) + } + // eventListener subscribe fails + every { socket.subscribe(any()) } returns + Result.failure(RuntimeException("event sub failed")) + every { socket.close(any(), any()) } returns Result.success(Unit) + + val connectedEvt = + mockk(relaxed = true).also { + every { it.connectionId } returns "conn-1" + } + every { parser.deserialize("OK") } returns + Result.success(StreamCompositeSerializationEvent.internal(connectedEvt)) + + every { socket.open(config) } answers + { + hsListener!!.onMessage("OK") + Result.success(Unit) + } + + val result = async { session.connect(connectUserData()) }.await() + + assertTrue(result.isFailure) + // Should tear down instead of continuing + verify { socket.close(any(), any()) } + verify(exactly = 0) { health.start() } + } + + @Test + fun `connect fails when buffered message replay fails`() = runTest { + every { health.onHeartbeat(any()) } just Runs + every { health.onUnhealthy(any()) } just Runs + every { debounce.onBatch(any()) } just Runs + every { debounce.offer(any()) } returns false // replay will fail + + val hsSub = mockk(relaxed = true) + val eventSub = mockk(relaxed = true) + var hsListener: StreamWebSocketListener? = null + + every { socket.subscribe(any(), any()) } answers + { + hsListener = firstArg() + Result.success(hsSub) + } + every { socket.subscribe(any()) } returns Result.success(eventSub) + every { socket.close(any(), any()) } returns Result.success(Unit) + + val productEvt = mockk(relaxed = true) + val connectedEvt = + mockk(relaxed = true).also { + every { it.connectionId } returns "conn-1" + } + every { parser.deserialize("BUFFERED") } returns + Result.success(StreamCompositeSerializationEvent.internal(productEvt)) + every { parser.deserialize("OK") } returns + Result.success(StreamCompositeSerializationEvent.internal(connectedEvt)) + + every { socket.open(config) } answers + { + hsListener!!.onMessage("BUFFERED") // gets buffered + hsListener!!.onMessage("OK") // triggers success → replay fails + Result.success(Unit) + } + + val result = async { session.connect(connectUserData()) }.await() + + assertTrue(result.isFailure) + verify { socket.close(any(), any()) } + verify(exactly = 0) { health.start() } + } + private fun connectUserData(): ConnectUserData = ConnectUserData("u", "t", null, null, false, null, emptyMap()) } diff --git a/stream-android-core/src/test/java/io/getstream/android/core/internal/socket/monitor/StreamHealthMonitorImplTest.kt b/stream-android-core/src/test/java/io/getstream/android/core/internal/socket/monitor/StreamHealthMonitorImplTest.kt index 1ecf7bd..56eb1f8 100644 --- a/stream-android-core/src/test/java/io/getstream/android/core/internal/socket/monitor/StreamHealthMonitorImplTest.kt +++ b/stream-android-core/src/test/java/io/getstream/android/core/internal/socket/monitor/StreamHealthMonitorImplTest.kt @@ -18,7 +18,7 @@ package io.getstream.android.core.internal.socket.monitor import io.getstream.android.core.api.log.StreamLogger import io.mockk.mockk -import kotlin.test.assertNotNull +import java.util.concurrent.atomic.AtomicReference import kotlin.test.assertNotSame import kotlin.test.assertSame import kotlin.time.Clock @@ -170,16 +170,16 @@ class StreamHealthMonitorImplTest { monitor.start() val jobField = monitor.javaClass.getDeclaredField("monitorJob").apply { isAccessible = true } - val firstJob = jobField.get(monitor) as Job + @Suppress("UNCHECKED_CAST") val jobRef = jobField.get(monitor) as AtomicReference + val firstJob = jobRef.get()!! - // Stop it → cancels the job, so !isActive but not null + // Stop it → cancels the job and sets ref to null monitor.stop() assertFalse(firstJob.isActive) - assertNotNull(jobField.get(monitor)) // Restart should create a NEW active job monitor.start() - val secondJob = jobField.get(monitor) as Job + val secondJob = jobRef.get()!! assertTrue(secondJob.isActive) assertNotSame(firstJob, secondJob) } @@ -213,19 +213,17 @@ class StreamHealthMonitorImplTest { ) monitor.start() - val firstJob = + @Suppress("UNCHECKED_CAST") + val jobRef = monitor.javaClass .getDeclaredField("monitorJob") .apply { isAccessible = true } - .get(monitor) + .get(monitor) as AtomicReference + val firstJob = jobRef.get() monitor.start() // second time should no-op - val secondJob = - monitor.javaClass - .getDeclaredField("monitorJob") - .apply { isAccessible = true } - .get(monitor) + val secondJob = jobRef.get() // Same job instance, not restarted assertSame(firstJob, secondJob) @@ -242,13 +240,16 @@ class StreamHealthMonitorImplTest { livenessThreshold = 100, ) - // No start() => monitorJob is null + // No start() => monitorJob ref holds null monitor.stop() - val jobField = monitor.javaClass.getDeclaredField("monitorJob") - jobField.isAccessible = true - val job = jobField.get(monitor) as Job? - assertTrue(job == null) + @Suppress("UNCHECKED_CAST") + val jobRef = + monitor.javaClass + .getDeclaredField("monitorJob") + .apply { isAccessible = true } + .get(monitor) as AtomicReference + assertTrue(jobRef.get() == null) } @Test @@ -268,9 +269,13 @@ class StreamHealthMonitorImplTest { // Start the monitor, job is created but suspended at delay() monitor.start() - val jobField = - monitor.javaClass.getDeclaredField("monitorJob").apply { isAccessible = true } - val job = jobField.get(monitor) as Job + @Suppress("UNCHECKED_CAST") + val jobRef = + monitor.javaClass + .getDeclaredField("monitorJob") + .apply { isAccessible = true } + .get(monitor) as AtomicReference + val job = jobRef.get()!! // Cancel the job before advancing time so that `isActive == false` job.cancel() @@ -287,6 +292,7 @@ class StreamHealthMonitorImplTest { private fun getJob(monitor: StreamHealthMonitorImpl): Job? { val field = StreamHealthMonitorImpl::class.java.getDeclaredField("monitorJob") field.isAccessible = true - return field.get(monitor) as Job? + @Suppress("UNCHECKED_CAST") val ref = field.get(monitor) as AtomicReference + return ref.get() } }