Skip to content

Commit

Permalink
Revert "chore: refactor unnecessary StreamController"
Browse files Browse the repository at this point in the history
This reverts commit 41addf4.
  • Loading branch information
rrousselGit committed Feb 18, 2022
1 parent 41addf4 commit 8470153
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,36 +98,43 @@ class MethodChannelDocumentReference extends DocumentReferencePlatform {
Stream<DocumentSnapshotPlatform> snapshots({
bool includeMetadataChanges = false,
}) {
final observerId = Stream.fromFuture(
MethodChannelFirebaseFirestore.channel
.invokeMethod<String>('DocumentReference#snapshots'),
);
// It's fine to let the StreamController be garbage collected once all the
// subscribers have cancelled; this analyzer warning is safe to ignore.
late StreamController<DocumentSnapshotPlatform>
controller; // ignore: close_sinks

return observerId
.asyncExpand((observerId) {
final channel =
MethodChannelFirebaseFirestore.documentSnapshotChannel(
observerId!,
StreamSubscription<dynamic>? snapshotStream;
controller = StreamController<DocumentSnapshotPlatform>.broadcast(
onListen: () async {
final observerId = await MethodChannelFirebaseFirestore.channel
.invokeMethod<String>('DocumentReference#snapshots');
snapshotStream =
MethodChannelFirebaseFirestore.documentSnapshotChannel(observerId!)
.receiveBroadcastStream(
<String, dynamic>{
'reference': this,
'includeMetadataChanges': includeMetadataChanges,
},
).listen((snapshot) {
controller.add(
DocumentSnapshotPlatform(
firestore,
snapshot['path'],
<String, dynamic>{
'data': snapshot['data'],
'metadata': snapshot['metadata'],
},
),
);
}, onError: (error, stack) {
controller.addError(convertPlatformException(error), stack);
});
},
onCancel: () {
snapshotStream?.cancel();
},
);

return channel.receiveBroadcastStream(
<String, dynamic>{
'reference': this,
'includeMetadataChanges': includeMetadataChanges,
},
);
})
.map(
(snapshot) => DocumentSnapshotPlatform(
firestore,
snapshot['path'],
<String, dynamic>{
'data': snapshot['data'],
'metadata': snapshot['metadata'],
},
),
)
.handleError(convertPlatformException)
.asBroadcastStream();
return controller.stream;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -184,21 +184,32 @@ class MethodChannelFirebaseFirestore extends FirebaseFirestorePlatform {

@override
Stream<void> snapshotsInSync() {
final observerId = Stream.fromFuture(
MethodChannelFirebaseFirestore.channel
.invokeMethod<String>('SnapshotsInSync#setup'),
StreamSubscription<dynamic>? snapshotStream;
late StreamController<void> controller; // ignore: close_sinks

controller = StreamController<void>.broadcast(
onListen: () async {
final observerId = await MethodChannelFirebaseFirestore.channel
.invokeMethod<String>('SnapshotsInSync#setup');

snapshotStream =
MethodChannelFirebaseFirestore.snapshotsInSyncChannel(observerId!)
.receiveBroadcastStream(
<String, dynamic>{
'firestore': this,
},
).listen((event) {
controller.add(null);
}, onError: (error, stack) {
controller.addError(convertPlatformException(error), stack);
});
},
onCancel: () {
snapshotStream?.cancel();
},
);

return observerId
.asyncExpand((observerId) {
final channel = MethodChannelFirebaseFirestore.snapshotsInSyncChannel(
observerId!);

return channel
.receiveBroadcastStream(<String, dynamic>{'firestore': this});
})
.handleError(convertPlatformException)
.asBroadcastStream();
return controller.stream;
}

@override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,22 +131,36 @@ class MethodChannelQuery extends QueryPlatform {
Stream<QuerySnapshotPlatform> snapshots({
bool includeMetadataChanges = false,
}) {
final observerId = Stream.fromFuture(
MethodChannelFirebaseFirestore.channel
.invokeMethod<String>('Query#snapshots'),
// It's fine to let the StreamController be garbage collected once all the
// subscribers have cancelled; this analyzer warning is safe to ignore.
late StreamController<QuerySnapshotPlatform>
controller; // ignore: close_sinks

StreamSubscription<dynamic>? snapshotStream;
controller = StreamController<QuerySnapshotPlatform>.broadcast(
onListen: () async {
final observerId = await MethodChannelFirebaseFirestore.channel
.invokeMethod<String>('Query#snapshots');

snapshotStream =
MethodChannelFirebaseFirestore.querySnapshotChannel(observerId!)
.receiveBroadcastStream(
<String, dynamic>{
'query': this,
'includeMetadataChanges': includeMetadataChanges,
},
).listen((snapshot) {
controller.add(MethodChannelQuerySnapshot(firestore, snapshot));
}, onError: (error, stack) {
controller.addError(convertPlatformException(error), stack);
});
},
onCancel: () {
snapshotStream?.cancel();
},
);

return observerId
.asyncExpand((observerId) {
final channel =
MethodChannelFirebaseFirestore.querySnapshotChannel(observerId!);

return channel
.receiveBroadcastStream(<String, dynamic>{'firestore': this});
})
.map((snapshot) => MethodChannelQuerySnapshot(firestore, snapshot))
.handleError(convertPlatformException)
.asBroadcastStream();
return controller.stream;
}

@override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,39 @@ class MethodChannelFirebaseAppInstallations
'plugins.flutter.io/firebase_app_installations',
);

static final Map<String, Stream<String>> _idTokenChangesListeners =
<String, Stream<String>>{};
static final Map<String, StreamController<String>> _idTokenChangesListeners =
<String, StreamController<String>>{};

/// Creates a new [MethodChannelFirebaseAppInstallations] instance with an [app].
MethodChannelFirebaseAppInstallations({required FirebaseApp app})
: super(app) {
_idTokenChangesListeners[app.name] = Stream.fromFuture(
channel.invokeMethod<String>(
'FirebaseInstallations#registerIdChangeListener',
{'appName': app.name},
),
)
.asyncExpand<Object?>((channelName) {
final events = EventChannel(channelName!, channel.codec);
return events.receiveBroadcastStream();
})
.map((arguments) => (arguments as Map)['token'] as String)
.handleError(convertPlatformException)
.asBroadcastStream();
_idTokenChangesListeners[app.name] = StreamController<String>.broadcast();

channel.invokeMethod<String>(
'FirebaseInstallations#registerIdChangeListener', {
'appName': app.name,
}).then((channelName) {
final events = EventChannel(channelName!, channel.codec);
events.receiveBroadcastStream().listen((arguments) {
_handleIdChangedListener(app.name, arguments);
}, onError: (error, stackTrace) {
_handleIdChangedError(app.name, error, stackTrace);
});
});
}
void _handleIdChangedError(String appName, Object error,
[StackTrace? stackTrace]) {
final StreamController<String> controller =
_idTokenChangesListeners[appName]!;
controller.addError(convertPlatformException(error), stackTrace);
}

/// Handle any incoming events from Event Channel and forward on to the user.
void _handleIdChangedListener(
String appName, Map<dynamic, dynamic> arguments) {
final StreamController<String> controller =
_idTokenChangesListeners[appName]!;
controller.add(arguments['token']);
}

/// Internal stub class initializer.
Expand Down Expand Up @@ -94,5 +108,7 @@ class MethodChannelFirebaseAppInstallations
}

@override
Stream<String> get onIdChange => _idTokenChangesListeners[app!.name]!;
Stream<String> get onIdChange {
return _idTokenChangesListeners[app!.name]!.stream;
}
}

0 comments on commit 8470153

Please sign in to comment.