Skip to content

Commit

Permalink
Revert "Add groupBy to Stream."
Browse files Browse the repository at this point in the history
This reverts commit 3e8bfb1 and 3f90b06.

R=lrn@google.com

Review-Url: https://codereview.chromium.org/2921663002 .
  • Loading branch information
floitschG committed Jun 1, 2017
1 parent 5b94cfe commit 33f360c
Show file tree
Hide file tree
Showing 3 changed files with 0 additions and 425 deletions.
3 changes: 0 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,6 @@ entirely to allow inference to fill in the type.
* JSON maps are now typed as `Map<String, dynamic>` instead of
`Map<dynamic, dynamic>`. A JSON-map is not a `HashMap` or `LinkedHashMap`
anymore (but just a `Map`).
* `dart:async`
* Add `groupBy` to `Stream`. Allows splitting a string into separate streams
depending on "key" property computed from the individual events.
* `dart:async`, `dart:io`, `dart:core`
* Adding to a closed sink, including `IOSink`, is not allowed anymore. In
1.24, violations are only reported (on stdout or stderr), but a future
Expand Down
103 changes: 0 additions & 103 deletions sdk/lib/async/stream.dart
Original file line number Diff line number Diff line change
Expand Up @@ -395,76 +395,6 @@ abstract class Stream<T> {
return new _MapStream<T, S>(this, convert);
}

/// Groups events by a computed key.
///
/// A key is extracted from incoming events.
/// The first time a key is seen, a stream is created for it, and emitted
/// on the returned stream, along with the key, as a [GroupedEvents] object.
/// Then the event is emitted on the stream ([GroupedEvents.values])
/// corresponding to the key.
///
/// An error on the source stream, or when calling the `key` functions,
/// will emit the error on the returned stream.
///
/// Canceling the subscription on the returned stream will stop processing
/// and close the streams for all groups.
///
/// Pausing the subscription on the returned stream will pause processing
/// and no further events are added to streams for the individual groups.
///
/// Pausing or canceling an individual group stream has no effect other than
/// on that stream. Events will be queued while the group stream
/// is paused and until it is first listened to.
/// If the [GroupedEvents.values] stream is never listened to,
/// it will enqueue all the events unnecessarily.
Stream<GroupedEvents<K, T>> groupBy<K>(K key(T event)) {
var controller;
controller = new StreamController<GroupedEvents<K, T>>(
sync: true,
onListen: () {
var groupControllers = new HashMap<K, StreamController<T>>();

void closeAll() {
for (var groupController in groupControllers.values) {
groupController.close();
}
}

var subscription = this.listen(
(data) {
K theKey;
try {
theKey = key(data);
} catch (error, stackTrace) {
controller.addError(error, stackTrace);
return;
}
var groupController = groupControllers[theKey];
if (groupController == null) {
groupController =
new StreamController<T>.broadcast(sync: true);
groupControllers[theKey] = groupController;
controller.add(
new GroupedEvents<K, T>(theKey, groupController.stream));
}
groupController.add(data);
},
onError: controller.addError,
onDone: () {
controller.close();
closeAll();
});
controller.onPause = subscription.pause;
controller.onResume = subscription.resume;
controller.onCancel = () {
subscription.cancel();
// Don't fire sync events in response to a callback.
scheduleMicrotask(closeAll);
};
});
return controller.stream;
}

/**
* Creates a new stream with each data event of this stream asynchronously
* mapped to a new event.
Expand Down Expand Up @@ -2067,36 +1997,3 @@ class _ControllerEventSinkWrapper<T> implements EventSink<T> {
_sink.close();
}
}

/// A group created by [Stream.groupBy].
///
/// The stream created by `groupBy` emits a `GroupedEvents`
/// for each distinct key it encounters.
/// This group contains the [key] itself, along with a stream of the [values]
/// associated with that key.
class GroupedEvents<K, V> {
/// The key that identifiers the values emitted by [values].
final K key;

/// The [values] that [GroupBy] have grouped by the common [key].
final Stream<V> values;

factory GroupedEvents(K key, Stream<V> values) = GroupedEvents<K, V>._;

// Don't expose a generative constructor.
// This class is not intended for subclassing, so we don't want to promise
// it. We can change that in the future.
GroupedEvents._(this.key, this.values);

/// Tells [values] to discard values instead of retaining them.
///
/// Must only be used instead of listening to the [values] stream.
/// If the stream has been listened to, this call fails.
/// After calling this method, listening on the [values] stream fails.
Future cancel() {
// If values has been listened to,
// this throws a StateError saying that stream has already been listened to,
// which is a correct error message for this call too.
return values.listen(null).cancel();
}
}

0 comments on commit 33f360c

Please sign in to comment.