Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright (c) 2014-2025 Stream.io Inc. All rights reserved.
*
* Licensed under the Stream License;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://github.com/GetStream/stream-core-android/blob/main/LICENSE
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.getstream.android.core.api.processing

import io.getstream.android.core.annotations.StreamInternalApi
import io.getstream.android.core.api.log.StreamLogger
import io.getstream.android.core.internal.processing.StreamDebouncerImpl
import kotlinx.coroutines.CoroutineScope

/**
* Debounces a bursty stream of values so that only the **last** value is delivered after a quiet
* period.
*
* Each call to [submit] replaces the pending value and resets the delay timer. When the timer
* expires without a new submission, the registered [onValue] handler is invoked with the most
* recent value. This is useful for coalescing rapid state changes (network flaps, lifecycle
* transitions) into a single settled action.
*
* ### Semantics
* - **Last-write-wins:** Only the most recently submitted value is delivered.
* - **Timer reset:** Each [submit] cancels any pending delivery and restarts the delay.
* - **Thread-safety:** All functions are safe to call from multiple coroutines.
* - **Cancellation:** [cancel] discards the pending value and stops the timer without delivery.
*/
@StreamInternalApi
public interface StreamDebouncer<T> {
/**
* Registers the handler invoked when the debounce window elapses.
*
* @param callback Suspend function called with the settled value.
*/
public fun onValue(callback: suspend (T) -> Unit)

/**
* Submits a new value, cancelling any pending delivery and restarting the delay timer.
*
* @param value The value to deliver after the debounce window.
*/
public fun submit(value: T)

/** Cancels any pending delivery without invoking the handler. */
public fun cancel()
}

/**
* Creates a new [StreamDebouncer] instance.
*
* @param T The type of value being debounced.
* @param scope Coroutine scope for launching the delayed delivery job.
* @param logger Logger for diagnostics.
* @param delayMs Time in milliseconds to wait after the last [StreamDebouncer.submit] before
* delivering the value. Defaults to 300ms.
* @return A new [StreamDebouncer] instance.
*/
@StreamInternalApi
public fun <T> StreamDebouncer(
scope: CoroutineScope,
logger: StreamLogger,
delayMs: Long = 300L,
): StreamDebouncer<T> = StreamDebouncerImpl(scope = scope, logger = logger, delayMs = delayMs)
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import io.getstream.android.core.api.model.connection.lifecycle.StreamLifecycleS
import io.getstream.android.core.api.model.connection.network.StreamNetworkState
import io.getstream.android.core.api.model.connection.recovery.Recovery
import io.getstream.android.core.api.model.value.StreamToken
import io.getstream.android.core.api.processing.StreamDebouncer
import io.getstream.android.core.api.processing.StreamSerialProcessingQueue
import io.getstream.android.core.api.processing.StreamSingleFlightProcessor
import io.getstream.android.core.api.recovery.StreamConnectionRecoveryEvaluator
Expand All @@ -46,7 +47,6 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.launch

internal class StreamClientImpl<T>(
private val user: StreamUser,
Expand All @@ -69,6 +69,8 @@ internal class StreamClientImpl<T>(

private var socketSessionHandle: StreamSubscription? = null
private var networkAndLifecycleMonitorHandle: StreamSubscription? = null
private val recoveryDebouncer: StreamDebouncer<Pair<StreamNetworkState, StreamLifecycleState>> =
StreamDebouncer(scope = scope, logger = logger, delayMs = 500L)
override val connectionState: StateFlow<StreamConnectionState>
get() = mutableConnectionState.asStateFlow()

Expand Down Expand Up @@ -111,22 +113,23 @@ internal class StreamClientImpl<T>(

if (networkAndLifecycleMonitorHandle == null) {
logger.v { "[connect] Setup network and lifecycle monitor callback" }
recoveryDebouncer.onValue { (networkState, lifecycleState) ->
val connectionState = mutableConnectionState.value
val recovery =
connectionRecoveryEvaluator.evaluate(
connectionState,
lifecycleState,
networkState,
)
recoveryEffect(recovery)
}
val networkAndLifecycleMonitorListener =
object : StreamNetworkAndLifecycleMonitorListener {
override fun onNetworkAndLifecycleState(
networkState: StreamNetworkState,
lifecycleState: StreamLifecycleState,
) {
scope.launch {
val connectionState = mutableConnectionState.value
val recovery =
connectionRecoveryEvaluator.evaluate(
connectionState,
lifecycleState,
networkState,
)
recoveryEffect(recovery)
}
recoveryDebouncer.submit(networkState to lifecycleState)
}
}
networkAndLifecycleMonitorHandle =
Expand Down Expand Up @@ -159,6 +162,7 @@ internal class StreamClientImpl<T>(
override suspend fun disconnect(): Result<Unit> =
singleFlight.run(disconnectKey) {
logger.d { "Disconnecting from socket" }
recoveryDebouncer.cancel()
mutableConnectionState.update(StreamConnectionState.Disconnected())
connectionIdHolder.clear()
socketSession.disconnect()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright (c) 2014-2025 Stream.io Inc. All rights reserved.
*
* Licensed under the Stream License;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://github.com/GetStream/stream-core-android/blob/main/LICENSE
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.getstream.android.core.internal.processing

import io.getstream.android.core.api.log.StreamLogger
import io.getstream.android.core.api.processing.StreamDebouncer
import java.util.concurrent.atomic.AtomicReference
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch

internal class StreamDebouncerImpl<T>(
private val scope: CoroutineScope,
private val logger: StreamLogger,
private val delayMs: Long,
) : StreamDebouncer<T> {
private val pendingJob = AtomicReference<Job?>(null)
private val pendingValue = AtomicReference<T?>(null)
private var callback: suspend (T) -> Unit = {}

override fun onValue(callback: suspend (T) -> Unit) {
this.callback = callback
}

override fun submit(value: T) {
pendingValue.set(value)
val oldJob = pendingJob.get()
val newJob =
scope.launch {
delay(delayMs)
val settled = pendingValue.getAndSet(null) ?: return@launch
logger.v { "[debounce] Delivering settled value: $settled" }
callback(settled)
}
if (!pendingJob.compareAndSet(oldJob, newJob)) {
newJob.cancel()
pendingValue.get()?.let { submit(it) }
return
}
oldJob?.cancel()
}

override fun cancel() {
pendingJob.getAndSet(null)?.cancel()
pendingValue.set(null)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ internal class StreamSocketSession<T>(
suspend fun connect(data: ConnectUserData): Result<StreamConnectionState.Connected> =
suspendCancellableCoroutine { continuation ->
var handshakeSubscription: StreamSubscription? = null
val pendingMessages = mutableListOf<String>()

// Ensure we clean up if the caller cancels the connect coroutine
continuation.invokeOnCancellation { cause ->
Expand Down Expand Up @@ -282,8 +283,38 @@ internal class StreamSocketSession<T>(
}

// Success/Failure continuations
val success: (StreamConnectedUser, String) -> Unit = { user, connectionId ->
val success: (StreamConnectedUser, String) -> Unit = success@{ user, connectionId ->
handshakeSubscription?.cancel()

// Subscribe eventListener for steady-state operation
val eventSubscription =
internalSocket.subscribe(eventListener).getOrElse { err ->
logger.e(err) { "[success] Failed to subscribe event listener" }
disconnect(err)
if (continuation.isActive) {
continuation.resume(Result.failure(err))
}
return@success
}
socketSubscription = eventSubscription

// Replay messages buffered during handshake
for (message in pendingMessages) {
if (!batcher.offer(message)) {
val err =
IllegalStateException(
"Failed to replay buffered message during handshake transition"
)
logger.e(err) { "[success] Buffered message replay failed" }
disconnect(err)
if (continuation.isActive) {
continuation.resume(Result.failure(err))
}
return@success
}
}
pendingMessages.clear()

if (continuation.isActive) {
healthMonitor.start()
val connected = StreamConnectionState.Connected(user, connectionId)
Expand All @@ -310,15 +341,9 @@ internal class StreamSocketSession<T>(
// Notify listeners: Opening (block until dispatched)
notifyState(StreamConnectionState.Connecting.Opening(data.userId))

// Subscribe for socket events
val socketSubRes = internalSocket.subscribe(eventListener)
if (socketSubRes.isFailure) {
failure(socketSubRes.exceptionOrNull()!!)
return@suspendCancellableCoroutine
}
socketSubscription = socketSubRes.getOrNull()

// Temporary listener to handle the initial open/auth handshake
// Only the handshake listener is subscribed initially.
// eventListener subscribes after handshake succeeds (in the success callback)
// to avoid dual-listener processing of the same message.
val connectListener =
object : StreamWebSocketListener {
override fun onOpen(response: Response) {
Expand Down Expand Up @@ -368,16 +393,16 @@ internal class StreamSocketSession<T>(

override fun onMessage(text: String) {
logger.d { "[onMessage] Socket message (string): $text" }
healthMonitor.acknowledgeHeartbeat()
eventParser
.deserialize(text)
.map { it.core }
.map { authResponse ->
when (authResponse) {
.onSuccess { event ->
when (val coreEvent = event.core) {
is StreamClientConnectedEvent -> {
logger.v {
"[onMessage] Handling connected event: $authResponse"
"[onMessage] Handling connected event: $coreEvent"
}
val me = authResponse.me
val me = coreEvent.me
val connectedUser =
StreamConnectedUser(
me.createdAt,
Expand All @@ -394,19 +419,26 @@ internal class StreamSocketSession<T>(
me.lastActive,
me.name,
)
streamClientConnectedEvent = authResponse
success(connectedUser, authResponse.connectionId)
streamClientConnectedEvent = coreEvent
success(connectedUser, coreEvent.connectionId)
}

is StreamClientConnectionErrorEvent -> {
logger.e {
"[onMessage] Socket connection recoverable error: $authResponse"
"[onMessage] Socket connection recoverable error: $coreEvent"
}
apiFailure(coreEvent.error)
}

else -> {
logger.v {
"[onMessage] Buffering non-auth message during handshake"
}
apiFailure(authResponse.error)
pendingMessages.add(text)
}
}
}
.recover {
.onFailure {
logger.e(it) {
"[onMessage] Failed to deserialize socket message. ${it.message}"
}
Expand Down Expand Up @@ -452,6 +484,7 @@ internal class StreamSocketSession<T>(
return@suspendCancellableCoroutine
}
handshakeSubscription = hsRes.getOrNull()
socketSubscription = handshakeSubscription

// Open socket
val openRes = internalSocket.open(config)
Expand Down
Loading
Loading