Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/stream_feeds/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## Unreleased
- [BREAKING] Renamed `AppLifecycleStateProvider` to `LifecycleStateProvider` and `AppLifecycleState` to `LifecycleState`.
- Re-watch websocket events for feeds when the websocket reconnects.

## 0.2.0
- [BREAKING] Update API client code, specifically the FeedOwnCapability enum.
Expand Down
23 changes: 23 additions & 0 deletions packages/stream_feeds/lib/src/client/feeds_client_impl.dart
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import 'dart:async';

import 'package:rxdart/rxdart.dart';
import 'package:stream_core/stream_core.dart';

import '../cdn/cdn_api.dart';
Expand Down Expand Up @@ -270,6 +271,7 @@ class StreamFeedsClientImpl implements StreamFeedsClient {
feedsRepository: _feedsRepository,
pollsRepository: _pollsRepository,
eventsEmitter: events,
onReconnectEmitter: onReconnectEmitter,
);
}

Expand Down Expand Up @@ -474,4 +476,25 @@ class StreamFeedsClientImpl implements StreamFeedsClient {
Future<Result<void>> deleteImage({required String url}) {
return _cdnClient.deleteImage(url);
}

Stream<void> get onReconnectEmitter {
return connectionState.stream.scan(
(state, connectionStatus, i) => switch (connectionStatus) {
Initialized() || Connecting() => (
wasDisconnected: state.wasDisconnected,
reconnected: false,
),
Disconnecting() || Disconnected() => (
wasDisconnected: true,
reconnected: false,
),
Connected() => (
wasDisconnected: false,
reconnected: state.wasDisconnected,
),
_ => state,
},
(wasDisconnected: false, reconnected: false),
).mapNotNull((state) => state.reconnected ? () : null);
}
}
23 changes: 20 additions & 3 deletions packages/stream_feeds/lib/src/state/feed.dart
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import 'dart:async';

import 'package:freezed_annotation/freezed_annotation.dart';
import 'package:rxdart/rxdart.dart';
import 'package:state_notifier/state_notifier.dart';
import 'package:stream_core/stream_core.dart';

Expand Down Expand Up @@ -48,6 +49,7 @@ class Feed with Disposable {
required this.feedsRepository,
required this.pollsRepository,
required this.eventsEmitter,
required Stream<void> onReconnectEmitter,
}) {
final fid = query.fid;

Expand All @@ -65,7 +67,12 @@ class Feed with Disposable {

// Attach event handlers for the feed events
final handler = FeedEventHandler(fid: fid, state: _stateNotifier);
_eventsSubscription = eventsEmitter.listen(handler.handleEvent);
_feedSubscriptions.add(eventsEmitter.listen(handler.handleEvent));

// Automatically refetch data on reconnection
if (query.watch) {
_subscribeToReconnectionUpdates(onReconnectEmitter: onReconnectEmitter);
}
}

FeedId get fid => query.fid;
Expand All @@ -86,11 +93,11 @@ class Feed with Disposable {
late final FeedStateNotifier _stateNotifier;

final SharedEmitter<WsEvent> eventsEmitter;
StreamSubscription<WsEvent>? _eventsSubscription;
final CompositeSubscription _feedSubscriptions = CompositeSubscription();

@override
void dispose() {
_eventsSubscription?.cancel();
_feedSubscriptions.cancel();
_stateNotifier.dispose();
_memberList.dispose();
super.dispose();
Expand Down Expand Up @@ -628,4 +635,14 @@ class Feed with Disposable {
}

// endregion

void _subscribeToReconnectionUpdates({
required Stream<void> onReconnectEmitter,
}) {
_feedSubscriptions.add(
onReconnectEmitter.listen((_) {
getOrCreate();
}),
);
}
}