diff --git a/packages/stream_feeds/CHANGELOG.md b/packages/stream_feeds/CHANGELOG.md index a97abb15..a24563a7 100644 --- a/packages/stream_feeds/CHANGELOG.md +++ b/packages/stream_feeds/CHANGELOG.md @@ -1,5 +1,6 @@ ## Unreleased - [BREAKING] Renamed `AppLifecycleStateProvider` to `LifecycleStateProvider` and `AppLifecycleState` to `LifecycleState`. +- Re-watch websocket events for feeds when the websocket reconnects. ## 0.2.0 - [BREAKING] Update API client code, specifically the FeedOwnCapability enum. diff --git a/packages/stream_feeds/lib/src/client/feeds_client_impl.dart b/packages/stream_feeds/lib/src/client/feeds_client_impl.dart index 1f511195..e4297a04 100644 --- a/packages/stream_feeds/lib/src/client/feeds_client_impl.dart +++ b/packages/stream_feeds/lib/src/client/feeds_client_impl.dart @@ -1,5 +1,6 @@ import 'dart:async'; +import 'package:rxdart/rxdart.dart'; import 'package:stream_core/stream_core.dart'; import '../cdn/cdn_api.dart'; @@ -270,6 +271,7 @@ class StreamFeedsClientImpl implements StreamFeedsClient { feedsRepository: _feedsRepository, pollsRepository: _pollsRepository, eventsEmitter: events, + onReconnectEmitter: onReconnectEmitter, ); } @@ -474,4 +476,25 @@ class StreamFeedsClientImpl implements StreamFeedsClient { Future> deleteImage({required String url}) { return _cdnClient.deleteImage(url); } + + Stream get onReconnectEmitter { + return connectionState.stream.scan( + (state, connectionStatus, i) => switch (connectionStatus) { + Initialized() || Connecting() => ( + wasDisconnected: state.wasDisconnected, + reconnected: false, + ), + Disconnecting() || Disconnected() => ( + wasDisconnected: true, + reconnected: false, + ), + Connected() => ( + wasDisconnected: false, + reconnected: state.wasDisconnected, + ), + _ => state, + }, + (wasDisconnected: false, reconnected: false), + ).mapNotNull((state) => state.reconnected ? () : null); + } } diff --git a/packages/stream_feeds/lib/src/state/feed.dart b/packages/stream_feeds/lib/src/state/feed.dart index c37f1a33..5e11d6bd 100644 --- a/packages/stream_feeds/lib/src/state/feed.dart +++ b/packages/stream_feeds/lib/src/state/feed.dart @@ -1,6 +1,7 @@ import 'dart:async'; import 'package:freezed_annotation/freezed_annotation.dart'; +import 'package:rxdart/rxdart.dart'; import 'package:state_notifier/state_notifier.dart'; import 'package:stream_core/stream_core.dart'; @@ -48,6 +49,7 @@ class Feed with Disposable { required this.feedsRepository, required this.pollsRepository, required this.eventsEmitter, + required Stream onReconnectEmitter, }) { final fid = query.fid; @@ -65,7 +67,12 @@ class Feed with Disposable { // Attach event handlers for the feed events final handler = FeedEventHandler(fid: fid, state: _stateNotifier); - _eventsSubscription = eventsEmitter.listen(handler.handleEvent); + _feedSubscriptions.add(eventsEmitter.listen(handler.handleEvent)); + + // Automatically refetch data on reconnection + if (query.watch) { + _subscribeToReconnectionUpdates(onReconnectEmitter: onReconnectEmitter); + } } FeedId get fid => query.fid; @@ -86,11 +93,11 @@ class Feed with Disposable { late final FeedStateNotifier _stateNotifier; final SharedEmitter eventsEmitter; - StreamSubscription? _eventsSubscription; + final CompositeSubscription _feedSubscriptions = CompositeSubscription(); @override void dispose() { - _eventsSubscription?.cancel(); + _feedSubscriptions.cancel(); _stateNotifier.dispose(); _memberList.dispose(); super.dispose(); @@ -628,4 +635,14 @@ class Feed with Disposable { } // endregion + + void _subscribeToReconnectionUpdates({ + required Stream onReconnectEmitter, + }) { + _feedSubscriptions.add( + onReconnectEmitter.listen((_) { + getOrCreate(); + }), + ); + } }