Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion packages/stream_core/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
## Unreleased

### 💥 BREAKING CHANGES

- Renamed `AppLifecycleStateProvider` to `LifecycleStateProvider` and `AppLifecycleState` to `LifecycleState`

### ✨ Features

- Added `keepConnectionAliveInBackground` option to `ConnectionRecoveryHandler`
- Added `unknown` state to `NetworkState` and `LifecycleState` enums

### 🐛 Bug Fixes

- Fixed `onClose()` not being called when disconnecting during connecting state
- Fixed unnecessary reconnection attempts when network is offline
- Fixed existing connections not being closed before opening new ones

## 0.1.0

* Initial release
- Initial release
2 changes: 1 addition & 1 deletion packages/stream_core/lib/src/utils.dart
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
export 'utils/app_lifecycle_state_provider.dart';
export 'utils/comparable_extensions.dart';
export 'utils/disposable.dart';
export 'utils/lifecycle_state_provider.dart';
export 'utils/list_extensions.dart';
export 'utils/network_state_provider.dart';
export 'utils/result.dart';
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,28 @@
import 'state_emitter.dart';

typedef AppLifecycleStateEmitter = StateEmitter<AppLifecycleState>;
/// A type alias for a state emitter that emits [LifecycleState] values.
typedef LifecycleStateEmitter = StateEmitter<LifecycleState>;

/// A utility class for monitoring application lifecycle state changes.
///
/// This interface defines the contract for an application lifecycle state provider
/// that can provide the current state of the application and a stream of state changes.
abstract interface class AppLifecycleStateProvider {
abstract interface class LifecycleStateProvider {
/// A emitter that provides updates on the application lifecycle state.
AppLifecycleStateEmitter get state;
LifecycleStateEmitter get state;
}

/// Enum representing the lifecycle state of the application.
///
/// This enum defines two possible states for the application:
/// `foreground` and `background`.
enum AppLifecycleState {
enum LifecycleState {
/// The lifecycle state is not known, e.g., the initial state before any checks.
unknown,

/// The application is in the foreground and actively being used.
foreground,

/// The application is in the background and not actively being used.
background,
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import 'state_emitter.dart';

/// A type alias for a state emitter that emits [NetworkState] values.
typedef NetworkStateEmitter = StateEmitter<NetworkState>;

/// A utility class for monitoring network connectivity changes.
Expand All @@ -16,6 +17,9 @@ abstract interface class NetworkStateProvider {
/// This enum defines two possible values to represent the state of network
/// connectivity: `connected` and `disconnected`.
enum NetworkState {
/// The network state is unknown, e.g., the initial state before any checks.
unknown,

/// Internet is available because at least one of the HEAD requests succeeded.
connected,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,23 +55,31 @@ class StreamWebSocketEngine<Inc, Out> implements WebSocketEngine<Out> {
@override
Future<Result<void>> open(WebSocketOptions options) {
return runSafely(() async {
if (_ws != null) {
throw StateError('Web socket is already open. Call close() first.');
}
// Close any existing connection first.
if (_ws != null) await close();

// Create a new WebSocket connection.
_ws = _wsProvider.call(options);

await _ws?.ready.then((_) => _listener?.onOpen());

_wsSubscription = _ws?.stream.listen(
_onData,
onError: _listener?.onError,
onDone: () => _listener?.onClose(_ws?.closeCode, _ws?.closeReason),
onDone: _onDone,
cancelOnError: false,
onError: _listener?.onError,
);

return _ws?.ready.then((_) => _listener?.onOpen());
});
}

void _onDone() {
// Capture the close code and reason before closing.
final closeCode = _ws?.closeCode;
final closeReason = _ws?.closeReason;

// Close the connection and notify the listener.
unawaited(close(closeCode, closeReason));
}

void _onData(Object? data) {
// If data is null, we ignore it.
if (data == null) return;
Expand All @@ -92,15 +100,16 @@ class StreamWebSocketEngine<Inc, Out> implements WebSocketEngine<Out> {
String? closeReason = 'Closed by client',
]) {
return runSafely(() async {
if (_ws == null) {
throw StateError('WebSocket is not open. Call open() first.');
}
if (_ws == null) return;

await _ws?.sink.close(closeCode, closeReason);
_ws = null;

await _wsSubscription?.cancel();
_wsSubscription = null;

// Notify the listener about the closure.
_listener?.onClose(closeCode, closeReason);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ class BackgroundStateReconnectionPolicy implements AutomaticReconnectionPolicy {
BackgroundStateReconnectionPolicy({required this.appLifecycleState});

/// The provider that gives the current app lifecycle state.
final AppLifecycleStateEmitter appLifecycleState;
final LifecycleStateEmitter appLifecycleState;

@override
bool canBeReconnected() {
final state = appLifecycleState.value;
return state == AppLifecycleState.foreground;
return state == LifecycleState.foreground;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@ class ConnectionRecoveryHandler extends Disposable {
ConnectionRecoveryHandler({
required StreamWebSocketClient client,
NetworkStateProvider? networkStateProvider,
AppLifecycleStateProvider? appLifecycleStateProvider,
LifecycleStateProvider? lifecycleStateProvider,
bool keepConnectionAliveInBackground = false,
List<AutomaticReconnectionPolicy>? policies,
RetryStrategy? retryStrategy,
}) : _client = client,
_reconnectStrategy = retryStrategy ?? RetryStrategy(),
_keepConnectionAliveInBackground = keepConnectionAliveInBackground,
_policies = <AutomaticReconnectionPolicy>[
if (policies != null) ...policies,
WebSocketAutomaticReconnectionPolicy(
Expand All @@ -52,7 +54,7 @@ class ConnectionRecoveryHandler extends Disposable {
InternetAvailabilityReconnectionPolicy(
networkState: provider.state,
),
if (appLifecycleStateProvider case final provider?)
if (lifecycleStateProvider case final provider?)
BackgroundStateReconnectionPolicy(
appLifecycleState: provider.state,
),
Expand All @@ -66,13 +68,14 @@ class ConnectionRecoveryHandler extends Disposable {
}

// Listen to app lifecycle state changes if a provider is given.
if (appLifecycleStateProvider case final provider?) {
if (lifecycleStateProvider case final provider?) {
provider.state.on(_onAppLifecycleStateChanged).addTo(_subscriptions);
}
}

final StreamWebSocketClient _client;
final RetryStrategy _reconnectStrategy;
final bool _keepConnectionAliveInBackground;
final List<AutomaticReconnectionPolicy> _policies;

late final _subscriptions = CompositeSubscription();
Expand Down Expand Up @@ -115,9 +118,7 @@ class ConnectionRecoveryHandler extends Disposable {
_reconnectionTimer = null;
}

bool _canBeReconnected() {
return _policies.every((policy) => policy.canBeReconnected());
}
bool _canBeReconnected() => _policies.every((it) => it.canBeReconnected());

bool _canBeDisconnected() {
return switch (_client.connectionState.value) {
Expand All @@ -128,15 +129,19 @@ class ConnectionRecoveryHandler extends Disposable {

void _onNetworkStateChanged(NetworkState status) {
return switch (status) {
NetworkState.unknown => () {}, // No action needed for unknown state.
NetworkState.connected => reconnectIfNeeded(),
NetworkState.disconnected => disconnectIfNeeded(),
};
}

void _onAppLifecycleStateChanged(AppLifecycleState state) {
void _onAppLifecycleStateChanged(LifecycleState state) {
return switch (state) {
AppLifecycleState.foreground => reconnectIfNeeded(),
AppLifecycleState.background => disconnectIfNeeded(),
LifecycleState.unknown => () {}, // No action needed for unknown state.
// If we want to keep the connection alive in the background, do nothing.
LifecycleState.background when _keepConnectionAliveInBackground => () {},
LifecycleState.background => disconnectIfNeeded(),
LifecycleState.foreground => reconnectIfNeeded(),
};
}

Expand All @@ -146,7 +151,7 @@ class ConnectionRecoveryHandler extends Disposable {
Connected() => _reconnectStrategy.resetConsecutiveFailures(),
Disconnected() => _scheduleReconnectionIfNeeded(),
// These states do not require any action.
Initialized() || Authenticating() || Disconnecting() => null,
Initialized() || Authenticating() || Disconnecting() => () {},
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class StreamWebSocketClient
final result = await _engine.open(options);

// If some failure occurs, disconnect and rethrow the error.
return result.onFailure((_, __) => disconnect()).getOrThrow();
return result.recover((_, __) => onClose()).getOrThrow();
}

/// Closes the WebSocket connection.
Expand Down Expand Up @@ -213,8 +213,7 @@ class StreamWebSocketClient
error: WebSocketEngineException(error: error),
);

// Update the connection state to 'disconnecting'.
_connectionState = WebSocketConnectionState.disconnecting(source: source);
return unawaited(disconnect(source: source));
}

void _handleHealthCheckEvent(WsEvent event, HealthCheckInfo info) {
Expand Down