From 342e5c9669768353de21bd89a83e537fcf3ea224 Mon Sep 17 00:00:00 2001 From: Rene Floor Date: Tue, 30 Sep 2025 13:42:30 +0200 Subject: [PATCH 1/3] relisten to websocket events after reconnect --- .../lib/src/client/feeds_client_impl.dart | 23 ++++++++++++++++++ packages/stream_feeds/lib/src/state/feed.dart | 24 ++++++++++++++++--- 2 files changed, 44 insertions(+), 3 deletions(-) diff --git a/packages/stream_feeds/lib/src/client/feeds_client_impl.dart b/packages/stream_feeds/lib/src/client/feeds_client_impl.dart index 25db02e7..e6376741 100644 --- a/packages/stream_feeds/lib/src/client/feeds_client_impl.dart +++ b/packages/stream_feeds/lib/src/client/feeds_client_impl.dart @@ -1,5 +1,6 @@ import 'dart:async'; +import 'package:rxdart/rxdart.dart'; import 'package:stream_core/stream_core.dart'; import '../cdn/cdn_api.dart'; @@ -270,6 +271,7 @@ class StreamFeedsClientImpl implements StreamFeedsClient { feedsRepository: _feedsRepository, pollsRepository: _pollsRepository, eventsEmitter: events, + onReconnectEmitter: onReconnectEmitter, ); } @@ -474,4 +476,25 @@ class StreamFeedsClientImpl implements StreamFeedsClient { Future> deleteImage({required String url}) { return _cdnClient.deleteImage(url); } + + Stream get onReconnectEmitter { + return connectionState.stream.scan( + (state, connectionStatus, i) => switch (connectionStatus) { + Initialized() || Connecting() => ( + wasDisconnected: state.wasDisconnected, + reconnected: false, + ), + Disconnecting() || Disconnected() => ( + wasDisconnected: true, + reconnected: false, + ), + Connected() => ( + wasDisconnected: false, + reconnected: state.wasDisconnected, + ), + _ => state, + }, + (wasDisconnected: false, reconnected: false), + ).mapNotNull((state) => state.reconnected ? () : null); + } } diff --git a/packages/stream_feeds/lib/src/state/feed.dart b/packages/stream_feeds/lib/src/state/feed.dart index c37f1a33..45a11f13 100644 --- a/packages/stream_feeds/lib/src/state/feed.dart +++ b/packages/stream_feeds/lib/src/state/feed.dart @@ -1,9 +1,11 @@ import 'dart:async'; import 'package:freezed_annotation/freezed_annotation.dart'; +import 'package:rxdart/rxdart.dart'; import 'package:state_notifier/state_notifier.dart'; import 'package:stream_core/stream_core.dart'; +import '../../stream_feeds.dart'; import '../generated/api/api.dart' as api; import '../models/activity_data.dart'; import '../models/bookmark_data.dart'; @@ -48,6 +50,7 @@ class Feed with Disposable { required this.feedsRepository, required this.pollsRepository, required this.eventsEmitter, + required Stream onReconnectEmitter, }) { final fid = query.fid; @@ -65,7 +68,12 @@ class Feed with Disposable { // Attach event handlers for the feed events final handler = FeedEventHandler(fid: fid, state: _stateNotifier); - _eventsSubscription = eventsEmitter.listen(handler.handleEvent); + _feedSubscriptions.add(eventsEmitter.listen(handler.handleEvent)); + + // Automatically refetch data on reconnection + if (query.watch) { + _subscribeToReconnectionUpdates(onReconnectEmitter: onReconnectEmitter); + } } FeedId get fid => query.fid; @@ -86,11 +94,11 @@ class Feed with Disposable { late final FeedStateNotifier _stateNotifier; final SharedEmitter eventsEmitter; - StreamSubscription? _eventsSubscription; + final CompositeSubscription _feedSubscriptions = CompositeSubscription(); @override void dispose() { - _eventsSubscription?.cancel(); + _feedSubscriptions.cancel(); _stateNotifier.dispose(); _memberList.dispose(); super.dispose(); @@ -628,4 +636,14 @@ class Feed with Disposable { } // endregion + + void _subscribeToReconnectionUpdates({ + required Stream onReconnectEmitter, + }) { + _feedSubscriptions.add( + onReconnectEmitter.listen((_) { + getOrCreate(); + }), + ); + } } From bab122c899b3c33c31b144e2033398abb30abfab Mon Sep 17 00:00:00 2001 From: Rene Floor Date: Tue, 30 Sep 2025 13:52:16 +0200 Subject: [PATCH 2/3] add changelog --- packages/stream_feeds/CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/packages/stream_feeds/CHANGELOG.md b/packages/stream_feeds/CHANGELOG.md index 7e2e739a..399795f0 100644 --- a/packages/stream_feeds/CHANGELOG.md +++ b/packages/stream_feeds/CHANGELOG.md @@ -1,3 +1,6 @@ +## NEXT +- Re-watch websocket events for feeds when the websocket reconnects. + ## 0.2.0 - [BREAKING] Update API client code, specifically the FeedOwnCapability enum. - Fix unknown enums for `List` in `GetOrCreateFeedResponse` to be `FeedOwnCapability.unknown`. From 8cef5fd756d91f8e6786c9cd6b78b60ab64bed8f Mon Sep 17 00:00:00 2001 From: Rene Floor Date: Tue, 30 Sep 2025 13:56:01 +0200 Subject: [PATCH 3/3] remove unnecessary import --- packages/stream_feeds/lib/src/state/feed.dart | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/stream_feeds/lib/src/state/feed.dart b/packages/stream_feeds/lib/src/state/feed.dart index 45a11f13..5e11d6bd 100644 --- a/packages/stream_feeds/lib/src/state/feed.dart +++ b/packages/stream_feeds/lib/src/state/feed.dart @@ -5,7 +5,6 @@ import 'package:rxdart/rxdart.dart'; import 'package:state_notifier/state_notifier.dart'; import 'package:stream_core/stream_core.dart'; -import '../../stream_feeds.dart'; import '../generated/api/api.dart' as api; import '../models/activity_data.dart'; import '../models/bookmark_data.dart';