From d6cce4adba3216827127c233a4b7a6148914f897 Mon Sep 17 00:00:00 2001 From: Sahil Kumar Date: Tue, 30 Sep 2025 13:38:22 +0200 Subject: [PATCH 1/2] refactor(llc)!: Enhance WebSocket connection management and lifecycle handling This commit introduces several improvements to the WebSocket connection management and application lifecycle handling: **WebSocket Engine:** - Ensures that any existing WebSocket connection is closed before opening a new one. - Improved handling of the `onDone` event for WebSocket streams, ensuring proper cleanup and notification. - Refined the `close` method to handle cases where the WebSocket is not open and to notify listeners appropriately. **WebSocket Client:** - Updated error handling during the connection opening process to use `recover` instead of `onFailure` for better resilience. - Simplified error handling in `onError` by directly calling `disconnect`. **Network and Lifecycle State:** - Renamed `AppLifecycleStateProvider` to `LifecycleStateProvider` and `AppLifecycleState` to `LifecycleState` for clarity and broader applicability. - Added an `unknown` state to both `NetworkState` and `LifecycleState` to represent initial or indeterminate states. - Introduced a `keepConnectionAliveInBackground` option in `ConnectionRecoveryHandler` to allow maintaining the WebSocket connection even when the app is in the background. - `ConnectionRecoveryHandler` now considers the `unknown` state for both network and lifecycle, taking no action in these cases. - `ConnectionRecoveryHandler` logic for `_onAppLifecycleStateChanged` updated to respect `keepConnectionAliveInBackground`. **General Changes:** - Updated imports and type aliases to reflect the renaming of lifecycle state components. --- packages/stream_core/lib/src/utils.dart | 2 +- ...der.dart => lifecycle_state_provider.dart} | 14 ++++++--- .../lib/src/utils/network_state_provider.dart | 4 +++ .../engine/stream_web_socket_engine.dart | 31 ++++++++++++------- .../automatic_reconnection_policy.dart | 4 +-- .../connection_recovery_handler.dart | 25 +++++++++------ .../ws/client/stream_web_socket_client.dart | 5 ++- 7 files changed, 54 insertions(+), 31 deletions(-) rename packages/stream_core/lib/src/utils/{app_lifecycle_state_provider.dart => lifecycle_state_provider.dart} (53%) diff --git a/packages/stream_core/lib/src/utils.dart b/packages/stream_core/lib/src/utils.dart index 961cea5..c8991b5 100644 --- a/packages/stream_core/lib/src/utils.dart +++ b/packages/stream_core/lib/src/utils.dart @@ -1,6 +1,6 @@ -export 'utils/app_lifecycle_state_provider.dart'; export 'utils/comparable_extensions.dart'; export 'utils/disposable.dart'; +export 'utils/lifecycle_state_provider.dart'; export 'utils/list_extensions.dart'; export 'utils/network_state_provider.dart'; export 'utils/result.dart'; diff --git a/packages/stream_core/lib/src/utils/app_lifecycle_state_provider.dart b/packages/stream_core/lib/src/utils/lifecycle_state_provider.dart similarity index 53% rename from packages/stream_core/lib/src/utils/app_lifecycle_state_provider.dart rename to packages/stream_core/lib/src/utils/lifecycle_state_provider.dart index 13ca5d7..7341c6a 100644 --- a/packages/stream_core/lib/src/utils/app_lifecycle_state_provider.dart +++ b/packages/stream_core/lib/src/utils/lifecycle_state_provider.dart @@ -1,22 +1,28 @@ import 'state_emitter.dart'; -typedef AppLifecycleStateEmitter = StateEmitter; +/// A type alias for a state emitter that emits [LifecycleState] values. +typedef LifecycleStateEmitter = StateEmitter; /// A utility class for monitoring application lifecycle state changes. /// /// This interface defines the contract for an application lifecycle state provider /// that can provide the current state of the application and a stream of state changes. -abstract interface class AppLifecycleStateProvider { +abstract interface class LifecycleStateProvider { /// A emitter that provides updates on the application lifecycle state. - AppLifecycleStateEmitter get state; + LifecycleStateEmitter get state; } /// Enum representing the lifecycle state of the application. /// /// This enum defines two possible states for the application: /// `foreground` and `background`. -enum AppLifecycleState { +enum LifecycleState { + /// The lifecycle state is not known, e.g., the initial state before any checks. + unknown, + + /// The application is in the foreground and actively being used. foreground, + /// The application is in the background and not actively being used. background, } diff --git a/packages/stream_core/lib/src/utils/network_state_provider.dart b/packages/stream_core/lib/src/utils/network_state_provider.dart index ed31216..eb28cbd 100644 --- a/packages/stream_core/lib/src/utils/network_state_provider.dart +++ b/packages/stream_core/lib/src/utils/network_state_provider.dart @@ -1,5 +1,6 @@ import 'state_emitter.dart'; +/// A type alias for a state emitter that emits [NetworkState] values. typedef NetworkStateEmitter = StateEmitter; /// A utility class for monitoring network connectivity changes. @@ -16,6 +17,9 @@ abstract interface class NetworkStateProvider { /// This enum defines two possible values to represent the state of network /// connectivity: `connected` and `disconnected`. enum NetworkState { + /// The network state is unknown, e.g., the initial state before any checks. + unknown, + /// Internet is available because at least one of the HEAD requests succeeded. connected, diff --git a/packages/stream_core/lib/src/ws/client/engine/stream_web_socket_engine.dart b/packages/stream_core/lib/src/ws/client/engine/stream_web_socket_engine.dart index fc201ea..5f5194c 100644 --- a/packages/stream_core/lib/src/ws/client/engine/stream_web_socket_engine.dart +++ b/packages/stream_core/lib/src/ws/client/engine/stream_web_socket_engine.dart @@ -55,23 +55,31 @@ class StreamWebSocketEngine implements WebSocketEngine { @override Future> open(WebSocketOptions options) { return runSafely(() async { - if (_ws != null) { - throw StateError('Web socket is already open. Call close() first.'); - } + // Close any existing connection first. + if (_ws != null) await close(); + // Create a new WebSocket connection. _ws = _wsProvider.call(options); - - await _ws?.ready.then((_) => _listener?.onOpen()); - _wsSubscription = _ws?.stream.listen( _onData, - onError: _listener?.onError, - onDone: () => _listener?.onClose(_ws?.closeCode, _ws?.closeReason), + onDone: _onDone, cancelOnError: false, + onError: _listener?.onError, ); + + return _ws?.ready.then((_) => _listener?.onOpen()); }); } + void _onDone() { + // Capture the close code and reason before closing. + final closeCode = _ws?.closeCode; + final closeReason = _ws?.closeReason; + + // Close the connection and notify the listener. + unawaited(close(closeCode, closeReason)); + } + void _onData(Object? data) { // If data is null, we ignore it. if (data == null) return; @@ -92,15 +100,16 @@ class StreamWebSocketEngine implements WebSocketEngine { String? closeReason = 'Closed by client', ]) { return runSafely(() async { - if (_ws == null) { - throw StateError('WebSocket is not open. Call open() first.'); - } + if (_ws == null) return; await _ws?.sink.close(closeCode, closeReason); _ws = null; await _wsSubscription?.cancel(); _wsSubscription = null; + + // Notify the listener about the closure. + _listener?.onClose(closeCode, closeReason); }); } diff --git a/packages/stream_core/lib/src/ws/client/reconnect/automatic_reconnection_policy.dart b/packages/stream_core/lib/src/ws/client/reconnect/automatic_reconnection_policy.dart index 56f9583..6481c3a 100644 --- a/packages/stream_core/lib/src/ws/client/reconnect/automatic_reconnection_policy.dart +++ b/packages/stream_core/lib/src/ws/client/reconnect/automatic_reconnection_policy.dart @@ -52,12 +52,12 @@ class BackgroundStateReconnectionPolicy implements AutomaticReconnectionPolicy { BackgroundStateReconnectionPolicy({required this.appLifecycleState}); /// The provider that gives the current app lifecycle state. - final AppLifecycleStateEmitter appLifecycleState; + final LifecycleStateEmitter appLifecycleState; @override bool canBeReconnected() { final state = appLifecycleState.value; - return state == AppLifecycleState.foreground; + return state == LifecycleState.foreground; } } diff --git a/packages/stream_core/lib/src/ws/client/reconnect/connection_recovery_handler.dart b/packages/stream_core/lib/src/ws/client/reconnect/connection_recovery_handler.dart index 04b3f33..3dc70c1 100644 --- a/packages/stream_core/lib/src/ws/client/reconnect/connection_recovery_handler.dart +++ b/packages/stream_core/lib/src/ws/client/reconnect/connection_recovery_handler.dart @@ -38,11 +38,13 @@ class ConnectionRecoveryHandler extends Disposable { ConnectionRecoveryHandler({ required StreamWebSocketClient client, NetworkStateProvider? networkStateProvider, - AppLifecycleStateProvider? appLifecycleStateProvider, + LifecycleStateProvider? lifecycleStateProvider, + bool keepConnectionAliveInBackground = false, List? policies, RetryStrategy? retryStrategy, }) : _client = client, _reconnectStrategy = retryStrategy ?? RetryStrategy(), + _keepConnectionAliveInBackground = keepConnectionAliveInBackground, _policies = [ if (policies != null) ...policies, WebSocketAutomaticReconnectionPolicy( @@ -52,7 +54,7 @@ class ConnectionRecoveryHandler extends Disposable { InternetAvailabilityReconnectionPolicy( networkState: provider.state, ), - if (appLifecycleStateProvider case final provider?) + if (lifecycleStateProvider case final provider?) BackgroundStateReconnectionPolicy( appLifecycleState: provider.state, ), @@ -66,13 +68,14 @@ class ConnectionRecoveryHandler extends Disposable { } // Listen to app lifecycle state changes if a provider is given. - if (appLifecycleStateProvider case final provider?) { + if (lifecycleStateProvider case final provider?) { provider.state.on(_onAppLifecycleStateChanged).addTo(_subscriptions); } } final StreamWebSocketClient _client; final RetryStrategy _reconnectStrategy; + final bool _keepConnectionAliveInBackground; final List _policies; late final _subscriptions = CompositeSubscription(); @@ -115,9 +118,7 @@ class ConnectionRecoveryHandler extends Disposable { _reconnectionTimer = null; } - bool _canBeReconnected() { - return _policies.every((policy) => policy.canBeReconnected()); - } + bool _canBeReconnected() => _policies.every((it) => it.canBeReconnected()); bool _canBeDisconnected() { return switch (_client.connectionState.value) { @@ -128,15 +129,19 @@ class ConnectionRecoveryHandler extends Disposable { void _onNetworkStateChanged(NetworkState status) { return switch (status) { + NetworkState.unknown => () {}, // No action needed for unknown state. NetworkState.connected => reconnectIfNeeded(), NetworkState.disconnected => disconnectIfNeeded(), }; } - void _onAppLifecycleStateChanged(AppLifecycleState state) { + void _onAppLifecycleStateChanged(LifecycleState state) { return switch (state) { - AppLifecycleState.foreground => reconnectIfNeeded(), - AppLifecycleState.background => disconnectIfNeeded(), + LifecycleState.unknown => () {}, // No action needed for unknown state. + // If we want to keep the connection alive in the background, do nothing. + LifecycleState.background when _keepConnectionAliveInBackground => () {}, + LifecycleState.background => disconnectIfNeeded(), + LifecycleState.foreground => reconnectIfNeeded(), }; } @@ -146,7 +151,7 @@ class ConnectionRecoveryHandler extends Disposable { Connected() => _reconnectStrategy.resetConsecutiveFailures(), Disconnected() => _scheduleReconnectionIfNeeded(), // These states do not require any action. - Initialized() || Authenticating() || Disconnecting() => null, + Initialized() || Authenticating() || Disconnecting() => () {}, }; } diff --git a/packages/stream_core/lib/src/ws/client/stream_web_socket_client.dart b/packages/stream_core/lib/src/ws/client/stream_web_socket_client.dart index 7721073..7c07cc5 100644 --- a/packages/stream_core/lib/src/ws/client/stream_web_socket_client.dart +++ b/packages/stream_core/lib/src/ws/client/stream_web_socket_client.dart @@ -122,7 +122,7 @@ class StreamWebSocketClient final result = await _engine.open(options); // If some failure occurs, disconnect and rethrow the error. - return result.onFailure((_, __) => disconnect()).getOrThrow(); + return result.recover((_, __) => onClose()).getOrThrow(); } /// Closes the WebSocket connection. @@ -213,8 +213,7 @@ class StreamWebSocketClient error: WebSocketEngineException(error: error), ); - // Update the connection state to 'disconnecting'. - _connectionState = WebSocketConnectionState.disconnecting(source: source); + return unawaited(disconnect(source: source)); } void _handleHealthCheckEvent(WsEvent event, HealthCheckInfo info) { From 829a028b2140327f3045336c7d5caeab71823c16 Mon Sep 17 00:00:00 2001 From: Sahil Kumar Date: Tue, 30 Sep 2025 13:44:32 +0200 Subject: [PATCH 2/2] chore: update CHANGELOG.md --- packages/stream_core/CHANGELOG.md | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/packages/stream_core/CHANGELOG.md b/packages/stream_core/CHANGELOG.md index 1318780..6df0e09 100644 --- a/packages/stream_core/CHANGELOG.md +++ b/packages/stream_core/CHANGELOG.md @@ -1,3 +1,20 @@ +## Unreleased + +### 💥 BREAKING CHANGES + +- Renamed `AppLifecycleStateProvider` to `LifecycleStateProvider` and `AppLifecycleState` to `LifecycleState` + +### ✨ Features + +- Added `keepConnectionAliveInBackground` option to `ConnectionRecoveryHandler` +- Added `unknown` state to `NetworkState` and `LifecycleState` enums + +### 🐛 Bug Fixes + +- Fixed `onClose()` not being called when disconnecting during connecting state +- Fixed unnecessary reconnection attempts when network is offline +- Fixed existing connections not being closed before opening new ones + ## 0.1.0 -* Initial release +- Initial release