diff --git a/app/src/main/java/io/getstream/android/core/sample/SampleActivity.kt b/app/src/main/java/io/getstream/android/core/sample/SampleActivity.kt index ec8a066..299a613 100644 --- a/app/src/main/java/io/getstream/android/core/sample/SampleActivity.kt +++ b/app/src/main/java/io/getstream/android/core/sample/SampleActivity.kt @@ -28,6 +28,7 @@ import androidx.compose.foundation.layout.fillMaxSize import androidx.compose.foundation.layout.padding import androidx.compose.foundation.rememberScrollState import androidx.compose.foundation.verticalScroll +import androidx.compose.material3.Button import androidx.compose.material3.Scaffold import androidx.compose.material3.Text import androidx.compose.runtime.Composable @@ -40,24 +41,38 @@ import androidx.lifecycle.lifecycleScope import androidx.lifecycle.repeatOnLifecycle import io.getstream.android.core.api.StreamClient import io.getstream.android.core.api.authentication.StreamTokenProvider -import io.getstream.android.core.api.model.connection.network.StreamNetworkState +import io.getstream.android.core.api.model.connection.StreamConnectionState +import io.getstream.android.core.api.model.connection.recovery.Recovery import io.getstream.android.core.api.model.value.StreamApiKey import io.getstream.android.core.api.model.value.StreamHttpClientInfoHeader import io.getstream.android.core.api.model.value.StreamToken import io.getstream.android.core.api.model.value.StreamUserId import io.getstream.android.core.api.model.value.StreamWsUrl +import io.getstream.android.core.api.socket.listeners.StreamClientListener +import io.getstream.android.core.api.subscribe.StreamSubscription +import io.getstream.android.core.api.subscribe.StreamSubscriptionManager import io.getstream.android.core.sample.client.createStreamClient import io.getstream.android.core.sample.ui.ConnectionStateCard -import io.getstream.android.core.sample.ui.NetworkInfoCard import io.getstream.android.core.sample.ui.theme.StreamandroidcoreTheme import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking -class SampleActivity : ComponentActivity() { +class SampleActivity : ComponentActivity(), StreamClientListener { val userId = StreamUserId.fromString("petar") var streamClient: StreamClient? = null + var handle: StreamSubscription? = null + + override fun onRecovery(recovery: Recovery) { + super.onRecovery(recovery) + Log.d("SampleActivity", "Recovery: $recovery") + } + + override fun onError(err: Throwable) { + super.onError(err) + Log.e("SampleActivity", "Error: $err") + } + override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) val streamClient2 = @@ -91,7 +106,21 @@ class SampleActivity : ComponentActivity() { ) streamClient = streamClient2 lifecycleScope.launch { - repeatOnLifecycle(Lifecycle.State.RESUMED) { streamClient?.connect() } + repeatOnLifecycle(Lifecycle.State.CREATED) { streamClient?.connect() } + } + + if (handle == null) { + handle = + streamClient2 + .subscribe( + this, + options = + StreamSubscriptionManager.Options( + retention = + StreamSubscriptionManager.Options.Retention.KEEP_UNTIL_CANCELLED + ), + ) + .getOrThrow() } enableEdgeToEdge() setContent { @@ -108,16 +137,43 @@ class SampleActivity : ComponentActivity() { ) { Greeting(name = "Android") ClientInfo(streamClient = streamClient2) + val state = streamClient?.connectionState?.collectAsStateWithLifecycle() + val buttonState = + when (state?.value) { + is StreamConnectionState.Connected -> { + Triple( + "Disconnect", + true, + { + lifecycleScope.launch { streamClient?.disconnect() } + Unit + }, + ) + } + + is StreamConnectionState.Connecting -> { + Triple("Connecting", false, { Unit }) + } + + else -> { + Triple( + "Connect", + true, + { + lifecycleScope.launch { streamClient?.connect() } + Unit + }, + ) + } + } + Button(onClick = buttonState.third, enabled = buttonState.second) { + Text(text = buttonState.first) + } } } } } } - - override fun onStop() { - runBlocking { streamClient?.disconnect() } - super.onStop() - } } @Composable @@ -134,18 +190,8 @@ fun GreetingPreview() { @Composable fun ClientInfo(streamClient: StreamClient) { val state = streamClient.connectionState.collectAsStateWithLifecycle() - val networkSnapshot = streamClient.networkState.collectAsStateWithLifecycle() Log.d("SampleActivity", "Client state: ${state.value}") - val networkState = networkSnapshot.value Column(verticalArrangement = Arrangement.spacedBy(16.dp)) { ConnectionStateCard(state = state.value) - when (networkState) { - is StreamNetworkState.Available -> { - NetworkInfoCard(snapshot = networkState.snapshot) - } - else -> { - NetworkInfoCard(snapshot = null) - } - } } } diff --git a/app/src/main/java/io/getstream/android/core/sample/client/StreamClient.kt b/app/src/main/java/io/getstream/android/core/sample/client/StreamClient.kt index 2dd474c..bd4941f 100644 --- a/app/src/main/java/io/getstream/android/core/sample/client/StreamClient.kt +++ b/app/src/main/java/io/getstream/android/core/sample/client/StreamClient.kt @@ -28,11 +28,13 @@ import io.getstream.android.core.api.model.value.StreamApiKey import io.getstream.android.core.api.model.value.StreamHttpClientInfoHeader import io.getstream.android.core.api.model.value.StreamUserId import io.getstream.android.core.api.model.value.StreamWsUrl +import io.getstream.android.core.api.observers.lifecycle.StreamLifecycleMonitor import io.getstream.android.core.api.observers.network.StreamNetworkMonitor import io.getstream.android.core.api.processing.StreamBatcher import io.getstream.android.core.api.processing.StreamRetryProcessor import io.getstream.android.core.api.processing.StreamSerialProcessingQueue import io.getstream.android.core.api.processing.StreamSingleFlightProcessor +import io.getstream.android.core.api.recovery.StreamConnectionRecoveryEvaluator import io.getstream.android.core.api.serialization.StreamEventSerialization import io.getstream.android.core.api.socket.StreamConnectionIdHolder import io.getstream.android.core.api.socket.StreamWebSocketFactory @@ -109,6 +111,15 @@ fun createStreamClient( logger = logProvider.taggedLogger("SCNetworkMonitorSubscriptions") ), ) + val lifecycleMonitor = + StreamLifecycleMonitor( + logger = logProvider.taggedLogger("SCLifecycleMonitor"), + subscriptionManager = + StreamSubscriptionManager( + logger = logProvider.taggedLogger("SCLifecycleMonitorSubscriptions") + ), + lifecycle = androidComponentsProvider.lifecycle(), + ) return StreamClient( scope = scope, @@ -136,6 +147,12 @@ fun createStreamClient( override fun deserialize(raw: String): Result = Result.success(Unit) } ), + lifecycleMonitor = lifecycleMonitor, + connectionRecoveryEvaluator = + StreamConnectionRecoveryEvaluator( + logger = logProvider.taggedLogger("SCConnectionRecoveryEvaluator"), + singleFlightProcessor = singleFlight, + ), batcher = batcher, ) } diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 9241a7e..60d4706 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -11,6 +11,7 @@ junitVersion = "1.3.0" espressoCore = "3.7.0" appcompat = "1.7.1" kotlinxCoroutines = "1.10.2" +lifecycleRuntime = "2.9.4" lintApi = "31.12.0" material = "1.12.0" jetbrainsKotlinJvm = "2.2.0" @@ -35,6 +36,8 @@ annotationJvm = "1.9.1" [libraries] androidx-core = { module = "androidx.test:core", version.ref = "core" } androidx-core-ktx = { group = "androidx.core", name = "core-ktx", version.ref = "coreKtx" } +androidx-lifecycle-runtime = { module = "androidx.lifecycle:lifecycle-runtime", version.ref = "lifecycleRuntime" } +androidx-lifecycle-process = { module = "androidx.lifecycle:lifecycle-process", version.ref = "lifecycleRuntime" } detekt-formatting = { module = "io.gitlab.arturbosch.detekt:detekt-formatting", version.ref = "detekt" } junit = { group = "junit", name = "junit", version.ref = "junit" } androidx-junit = { group = "androidx.test.ext", name = "junit", version.ref = "junitVersion" } diff --git a/gradle/scripts/sonar.gradle b/gradle/scripts/sonar.gradle index c5f7ee6..b56ec3d 100644 --- a/gradle/scripts/sonar.gradle +++ b/gradle/scripts/sonar.gradle @@ -22,6 +22,15 @@ ext.sonar.ignoreModules.each { ext.sonar.excludeFilter << "**/${it}/**" } + +def coverageReports = rootProject.subprojects + .findAll { !rootProject.ext.sonar.ignoreModules.contains(it.name) } + .collect { "${it.buildDir}/reports/kover/report.xml" } + +if (coverageReports.isEmpty()) { + coverageReports << "${rootProject.buildDir}/reports/kover/report.xml" +} + sonarqube { properties { property("sonar.host.url", "https://sonarcloud.io") @@ -32,6 +41,13 @@ sonarqube { property "sonar.java.coveragePlugin", "jacoco" property "sonar.sourceEncoding", "UTF-8" property "sonar.java.binaries", "${rootDir}/**/build/tmp/java-classes/debug" + property "sonar.coverage.jacoco.xmlReportPaths", coverageReports.join(",") property "sonar.coverage.exclusions", rootProject.ext.sonar.excludeFilter } } + +tasks.named("sonar").configure { + rootProject.subprojects + .findAll { !rootProject.ext.sonar.ignoreModules.contains(it.name) } + .each { dependsOn("${it.path}:koverXmlReport") } +} diff --git a/stream-android-core/build.gradle.kts b/stream-android-core/build.gradle.kts index 35ad4da..b6efa0f 100644 --- a/stream-android-core/build.gradle.kts +++ b/stream-android-core/build.gradle.kts @@ -65,6 +65,8 @@ dependencies { // Android implementation(libs.androidx.annotation.jvm) + implementation(libs.androidx.lifecycle.runtime) + implementation(libs.androidx.lifecycle.process) // Network implementation(libs.moshi) diff --git a/stream-android-core/src/main/AndroidManifest.xml b/stream-android-core/src/main/AndroidManifest.xml index 95095c5..70a06f7 100644 --- a/stream-android-core/src/main/AndroidManifest.xml +++ b/stream-android-core/src/main/AndroidManifest.xml @@ -18,5 +18,6 @@ + \ No newline at end of file diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/StreamClient.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/StreamClient.kt index 5d4b3ea..f67201f 100644 --- a/stream-android-core/src/main/java/io/getstream/android/core/api/StreamClient.kt +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/StreamClient.kt @@ -16,6 +16,7 @@ package io.getstream.android.core.api +import android.annotation.SuppressLint import io.getstream.android.core.annotations.StreamInternalApi import io.getstream.android.core.api.authentication.StreamTokenManager import io.getstream.android.core.api.authentication.StreamTokenProvider @@ -26,25 +27,29 @@ import io.getstream.android.core.api.model.config.StreamHttpConfig import io.getstream.android.core.api.model.config.StreamSocketConfig import io.getstream.android.core.api.model.connection.StreamConnectedUser import io.getstream.android.core.api.model.connection.StreamConnectionState +import io.getstream.android.core.api.model.connection.lifecycle.StreamLifecycleState import io.getstream.android.core.api.model.connection.network.StreamNetworkState import io.getstream.android.core.api.model.value.StreamApiKey import io.getstream.android.core.api.model.value.StreamHttpClientInfoHeader import io.getstream.android.core.api.model.value.StreamUserId import io.getstream.android.core.api.model.value.StreamWsUrl +import io.getstream.android.core.api.observers.lifecycle.StreamLifecycleMonitor import io.getstream.android.core.api.observers.network.StreamNetworkMonitor import io.getstream.android.core.api.processing.StreamBatcher import io.getstream.android.core.api.processing.StreamRetryProcessor import io.getstream.android.core.api.processing.StreamSerialProcessingQueue import io.getstream.android.core.api.processing.StreamSingleFlightProcessor +import io.getstream.android.core.api.recovery.StreamConnectionRecoveryEvaluator import io.getstream.android.core.api.serialization.StreamEventSerialization import io.getstream.android.core.api.socket.StreamConnectionIdHolder import io.getstream.android.core.api.socket.StreamWebSocket import io.getstream.android.core.api.socket.StreamWebSocketFactory import io.getstream.android.core.api.socket.listeners.StreamClientListener import io.getstream.android.core.api.socket.monitor.StreamHealthMonitor -import io.getstream.android.core.api.subscribe.StreamSubscription +import io.getstream.android.core.api.subscribe.StreamObservable import io.getstream.android.core.api.subscribe.StreamSubscriptionManager import io.getstream.android.core.internal.client.StreamClientImpl +import io.getstream.android.core.internal.observers.StreamNetworkAndLifeCycleMonitor import io.getstream.android.core.internal.serialization.StreamCompositeEventSerializationImpl import io.getstream.android.core.internal.serialization.StreamCompositeMoshiJsonSerialization import io.getstream.android.core.internal.serialization.StreamMoshiJsonSerializationImpl @@ -100,7 +105,7 @@ import kotlinx.coroutines.flow.StateFlow * ``` */ @StreamInternalApi -public interface StreamClient { +public interface StreamClient : StreamObservable { /** * Read-only, hot state holder for this client. * @@ -110,16 +115,6 @@ public interface StreamClient { */ public val connectionState: StateFlow - /** - * Read-only, hot state holder for the current network snapshot. - * - * **Semantics** - * - Emits the latest network snapshot whenever it changes. - * - Hot & conflated: new collectors receive the latest value immediately. - * - `null` if no network is available. - */ - @StreamInternalApi public val networkState: StateFlow - /** * Establishes a connection for the current user. * @@ -143,13 +138,6 @@ public interface StreamClient { * - Throws [kotlinx.coroutines.CancellationException] if the awaiting coroutine is cancelled. */ public suspend fun disconnect(): Result - - /** - * Subscribes to client events and state - * - * @param listener The listener to subscribe. - */ - public fun subscribe(listener: StreamClientListener): Result } /** @@ -197,21 +185,25 @@ public interface StreamClient { * @param apiKey The API key. * @param userId The user ID. * @param wsUrl The WebSocket URL. + * @param products Stream product codes (for feature gates / telemetry) negotiated with the socket. * @param clientInfoHeader The client info header. + * @param clientSubscriptionManager Manages socket-level listeners registered via [StreamClient]. * @param tokenProvider The token provider. - * @param scope The coroutine scope. - * @param logProvider The logger provider. - * @param clientSubscriptionManager The client subscription manager. * @param tokenManager The token manager. * @param singleFlight The single-flight processor. * @param serialQueue The serial processing queue. - * @param httpConfig The HTTP configuration. * @param retryProcessor The retry processor. + * @param scope The coroutine scope powering internal work (usually `SupervisorJob + Dispatcher`). * @param connectionIdHolder The connection ID holder. * @param socketFactory The WebSocket factory. - * @param healthMonitor The health monitor. * @param batcher The WebSocket event batcher. + * @param healthMonitor The health monitor. + * @param networkMonitor Tracks device connectivity and feeds connection recovery. + * @param httpConfig Optional HTTP client customization. + * @param serializationConfig Composite JSON / event serialization configuration. + * @param logProvider The logger provider. */ +@SuppressLint("ExposeAsStateFlow") @StreamInternalApi public fun StreamClient( // Client config @@ -236,6 +228,8 @@ public fun StreamClient( // Monitoring healthMonitor: StreamHealthMonitor, networkMonitor: StreamNetworkMonitor, + lifecycleMonitor: StreamLifecycleMonitor, + connectionRecoveryEvaluator: StreamConnectionRecoveryEvaluator, // Http httpConfig: StreamHttpConfig? = null, // Serialization @@ -284,6 +278,20 @@ public fun StreamClient( configuredInterceptors.forEach { httpBuilder.addInterceptor(it) } } + val networkAndLifeCycleMonitor = + StreamNetworkAndLifeCycleMonitor( + logger = logProvider.taggedLogger("SCNetworkAndLifecycleMonitor"), + networkMonitor = networkMonitor, + lifecycleMonitor = lifecycleMonitor, + mutableNetworkState = MutableStateFlow(StreamNetworkState.Unknown), + mutableLifecycleState = MutableStateFlow(StreamLifecycleState.Unknown), + subscriptionManager = + StreamSubscriptionManager( + logger = logProvider.taggedLogger("SCNLMonitorSubscriptions") + ), + ) + + val mutableConnectionState = MutableStateFlow(StreamConnectionState.Idle) return StreamClientImpl( userId = userId, scope = clientScope, @@ -292,10 +300,10 @@ public fun StreamClient( serialQueue = serialQueue, connectionIdHolder = connectionIdHolder, logger = clientLogger, - mutableNetworkState = MutableStateFlow(StreamNetworkState.Unknown), - mutableConnectionState = MutableStateFlow(StreamConnectionState.Idle), + mutableConnectionState = mutableConnectionState, subscriptionManager = clientSubscriptionManager, - networkMonitor = networkMonitor, + networkAndLifeCycleMonitor = networkAndLifeCycleMonitor, + connectionRecoveryEvaluator = connectionRecoveryEvaluator, socketSession = StreamSocketSession( logger = logProvider.taggedLogger("SCSocketSession"), diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/components/StreamAndroidComponentsProvider.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/components/StreamAndroidComponentsProvider.kt index 9b4b569..727efc1 100644 --- a/stream-android-core/src/main/java/io/getstream/android/core/api/components/StreamAndroidComponentsProvider.kt +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/components/StreamAndroidComponentsProvider.kt @@ -20,14 +20,23 @@ import android.content.Context import android.net.ConnectivityManager import android.net.wifi.WifiManager import android.telephony.TelephonyManager +import androidx.lifecycle.Lifecycle import io.getstream.android.core.annotations.StreamInternalApi import io.getstream.android.core.internal.components.StreamAndroidComponentsProviderImpl /** - * Provides access to Android system services. + * Facade over the Android system services required by the core SDK. * - * This interface abstracts away the details of accessing Android system services, allowing the SDK - * to work with different versions of Android and different build environments. + * Abstracting the access behind this interface allows Stream components to operate in tests, + * host-apps with custom dependency wiring, or alternative runtime environments. + * + * ### Typical usage + * + * ```kotlin + * val components = StreamAndroidComponentsProvider(context) + * val connectivity = components.connectivityManager().getOrNull() + * val lifecycle = components.lifecycle() + * ``` */ @StreamInternalApi public interface StreamAndroidComponentsProvider { @@ -35,26 +44,62 @@ public interface StreamAndroidComponentsProvider { /** * Retrieves the [ConnectivityManager] system service. * - * @return A [Result] containing the [ConnectivityManager] if successful, or an error if the - * service cannot be retrieved. + * ### Example + * + * ```kotlin + * val connectivity: ConnectivityManager = + * components.connectivityManager().getOrElse { throwable -> + * logger.e(throwable) { "Connectivity unavailable" } + * throw throwable + * } + * ``` + * + * @return [Result.success] with the manager, or [Result.failure] when the service is missing. */ public fun connectivityManager(): Result /** * Retrieves the [WifiManager] system service. * - * @return A [Result] containing the [WifiManager] if successful, or an error if the service - * cannot be retrieved. + * ### Example + * + * ```kotlin + * val isWifiEnabled = components.wifiManager() + * .map { wifi -> wifi.isWifiEnabled } + * .getOrDefault(false) + * ``` + * + * @return [Result.success] with the manager, or [Result.failure] when the service is missing. */ public fun wifiManager(): Result /** * Retrieves the [TelephonyManager] system service. * - * @return A [Result] containing the [TelephonyManager] if successful, or an error if the - * service cannot be retrieved. + * ### Example + * + * ```kotlin + * val networkType = components.telephonyManager() + * .map { telephony -> telephony.dataNetworkType } + * .getOrElse { TelephonyManager.NETWORK_TYPE_UNKNOWN } + * ``` + * + * @return [Result.success] with the manager, or [Result.failure] when the service is missing. */ public fun telephonyManager(): Result + + /** + * Retrieves the [Lifecycle] for the application. + * + * ### Example + * + * ```kotlin + * components.lifecycle().addObserver(lifecycleObserver) + * ``` + * + * @return The process-level [Lifecycle]. + */ + public fun lifecycle(): Lifecycle } /** diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/model/connection/lifecycle/StreamLifecycleState.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/model/connection/lifecycle/StreamLifecycleState.kt new file mode 100644 index 0000000..0849680 --- /dev/null +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/model/connection/lifecycle/StreamLifecycleState.kt @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2014-2025 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.android.core.api.model.connection.lifecycle + +import io.getstream.android.core.annotations.StreamInternalApi + +/** + * Process-wide lifecycle snapshot used by the connection recovery layer. + * + * Implementations surface coarse-grained lifecycle boundaries (foreground/background) that + * influence reconnection heuristics. `Unknown` is emitted while the lifecycle source is still being + * resolved. + */ +@StreamInternalApi +public sealed class StreamLifecycleState { + + /** The lifecycle source has not yet reported a definitive state. */ + public object Unknown : StreamLifecycleState() + + /** The app is considered foregrounded (user-visible). */ + public object Foreground : StreamLifecycleState() + + /** The app moved to background and is no longer user-visible. */ + public object Background : StreamLifecycleState() +} diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/model/connection/recovery/Recovery.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/model/connection/recovery/Recovery.kt new file mode 100644 index 0000000..34dfb6b --- /dev/null +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/model/connection/recovery/Recovery.kt @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2014-2025 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.android.core.api.model.connection.recovery + +import io.getstream.android.core.annotations.StreamInternalApi + +/** + * Represents a connection recovery decision. + * + * @param T The type of the data that is passed to the recovery decision. + */ +@StreamInternalApi +public sealed class Recovery { + + /** + * The connection should be reconnected. + * + * @param why The reason for the reconnect. + * @param T The type of the data that is passed to the recovery decision. + */ + public data class Connect(val why: T) : Recovery() + + /** + * The connection should be disconnected. + * + * @param why The reason for the disconnect. + * @param T The type of the data that is passed to the recovery decision. + */ + public data class Disconnect(val why: T) : Recovery() + + /** + * An error occurred while evaluating the recovery strategy. + * + * @property error The error that occurred. + */ + public data class Error(val error: Throwable) : Recovery() +} diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/observers/StreamStartableComponent.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/observers/StreamStartableComponent.kt new file mode 100644 index 0000000..8f7ac77 --- /dev/null +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/observers/StreamStartableComponent.kt @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2014-2025 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.android.core.api.observers + +import io.getstream.android.core.annotations.StreamInternalApi + +/** + * Minimal lifecycle contract for components that can be started and stopped on demand. + * + * Consumers typically call [start] during initialization and [stop] when tearing down resources or + * responding to lifecycle events. + */ +@StreamInternalApi +public interface StreamStartableComponent { + + /** + * Starts the component. + * + * ### Example + * + * ```kotlin + * subscriptionManager.start() + * .onFailure { cause -> logger.e(cause) { "Unable to start monitor" } } + * ``` + * + * @return `Result.success(Unit)` on success; `Result.failure(cause)` when startup fails. + */ + public fun start(): Result + + /** + * Stops the component. + * + * ### Example + * + * ```kotlin + * subscriptionManager.stop() + * .onFailure { cause -> logger.w(cause) { "Unable to stop monitor" } } + * ``` + * + * @return `Result.success(Unit)` on success; `Result.failure(cause)` when shutdown fails. + */ + public fun stop(): Result +} diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/observers/lifecycle/StreamLifecycleListener.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/observers/lifecycle/StreamLifecycleListener.kt new file mode 100644 index 0000000..ac9ac46 --- /dev/null +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/observers/lifecycle/StreamLifecycleListener.kt @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2014-2025 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.android.core.api.observers.lifecycle + +import io.getstream.android.core.annotations.StreamInternalApi + +/** + * Callbacks mirroring host lifecycle events emitted by [StreamLifecycleMonitor]. + * + * Implementers typically register via `StreamLifecycleMonitor.subscribe(listener)` and override the + * relevant callbacks to react to lifecycle transitions. + */ +@StreamInternalApi +public interface StreamLifecycleListener { + + /** + * Called when the app moves to the foreground. + * + * ### Example + * + * ```kotlin + * override fun onForeground() { + * logger.i { "App moved to foreground" } + * } + * ``` + */ + public fun onForeground() {} + + /** + * Called when the app moves to the background. + * + * ### Example + * + * ```kotlin + * override fun onBackground() { + * logger.i { "App moved to background" } + * } + * ``` + */ + public fun onBackground() {} +} diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/observers/lifecycle/StreamLifecycleMonitor.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/observers/lifecycle/StreamLifecycleMonitor.kt new file mode 100644 index 0000000..b3608db --- /dev/null +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/observers/lifecycle/StreamLifecycleMonitor.kt @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2014-2025 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.android.core.api.observers.lifecycle + +import androidx.lifecycle.Lifecycle +import io.getstream.android.core.annotations.StreamInternalApi +import io.getstream.android.core.api.log.StreamLogger +import io.getstream.android.core.api.model.connection.lifecycle.StreamLifecycleState +import io.getstream.android.core.api.observers.StreamStartableComponent +import io.getstream.android.core.api.subscribe.StreamObservable +import io.getstream.android.core.api.subscribe.StreamSubscriptionManager +import io.getstream.android.core.internal.observers.lifecycle.StreamLifecycleMonitorImpl + +/** + * Aggregates lifecycle events from the host environment and re-emits them to registered listeners. + * + * ### Example + * + * ```kotlin + * val subscription = lifecycleMonitor.subscribe(MyLifecycleListener()) + * lifecycleMonitor.start() + * // … later … + * subscription.getOrThrow().cancel() + * lifecycleMonitor.stop() + * ``` + */ +@StreamInternalApi +public interface StreamLifecycleMonitor : + StreamStartableComponent, StreamObservable { + + /** + * Returns the current lifecycle state. + * + * @return The current lifecycle state. + */ + public fun getCurrentState(): StreamLifecycleState +} + +/** + * Creates a [StreamLifecycleMonitor] instance. + * + * @param logger The logger to use for logging. + * @param subscriptionManager The subscription manager to use for managing listeners. + * @param lifecycle The host lifecycle to observe. + * @return A new [StreamLifecycleMonitor] instance. + */ +@StreamInternalApi +public fun StreamLifecycleMonitor( + logger: StreamLogger, + lifecycle: Lifecycle, + subscriptionManager: StreamSubscriptionManager, +): StreamLifecycleMonitor = StreamLifecycleMonitorImpl(logger, subscriptionManager, lifecycle) diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/observers/network/StreamNetworkMonitor.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/observers/network/StreamNetworkMonitor.kt index 286d67c..3249b39 100644 --- a/stream-android-core/src/main/java/io/getstream/android/core/api/observers/network/StreamNetworkMonitor.kt +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/observers/network/StreamNetworkMonitor.kt @@ -21,7 +21,8 @@ import android.net.wifi.WifiManager import android.telephony.TelephonyManager import io.getstream.android.core.annotations.StreamInternalApi import io.getstream.android.core.api.log.StreamLogger -import io.getstream.android.core.api.subscribe.StreamSubscription +import io.getstream.android.core.api.observers.StreamStartableComponent +import io.getstream.android.core.api.subscribe.StreamObservable import io.getstream.android.core.api.subscribe.StreamSubscriptionManager import io.getstream.android.core.internal.observers.network.StreamNetworkMonitorImpl import io.getstream.android.core.internal.observers.network.StreamNetworkSignalProcessing @@ -31,23 +32,22 @@ import kotlinx.coroutines.CoroutineScope /** * Observes changes to the device's active network and provides snapshots of its capabilities. * - * Implementations are expected to be life-cycle aware and safe to invoke from any thread. + * Implementations are expected to be lifecycle-aware and safe to invoke from any thread. + * + * ### Example + * + * ```kotlin + * val subscription = monitor.subscribe(listener).getOrThrow() + * monitor.start() + * + * // ... later ... + * subscription.cancel() + * monitor.stop() + * ``` */ @StreamInternalApi -public interface StreamNetworkMonitor { - - /** Registers [listener] to receive network updates. */ - public fun subscribe( - listener: StreamNetworkMonitorListener, - options: StreamSubscriptionManager.Options = StreamSubscriptionManager.Options(), - ): Result - - /** Starts monitoring connectivity changes. Safe to call multiple times. */ - public fun start(): Result - - /** Stops monitoring and releases platform callbacks. Safe to call multiple times. */ - public fun stop(): Result -} +public interface StreamNetworkMonitor : + StreamStartableComponent, StreamObservable /** * Creates a [StreamNetworkMonitor] instance. @@ -55,7 +55,10 @@ public interface StreamNetworkMonitor { * @param logger The logger to use for logging. * @param scope The coroutine scope to use for running the monitor. * @param subscriptionManager The subscription manager to use for managing listeners. - * @param componentsProvider Provides access to Android system services used for monitoring. + * @param wifiManager The Wi-Fi manager to use for accessing Wi-Fi information. + * @param telephonyManager The telephony manager to use for accessing cellular information. + * @param connectivityManager The connectivity manager to use for accessing network information. + * @return A new [StreamNetworkMonitor] instance. */ @StreamInternalApi public fun StreamNetworkMonitor( diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/observers/network/StreamNetworkMonitorListener.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/observers/network/StreamNetworkMonitorListener.kt index 4608608..3914538 100644 --- a/stream-android-core/src/main/java/io/getstream/android/core/api/observers/network/StreamNetworkMonitorListener.kt +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/observers/network/StreamNetworkMonitorListener.kt @@ -19,16 +19,20 @@ package io.getstream.android.core.api.observers.network import io.getstream.android.core.annotations.StreamInternalApi import io.getstream.android.core.api.model.connection.network.StreamNetworkInfo -/** - * Listener interface for network state changes. - * - * Implement this interface to receive updates about network state changes. - */ +/** Receives network availability and capability updates from [StreamNetworkMonitor]. */ @StreamInternalApi public interface StreamNetworkMonitorListener { /** * Called when the network is connected. * + * ### Example + * + * ```kotlin + * override suspend fun onNetworkConnected(snapshot: StreamNetworkInfo.Snapshot?) { + * logger.i { "Network connected: ${snapshot?.type}" } + * } + * ``` + * * @param snapshot A [StreamNetworkInfo.Snapshot] describing the newly connected network. */ public suspend fun onNetworkConnected(snapshot: StreamNetworkInfo.Snapshot?) {} @@ -36,7 +40,16 @@ public interface StreamNetworkMonitorListener { /** * Called when the network is lost. * - * @param permanent True if the network is lost permanently (e.g., due to airplane mode). + * ### Example + * + * ```kotlin + * override suspend fun onNetworkLost(permanent: Boolean) { + * retryScheduler.pause() + * if (permanent) alertUser() + * } + * ``` + * + * @param permanent True if the network is lost permanently (e.g., onUnavailable called). */ public suspend fun onNetworkLost(permanent: Boolean = false) {} @@ -44,6 +57,14 @@ public interface StreamNetworkMonitorListener { * Called when the properties of the currently connected network change while the connection * remains active. * + * ### Example + * + * ```kotlin + * override suspend fun onNetworkPropertiesChanged(snapshot: StreamNetworkInfo.Snapshot) { + * metrics.recordThroughput(snapshot.linkBandwidthDownKbps) + * } + * ``` + * * @param snapshot A [StreamNetworkInfo.Snapshot] containing the updated properties. */ public suspend fun onNetworkPropertiesChanged(snapshot: StreamNetworkInfo.Snapshot) {} diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamBatcher.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamBatcher.kt index c0c8376..4ec27cf 100644 --- a/stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamBatcher.kt +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamBatcher.kt @@ -50,6 +50,13 @@ public interface StreamBatcher { /** * Starts the processor if it's not already running. * + * ### Example + * + * ```kotlin + * batcher.start() + * .onFailure { cause -> logger.e(cause) { "Unable to start batcher" } } + * ``` + * * @return `Result.success(Unit)` if the processor was started successfully; otherwise a * `Result.failure(cause)` describing why the start failed. */ @@ -67,6 +74,14 @@ public interface StreamBatcher { * - **Int**: the number of items emitted in this batch (equals `batch.size`). * * Calling this method replaces any previously registered handler. + * + * ### Example + * + * ```kotlin + * batcher.onBatch { batch, _, _ -> + * batch.forEach { event -> handle(event) } + * } + * ``` */ public fun onBatch(handler: suspend (List, Long, Int) -> Unit) @@ -81,16 +96,29 @@ public interface StreamBatcher { * Implementations may start processing lazily on the first call. * * @param item The item to enqueue. + * + * ### Example + * + * ```kotlin + * batcher.enqueue(event) + * .onFailure { cause -> logger.w(cause) { "Dropped event" } } + * ``` */ public suspend fun enqueue(item: T): Result /** * Enqueues a single item for debounced processing. * - * This function **does not suspend** and returns `false` if the underlying buffer is full - * (bounded-capacity implementations). It returns a [Result]: - * - `true` if the item was accepted, - * - `false` if the processor is closed/stopped or cannot accept the item. + * This non-suspending variant returns `false` when the underlying buffer is full + * (bounded-capacity implementations). + * + * ### Example + * + * ```kotlin + * if (!batcher.offer(event)) { + * metrics.incrementDropped() + * } + * ``` */ public fun offer(item: T): Boolean @@ -101,6 +129,13 @@ public interface StreamBatcher { * calls should fail with `Result.failure`. Multiple calls to [stop] are allowed and should be * **idempotent**. * + * ### Example + * + * ```kotlin + * batcher.stop() + * .onFailure { cause -> logger.w(cause) { "Unable to stop batcher" } } + * ``` + * * @return `Result.success(Unit)` on a successful stop; `Result.failure(cause)` if stopping * failed. */ @@ -128,7 +163,7 @@ public fun StreamBatcher( autoStart: Boolean = true, channelCapacity: Int = Channel.UNLIMITED, ): StreamBatcher = - StreamBatcherImpl( + StreamBatcherImpl( scope = scope, batchSize = batchSize, initialDelayMs = initialDelayMs, diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/recovery/StreamConnectionRecoveryEvaluator.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/recovery/StreamConnectionRecoveryEvaluator.kt new file mode 100644 index 0000000..8ed7d76 --- /dev/null +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/recovery/StreamConnectionRecoveryEvaluator.kt @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2014-2025 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.android.core.api.recovery + +import io.getstream.android.core.annotations.StreamInternalApi +import io.getstream.android.core.api.log.StreamLogger +import io.getstream.android.core.api.model.connection.StreamConnectionState +import io.getstream.android.core.api.model.connection.lifecycle.StreamLifecycleState +import io.getstream.android.core.api.model.connection.network.StreamNetworkState +import io.getstream.android.core.api.model.connection.recovery.Recovery +import io.getstream.android.core.api.processing.StreamSingleFlightProcessor +import io.getstream.android.core.internal.recovery.StreamConnectionRecoveryEvaluatorImpl + +/** + * Evaluates the current connection state and network availability to determine whether a reconnect + * or disconnect action should be taken. + * + * Implementations are responsible for defining the heuristics that determine when a reconnect or + * disconnect should be triggered based on the current connection state and network availability. + */ +@StreamInternalApi +public interface StreamConnectionRecoveryEvaluator { + /** + * Evaluates the current connection state and network availability to determine whether a + * reconnect or disconnect action should be taken. + * + * @param lifecycleState The current lifecycle state. + * @param networkState The current network state. + * @return A [io.getstream.android.core.api.model.connection.recovery.Recovery] indicating the + * action to take, or `null` if no action is needed. + */ + public suspend fun evaluate( + connectionState: StreamConnectionState, + lifecycleState: StreamLifecycleState, + networkState: StreamNetworkState, + ): Result +} + +/** + * Creates a new [StreamConnectionRecoveryEvaluator] instance. + * + * @param logger The logger to use for logging. + * @param singleFlightProcessor The single-flight processor to use for managing concurrent requests. + * @return A new [StreamConnectionRecoveryEvaluator] instance. + */ +@StreamInternalApi +public fun StreamConnectionRecoveryEvaluator( + logger: StreamLogger, + singleFlightProcessor: StreamSingleFlightProcessor, +): StreamConnectionRecoveryEvaluator = + StreamConnectionRecoveryEvaluatorImpl(logger, singleFlightProcessor) diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/socket/listeners/StreamClientListener.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/socket/listeners/StreamClientListener.kt index b4c2074..04d24a0 100644 --- a/stream-android-core/src/main/java/io/getstream/android/core/api/socket/listeners/StreamClientListener.kt +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/socket/listeners/StreamClientListener.kt @@ -18,7 +18,7 @@ package io.getstream.android.core.api.socket.listeners import io.getstream.android.core.annotations.StreamInternalApi import io.getstream.android.core.api.model.connection.StreamConnectionState -import io.getstream.android.core.api.model.connection.network.StreamNetworkState +import io.getstream.android.core.api.model.connection.recovery.Recovery /** * Listener interface for Feeds socket events. @@ -50,9 +50,9 @@ public interface StreamClientListener { public fun onError(err: Throwable) {} /** - * Called when the network connection changes. + * Called when a recovery decision is made. * - * @param state The new network state. + * @param recovery The recovery decision. */ - public fun onNetworkState(state: StreamNetworkState) {} + public fun onRecovery(recovery: Recovery) {} } diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/socket/monitor/StreamHealthMonitor.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/socket/monitor/StreamHealthMonitor.kt index 15492f4..739ec2f 100644 --- a/stream-android-core/src/main/java/io/getstream/android/core/api/socket/monitor/StreamHealthMonitor.kt +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/socket/monitor/StreamHealthMonitor.kt @@ -18,6 +18,7 @@ package io.getstream.android.core.api.socket.monitor import io.getstream.android.core.annotations.StreamInternalApi import io.getstream.android.core.api.log.StreamLogger +import io.getstream.android.core.api.observers.StreamStartableComponent import io.getstream.android.core.internal.socket.monitor.StreamHealthMonitorImpl import kotlin.time.ExperimentalTime import kotlinx.coroutines.CoroutineScope @@ -33,7 +34,7 @@ import kotlinx.coroutines.CoroutineScope * - Start and stop the monitor as needed. */ @StreamInternalApi -public interface StreamHealthMonitor { +public interface StreamHealthMonitor : StreamStartableComponent { /** * Registers a callback that is invoked at every heartbeat interval. * @@ -61,12 +62,6 @@ public interface StreamHealthMonitor { * is alive and healthy. Resets the liveness timer. */ public fun acknowledgeHeartbeat() - - /** Starts the health monitor, beginning the heartbeat and liveness checks. */ - public fun start(): Result - - /** Stops the health monitor, halting heartbeat and liveness checks. */ - public fun stop(): Result } /** diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/subscribe/StreamObservable.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/subscribe/StreamObservable.kt new file mode 100644 index 0000000..6f95554 --- /dev/null +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/subscribe/StreamObservable.kt @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2014-2025 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.android.core.api.subscribe + +import io.getstream.android.core.annotations.StreamPublishedApi +import io.getstream.android.core.api.subscribe.StreamSubscriptionManager.Options + +/** + * Interface for observables that can be subscribed to. + * + * @param T The type of the listener. + */ +@StreamPublishedApi +public interface StreamObservable { + + /** + * Adds [listener] to the active set and returns a handle that can later be used to unregister + * it. + * + * The returned [StreamSubscription] is idempotent: + * - Calling `cancel()` multiple times is safe. + * - After `cancel()` completes, the listener is guaranteed to be absent from subsequent + * [forEach] iterations. + * + * Retention: + * - When [options.retention] is [Options.Retention.AUTO_REMOVE] (default), you can omit calling + * `cancel()`. Once your code drops all references to the listener, it is removed + * automatically and will no longer receive events. + * - When [options.retention] is [Options.Retention.KEEP_UNTIL_CANCELLED], you must call + * `cancel()` (or invoke [clear]) to stop events. + * + * @param listener The listener to register. + * @param options Retention options; defaults to automatic removal when the listener is no + * longer referenced. + * @return `Result.success(StreamSubscription)` when the listener was added; + * `Result.failure(Throwable)` if the operation cannot be completed (e.g., capacity limits). + */ + public fun subscribe(listener: T, options: Options = Options()): Result +} diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/subscribe/StreamSubscriptionManager.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/subscribe/StreamSubscriptionManager.kt index 1523c29..2f4823f 100644 --- a/stream-android-core/src/main/java/io/getstream/android/core/api/subscribe/StreamSubscriptionManager.kt +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/subscribe/StreamSubscriptionManager.kt @@ -39,7 +39,7 @@ import io.getstream.android.core.internal.subscribe.StreamSubscriptionManagerImp * @param T the listener type (often a function type, e.g. `(Event) -> Unit`) */ @StreamInternalApi -public interface StreamSubscriptionManager { +public interface StreamSubscriptionManager : StreamObservable { /** * Subscription behavior options. * @@ -63,30 +63,6 @@ public interface StreamSubscriptionManager { } } - /** - * Adds [listener] to the active set and returns a handle that can later be used to unregister - * it. - * - * The returned [StreamSubscription] is idempotent: - * - Calling `cancel()` multiple times is safe. - * - After `cancel()` completes, the listener is guaranteed to be absent from subsequent - * [forEach] iterations. - * - * Retention: - * - When [options.retention] is [Options.Retention.AUTO_REMOVE] (default), you can omit calling - * `cancel()`. Once your code drops all references to the listener, it is removed - * automatically and will no longer receive events. - * - When [options.retention] is [Options.Retention.KEEP_UNTIL_CANCELLED], you must call - * `cancel()` (or invoke [clear]) to stop events. - * - * @param listener The listener to register. - * @param options Retention options; defaults to automatic removal when the listener is no - * longer referenced. - * @return `Result.success(StreamSubscription)` when the listener was added; - * `Result.failure(Throwable)` if the operation cannot be completed (e.g., capacity limits). - */ - public fun subscribe(listener: T, options: Options = Options()): Result - /** * Removes **all** listeners and releases related resources. * diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/utils/Flows.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/utils/Flows.kt new file mode 100644 index 0000000..4bc8a74 --- /dev/null +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/utils/Flows.kt @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2014-2025 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.android.core.api.utils + +import io.getstream.android.core.annotations.StreamInternalApi +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.update + +/** + * Updates the value of the [MutableStateFlow] with the given [state]. Internally calls + * [MutableStateFlow.update] with a lambda that always returns the given [state]. More readable than + * `stateFlow.update { state }`. + * + * @param state The new value to set. + */ +@StreamInternalApi +public fun MutableStateFlow.update(state: T) { + this.update { state } +} diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/utils/Result.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/utils/Result.kt index 67e4dd9..048bebe 100644 --- a/stream-android-core/src/main/java/io/getstream/android/core/api/utils/Result.kt +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/utils/Result.kt @@ -17,6 +17,7 @@ package io.getstream.android.core.api.utils import io.getstream.android.core.annotations.StreamInternalApi +import io.getstream.android.core.api.model.exceptions.StreamEndpointException import kotlin.coroutines.cancellation.CancellationException /** @@ -36,3 +37,26 @@ import kotlin.coroutines.cancellation.CancellationException @StreamInternalApi public inline fun Result.flatMap(transform: (T) -> Result): Result = fold(onSuccess = { transform(it) }, onFailure = { Result.failure(it) }) + +/** + * Invokes the given [function] if this [Result] is a failure and the error is a token error. + * + * @param function A function to invoke if this [Result] is a failure and the error is a token + * error. + * @return This [Result] if it is a success, or the result of [function] if it is a failure and the + * error is a token error. + */ +@StreamInternalApi +public suspend fun Result.onTokenError( + function: suspend (exception: StreamEndpointException, code: Int) -> Result +): Result = + fold( + onSuccess = { this }, + onFailure = { throwable -> + if (throwable is StreamEndpointException && throwable.apiError?.code?.div(10) == 4) { + function(throwable, throwable.apiError.code) + } else { + this + } + }, + ) diff --git a/stream-android-core/src/main/java/io/getstream/android/core/internal/client/StreamClientImpl.kt b/stream-android-core/src/main/java/io/getstream/android/core/internal/client/StreamClientImpl.kt index d629532..d9fc88a 100644 --- a/stream-android-core/src/main/java/io/getstream/android/core/internal/client/StreamClientImpl.kt +++ b/stream-android-core/src/main/java/io/getstream/android/core/internal/client/StreamClientImpl.kt @@ -22,25 +22,30 @@ import io.getstream.android.core.api.log.StreamLogger import io.getstream.android.core.api.model.StreamTypedKey.Companion.randomExecutionKey import io.getstream.android.core.api.model.connection.StreamConnectedUser import io.getstream.android.core.api.model.connection.StreamConnectionState -import io.getstream.android.core.api.model.connection.network.StreamNetworkInfo +import io.getstream.android.core.api.model.connection.lifecycle.StreamLifecycleState import io.getstream.android.core.api.model.connection.network.StreamNetworkState +import io.getstream.android.core.api.model.connection.recovery.Recovery +import io.getstream.android.core.api.model.value.StreamToken import io.getstream.android.core.api.model.value.StreamUserId -import io.getstream.android.core.api.observers.network.StreamNetworkMonitor -import io.getstream.android.core.api.observers.network.StreamNetworkMonitorListener import io.getstream.android.core.api.processing.StreamSerialProcessingQueue import io.getstream.android.core.api.processing.StreamSingleFlightProcessor +import io.getstream.android.core.api.recovery.StreamConnectionRecoveryEvaluator import io.getstream.android.core.api.socket.StreamConnectionIdHolder import io.getstream.android.core.api.socket.listeners.StreamClientListener import io.getstream.android.core.api.subscribe.StreamSubscription import io.getstream.android.core.api.subscribe.StreamSubscriptionManager import io.getstream.android.core.api.utils.flatMap +import io.getstream.android.core.api.utils.onTokenError +import io.getstream.android.core.api.utils.update +import io.getstream.android.core.internal.observers.StreamNetworkAndLifeCycleMonitor +import io.getstream.android.core.internal.observers.StreamNetworkAndLifecycleMonitorListener import io.getstream.android.core.internal.socket.StreamSocketSession import io.getstream.android.core.internal.socket.model.ConnectUserData import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.asStateFlow -import kotlinx.coroutines.flow.update +import kotlinx.coroutines.launch internal class StreamClientImpl( private val userId: StreamUserId, @@ -49,11 +54,11 @@ internal class StreamClientImpl( private val serialQueue: StreamSerialProcessingQueue, private val connectionIdHolder: StreamConnectionIdHolder, private val socketSession: StreamSocketSession, - private var mutableNetworkState: MutableStateFlow, + private val networkAndLifeCycleMonitor: StreamNetworkAndLifeCycleMonitor, + private val connectionRecoveryEvaluator: StreamConnectionRecoveryEvaluator, private val mutableConnectionState: MutableStateFlow, private val logger: StreamLogger, private val subscriptionManager: StreamSubscriptionManager, - private val networkMonitor: StreamNetworkMonitor, private val scope: CoroutineScope, ) : StreamClient { companion object { @@ -61,17 +66,11 @@ internal class StreamClientImpl( private val disconnectKey = randomExecutionKey() } - private var handle: StreamSubscription? = null - private var networkMonitorHandle: StreamSubscription? = null + private var socketSessionHandle: StreamSubscription? = null + private var networkAndLifecycleMonitorHandle: StreamSubscription? = null override val connectionState: StateFlow get() = mutableConnectionState.asStateFlow() - override val networkState: StateFlow - get() = mutableNetworkState.asStateFlow() - - override fun subscribe(listener: StreamClientListener): Result = - subscriptionManager.subscribe(listener) - override suspend fun connect(): Result = singleFlight.run(connectKey) { val currentState = connectionState.value @@ -79,104 +78,67 @@ internal class StreamClientImpl( logger.w { "[connect] Already connected!" } return@run currentState.connectedUser } - if (handle == null) { + + val retentionOptions = + StreamSubscriptionManager.Options( + retention = StreamSubscriptionManager.Options.Retention.KEEP_UNTIL_CANCELLED + ) + + if (socketSessionHandle == null) { logger.v { "[connect] Subscribing to socket events]" } - handle = - socketSession - .subscribe( - object : StreamClientListener { - override fun onState(state: StreamConnectionState) { - logger.v { "[client#onState]: $state" } - mutableConnectionState.update(state) - subscriptionManager.forEach { it.onState(state) } - } - - override fun onEvent(event: Any) { - logger.v { "[client#onEvent]: $event" } - subscriptionManager.forEach { it.onEvent(event) } - } - }, - StreamSubscriptionManager.Options( - retention = - StreamSubscriptionManager.Options.Retention.KEEP_UNTIL_CANCELLED - ), - ) + val clientListener = + object : StreamClientListener { + override fun onState(state: StreamConnectionState) { + logger.v { "[client#onState]: $state" } + mutableConnectionState.update(state) + subscriptionManager.forEach { it.onState(state) } + } + + override fun onEvent(event: Any) { + logger.v { "[client#onEvent]: $event" } + subscriptionManager.forEach { it.onEvent(event) } + } + + override fun onError(err: Throwable) { + logger.e(err) { "[client#onError]: $err" } + subscriptionManager.forEach { it.onError(err) } + } + } + socketSessionHandle = + socketSession.subscribe(clientListener, retentionOptions).getOrThrow() + } + + if (networkAndLifecycleMonitorHandle == null) { + logger.v { "[connect] Setup network and lifecycle monitor callback" } + val networkAndLifecycleMonitorListener = + object : StreamNetworkAndLifecycleMonitorListener { + override fun onNetworkAndLifecycleState( + networkState: StreamNetworkState, + lifecycleState: StreamLifecycleState, + ) { + scope.launch { + val connectionState = mutableConnectionState.value + val recovery = + connectionRecoveryEvaluator.evaluate( + connectionState, + lifecycleState, + networkState, + ) + recoveryEffect(recovery) + } + } + } + networkAndLifecycleMonitorHandle = + networkAndLifeCycleMonitor + .subscribe(networkAndLifecycleMonitorListener, retentionOptions) .getOrThrow() } tokenManager .loadIfAbsent() - .flatMap { token -> - socketSession.connect( - ConnectUserData( - userId = userId.rawValue, - token = token.rawValue, - name = null, - image = null, - invisible = false, - language = null, - custom = null, - ) - ) - } + .flatMap { token -> connectSocketSession(token) } .fold( onSuccess = { connected -> logger.d { "Connected to socket: $connected" } - if (networkMonitorHandle == null) { - logger.v { "[connect] Starting network monitor" } - networkMonitorHandle = - networkMonitor - .subscribe( - object : StreamNetworkMonitorListener { - override suspend fun onNetworkConnected( - snapshot: StreamNetworkInfo.Snapshot? - ) { - logger.v { - "[connect] Network connected: $snapshot" - } - val state = StreamNetworkState.Available(snapshot) - mutableNetworkState.update(state) - subscriptionManager.forEach { - it.onNetworkState(state) - } - } - - override suspend fun onNetworkLost(permanent: Boolean) { - logger.v { "[connect] Network lost" } - val state = - if (permanent) { - StreamNetworkState.Unavailable - } else { - StreamNetworkState.Disconnected - } - mutableNetworkState.update(state) - subscriptionManager.forEach { - it.onNetworkState(state) - } - } - - override suspend fun onNetworkPropertiesChanged( - snapshot: StreamNetworkInfo.Snapshot - ) { - logger.v { "[connect] Network changed: $snapshot" } - mutableNetworkState.update( - StreamNetworkState.Available(snapshot) - ) - subscriptionManager.forEach { - it.onNetworkState( - StreamNetworkState.Available(snapshot) - ) - } - } - }, - StreamSubscriptionManager.Options( - retention = - StreamSubscriptionManager.Options.Retention - .KEEP_UNTIL_CANCELLED - ), - ) - .getOrThrow() - } - networkMonitor.start() mutableConnectionState.update(connected) connectionIdHolder.setConnectionId(connected.connectionId).map { connected.connectedUser @@ -188,6 +150,9 @@ internal class StreamClientImpl( Result.failure(error) }, ) + .flatMap { connectedUser -> + networkAndLifeCycleMonitor.start().map { connectedUser } + } .getOrThrow() } @@ -197,17 +162,79 @@ internal class StreamClientImpl( mutableConnectionState.update(StreamConnectionState.Disconnected()) connectionIdHolder.clear() socketSession.disconnect() - handle?.cancel() - networkMonitor.stop() - networkMonitorHandle?.cancel() - networkMonitorHandle = null - handle = null + socketSessionHandle?.cancel() + networkAndLifeCycleMonitor.stop() + networkAndLifecycleMonitorHandle?.cancel() + networkAndLifecycleMonitorHandle = null + socketSessionHandle = null tokenManager.invalidate() serialQueue.stop() singleFlight.clear(true) } - private fun MutableStateFlow.update(state: T) { - this.update { state } + override fun subscribe( + listener: StreamClientListener, + options: StreamSubscriptionManager.Options, + ): Result = subscriptionManager.subscribe(listener, options) + + private suspend fun connectSocketSession( + token: StreamToken + ): Result { + val data = + ConnectUserData( + userId = userId.rawValue, + token = token.rawValue, + name = null, + image = null, + invisible = false, + language = null, + custom = null, + ) + return socketSession.connect(data).onTokenError { error, code -> + logger.e(error) { "Token error: $code" } + tokenManager.invalidate() + tokenManager.refresh().flatMap { newToken -> + // Retry once with new token + socketSession.connect(data.copy(token = newToken.rawValue)) + } + } } + + private suspend fun recoveryEffect(recovery: Result) { + recovery.fold( + onSuccess = { recovery -> + when (recovery) { + is Recovery.Connect<*> -> { + logger.v { "[recovery] Connecting: $recovery" } + connect().notifyFailure(subscriptionManager) + } + + is Recovery.Disconnect<*> -> { + logger.v { "[recovery] Disconnecting: $recovery" } + socketSession.disconnect().notifyFailure(subscriptionManager) + } + + is Recovery.Error -> { + logger.e(recovery.error) { "[recovery] Error: ${recovery.error.message}" } + subscriptionManager.forEach { it.onError(recovery.error) } + } + + null -> { + logger.v { "[recovery] No action" } + } + } + if (recovery != null) { + subscriptionManager.forEach { it.onRecovery(recovery) } + } + }, + onFailure = { error -> + logger.e(error) { "[recovery] Error: ${error.message}" } + subscriptionManager.forEach { it.onError(error) } + }, + ) + } + + private fun Result.notifyFailure( + subscriptionManager: StreamSubscriptionManager + ) = onFailure { error -> subscriptionManager.forEach { it.onError(error) } } } diff --git a/stream-android-core/src/main/java/io/getstream/android/core/internal/components/StreamAndroidComponentsProviderImpl.kt b/stream-android-core/src/main/java/io/getstream/android/core/internal/components/StreamAndroidComponentsProviderImpl.kt index 8cf4057..5b76b43 100644 --- a/stream-android-core/src/main/java/io/getstream/android/core/internal/components/StreamAndroidComponentsProviderImpl.kt +++ b/stream-android-core/src/main/java/io/getstream/android/core/internal/components/StreamAndroidComponentsProviderImpl.kt @@ -22,6 +22,8 @@ import android.net.ConnectivityManager import android.net.wifi.WifiManager import android.os.Build import android.telephony.TelephonyManager +import androidx.lifecycle.Lifecycle +import androidx.lifecycle.ProcessLifecycleOwner import io.getstream.android.core.api.components.StreamAndroidComponentsProvider internal class StreamAndroidComponentsProviderImpl(context: Context) : @@ -53,4 +55,6 @@ internal class StreamAndroidComponentsProviderImpl(context: Context) : applicationContext.getSystemService(Context.TELEPHONY_SERVICE) as TelephonyManager } } + + override fun lifecycle(): Lifecycle = ProcessLifecycleOwner.get().lifecycle } diff --git a/stream-android-core/src/main/java/io/getstream/android/core/internal/observers/StreamNetworkAndLifeCycleMonitor.kt b/stream-android-core/src/main/java/io/getstream/android/core/internal/observers/StreamNetworkAndLifeCycleMonitor.kt new file mode 100644 index 0000000..ed05e44 --- /dev/null +++ b/stream-android-core/src/main/java/io/getstream/android/core/internal/observers/StreamNetworkAndLifeCycleMonitor.kt @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2014-2025 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.android.core.internal.observers + +import io.getstream.android.core.api.log.StreamLogger +import io.getstream.android.core.api.model.connection.lifecycle.StreamLifecycleState +import io.getstream.android.core.api.model.connection.network.StreamNetworkState +import io.getstream.android.core.api.observers.StreamStartableComponent +import io.getstream.android.core.api.observers.lifecycle.StreamLifecycleMonitor +import io.getstream.android.core.api.observers.network.StreamNetworkMonitor +import io.getstream.android.core.api.subscribe.StreamObservable +import io.getstream.android.core.api.subscribe.StreamSubscriptionManager +import kotlinx.coroutines.flow.MutableStateFlow + +/** + * Coordinates lifecycle and network signals to make connection recovery decisions. + * + * ### Responsibilities + * - Bridges callbacks from [StreamNetworkMonitor] and [StreamLifecycleMonitor] into internal state + * flows for network and lifecycle state. + * - Notifies registered [StreamNetworkAndLifecycleMonitorListener]s when reconnect/teardown actions + * should occur. + * - Implements [StreamStartableComponent] so callers can hook into their own lifecycle. + * + * ### Usage + * + * ```kotlin + * val subscription = connectionRecoveryManager.subscribe(listener).getOrThrow() + * connectionRecoveryManager.start() + * // … later … + * subscription.cancel() + * connectionRecoveryManager.stop() + * ``` + */ +internal interface StreamNetworkAndLifeCycleMonitor : + StreamStartableComponent, StreamObservable {} + +/** + * Creates a [StreamNetworkAndLifeCycleMonitor] instance. + * + * @param logger The logger to use for logging. + * @param lifecycleMonitor The lifecycle monitor to use for accessing lifecycle information. + * @param networkMonitor The network monitor to use for accessing network information. + * @param mutableNetworkState The mutable network state to use for accessing network information. + * @param mutableLifecycleState The mutable lifecycle state to use for accessing lifecycle + * information. + * @param subscriptionManager The subscription manager to use for managing listeners. + * @return A new [StreamNetworkAndLifeCycleMonitor] instance. + */ +internal fun StreamNetworkAndLifeCycleMonitor( + logger: StreamLogger, + lifecycleMonitor: StreamLifecycleMonitor, + networkMonitor: StreamNetworkMonitor, + mutableNetworkState: MutableStateFlow, + mutableLifecycleState: MutableStateFlow, + subscriptionManager: StreamSubscriptionManager, +): StreamNetworkAndLifeCycleMonitor = + StreamNetworkAndLifecycleMonitorImpl( + logger = logger, + networkMonitor = networkMonitor, + lifecycleMonitor = lifecycleMonitor, + mutableNetworkState = mutableNetworkState, + mutableLifecycleState = mutableLifecycleState, + subscriptionManager = subscriptionManager, + ) diff --git a/stream-android-core/src/main/java/io/getstream/android/core/internal/observers/StreamNetworkAndLifecycleMonitorImpl.kt b/stream-android-core/src/main/java/io/getstream/android/core/internal/observers/StreamNetworkAndLifecycleMonitorImpl.kt new file mode 100644 index 0000000..48183d5 --- /dev/null +++ b/stream-android-core/src/main/java/io/getstream/android/core/internal/observers/StreamNetworkAndLifecycleMonitorImpl.kt @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2014-2025 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.android.core.internal.observers + +import io.getstream.android.core.api.log.StreamLogger +import io.getstream.android.core.api.model.connection.lifecycle.StreamLifecycleState +import io.getstream.android.core.api.model.connection.network.StreamNetworkInfo +import io.getstream.android.core.api.model.connection.network.StreamNetworkState +import io.getstream.android.core.api.observers.lifecycle.StreamLifecycleListener +import io.getstream.android.core.api.observers.lifecycle.StreamLifecycleMonitor +import io.getstream.android.core.api.observers.network.StreamNetworkMonitor +import io.getstream.android.core.api.observers.network.StreamNetworkMonitorListener +import io.getstream.android.core.api.subscribe.StreamSubscription +import io.getstream.android.core.api.subscribe.StreamSubscriptionManager +import io.getstream.android.core.api.utils.flatMap +import io.getstream.android.core.api.utils.times +import io.getstream.android.core.api.utils.update +import kotlinx.coroutines.flow.MutableStateFlow + +internal class StreamNetworkAndLifecycleMonitorImpl( + private val logger: StreamLogger, + private val networkMonitor: StreamNetworkMonitor, + private val lifecycleMonitor: StreamLifecycleMonitor, + private val mutableNetworkState: MutableStateFlow, + private val mutableLifecycleState: MutableStateFlow, + private val subscriptionManager: + StreamSubscriptionManager, +) : StreamNetworkAndLifeCycleMonitor { + private var networkHandle: StreamSubscription? = null + private var lifecycleHandle: StreamSubscription? = null + private val lifecycleListener = + object : StreamLifecycleListener { + + override fun onForeground() { + logger.v { "Lifecycle foregrounded" } + val lifecycleState = StreamLifecycleState.Foreground + val networkState = mutableNetworkState.value + mutableLifecycleState.update(lifecycleState) + subscriptionManager.forEach { + it.onNetworkAndLifecycleState(networkState, lifecycleState) + } + } + + override fun onBackground() { + logger.v { "Lifecycle backgrounded" } + val lifecycleState = StreamLifecycleState.Background + val networkState = mutableNetworkState.value + mutableLifecycleState.update(lifecycleState) + subscriptionManager.forEach { + it.onNetworkAndLifecycleState(networkState, lifecycleState) + } + } + } + private val networkMonitorListener = + object : StreamNetworkMonitorListener { + override suspend fun onNetworkConnected(snapshot: StreamNetworkInfo.Snapshot?) { + logger.v { "Network connected: $snapshot" } + val state = StreamNetworkState.Available(snapshot) + mutableNetworkState.update(state) + val lifecycleState = mutableLifecycleState.value + subscriptionManager.forEach { it.onNetworkAndLifecycleState(state, lifecycleState) } + } + + override suspend fun onNetworkLost(permanent: Boolean) { + logger.v { "Network lost" } + val state = + if (permanent) { + StreamNetworkState.Unavailable + } else { + StreamNetworkState.Disconnected + } + mutableNetworkState.update(state) + val lifecycleState = mutableLifecycleState.value + subscriptionManager.forEach { it.onNetworkAndLifecycleState(state, lifecycleState) } + } + } + + override fun start(): Result { + val networkStart = + networkMonitor + .start() + .flatMap { networkMonitor.subscribe(networkMonitorListener) } + .also { result -> networkHandle = result.getOrNull() } + val lifecycleStart = + lifecycleMonitor + .start() + .flatMap { lifecycleMonitor.subscribe(lifecycleListener) } + .also { result -> lifecycleHandle = result.getOrNull() } + + mutableLifecycleState.update(lifecycleMonitor.getCurrentState()) + return (networkStart * lifecycleStart).flatMap { Result.success(Unit) } + } + + override fun stop(): Result { + networkHandle?.cancel() + lifecycleHandle?.cancel() + networkHandle = null + lifecycleHandle = null + mutableNetworkState.update(StreamNetworkState.Unknown) + mutableLifecycleState.update(StreamLifecycleState.Unknown) + subscriptionManager.clear() + return (lifecycleMonitor.stop() * networkMonitor.stop()).flatMap { Result.success(Unit) } + } + + override fun subscribe( + listener: StreamNetworkAndLifecycleMonitorListener, + options: StreamSubscriptionManager.Options, + ): Result = subscriptionManager.subscribe(listener, options) +} diff --git a/stream-android-core/src/main/java/io/getstream/android/core/internal/observers/StreamNetworkAndLifecycleMonitorListener.kt b/stream-android-core/src/main/java/io/getstream/android/core/internal/observers/StreamNetworkAndLifecycleMonitorListener.kt new file mode 100644 index 0000000..d82b41b --- /dev/null +++ b/stream-android-core/src/main/java/io/getstream/android/core/internal/observers/StreamNetworkAndLifecycleMonitorListener.kt @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2014-2025 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.android.core.internal.observers + +import io.getstream.android.core.api.model.connection.lifecycle.StreamLifecycleState +import io.getstream.android.core.api.model.connection.network.StreamNetworkState + +/** Listener interface for receiving network and lifecycle state updates. */ +internal interface StreamNetworkAndLifecycleMonitorListener { + + /** + * Called when the network or lifecycle state changes. + * + * @param networkState The new network state. + * @param lifecycleState The new lifecycle state. + */ + fun onNetworkAndLifecycleState( + networkState: StreamNetworkState, + lifecycleState: StreamLifecycleState, + ) +} diff --git a/stream-android-core/src/main/java/io/getstream/android/core/internal/observers/lifecycle/StreamLifecycleMonitorImpl.kt b/stream-android-core/src/main/java/io/getstream/android/core/internal/observers/lifecycle/StreamLifecycleMonitorImpl.kt new file mode 100644 index 0000000..37fdc7c --- /dev/null +++ b/stream-android-core/src/main/java/io/getstream/android/core/internal/observers/lifecycle/StreamLifecycleMonitorImpl.kt @@ -0,0 +1,90 @@ +/* + * Copyright (c) 2014-2025 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.android.core.internal.observers.lifecycle + +import androidx.lifecycle.DefaultLifecycleObserver +import androidx.lifecycle.Lifecycle +import androidx.lifecycle.LifecycleOwner +import io.getstream.android.core.api.log.StreamLogger +import io.getstream.android.core.api.model.connection.lifecycle.StreamLifecycleState +import io.getstream.android.core.api.observers.lifecycle.StreamLifecycleListener +import io.getstream.android.core.api.observers.lifecycle.StreamLifecycleMonitor +import io.getstream.android.core.api.subscribe.StreamSubscription +import io.getstream.android.core.api.subscribe.StreamSubscriptionManager +import java.util.concurrent.atomic.AtomicBoolean +import kotlinx.coroutines.ExperimentalCoroutinesApi + +internal class StreamLifecycleMonitorImpl( + private val logger: StreamLogger, + private val subscriptionManager: StreamSubscriptionManager, + private val lifecycle: Lifecycle, +) : StreamLifecycleMonitor, DefaultLifecycleObserver { + + private val started = AtomicBoolean(false) + + override fun subscribe( + listener: StreamLifecycleListener, + options: StreamSubscriptionManager.Options, + ): Result = subscriptionManager.subscribe(listener, options) + + override fun start(): Result = runCatching { + if (!started.compareAndSet(false, true)) { + return@runCatching + } + lifecycle.addObserver(this) + } + + override fun stop(): Result = runCatching { + if (!started.compareAndSet(true, false)) { + return@runCatching + } + lifecycle.removeObserver(this) + } + + override fun onResume(owner: LifecycleOwner) { + notifyListeners { it.onForeground() } + } + + override fun onPause(owner: LifecycleOwner) { + notifyListeners { it.onBackground() } + } + + private fun notifyListeners(block: (StreamLifecycleListener) -> Unit) { + subscriptionManager + .forEach { listener -> + runCatching { block(listener) } + .onFailure { throwable -> + logger.e(throwable) { + "StreamLifecycleListener block() failed when notifying" + } + } + } + .onFailure { throwable -> + logger.e(throwable) { "Failed to iterate lifecycle listeners" } + } + } + + @OptIn(ExperimentalCoroutinesApi::class) + override fun getCurrentState(): StreamLifecycleState = + when (lifecycle.currentState) { + Lifecycle.State.INITIALIZED -> StreamLifecycleState.Unknown + Lifecycle.State.DESTROYED -> StreamLifecycleState.Background + Lifecycle.State.CREATED -> StreamLifecycleState.Background + Lifecycle.State.STARTED -> StreamLifecycleState.Background + Lifecycle.State.RESUMED -> StreamLifecycleState.Foreground + } +} diff --git a/stream-android-core/src/main/java/io/getstream/android/core/internal/observers/network/StreamNetworkMonitorCallback.kt b/stream-android-core/src/main/java/io/getstream/android/core/internal/observers/network/StreamNetworkMonitorCallback.kt index 4124032..d29af93 100644 --- a/stream-android-core/src/main/java/io/getstream/android/core/internal/observers/network/StreamNetworkMonitorCallback.kt +++ b/stream-android-core/src/main/java/io/getstream/android/core/internal/observers/network/StreamNetworkMonitorCallback.kt @@ -127,7 +127,7 @@ internal class StreamNetworkMonitorCallback( val newState = ActiveNetworkState(network, resolvedCapabilities, resolvedLink, snapshot) val previousState = activeState.getAndSet(newState) - val networkChanged = previousState?.network != network + val networkChanged = previousState?.network != network || previousState == null val snapshotChanged = previousState?.snapshot != snapshot when { diff --git a/stream-android-core/src/main/java/io/getstream/android/core/internal/recovery/StreamConnectionRecoveryEvaluatorImpl.kt b/stream-android-core/src/main/java/io/getstream/android/core/internal/recovery/StreamConnectionRecoveryEvaluatorImpl.kt new file mode 100644 index 0000000..578d4f0 --- /dev/null +++ b/stream-android-core/src/main/java/io/getstream/android/core/internal/recovery/StreamConnectionRecoveryEvaluatorImpl.kt @@ -0,0 +1,100 @@ +/* + * Copyright (c) 2014-2025 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.android.core.internal.recovery + +import io.getstream.android.core.api.log.StreamLogger +import io.getstream.android.core.api.model.StreamTypedKey.Companion.asStreamTypedKey +import io.getstream.android.core.api.model.connection.StreamConnectionState +import io.getstream.android.core.api.model.connection.lifecycle.StreamLifecycleState +import io.getstream.android.core.api.model.connection.network.StreamNetworkInfo +import io.getstream.android.core.api.model.connection.network.StreamNetworkState +import io.getstream.android.core.api.model.connection.recovery.Recovery +import io.getstream.android.core.api.processing.StreamSingleFlightProcessor +import io.getstream.android.core.api.recovery.StreamConnectionRecoveryEvaluator + +internal class StreamConnectionRecoveryEvaluatorImpl( + private val logger: StreamLogger, + private val singleFlightProcessor: StreamSingleFlightProcessor, +) : StreamConnectionRecoveryEvaluator { + + companion object { + private val evaluateKey = "ev".asStreamTypedKey() + } + + private var hasConnectedBefore: Boolean = false + private var lastLifecycleState: StreamLifecycleState = StreamLifecycleState.Unknown + private var lastNetworkState: StreamNetworkState = StreamNetworkState.Unknown + private var lastNetworkSnapshot: StreamNetworkInfo.Snapshot? = null + private var lastConnectionState: StreamConnectionState = StreamConnectionState.Idle + + override suspend fun evaluate( + connectionState: StreamConnectionState, + lifecycleState: StreamLifecycleState, + networkState: StreamNetworkState, + ): Result = + singleFlightProcessor.run(evaluateKey) { + logger.v { + "[evaluate] connectionState=$connectionState, lifecycleState=$lifecycleState, networkState=$networkState" + } + + val isConnected = connectionState is StreamConnectionState.Connected + val isConnecting = connectionState is StreamConnectionState.Connecting + val isDisconnected = + connectionState is StreamConnectionState.Disconnected || + connectionState is StreamConnectionState.Idle + val previousLifecycle = lastLifecycleState + val previousNetwork = lastNetworkState + val networkAvailable = networkState is StreamNetworkState.Available + val networkBecameAvailable = + networkAvailable && previousNetwork !is StreamNetworkState.Available + val lifecycleForeground = lifecycleState == StreamLifecycleState.Foreground + val returningToForeground = + previousLifecycle == StreamLifecycleState.Background && lifecycleForeground + + val shouldDisconnect = + (isConnected || isConnecting) && + (!networkAvailable || lifecycleState == StreamLifecycleState.Background) + + val shouldConnect = + hasConnectedBefore && + isDisconnected && + lifecycleForeground && + (networkBecameAvailable || returningToForeground && networkAvailable) + + val connectSnapshot = + when (networkState) { + is StreamNetworkState.Available -> networkState.snapshot + else -> lastNetworkSnapshot + } + + val result = + when { + shouldConnect -> Recovery.Connect(connectSnapshot) + shouldDisconnect -> Recovery.Disconnect(networkState) + else -> null + } + + if (isConnected) { + hasConnectedBefore = true + } + lastConnectionState = connectionState + lastLifecycleState = lifecycleState + lastNetworkState = networkState + lastNetworkSnapshot = connectSnapshot + result + } +} diff --git a/stream-android-core/src/test/java/io/getstream/android/core/api/StreamClientFactoryTest.kt b/stream-android-core/src/test/java/io/getstream/android/core/api/StreamClientFactoryTest.kt index 8207dd7..47002dc 100644 --- a/stream-android-core/src/test/java/io/getstream/android/core/api/StreamClientFactoryTest.kt +++ b/stream-android-core/src/test/java/io/getstream/android/core/api/StreamClientFactoryTest.kt @@ -31,10 +31,13 @@ import io.getstream.android.core.api.model.value.StreamApiKey import io.getstream.android.core.api.model.value.StreamHttpClientInfoHeader import io.getstream.android.core.api.model.value.StreamUserId import io.getstream.android.core.api.model.value.StreamWsUrl +import io.getstream.android.core.api.observers.lifecycle.StreamLifecycleMonitor +import io.getstream.android.core.api.observers.network.StreamNetworkMonitor import io.getstream.android.core.api.processing.StreamBatcher import io.getstream.android.core.api.processing.StreamRetryProcessor import io.getstream.android.core.api.processing.StreamSerialProcessingQueue import io.getstream.android.core.api.processing.StreamSingleFlightProcessor +import io.getstream.android.core.api.recovery.StreamConnectionRecoveryEvaluator import io.getstream.android.core.api.serialization.StreamEventSerialization import io.getstream.android.core.api.socket.StreamConnectionIdHolder import io.getstream.android.core.api.socket.StreamWebSocketFactory @@ -98,6 +101,9 @@ internal class StreamClientFactoryTest { val socketFactory: StreamWebSocketFactory, val healthMonitor: StreamHealthMonitor, val batcher: StreamBatcher, + val lifecycleMonitor: StreamLifecycleMonitor, + val networkMonitor: StreamNetworkMonitor, + val connectionRecoveryEvaluator: StreamConnectionRecoveryEvaluator, ) private fun createClient( @@ -128,6 +134,9 @@ internal class StreamClientFactoryTest { socketFactory = mockk(relaxed = true), healthMonitor = mockk(relaxed = true), batcher = mockk(relaxed = true), + lifecycleMonitor = mockk(relaxed = true), + networkMonitor = mockk(relaxed = true), + connectionRecoveryEvaluator = mockk(relaxed = true), ) val client = @@ -151,7 +160,9 @@ internal class StreamClientFactoryTest { httpConfig = httpConfig, serializationConfig = serializationConfig, logProvider = logProvider, - networkMonitor = mockk(relaxed = true), + networkMonitor = deps.networkMonitor, + lifecycleMonitor = deps.lifecycleMonitor, + connectionRecoveryEvaluator = deps.connectionRecoveryEvaluator, ) return client to deps diff --git a/stream-android-core/src/test/java/io/getstream/android/core/api/model/config/StreamClientSerializationConfigTest.kt b/stream-android-core/src/test/java/io/getstream/android/core/api/model/config/StreamClientSerializationConfigTest.kt new file mode 100644 index 0000000..c1aed8f --- /dev/null +++ b/stream-android-core/src/test/java/io/getstream/android/core/api/model/config/StreamClientSerializationConfigTest.kt @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2014-2025 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.android.core.api.model.config + +import io.getstream.android.core.api.model.event.StreamClientWsEvent +import io.getstream.android.core.api.serialization.StreamEventSerialization +import io.getstream.android.core.api.serialization.StreamJsonSerialization +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertNull +import kotlin.test.assertSame + +class StreamClientSerializationConfigTest { + + @Test + fun `json builder injects json implementation`() { + val json = FakeJsonSerialization() + val productEvents = FakeEventSerialization() + val alsoExternal = setOf("custom:event") + + val config = StreamClientSerializationConfig.json(json, productEvents, alsoExternal) + + assertSame(json, config.json) + assertNull(config.eventParser) + assertSame(productEvents, config.productEventSerializers) + assertEquals(alsoExternal, config.alsoExternal) + } + + @Test + fun `event builder injects event parser`() { + val eventParser = FakeEventSerialization() + val productEvents = FakeEventSerialization() + val alsoExternal = setOf("product:event") + + val config = StreamClientSerializationConfig.event(eventParser, productEvents, alsoExternal) + + assertSame(eventParser, config.eventParser) + assertNull(config.json) + assertSame(productEvents, config.productEventSerializers) + assertEquals(alsoExternal, config.alsoExternal) + } + + private class FakeJsonSerialization : StreamJsonSerialization { + override fun toJson(any: Any): Result = Result.success("{}") + + override fun fromJson(raw: String, clazz: Class): Result = + Result.failure(UnsupportedOperationException()) + } + + private class FakeEventSerialization : StreamEventSerialization { + override fun serialize(data: T): Result = Result.success("event") + + override fun deserialize(raw: String): Result = + Result.failure(UnsupportedOperationException()) + } +} diff --git a/stream-android-core/src/test/java/io/getstream/android/core/api/model/config/StreamSocketConfigTest.kt b/stream-android-core/src/test/java/io/getstream/android/core/api/model/config/StreamSocketConfigTest.kt new file mode 100644 index 0000000..33092c6 --- /dev/null +++ b/stream-android-core/src/test/java/io/getstream/android/core/api/model/config/StreamSocketConfigTest.kt @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2014-2025 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.android.core.api.model.config + +import io.getstream.android.core.api.model.value.StreamApiKey +import io.getstream.android.core.api.model.value.StreamHttpClientInfoHeader +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith + +class StreamSocketConfigTest { + + private val apiKey = StreamApiKey.fromString("key") + private val header = StreamHttpClientInfoHeader.create("product", "1.0", "android", 34, "pixel") + + @Test + fun `anonymous config uses anonymous auth type`() { + val config = + StreamSocketConfig.anonymous( + url = "wss://chat.stream.io", + apiKey = apiKey, + clientInfoHeader = header, + ) + + assertEquals("wss://chat.stream.io", config.url) + assertEquals(apiKey, config.apiKey) + assertEquals("anonymous", config.authType) + assertEquals(header, config.clientInfoHeader) + } + + @Test + fun `custom config uses provided auth type and validates input`() { + val config = + StreamSocketConfig.custom( + url = "wss://chat.stream.io/custom", + apiKey = apiKey, + authType = "token", + clientInfoHeader = header, + ) + + assertEquals("token", config.authType) + + assertFailsWith { + StreamSocketConfig.custom("", apiKey, "jwt", header) + } + assertFailsWith { + StreamSocketConfig.custom("wss://chat.stream.io", apiKey, "", header) + } + } +} diff --git a/stream-android-core/src/test/java/io/getstream/android/core/api/observers/lifecycle/StreamLifecycleMonitorTest.kt b/stream-android-core/src/test/java/io/getstream/android/core/api/observers/lifecycle/StreamLifecycleMonitorTest.kt new file mode 100644 index 0000000..49fdc1a --- /dev/null +++ b/stream-android-core/src/test/java/io/getstream/android/core/api/observers/lifecycle/StreamLifecycleMonitorTest.kt @@ -0,0 +1,137 @@ +/* + * Copyright (c) 2014-2025 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.android.core.api.observers.lifecycle + +import android.os.Build +import androidx.lifecycle.Lifecycle +import androidx.lifecycle.LifecycleOwner +import androidx.lifecycle.LifecycleRegistry +import io.getstream.android.core.api.model.connection.lifecycle.StreamLifecycleState +import io.getstream.android.core.api.subscribe.StreamSubscriptionManager +import io.getstream.android.core.api.subscribe.StreamSubscriptionManager.Options +import io.getstream.android.core.api.subscribe.StreamSubscriptionManager.Options.Retention +import io.getstream.android.core.testing.TestLogger +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertTrue +import org.junit.runner.RunWith +import org.robolectric.RobolectricTestRunner +import org.robolectric.annotation.Config + +@RunWith(RobolectricTestRunner::class) +@Config(sdk = [Build.VERSION_CODES.UPSIDE_DOWN_CAKE]) +class StreamLifecycleMonitorTest { + + @Test + fun `start forwards lifecycle events to listeners`() { + val owner = TestLifecycleOwner() + val monitor = StreamLifecycleMonitor(TestLogger, owner.lifecycle, newSubscriptionManager()) + val received = mutableListOf() + + val listener = + object : StreamLifecycleListener { + override fun onForeground() { + received += "fg" + } + + override fun onBackground() { + received += "bg" + } + } + val noopListener = object : StreamLifecycleListener {} + + val options = Options(retention = Retention.KEEP_UNTIL_CANCELLED) + val subscription = monitor.subscribe(listener, options).getOrThrow() + monitor.subscribe(noopListener, options).getOrThrow() + + monitor.start().getOrThrow() + + owner.registry.handleLifecycleEvent(Lifecycle.Event.ON_RESUME) + owner.registry.handleLifecycleEvent(Lifecycle.Event.ON_PAUSE) + + assertEquals(listOf("fg", "bg"), received) + + monitor.stop().getOrThrow() + owner.registry.handleLifecycleEvent(Lifecycle.Event.ON_RESUME) + assertEquals(2, received.size) + + subscription.cancel() + } + + @Test + fun `getCurrentState reflects lifecycle state`() { + val expectations = + listOf( + Lifecycle.State.INITIALIZED to StreamLifecycleState.Unknown, + Lifecycle.State.CREATED to StreamLifecycleState.Background, + Lifecycle.State.STARTED to StreamLifecycleState.Background, + Lifecycle.State.RESUMED to StreamLifecycleState.Foreground, + Lifecycle.State.DESTROYED to StreamLifecycleState.Background, + ) + + expectations.forEach { (state, expected) -> + val owner = TestLifecycleOwner() + val monitor = + StreamLifecycleMonitor(TestLogger, owner.lifecycle, newSubscriptionManager()) + + when (state) { + Lifecycle.State.INITIALIZED -> Unit + Lifecycle.State.CREATED -> + owner.registry.handleLifecycleEvent(Lifecycle.Event.ON_CREATE) + Lifecycle.State.STARTED -> { + owner.registry.handleLifecycleEvent(Lifecycle.Event.ON_CREATE) + owner.registry.handleLifecycleEvent(Lifecycle.Event.ON_START) + } + Lifecycle.State.RESUMED -> { + owner.registry.handleLifecycleEvent(Lifecycle.Event.ON_CREATE) + owner.registry.handleLifecycleEvent(Lifecycle.Event.ON_START) + owner.registry.handleLifecycleEvent(Lifecycle.Event.ON_RESUME) + } + Lifecycle.State.DESTROYED -> { + owner.registry.handleLifecycleEvent(Lifecycle.Event.ON_CREATE) + owner.registry.handleLifecycleEvent(Lifecycle.Event.ON_START) + owner.registry.handleLifecycleEvent(Lifecycle.Event.ON_RESUME) + owner.registry.handleLifecycleEvent(Lifecycle.Event.ON_PAUSE) + owner.registry.handleLifecycleEvent(Lifecycle.Event.ON_STOP) + owner.registry.handleLifecycleEvent(Lifecycle.Event.ON_DESTROY) + } + } + + assertEquals(expected, monitor.getCurrentState()) + } + } + + @Test + fun `lifecycle listener default callbacks are no-op`() { + val listener = object : StreamLifecycleListener {} + + listener.onForeground() + listener.onBackground() + + assertTrue(true) + } + + private fun newSubscriptionManager(): StreamSubscriptionManager = + StreamSubscriptionManager(TestLogger) + + private class TestLifecycleOwner : LifecycleOwner { + val registry = LifecycleRegistry(this) + + override val lifecycle: Lifecycle + get() = registry + } +} diff --git a/stream-android-core/src/test/java/io/getstream/android/core/api/recovery/StreamConnectionRecoveryEvaluatorFactoryTest.kt b/stream-android-core/src/test/java/io/getstream/android/core/api/recovery/StreamConnectionRecoveryEvaluatorFactoryTest.kt new file mode 100644 index 0000000..3fbd08c --- /dev/null +++ b/stream-android-core/src/test/java/io/getstream/android/core/api/recovery/StreamConnectionRecoveryEvaluatorFactoryTest.kt @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2014-2025 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.android.core.api.recovery + +import io.getstream.android.core.api.model.StreamTypedKey +import io.getstream.android.core.api.model.connection.StreamConnectionState +import io.getstream.android.core.api.model.connection.lifecycle.StreamLifecycleState +import io.getstream.android.core.api.model.connection.network.StreamNetworkState +import io.getstream.android.core.api.processing.StreamSingleFlightProcessor +import io.getstream.android.core.testing.TestLogger +import kotlin.test.Test +import kotlin.test.assertIs +import kotlin.test.assertNull +import kotlinx.coroutines.test.runTest + +class StreamConnectionRecoveryEvaluatorFactoryTest { + + @Test + fun `factory wires a working evaluator`() = runTest { + val evaluator = + StreamConnectionRecoveryEvaluator(TestLogger, ImmediateSingleFlightProcessor()) + + val result = + evaluator + .evaluate( + connectionState = StreamConnectionState.Idle, + lifecycleState = StreamLifecycleState.Foreground, + networkState = StreamNetworkState.Unknown, + ) + .getOrThrow() + + assertNull(result) + assertIs(evaluator) + } + + private class ImmediateSingleFlightProcessor : StreamSingleFlightProcessor { + override suspend fun run(key: StreamTypedKey, block: suspend () -> T): Result = + try { + Result.success(block()) + } catch (t: Throwable) { + Result.failure(t) + } + + override fun has(key: StreamTypedKey): Boolean = false + + override fun cancel(key: StreamTypedKey): Result = Result.success(Unit) + + override fun clear(cancelRunning: Boolean): Result = Result.success(Unit) + + override fun stop(): Result = Result.success(Unit) + } +} diff --git a/stream-android-core/src/test/java/io/getstream/android/core/api/socket/listeners/StreamListenersDefaultImplsTest.kt b/stream-android-core/src/test/java/io/getstream/android/core/api/socket/listeners/StreamListenersDefaultImplsTest.kt index 2a0dede..aad56ae 100644 --- a/stream-android-core/src/test/java/io/getstream/android/core/api/socket/listeners/StreamListenersDefaultImplsTest.kt +++ b/stream-android-core/src/test/java/io/getstream/android/core/api/socket/listeners/StreamListenersDefaultImplsTest.kt @@ -67,10 +67,6 @@ internal class StreamListenersDefaultImplsTest { override fun onError(err: Throwable) { errorChannel.trySend(err) } - - override fun onNetworkState(state: StreamNetworkState) { - networkChannel.trySend(state) - } } val state = StreamConnectionState.Connecting.Opening("user") diff --git a/stream-android-core/src/test/java/io/getstream/android/core/api/utils/AlgebraTest.kt b/stream-android-core/src/test/java/io/getstream/android/core/api/utils/AlgebraTest.kt index ccc87b0..ff289de 100644 --- a/stream-android-core/src/test/java/io/getstream/android/core/api/utils/AlgebraTest.kt +++ b/stream-android-core/src/test/java/io/getstream/android/core/api/utils/AlgebraTest.kt @@ -118,4 +118,80 @@ class AlgebraTest { val exception = combined.exceptionOrNull() as StreamAggregateException assertEquals(listOf(failure, failure2), exception.causes) } + + @Test + fun `result pair times result value propagates left failure`() { + val failure = IllegalStateException("pair failed") + val left = Result.failure>(failure) + val right = Result.success(3.0) + + val combined = left * right + + assertTrue(combined.isFailure) + assertSame(failure, combined.exceptionOrNull()) + } + + @Test + fun `result pair times result value propagates right failure`() { + val left = Result.success(1 to "two") + val failure = IllegalArgumentException("value failed") + val right = Result.failure(failure) + + val combined = left * right + + assertTrue(combined.isFailure) + assertSame(failure, combined.exceptionOrNull()) + } + + @Test + fun `result pair times result value aggregates failures`() { + val leftFailure = IllegalStateException("pair boom") + val rightFailure = IllegalArgumentException("value boom") + val left = Result.failure>(leftFailure) + val right = Result.failure(rightFailure) + + val combined = left * right + + assertTrue(combined.isFailure) + val aggregate = combined.exceptionOrNull() as StreamAggregateException + assertEquals(listOf(leftFailure, rightFailure), aggregate.causes) + } + + @Test + fun `result value times result pair propagates left failure`() { + val failure = IllegalStateException("value failed") + val left = Result.failure(failure) + val right = Result.success(1 to "two") + + val combined = left * right + + assertTrue(combined.isFailure) + assertSame(failure, combined.exceptionOrNull()) + } + + @Test + fun `result value times result pair propagates right failure`() { + val left = Result.success(true) + val failure = IllegalArgumentException("pair failed") + val right = Result.failure>(failure) + + val combined = left * right + + assertTrue(combined.isFailure) + assertSame(failure, combined.exceptionOrNull()) + } + + @Test + fun `result value times result pair aggregates failures`() { + val leftFailure = IllegalStateException("value boom") + val rightFailure = IllegalArgumentException("pair boom") + val left = Result.failure(leftFailure) + val right = Result.failure>(rightFailure) + + val combined = left * right + + assertTrue(combined.isFailure) + val aggregate = combined.exceptionOrNull() as StreamAggregateException + assertEquals(listOf(leftFailure, rightFailure), aggregate.causes) + } } diff --git a/stream-android-core/src/test/java/io/getstream/android/core/api/utils/FlowsTest.kt b/stream-android-core/src/test/java/io/getstream/android/core/api/utils/FlowsTest.kt new file mode 100644 index 0000000..2b49af5 --- /dev/null +++ b/stream-android-core/src/test/java/io/getstream/android/core/api/utils/FlowsTest.kt @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2014-2025 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.android.core.api.utils + +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlinx.coroutines.flow.MutableStateFlow + +internal class FlowsTest { + + @Test + fun `update replaces state value`() { + val flow = MutableStateFlow(0) + + flow.update(42) + + assertEquals(42, flow.value) + } +} diff --git a/stream-android-core/src/test/java/io/getstream/android/core/internal/client/StreamClientIImplTest.kt b/stream-android-core/src/test/java/io/getstream/android/core/internal/client/StreamClientIImplTest.kt index b6007c9..68498eb 100644 --- a/stream-android-core/src/test/java/io/getstream/android/core/internal/client/StreamClientIImplTest.kt +++ b/stream-android-core/src/test/java/io/getstream/android/core/internal/client/StreamClientIImplTest.kt @@ -22,19 +22,24 @@ import io.getstream.android.core.api.authentication.StreamTokenManager import io.getstream.android.core.api.log.StreamLogger import io.getstream.android.core.api.model.connection.StreamConnectedUser import io.getstream.android.core.api.model.connection.StreamConnectionState +import io.getstream.android.core.api.model.connection.lifecycle.StreamLifecycleState import io.getstream.android.core.api.model.connection.network.StreamNetworkInfo import io.getstream.android.core.api.model.connection.network.StreamNetworkState +import io.getstream.android.core.api.model.connection.recovery.Recovery import io.getstream.android.core.api.model.event.StreamClientWsEvent +import io.getstream.android.core.api.model.exceptions.StreamEndpointErrorData +import io.getstream.android.core.api.model.exceptions.StreamEndpointException import io.getstream.android.core.api.model.value.StreamToken import io.getstream.android.core.api.model.value.StreamUserId -import io.getstream.android.core.api.observers.network.StreamNetworkMonitor -import io.getstream.android.core.api.observers.network.StreamNetworkMonitorListener import io.getstream.android.core.api.processing.StreamSerialProcessingQueue import io.getstream.android.core.api.processing.StreamSingleFlightProcessor +import io.getstream.android.core.api.recovery.StreamConnectionRecoveryEvaluator import io.getstream.android.core.api.socket.StreamConnectionIdHolder import io.getstream.android.core.api.socket.listeners.StreamClientListener import io.getstream.android.core.api.subscribe.StreamSubscription import io.getstream.android.core.api.subscribe.StreamSubscriptionManager +import io.getstream.android.core.internal.observers.StreamNetworkAndLifeCycleMonitor +import io.getstream.android.core.internal.observers.StreamNetworkAndLifecycleMonitorListener import io.getstream.android.core.internal.socket.StreamSocketSession import io.mockk.* import kotlin.time.ExperimentalTime @@ -75,6 +80,8 @@ class StreamClientIImplTest { socketSession = mockk(relaxed = true) logger = mockk(relaxed = true) subscriptionManager = mockk(relaxed = true) + every { socketSession.subscribe(any(), any()) } returns + Result.success(mockk(relaxed = true)) // SingleFlight: execute the lambda and wrap into Result singleFlight = mockk(relaxed = true) @@ -82,7 +89,11 @@ class StreamClientIImplTest { coEvery { singleFlight.run(any(), any Any>()) } coAnswers { val block = secondArg Any>() - Result.success(block()) + try { + Result.success(block()) + } catch (t: Throwable) { + Result.failure(t) + } } // Mutable client state: expose real StateFlows that update() mutates @@ -90,11 +101,13 @@ class StreamClientIImplTest { networkFlow = MutableStateFlow(StreamNetworkState.Unknown) every { connectionIdHolder.clear() } returns Result.success(Unit) + every { subscriptionManager.forEach(any()) } returns Result.success(Unit) } private fun createClient( scope: CoroutineScope, - networkMonitor: StreamNetworkMonitor = mockNetworkMonitor(), + networkAndLifeCycleMonitor: StreamNetworkAndLifeCycleMonitor = mockNetworkMonitor(), + connectionRecoveryEvaluator: StreamConnectionRecoveryEvaluator = mockk(relaxed = true), ) = StreamClientImpl( userId = userId, @@ -104,26 +117,50 @@ class StreamClientIImplTest { connectionIdHolder = connectionIdHolder, socketSession = socketSession, logger = logger, - mutableNetworkState = networkFlow, mutableConnectionState = connFlow, scope = scope, subscriptionManager = subscriptionManager, - networkMonitor = networkMonitor, + networkAndLifeCycleMonitor = networkAndLifeCycleMonitor, + connectionRecoveryEvaluator = connectionRecoveryEvaluator, ) - private fun mockNetworkMonitor(): StreamNetworkMonitor = + private fun mockNetworkMonitor(): StreamNetworkAndLifeCycleMonitor = + mockk(relaxed = true) { + every { start() } returns Result.success(Unit) + every { stop() } returns Result.success(Unit) + every { subscribe(any(), any()) } returns + Result.success(mockk(relaxed = true)) + } + + private fun capturingNetworkMonitor( + onListener: (StreamNetworkAndLifecycleMonitorListener) -> Unit + ): StreamNetworkAndLifeCycleMonitor = mockk(relaxed = true) { every { start() } returns Result.success(Unit) every { stop() } returns Result.success(Unit) - every { subscribe(any(), any()) } returns Result.success(mockk(relaxed = true)) + every { subscribe(any(), any()) } answers + { + onListener(firstArg()) + Result.success(mockk(relaxed = true)) + } } + private fun stubSubscriptionManager(configure: (StreamClientListener) -> Unit = {}) { + every { subscriptionManager.forEach(any()) } answers + { + val block = firstArg<(StreamClientListener) -> Unit>() + val external = mockk(relaxed = true) + configure(external) + block(external) + Result.success(Unit) + } + } + @Test fun `connect short-circuits when already connected`() = runTest { - backgroundScope val connectedUser = mockk(relaxed = true) connFlow.value = StreamConnectionState.Connected(connectedUser, "cid-123") - val client = createClient(backgroundScope) + val client = createClient(this) val res = client.connect() @@ -131,7 +168,6 @@ class StreamClientIImplTest { assertSame(connectedUser, res.getOrThrow()) // No socket session subscribe/connect or token fetch when already connected - verify(exactly = 0) { socketSession.subscribe(any(), any()) } coVerify(exactly = 0) { socketSession.connect(any()) } coVerify(exactly = 0) { tokenManager.loadIfAbsent() } } @@ -140,7 +176,7 @@ class StreamClientIImplTest { fun `disconnect performs cleanup - updates state, clears ids, cancels handle, stops processors`() = runTest { val networkMonitor = mockNetworkMonitor() - val client = createClient(backgroundScope, networkMonitor) + val client = createClient(this, networkMonitor) // Make singleFlight actually run the provided block and return success coEvery { singleFlight.run(any(), any Any>()) } coAnswers { @@ -153,12 +189,14 @@ class StreamClientIImplTest { // Pretend we already have a live subscription handle inside the client val fakeHandle = mockk(relaxed = true) val handleField = - client.javaClass.getDeclaredField("handle").apply { isAccessible = true } + client.javaClass.getDeclaredField("socketSessionHandle").apply { + isAccessible = true + } handleField.set(client, fakeHandle) val networkHandle = mockk(relaxed = true) val networkHandleField = - client.javaClass.getDeclaredField("networkMonitorHandle").apply { + client.javaClass.getDeclaredField("networkAndLifecycleMonitorHandle").apply { isAccessible = true } networkHandleField.set(client, networkHandle) @@ -189,82 +227,12 @@ class StreamClientIImplTest { assertNull(networkHandleField.get(client)) } - @Test - fun `network monitor updates state and notifies subscribers`() = runTest { - val forwardedStates = mutableListOf() - every { subscriptionManager.forEach(any()) } answers - { - val block = firstArg<(StreamClientListener) -> Unit>() - val external = mockk(relaxed = true) - every { external.onNetworkState(any()) } answers - { - forwardedStates += firstArg() - } - block(external) - Result.success(Unit) - } - - val networkHandle = mockk(relaxed = true) - var capturedListener: StreamNetworkMonitorListener? = null - val networkMonitor = mockk() - every { networkMonitor.start() } returns Result.success(Unit) - every { networkMonitor.stop() } returns Result.success(Unit) - every { networkMonitor.subscribe(any(), any()) } answers - { - capturedListener = firstArg() - Result.success(networkHandle) - } - - val client = createClient(backgroundScope, networkMonitor) - - val socketHandle = mockk(relaxed = true) - every { socketSession.subscribe(any(), any()) } returns - Result.success(socketHandle) - val token = StreamToken.fromString("tok") - coEvery { tokenManager.loadIfAbsent() } returns Result.success(token) - val connectedUser = mockk(relaxed = true) - val connectedState = StreamConnectionState.Connected(connectedUser, "conn-1") - coEvery { socketSession.connect(any()) } returns Result.success(connectedState) - every { connectionIdHolder.setConnectionId("conn-1") } returns Result.success("conn-1") - - val result = client.connect() - - assertTrue(result.isSuccess) - verify(exactly = 1) { networkMonitor.subscribe(any(), any()) } - verify(exactly = 1) { networkMonitor.start() } - val listener = capturedListener ?: error("Expected network monitor listener") - - val connectedSnapshot = StreamNetworkInfo.Snapshot(transports = emptySet()) - listener.onNetworkConnected(connectedSnapshot) - assertEquals(StreamNetworkState.Available(connectedSnapshot), networkFlow.value) - - listener.onNetworkLost(permanent = false) - assertEquals(StreamNetworkState.Disconnected, networkFlow.value) - - listener.onNetworkLost(permanent = true) - assertEquals(StreamNetworkState.Unavailable, networkFlow.value) - - val updatedSnapshot = - connectedSnapshot.copy(priority = StreamNetworkInfo.PriorityHint.LATENCY) - listener.onNetworkPropertiesChanged(updatedSnapshot) - assertEquals(StreamNetworkState.Available(updatedSnapshot), networkFlow.value) - - val expectedStates = - listOf( - StreamNetworkState.Available(connectedSnapshot), - StreamNetworkState.Disconnected, - StreamNetworkState.Unavailable, - StreamNetworkState.Available(updatedSnapshot), - ) - assertTrue(forwardedStates.containsAll(expectedStates)) - } - @Test fun `subscribe delegates to subscriptionManager`() = runTest { val listener = mockk(relaxed = true) val sub = mockk(relaxed = true) every { subscriptionManager.subscribe(listener) } returns Result.success(sub) - val client = createClient(backgroundScope) + val client = createClient(this) val res = client.subscribe(listener) @@ -276,7 +244,7 @@ class StreamClientIImplTest { @Test fun `connect success - subscribes once, calls session connect, updates state and connectionId, returns user`() = runTest { - val client = createClient(backgroundScope) + val client = createClient(this) // single-flight executes block and returns its result coEvery { singleFlight.run(any(), any StreamConnectedUser>()) } coAnswers { @@ -298,7 +266,7 @@ class StreamClientIImplTest { val connectedState = StreamConnectionState.Connected(connectedUser, "conn-1") coEvery { socketSession.connect(any()) } returns Result.success(connectedState) - every { connectionIdHolder.setConnectionId("conn-1") } returns Result.success("Unit") + every { connectionIdHolder.setConnectionId("conn-1") } returns Result.success("conn-1") val result = client.connect() @@ -319,7 +287,7 @@ class StreamClientIImplTest { @Test fun `connect early-exit when already connected - returns existing user and does not hit session or token`() = runTest { - val client = createClient(backgroundScope) + val client = createClient(this) // Make single-flight run the block coEvery { singleFlight.run(any(), any StreamConnectedUser>()) } coAnswers { @@ -337,7 +305,6 @@ class StreamClientIImplTest { assertSame(existingUser, result.getOrNull()) // No new subscribe, no token load, no session connect, no connectionId set - verify(exactly = 0) { socketSession.subscribe(any(), any()) } coVerify(exactly = 0) { tokenManager.loadIfAbsent() } coVerify(exactly = 0) { socketSession.connect(any()) } verify(exactly = 0) { connectionIdHolder.setConnectionId(any()) } @@ -346,7 +313,7 @@ class StreamClientIImplTest { @Test fun `connect fails when token manager fails - emits Disconnected state and returns failure`() = runTest { - val client = createClient(backgroundScope) + val client = createClient(this) // single-flight executes block coEvery { singleFlight.run(any(), any StreamConnectedUser>()) } coAnswers { @@ -382,7 +349,7 @@ class StreamClientIImplTest { @Test fun `connect fails when socket session connect fails - emits Disconnected state and returns failure`() = runTest { - val client = createClient(backgroundScope) + val client = createClient(this) // single-flight executes block coEvery { singleFlight.run(any(), any StreamConnectedUser>()) } coAnswers { @@ -420,9 +387,190 @@ class StreamClientIImplTest { verify(exactly = 0) { connectionIdHolder.setConnectionId(any()) } } + @Test + fun `recovery connect triggers another connect attempt`() = runTest { + var networkListener: StreamNetworkAndLifecycleMonitorListener? = null + val networkMonitor = capturingNetworkMonitor { networkListener = it } + val recoveryEvaluator = mockk() + val expectedRecovery = Recovery.Connect(StreamNetworkInfo.Snapshot()) + coEvery { recoveryEvaluator.evaluate(any(), any(), any()) } returns + Result.success(expectedRecovery) + val recoveries = mutableListOf() + stubSubscriptionManager { external -> + every { external.onRecovery(any()) } answers { recoveries += firstArg() } + } + + val error = RuntimeException("no token") + coEvery { tokenManager.loadIfAbsent() } returnsMany + listOf(Result.failure(error), Result.failure(error)) + every { socketSession.subscribe(any(), any()) } returns + Result.success(mockk(relaxed = true)) + + val client = createClient(this, networkMonitor, recoveryEvaluator) + + client.connect().onFailure {} + advanceUntilIdle() + + val listener = networkListener ?: error("Network listener not registered") + val networkState = StreamNetworkState.Available(StreamNetworkInfo.Snapshot()) + listener.onNetworkAndLifecycleState(networkState, StreamLifecycleState.Foreground) + advanceUntilIdle() + + coVerify(exactly = 2) { tokenManager.loadIfAbsent() } + coVerify(exactly = 1) { recoveryEvaluator.evaluate(any(), any(), any()) } + assertTrue(recoveries.contains(expectedRecovery)) + } + + @Test + fun `recovery disconnect closes the socket session`() = runTest { + var networkListener: StreamNetworkAndLifecycleMonitorListener? = null + val networkMonitor = capturingNetworkMonitor { networkListener = it } + val recoveryEvaluator = mockk() + val expectedRecovery = Recovery.Disconnect(StreamNetworkState.Disconnected) + coEvery { recoveryEvaluator.evaluate(any(), any(), any()) } returns + Result.success(expectedRecovery) + val recoveries = mutableListOf() + stubSubscriptionManager { external -> + every { external.onRecovery(any()) } answers { recoveries += firstArg() } + } + + val error = RuntimeException("no token") + coEvery { tokenManager.loadIfAbsent() } returns Result.failure(error) + every { socketSession.disconnect() } returns Result.success(Unit) + every { socketSession.subscribe(any(), any()) } returns + Result.success(mockk(relaxed = true)) + + val client = createClient(this, networkMonitor, recoveryEvaluator) + + client.connect().onFailure {} + advanceUntilIdle() + + val listener = networkListener ?: error("Network listener not registered") + listener.onNetworkAndLifecycleState( + StreamNetworkState.Disconnected, + StreamLifecycleState.Background, + ) + advanceUntilIdle() + + coVerify(exactly = 1) { recoveryEvaluator.evaluate(any(), any(), any()) } + verify(exactly = 1) { socketSession.disconnect() } + assertTrue(recoveries.contains(expectedRecovery)) + } + + @Test + fun `recovery error notifies subscribers`() = runTest { + var networkListener: StreamNetworkAndLifecycleMonitorListener? = null + val networkMonitor = capturingNetworkMonitor { networkListener = it } + val recoveryEvaluator = mockk() + val boom = RuntimeException("recovery error") + val expectedRecovery = Recovery.Error(boom) + coEvery { recoveryEvaluator.evaluate(any(), any(), any()) } returns + Result.success(expectedRecovery) + + val reported = mutableListOf() + val recoveries = mutableListOf() + stubSubscriptionManager { external -> + every { external.onError(any()) } answers { reported += firstArg() } + every { external.onRecovery(any()) } answers { recoveries += firstArg() } + } + + val tokenError = RuntimeException("token") + coEvery { tokenManager.loadIfAbsent() } returns Result.failure(tokenError) + every { socketSession.subscribe(any(), any()) } returns + Result.success(mockk(relaxed = true)) + + val client = createClient(this, networkMonitor, recoveryEvaluator) + + client.connect().onFailure {} + advanceUntilIdle() + + val listener = networkListener ?: error("Network listener not registered") + listener.onNetworkAndLifecycleState( + StreamNetworkState.Disconnected, + StreamLifecycleState.Background, + ) + advanceUntilIdle() + + assertTrue(reported.contains(boom)) + assertTrue(recoveries.contains(expectedRecovery)) + every { subscriptionManager.forEach(any()) } returns Result.success(Unit) + } + + @Test + fun `recovery null results in no action`() = runTest { + var networkListener: StreamNetworkAndLifecycleMonitorListener? = null + val networkMonitor = capturingNetworkMonitor { networkListener = it } + val recoveryEvaluator = mockk() + coEvery { recoveryEvaluator.evaluate(any(), any(), any()) } returns Result.success(null) + val recoveries = mutableListOf() + stubSubscriptionManager { external -> + every { external.onRecovery(any()) } answers { recoveries += firstArg() } + } + + val tokenError = RuntimeException("token") + coEvery { tokenManager.loadIfAbsent() } returns Result.failure(tokenError) + every { socketSession.subscribe(any(), any()) } returns + Result.success(mockk(relaxed = true)) + + val client = createClient(this, networkMonitor, recoveryEvaluator) + + client.connect().onFailure {} + advanceUntilIdle() + + val listener = networkListener ?: error("Network listener not registered") + listener.onNetworkAndLifecycleState( + StreamNetworkState.Disconnected, + StreamLifecycleState.Background, + ) + advanceUntilIdle() + + coVerify(exactly = 1) { tokenManager.loadIfAbsent() } + verify(exactly = 0) { socketSession.disconnect() } + assertTrue(recoveries.isEmpty()) + } + + @Test + fun `recovery failure notifies subscribers`() = runTest { + var networkListener: StreamNetworkAndLifecycleMonitorListener? = null + val networkMonitor = capturingNetworkMonitor { networkListener = it } + val recoveryEvaluator = mockk() + val boom = IllegalStateException("recovery failure") + coEvery { recoveryEvaluator.evaluate(any(), any(), any()) } returns Result.failure(boom) + + val reported = mutableListOf() + every { subscriptionManager.forEach(any()) } answers + { + val block = firstArg<(StreamClientListener) -> Unit>() + val external = mockk(relaxed = true) + every { external.onError(any()) } answers { reported += firstArg() } + block(external) + Result.success(Unit) + } + + val tokenError = RuntimeException("token") + coEvery { tokenManager.loadIfAbsent() } returns Result.failure(tokenError) + every { socketSession.subscribe(any(), any()) } returns + Result.success(mockk(relaxed = true)) + + val client = createClient(this, networkMonitor, recoveryEvaluator) + + client.connect().onFailure {} + advanceUntilIdle() + + val listener = networkListener ?: error("Network listener not registered") + listener.onNetworkAndLifecycleState( + StreamNetworkState.Disconnected, + StreamLifecycleState.Background, + ) + advanceUntilIdle() + + assertTrue(reported.contains(boom)) + every { subscriptionManager.forEach(any()) } returns Result.success(Unit) + } + @Test fun `subscription onState updates client state and forwards to subscribers`() = runTest { - val client = createClient(backgroundScope) + val client = createClient(this) // Make single-flight execute the block coEvery { singleFlight.run(any(), any StreamConnectedUser>()) } coAnswers { @@ -477,7 +625,7 @@ class StreamClientIImplTest { @Test fun `subscription onEvent forwards to subscribers`() = runTest { - val client = createClient(backgroundScope) + val client = createClient(this) // Make single-flight execute the block coEvery { singleFlight.run(any(), any StreamConnectedUser>()) } coAnswers { @@ -527,4 +675,74 @@ class StreamClientIImplTest { assertTrue(forwardedEvents.contains(event)) verify(atLeast = 1) { subscriptionManager.forEach(any()) } } + + @Test + fun `subscription onError forwards to subscribers`() = runTest { + val client = createClient(this) + coEvery { singleFlight.run(any(), any StreamConnectedUser>()) } coAnswers + { + val block = secondArg StreamConnectedUser>() + try { + Result.success(block.invoke()) + } catch (t: Throwable) { + Result.failure(t) + } + } + + var capturedListener: StreamClientListener? = null + every { socketSession.subscribe(any(), any()) } answers + { + capturedListener = firstArg() + Result.success(mockk(relaxed = true)) + } + + val reported = mutableListOf() + every { subscriptionManager.forEach(any()) } answers + { + val block = firstArg<(StreamClientListener) -> Unit>() + val external = mockk(relaxed = true) + every { external.onError(any()) } answers { reported += firstArg() } + block(external) + Result.success(Unit) + } + + coEvery { tokenManager.loadIfAbsent() } returns Result.failure(RuntimeException("stop")) + + client.connect().onFailure {} + advanceUntilIdle() + + val error = RuntimeException("socket failure") + capturedListener!!.onError(error) + + assertTrue(reported.contains(error)) + every { subscriptionManager.forEach(any()) } returns Result.success(Unit) + } + + @Test + fun `connect retries when token error occurs`() = runTest { + val client = createClient(this) + + val token = StreamToken.fromString("tok-1") + val refreshedToken = StreamToken.fromString("tok-2") + coEvery { tokenManager.loadIfAbsent() } returns Result.success(token) + justRun { tokenManager.invalidate() } + coEvery { tokenManager.refresh() } returns Result.success(refreshedToken) + + val endpointError = StreamEndpointException(apiError = StreamEndpointErrorData(code = 40)) + val connectedUser = mockk(relaxed = true) + val connectedState = StreamConnectionState.Connected(connectedUser, "conn-42") + coEvery { socketSession.connect(match { it.token == token.rawValue }) } returns + Result.failure(endpointError) + coEvery { socketSession.connect(match { it.token == refreshedToken.rawValue }) } returns + Result.success(connectedState) + every { socketSession.subscribe(any(), any()) } returns + Result.success(mockk(relaxed = true)) + every { connectionIdHolder.setConnectionId("conn-42") } returns Result.success("conn-42") + + client.connect().onFailure {} + + verify { tokenManager.invalidate() } + coVerify { tokenManager.refresh() } + coVerify(exactly = 2) { socketSession.connect(any()) } + } } diff --git a/stream-android-core/src/test/java/io/getstream/android/core/internal/observers/StreamNetworkAndLifecycleMonitorImplTest.kt b/stream-android-core/src/test/java/io/getstream/android/core/internal/observers/StreamNetworkAndLifecycleMonitorImplTest.kt new file mode 100644 index 0000000..cc53052 --- /dev/null +++ b/stream-android-core/src/test/java/io/getstream/android/core/internal/observers/StreamNetworkAndLifecycleMonitorImplTest.kt @@ -0,0 +1,248 @@ +/* + * Copyright (c) 2014-2025 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.android.core.internal.observers + +import io.getstream.android.core.api.model.connection.lifecycle.StreamLifecycleState +import io.getstream.android.core.api.model.connection.network.StreamNetworkInfo +import io.getstream.android.core.api.model.connection.network.StreamNetworkState +import io.getstream.android.core.api.observers.lifecycle.StreamLifecycleListener +import io.getstream.android.core.api.observers.lifecycle.StreamLifecycleMonitor +import io.getstream.android.core.api.observers.network.StreamNetworkMonitor +import io.getstream.android.core.api.observers.network.StreamNetworkMonitorListener +import io.getstream.android.core.api.subscribe.StreamSubscription +import io.getstream.android.core.api.subscribe.StreamSubscriptionManager +import io.getstream.android.core.api.subscribe.StreamSubscriptionManager.Options +import io.getstream.android.core.api.subscribe.StreamSubscriptionManager.Options.Retention +import io.getstream.android.core.testing.TestLogger +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFalse +import kotlin.test.assertTrue +import kotlin.time.ExperimentalTime +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.runBlocking + +@OptIn(ExperimentalTime::class) +class StreamNetworkAndLifecycleMonitorImplTest { + + private val networkState = MutableStateFlow(StreamNetworkState.Unknown) + private val lifecycleState = + MutableStateFlow(StreamLifecycleState.Unknown) + private val downstreamSubscriptionManager = + StreamSubscriptionManager(TestLogger) + private val fakeLifecycleMonitor = FakeLifecycleMonitor() + private val fakeNetworkMonitor = FakeNetworkMonitor() + private val monitor: StreamNetworkAndLifeCycleMonitor = + StreamNetworkAndLifeCycleMonitor( + logger = TestLogger, + lifecycleMonitor = fakeLifecycleMonitor, + networkMonitor = fakeNetworkMonitor, + mutableLifecycleState = lifecycleState, + mutableNetworkState = networkState, + subscriptionManager = downstreamSubscriptionManager, + ) + private val options = Options(retention = Retention.KEEP_UNTIL_CANCELLED) + + @Test + fun `network callbacks update state and notify listeners`() { + val events = mutableListOf>() + monitor + .subscribe( + object : StreamNetworkAndLifecycleMonitorListener { + override fun onNetworkAndLifecycleState( + networkState: StreamNetworkState, + lifecycleState: StreamLifecycleState, + ) { + events += networkState to lifecycleState + } + }, + options, + ) + .getOrThrow() + + monitor.start().getOrThrow() + + val snapshot = StreamNetworkInfo.Snapshot() + fakeNetworkMonitor.emitConnected(snapshot) + assertEquals(StreamNetworkState.Available(snapshot), networkState.value) + + fakeNetworkMonitor.emitLost(permanent = false) + assertTrue(networkState.value is StreamNetworkState.Disconnected) + + fakeNetworkMonitor.emitLost(permanent = true) + assertEquals(StreamNetworkState.Unavailable, networkState.value) + + assertEquals( + listOf( + StreamNetworkState.Available(snapshot) to lifecycleState.value, + StreamNetworkState.Disconnected to lifecycleState.value, + StreamNetworkState.Unavailable to lifecycleState.value, + ), + events, + ) + } + + @Test + fun `lifecycle callbacks update state and notify listeners`() { + val events = mutableListOf>() + monitor + .subscribe( + object : StreamNetworkAndLifecycleMonitorListener { + override fun onNetworkAndLifecycleState( + networkState: StreamNetworkState, + lifecycleState: StreamLifecycleState, + ) { + events += networkState to lifecycleState + } + }, + options, + ) + .getOrThrow() + + monitor.start().getOrThrow() + + fakeLifecycleMonitor.emitForeground() + assertEquals(StreamLifecycleState.Foreground, lifecycleState.value) + + fakeLifecycleMonitor.emitBackground() + assertEquals(StreamLifecycleState.Background, lifecycleState.value) + + assertEquals( + listOf( + networkState.value to StreamLifecycleState.Foreground, + networkState.value to StreamLifecycleState.Background, + ), + events, + ) + } + + @Test + fun `stop resets states and detaches listeners`() { + val events = mutableListOf>() + monitor + .subscribe( + object : StreamNetworkAndLifecycleMonitorListener { + override fun onNetworkAndLifecycleState( + networkState: StreamNetworkState, + lifecycleState: StreamLifecycleState, + ) { + events += networkState to lifecycleState + } + }, + options, + ) + .getOrThrow() + + monitor.start().getOrThrow() + fakeLifecycleMonitor.emitForeground() + fakeNetworkMonitor.emitConnected(StreamNetworkInfo.Snapshot()) + + monitor.stop().getOrThrow() + + assertEquals(StreamLifecycleState.Unknown, lifecycleState.value) + assertEquals(StreamNetworkState.Unknown, networkState.value) + assertEquals(1, fakeLifecycleMonitor.stopCalls) + assertEquals(1, fakeNetworkMonitor.stopCalls) + assertFalse(fakeLifecycleMonitor.hasListeners()) + assertFalse(fakeNetworkMonitor.hasListener()) + + val previousEventCount = events.size + fakeLifecycleMonitor.emitForeground() + fakeNetworkMonitor.emitConnected(StreamNetworkInfo.Snapshot()) + assertEquals(previousEventCount, events.size) + } + + private class FakeLifecycleMonitor : StreamLifecycleMonitor { + private val listeners = mutableSetOf() + var stopCalls: Int = 0 + private set + + override fun start(): Result = Result.success(Unit) + + override fun stop(): Result = + Result.success(Unit).also { + stopCalls++ + listeners.clear() + } + + override fun subscribe( + listener: StreamLifecycleListener, + options: Options, + ): Result { + listeners += listener + return Result.success( + object : StreamSubscription { + override fun cancel() { + listeners -= listener + } + } + ) + } + + override fun getCurrentState(): StreamLifecycleState = StreamLifecycleState.Unknown + + fun emitForeground() { + listeners.forEach { it.onForeground() } + } + + fun emitBackground() { + listeners.forEach { it.onBackground() } + } + + fun hasListeners(): Boolean = listeners.isNotEmpty() + } + + private class FakeNetworkMonitor : StreamNetworkMonitor { + private var listener: StreamNetworkMonitorListener? = null + var stopCalls: Int = 0 + private set + + override fun start(): Result = Result.success(Unit) + + override fun stop(): Result = + Result.success(Unit).also { + stopCalls++ + listener = null + } + + override fun subscribe( + listener: StreamNetworkMonitorListener, + options: Options, + ): Result { + this.listener = listener + return Result.success( + object : StreamSubscription { + override fun cancel() { + if (this@FakeNetworkMonitor.listener === listener) { + this@FakeNetworkMonitor.listener = null + } + } + } + ) + } + + fun emitConnected(snapshot: StreamNetworkInfo.Snapshot?) { + runBlocking { listener?.onNetworkConnected(snapshot) } + } + + fun emitLost(permanent: Boolean) { + runBlocking { listener?.onNetworkLost(permanent) } + } + + fun hasListener(): Boolean = listener != null + } +} diff --git a/stream-android-core/src/test/java/io/getstream/android/core/internal/recovery/StreamConnectionRecoveryEvaluatorImplTest.kt b/stream-android-core/src/test/java/io/getstream/android/core/internal/recovery/StreamConnectionRecoveryEvaluatorImplTest.kt new file mode 100644 index 0000000..ac7d90e --- /dev/null +++ b/stream-android-core/src/test/java/io/getstream/android/core/internal/recovery/StreamConnectionRecoveryEvaluatorImplTest.kt @@ -0,0 +1,379 @@ +/* + * Copyright (c) 2014-2025 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.android.core.internal.recovery + +import io.getstream.android.core.api.log.StreamLogger +import io.getstream.android.core.api.model.StreamTypedKey +import io.getstream.android.core.api.model.connection.StreamConnectedUser +import io.getstream.android.core.api.model.connection.StreamConnectionState +import io.getstream.android.core.api.model.connection.lifecycle.StreamLifecycleState +import io.getstream.android.core.api.model.connection.network.StreamNetworkInfo +import io.getstream.android.core.api.model.connection.network.StreamNetworkState +import io.getstream.android.core.api.model.connection.recovery.Recovery +import io.getstream.android.core.api.processing.StreamSingleFlightProcessor +import java.util.Date +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertIs +import kotlin.test.assertNull +import kotlin.time.ExperimentalTime +import kotlinx.coroutines.test.runTest + +@OptIn(ExperimentalTime::class) +class StreamConnectionRecoveryEvaluatorImplTest { + + @Test + fun `reconnects when network returns while foreground`() = runTest { + val evaluator = evaluator() + val snapshot = StreamNetworkInfo.Snapshot() + val newSnapshot = StreamNetworkInfo.Snapshot() + + evaluator + .evaluate( + connectionState = connectedState(), + lifecycleState = StreamLifecycleState.Foreground, + networkState = available(snapshot), + ) + .getOrThrow() + .also { assertNull(it) } + + evaluator + .evaluate( + connectionState = connectedState(), + lifecycleState = StreamLifecycleState.Foreground, + networkState = StreamNetworkState.Disconnected, + ) + .getOrThrow() + .also { assertIs>(it) } + + val recovery = + evaluator + .evaluate( + connectionState = StreamConnectionState.Disconnected(), + lifecycleState = StreamLifecycleState.Foreground, + networkState = available(newSnapshot), + ) + .getOrThrow() + + val connect = assertIs>(recovery) + assertEquals(newSnapshot, connect.why) + } + + @Test + fun `reconnects immediately when returning foreground with available network`() = runTest { + val evaluator = evaluator() + val snapshot = StreamNetworkInfo.Snapshot() + val newSnapshot = StreamNetworkInfo.Snapshot() + + evaluator + .evaluate( + connectionState = connectedState(), + lifecycleState = StreamLifecycleState.Foreground, + networkState = available(snapshot), + ) + .getOrThrow() + + evaluator + .evaluate( + connectionState = connectedState(), + lifecycleState = StreamLifecycleState.Background, + networkState = available(snapshot), + ) + .getOrThrow() + .also { assertIs>(it) } + + val recovery = + evaluator + .evaluate( + connectionState = StreamConnectionState.Disconnected(), + lifecycleState = StreamLifecycleState.Foreground, + networkState = available(newSnapshot), + ) + .getOrThrow() + + val connect = assertIs>(recovery) + assertEquals(newSnapshot, connect.why) + } + + @Test + fun `waits for network after foregrounding if offline`() = runTest { + val evaluator = evaluator() + val initialSnapshot = StreamNetworkInfo.Snapshot() + val restoredSnapshot = StreamNetworkInfo.Snapshot() + + evaluator + .evaluate( + connectionState = connectedState(), + lifecycleState = StreamLifecycleState.Foreground, + networkState = available(initialSnapshot), + ) + .getOrThrow() + .also { + // Do nothing + assertNull(it) + } + + evaluator + .evaluate( + connectionState = connectedState(), + lifecycleState = StreamLifecycleState.Background, + networkState = available(initialSnapshot), + ) + .getOrThrow() + .also { assertIs>(it) } + + evaluator + .evaluate( + connectionState = StreamConnectionState.Disconnected(), + lifecycleState = StreamLifecycleState.Foreground, + networkState = StreamNetworkState.Disconnected, + ) + .getOrThrow() + .also { assertNull(it) } + + val recovery = + evaluator + .evaluate( + connectionState = StreamConnectionState.Disconnected(), + lifecycleState = StreamLifecycleState.Foreground, + networkState = available(restoredSnapshot), + ) + .getOrThrow() + + val connect = assertIs>(recovery) + assertEquals(restoredSnapshot, connect.why) + } + + @Test + fun `reconnects when network returns without lifecycle change`() = runTest { + val evaluator = evaluator() + val initialSnapshot = StreamNetworkInfo.Snapshot() + val restoredSnapshot = StreamNetworkInfo.Snapshot() + + evaluator + .evaluate( + connectionState = connectedState(), + lifecycleState = StreamLifecycleState.Foreground, + networkState = available(initialSnapshot), + ) + .getOrThrow() + + evaluator + .evaluate( + connectionState = StreamConnectionState.Disconnected(), + lifecycleState = StreamLifecycleState.Foreground, + networkState = StreamNetworkState.Disconnected, + ) + .getOrThrow() + .also { assertNull(it) } + + val recovery = + evaluator + .evaluate( + connectionState = StreamConnectionState.Disconnected(), + lifecycleState = StreamLifecycleState.Foreground, + networkState = available(restoredSnapshot), + ) + .getOrThrow() + + val connect = assertIs>(recovery) + assertEquals(restoredSnapshot, connect.why) + } + + @Test + fun `does not connect before the first successful connection`() = runTest { + val evaluator = evaluator() + val snapshot = StreamNetworkInfo.Snapshot() + + val recovery = + evaluator + .evaluate( + connectionState = StreamConnectionState.Disconnected(), + lifecycleState = StreamLifecycleState.Foreground, + networkState = available(snapshot), + ) + .getOrThrow() + + assertNull(recovery) + } + + @Test + fun `stays idle when returning foreground while already reconnecting`() = runTest { + val evaluator = evaluator() + val snapshot = StreamNetworkInfo.Snapshot() + val networkLostSnapshot = StreamNetworkInfo.Snapshot() + + evaluator + .evaluate( + connectionState = connectedState(), + lifecycleState = StreamLifecycleState.Foreground, + networkState = available(snapshot), + ) + .getOrThrow() + + evaluator + .evaluate( + connectionState = StreamConnectionState.Disconnected(), + lifecycleState = StreamLifecycleState.Background, + networkState = StreamNetworkState.Disconnected, + ) + .getOrThrow() + + evaluator + .evaluate( + connectionState = StreamConnectionState.Disconnected(), + lifecycleState = StreamLifecycleState.Foreground, + networkState = StreamNetworkState.Disconnected, + ) + .getOrThrow() + + val recovery = + evaluator + .evaluate( + connectionState = StreamConnectionState.Connecting.Opening(TEST_USER_ID), + lifecycleState = StreamLifecycleState.Foreground, + networkState = available(networkLostSnapshot), + ) + .getOrThrow() + + assertNull(recovery) + } + + @Test + fun `does not reconnect while background even if network returns`() = runTest { + val evaluator = evaluator() + val snapshot = StreamNetworkInfo.Snapshot() + + evaluator + .evaluate( + connectionState = connectedState(), + lifecycleState = StreamLifecycleState.Foreground, + networkState = available(snapshot), + ) + .getOrThrow() + + evaluator + .evaluate( + connectionState = connectedState(), + lifecycleState = StreamLifecycleState.Foreground, + networkState = StreamNetworkState.Disconnected, + ) + .getOrThrow() + + evaluator + .evaluate( + connectionState = StreamConnectionState.Disconnected(), + lifecycleState = StreamLifecycleState.Background, + networkState = StreamNetworkState.Disconnected, + ) + .getOrThrow() + + val recovery = + evaluator + .evaluate( + connectionState = StreamConnectionState.Disconnected(), + lifecycleState = StreamLifecycleState.Background, + networkState = available(snapshot), + ) + .getOrThrow() + + assertNull(recovery) + } + + @Test + fun `ignores reconnect signal while already connecting`() = runTest { + val evaluator = evaluator() + val snapshot = StreamNetworkInfo.Snapshot() + evaluator + .evaluate( + connectionState = connectedState(), + lifecycleState = StreamLifecycleState.Foreground, + networkState = available(snapshot), + ) + .getOrThrow() + + evaluator + .evaluate( + connectionState = connectedState(), + lifecycleState = StreamLifecycleState.Foreground, + networkState = StreamNetworkState.Disconnected, + ) + .getOrThrow() + + val recovery = + evaluator + .evaluate( + connectionState = StreamConnectionState.Connecting.Opening(TEST_USER_ID), + lifecycleState = StreamLifecycleState.Foreground, + networkState = available(StreamNetworkInfo.Snapshot()), + ) + .getOrThrow() + + assertNull(recovery) + } + + private fun evaluator(): StreamConnectionRecoveryEvaluatorImpl = + StreamConnectionRecoveryEvaluatorImpl(NoopLogger, ImmediateSingleFlightProcessor()) + + private fun connectedState(): StreamConnectionState.Connected = + StreamConnectionState.Connected(testUser(), TEST_CONNECTION_ID) + + private fun testUser(): StreamConnectedUser = + StreamConnectedUser( + createdAt = Date(0L), + id = TEST_USER_ID, + language = "en", + role = "user", + updatedAt = Date(0L), + teams = emptyList(), + ) + + private fun available(snapshot: StreamNetworkInfo.Snapshot): StreamNetworkState.Available = + StreamNetworkState.Available(snapshot) + + private object NoopLogger : StreamLogger { + override fun log( + level: StreamLogger.LogLevel, + throwable: Throwable?, + message: () -> String, + ) { + // no-op + } + } + + private class ImmediateSingleFlightProcessor : StreamSingleFlightProcessor { + override suspend fun run(key: StreamTypedKey, block: suspend () -> T): Result = + try { + Result.success(block()) + } catch (t: Throwable) { + Result.failure(t) + } + + override fun has(key: StreamTypedKey): Boolean = false + + override fun cancel(key: StreamTypedKey): Result = Result.success(Unit) + + override fun clear(cancelRunning: Boolean): Result = Result.success(Unit) + + override fun stop(): Result = Result.success(Unit) + } + + private companion object { + private const val TEST_USER_ID = "user-id" + private const val TEST_CONNECTION_ID = "connection-id" + } +} diff --git a/stream-android-core/src/test/java/io/getstream/android/core/testing/TestLogger.kt b/stream-android-core/src/test/java/io/getstream/android/core/testing/TestLogger.kt new file mode 100644 index 0000000..f930a02 --- /dev/null +++ b/stream-android-core/src/test/java/io/getstream/android/core/testing/TestLogger.kt @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2014-2025 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.android.core.testing + +import io.getstream.android.core.api.log.StreamLogger + +internal object TestLogger : StreamLogger { + override fun log(level: StreamLogger.LogLevel, throwable: Throwable?, message: () -> String) { + // Swallow logs during tests + } +}