Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,23 @@ import 'package:yaml/yaml.dart';
import 'package:firebase_core/firebase_core.dart';
import 'package:flutter/foundation.dart';

/// Replaces an existing Firestore listener for [apiName], cancelling any prior
/// subscription so repeated `listenForChanges` invocations do not accumulate.
@visibleForTesting
void replaceFirestoreApiSubscription(
Map<String, StreamSubscription> subscriptions,
String apiName,
StreamSubscription subscription,
) {
subscriptions[apiName]?.cancel();
subscriptions[apiName] = subscription;
}

class FirestoreAPIProvider extends APIProvider with LiveAPIProvider {
FirebaseApp? _app;
FirebaseFirestore get firestore => FirebaseFirestore.instanceFor(app: _app!);
List<StreamSubscription> _subscriptions = [];
Map<String, StreamSubscription> _subscriptions = {};
bool _disposed = false;
late FirestoreApp firestoreApp;

@override
Expand Down Expand Up @@ -259,16 +272,26 @@ class FirestoreAPIProvider extends APIProvider with LiveAPIProvider {
List<String> pathSegments = path.split('/');
bool isDocumentPath = pathSegments.length % 2 == 0;

void deliverSnapshot(dynamic snapshot) {
if (_disposed) return;
listener(getOKResponse(apiName, snapshot));
}

if (isDocumentPath) {
DocumentReference docRef = firestore.doc(path);
_subscriptions.add(docRef.snapshots().listen((DocumentSnapshot snapshot) {
listener.call(getOKResponse(apiName, snapshot));
}));
replaceFirestoreApiSubscription(
_subscriptions,
apiName,
docRef.snapshots().listen(deliverSnapshot),
);
} else {
Query query = firestoreApp.getQuery(api, isCollectionGroup: api['isCollectionGroup'] ?? false);
_subscriptions.add(query.snapshots().listen((QuerySnapshot snapshot) {
listener.call(getOKResponse(apiName, snapshot));
}));
Query query = firestoreApp.getQuery(
api, isCollectionGroup: api['isCollectionGroup'] ?? false);
replaceFirestoreApiSubscription(
_subscriptions,
apiName,
query.snapshots().listen(deliverSnapshot),
);
}
Map<String, dynamic> body = {
'message': 'Subscribed to API',
Expand All @@ -284,13 +307,12 @@ class FirestoreAPIProvider extends APIProvider with LiveAPIProvider {

@override
dispose() {
_disposed = true;
try {
if (_subscriptions.isNotEmpty) {
for (var subscription in _subscriptions) {
subscription.cancel();
}
for (final subscription in _subscriptions.values) {
subscription.cancel();
}
_subscriptions = [];
_subscriptions.clear();
} catch (e) {
print('Error disposing FirestoreAPIProvider: $e');
}
Expand Down
2 changes: 2 additions & 0 deletions modules/ensemble/lib/framework/scope.dart
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,8 @@ mixin PageBindingManager on IsScopeManager {

void dispatch(ModelChangeEvent event) {
//log("EventBus ${eventBus.hashCode} firing $event");
// Page dispose closes the event bus before Screen cancels live API listeners.
if (eventBus.streamController.isClosed) return;
eventBus.fire(event);
}

Expand Down
57 changes: 57 additions & 0 deletions modules/ensemble/test/firestore_listen_dedup_test.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import 'dart:async';

import 'package:ensemble/framework/apiproviders/firestore/firestore_api_provider.dart';
import 'package:event_bus/event_bus.dart';
import 'package:flutter_test/flutter_test.dart';

void main() {
group('replaceFirestoreApiSubscription', () {
test('cancels prior subscription for the same apiName', () async {
final firstController = StreamController<int>();
final secondController = StreamController<int>();
var firstEventsAfterReplace = 0;

final first = firstController.stream.listen((_) {
firstEventsAfterReplace++;
});
final second = secondController.stream.listen((_) {});

final subscriptions = <String, StreamSubscription>{'users': first};
replaceFirestoreApiSubscription(subscriptions, 'users', second);

firstController.add(1);
await Future<void>.delayed(Duration.zero);

expect(firstEventsAfterReplace, 0);
expect(subscriptions['users'], same(second));
expect(first.isPaused, isFalse);

await firstController.close();
await secondController.close();
});

test('keeps subscriptions for different api names', () async {
final usersController = StreamController<int>();
final ordersController = StreamController<int>();
final users = usersController.stream.listen((_) {});
final orders = ordersController.stream.listen((_) {});

final subscriptions = <String, StreamSubscription>{'users': users};
replaceFirestoreApiSubscription(subscriptions, 'orders', orders);

expect(subscriptions.keys, containsAll(['users', 'orders']));

await usersController.close();
await ordersController.close();
});
});

group('event bus dispose race', () {
test('closed event bus rejects new events', () {
final bus = EventBus();
bus.destroy();
expect(bus.streamController.isClosed, isTrue);
expect(() => bus.fire('event'), throwsStateError);
});
});
}
Loading