From fa506df064676b69d4554a1a4bb120aec84bc94d Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Thu, 4 Jun 2026 11:05:14 +0000 Subject: [PATCH] fix(firestore): dedupe listenForChanges subscriptions and guard disposed event bus Repeated invokeAPI calls with listenForChanges: true accumulated Firestore snapshot listeners because subscribeToApi always appended to a list. Replace the prior subscription for the same apiName (matching SSE provider behavior) and skip callbacks after provider dispose. Also ignore binding dispatches once the page event bus is closed, which avoids crashes when a snapshot arrives after Page dispose but before Screen cancels API providers. Co-authored-by: Sharjeel Yunus --- .../firestore/firestore_api_provider.dart | 48 +++++++++++----- modules/ensemble/lib/framework/scope.dart | 2 + .../test/firestore_listen_dedup_test.dart | 57 +++++++++++++++++++ 3 files changed, 94 insertions(+), 13 deletions(-) create mode 100644 modules/ensemble/test/firestore_listen_dedup_test.dart diff --git a/modules/ensemble/lib/framework/apiproviders/firestore/firestore_api_provider.dart b/modules/ensemble/lib/framework/apiproviders/firestore/firestore_api_provider.dart index 2c6dbf988..a1a905022 100644 --- a/modules/ensemble/lib/framework/apiproviders/firestore/firestore_api_provider.dart +++ b/modules/ensemble/lib/framework/apiproviders/firestore/firestore_api_provider.dart @@ -12,10 +12,23 @@ import 'package:yaml/yaml.dart'; import 'package:firebase_core/firebase_core.dart'; import 'package:flutter/foundation.dart'; +/// Replaces an existing Firestore listener for [apiName], cancelling any prior +/// subscription so repeated `listenForChanges` invocations do not accumulate. +@visibleForTesting +void replaceFirestoreApiSubscription( + Map subscriptions, + String apiName, + StreamSubscription subscription, +) { + subscriptions[apiName]?.cancel(); + subscriptions[apiName] = subscription; +} + class FirestoreAPIProvider extends APIProvider with LiveAPIProvider { FirebaseApp? _app; FirebaseFirestore get firestore => FirebaseFirestore.instanceFor(app: _app!); - List _subscriptions = []; + Map _subscriptions = {}; + bool _disposed = false; late FirestoreApp firestoreApp; @override @@ -259,16 +272,26 @@ class FirestoreAPIProvider extends APIProvider with LiveAPIProvider { List pathSegments = path.split('/'); bool isDocumentPath = pathSegments.length % 2 == 0; + void deliverSnapshot(dynamic snapshot) { + if (_disposed) return; + listener(getOKResponse(apiName, snapshot)); + } + if (isDocumentPath) { DocumentReference docRef = firestore.doc(path); - _subscriptions.add(docRef.snapshots().listen((DocumentSnapshot snapshot) { - listener.call(getOKResponse(apiName, snapshot)); - })); + replaceFirestoreApiSubscription( + _subscriptions, + apiName, + docRef.snapshots().listen(deliverSnapshot), + ); } else { - Query query = firestoreApp.getQuery(api, isCollectionGroup: api['isCollectionGroup'] ?? false); - _subscriptions.add(query.snapshots().listen((QuerySnapshot snapshot) { - listener.call(getOKResponse(apiName, snapshot)); - })); + Query query = firestoreApp.getQuery( + api, isCollectionGroup: api['isCollectionGroup'] ?? false); + replaceFirestoreApiSubscription( + _subscriptions, + apiName, + query.snapshots().listen(deliverSnapshot), + ); } Map body = { 'message': 'Subscribed to API', @@ -284,13 +307,12 @@ class FirestoreAPIProvider extends APIProvider with LiveAPIProvider { @override dispose() { + _disposed = true; try { - if (_subscriptions.isNotEmpty) { - for (var subscription in _subscriptions) { - subscription.cancel(); - } + for (final subscription in _subscriptions.values) { + subscription.cancel(); } - _subscriptions = []; + _subscriptions.clear(); } catch (e) { print('Error disposing FirestoreAPIProvider: $e'); } diff --git a/modules/ensemble/lib/framework/scope.dart b/modules/ensemble/lib/framework/scope.dart index 2d51ef545..b6ef34db3 100644 --- a/modules/ensemble/lib/framework/scope.dart +++ b/modules/ensemble/lib/framework/scope.dart @@ -664,6 +664,8 @@ mixin PageBindingManager on IsScopeManager { void dispatch(ModelChangeEvent event) { //log("EventBus ${eventBus.hashCode} firing $event"); + // Page dispose closes the event bus before Screen cancels live API listeners. + if (eventBus.streamController.isClosed) return; eventBus.fire(event); } diff --git a/modules/ensemble/test/firestore_listen_dedup_test.dart b/modules/ensemble/test/firestore_listen_dedup_test.dart new file mode 100644 index 000000000..69c954c75 --- /dev/null +++ b/modules/ensemble/test/firestore_listen_dedup_test.dart @@ -0,0 +1,57 @@ +import 'dart:async'; + +import 'package:ensemble/framework/apiproviders/firestore/firestore_api_provider.dart'; +import 'package:event_bus/event_bus.dart'; +import 'package:flutter_test/flutter_test.dart'; + +void main() { + group('replaceFirestoreApiSubscription', () { + test('cancels prior subscription for the same apiName', () async { + final firstController = StreamController(); + final secondController = StreamController(); + var firstEventsAfterReplace = 0; + + final first = firstController.stream.listen((_) { + firstEventsAfterReplace++; + }); + final second = secondController.stream.listen((_) {}); + + final subscriptions = {'users': first}; + replaceFirestoreApiSubscription(subscriptions, 'users', second); + + firstController.add(1); + await Future.delayed(Duration.zero); + + expect(firstEventsAfterReplace, 0); + expect(subscriptions['users'], same(second)); + expect(first.isPaused, isFalse); + + await firstController.close(); + await secondController.close(); + }); + + test('keeps subscriptions for different api names', () async { + final usersController = StreamController(); + final ordersController = StreamController(); + final users = usersController.stream.listen((_) {}); + final orders = ordersController.stream.listen((_) {}); + + final subscriptions = {'users': users}; + replaceFirestoreApiSubscription(subscriptions, 'orders', orders); + + expect(subscriptions.keys, containsAll(['users', 'orders'])); + + await usersController.close(); + await ordersController.close(); + }); + }); + + group('event bus dispose race', () { + test('closed event bus rejects new events', () { + final bus = EventBus(); + bus.destroy(); + expect(bus.streamController.isClosed, isTrue); + expect(() => bus.fire('event'), throwsStateError); + }); + }); +}