Skip to content

Commit

Permalink
Refactor to implement Stream.fromIterator more directly.
Browse files Browse the repository at this point in the history
The current implementation uses a specialized version of the
pending-event queue used by stream subscriptions to remember
pending events.
That makes the queue polymorphic, and slightly more complicated than
necessary, and that again makes further refactorings of the
Stream implementation harder.

This change moves the logic from the specialized pending-queue
into a simple function instead, so you only pay for it if you
actually use `Stream.fromIterable`.

Also allows `Stream.fromIterable` to be listened to more than once.
(It uses `Stream.multi` for the general async+sync controller API,
and it would cost extra code to make it only work once.)

Change-Id: I44b2010225cd3d32c2bcdb8a315c94881331bdae
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/248146
Reviewed-by: Nate Bosch <nbosch@google.com>
Commit-Queue: Lasse Nielsen <lrn@google.com>
  • Loading branch information
lrhn authored and Commit Bot committed Jun 16, 2022
1 parent e4e3e09 commit 330759e
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 134 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@
This feature has not been supported in most compilation targets for some
time but is now completely removed.

### Libraries

#### `dart:async`

- The `Stream.fromIterable` stream can now be listened to more than once.

## 2.18.0

### Language
Expand Down
11 changes: 6 additions & 5 deletions sdk/lib/async/broadcast_stream_controller.dart
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ class _AsyncBroadcastStreamController<T> extends _BroadcastStreamController<T> {
/// on another stream, and it is fine to forward them synchronously.
class _AsBroadcastStreamController<T> extends _SyncBroadcastStreamController<T>
implements _EventDispatch<T> {
_StreamImplEvents<T>? _pending;
_PendingEvents<T>? _pending;

_AsBroadcastStreamController(void onListen()?, void onCancel()?)
: super(onListen, onCancel);
Expand All @@ -463,7 +463,7 @@ class _AsBroadcastStreamController<T> extends _SyncBroadcastStreamController<T>
}

void _addPendingEvent(_DelayedEvent event) {
(_pending ??= new _StreamImplEvents<T>()).add(event);
(_pending ??= new _PendingEvents<T>()).add(event);
}

void add(T data) {
Expand All @@ -489,9 +489,10 @@ class _AsBroadcastStreamController<T> extends _SyncBroadcastStreamController<T>

void _flushPending() {
var pending = _pending;
while (pending != null && !pending.isEmpty) {
pending.handleNext(this);
pending = _pending;
if (pending != null) {
while (!pending.isEmpty) {
pending.handleNext(this);
}
}
}

Expand Down
61 changes: 56 additions & 5 deletions sdk/lib/async/stream.dart
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ abstract class Stream<T> {
return controller.stream;
}

/// Creates a single-subscription stream that gets its data from [elements].
/// Creates a stream that gets its data from [elements].
///
/// The iterable is iterated when the stream receives a listener, and stops
/// iterating if the listener cancels the subscription, or if the
Expand All @@ -333,15 +333,66 @@ abstract class Stream<T> {
/// If reading [Iterator.current] on `elements.iterator` throws,
/// the stream emits that error, but keeps iterating.
///
/// Can be listened to more than once. Each listener iterates [elements]
/// independently.
///
/// Example:
/// ```dart
/// final numbers = [1, 2, 3, 5, 6, 7];
/// final stream = Stream.fromIterable(numbers);
/// ```
factory Stream.fromIterable(Iterable<T> elements) {
return new _GeneratedStreamImpl<T>(
() => new _IterablePendingEvents<T>(elements));
}
factory Stream.fromIterable(Iterable<T> elements) =>
Stream<T>.multi((controller) {
Iterator<T> iterator;
try {
iterator = elements.iterator;
} catch (e, s) {
controller.addError(e, s);
controller.close();
return;
}
var zone = Zone.current;
var isScheduled = true;

void next() {
if (!controller.hasListener || controller.isPaused) {
// Cancelled or paused since scheduled.
isScheduled = false;
return;
}
bool hasNext;
try {
hasNext = iterator.moveNext();
} catch (e, s) {
controller.addErrorSync(e, s);
controller.closeSync();
return;
}
if (hasNext) {
try {
controller.addSync(iterator.current);
} catch (e, s) {
controller.addErrorSync(e, s);
}
if (controller.hasListener && !controller.isPaused) {
zone.scheduleMicrotask(next);
} else {
isScheduled = false;
}
} else {
controller.closeSync();
}
}

controller.onResume = () {
if (!isScheduled) {
isScheduled = true;
zone.scheduleMicrotask(next);
}
};

zone.scheduleMicrotask(next);
});

/// Creates a multi-subscription stream.
///
Expand Down
6 changes: 3 additions & 3 deletions sdk/lib/async/stream_controller.dart
Original file line number Diff line number Diff line change
Expand Up @@ -527,19 +527,19 @@ abstract class _StreamController<T> implements _StreamControllerBase<T> {
}

// Returns the pending events, and creates the object if necessary.
_StreamImplEvents<T> _ensurePendingEvents() {
_PendingEvents<T> _ensurePendingEvents() {
assert(_isInitialState);
if (!_isAddingStream) {
Object? events = _varData;
if (events == null) {
_varData = events = _StreamImplEvents<T>();
_varData = events = _PendingEvents<T>();
}
return events as dynamic;
}
_StreamControllerAddStreamState<T> state = _varData as dynamic;
Object? events = state.varData;
if (events == null) {
state.varData = events = _StreamImplEvents<T>();
state.varData = events = _PendingEvents<T>();
}
return events as dynamic;
}
Expand Down
140 changes: 27 additions & 113 deletions sdk/lib/async/stream_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ class _BufferingStreamSubscription<T>

/* Event handlers provided in constructor. */
@pragma("vm:entry-point")
_DataHandler<T> _onData;
void Function(T) _onData;
Function _onError;
_DoneHandler _onDone;
void Function() _onDone;

final Zone _zone;

Expand Down Expand Up @@ -318,9 +318,7 @@ class _BufferingStreamSubscription<T>
/// If the subscription is not paused, this also schedules a firing
/// of pending events later (if necessary).
void _addPending(_DelayedEvent event) {
_StreamImplEvents<T>? pending = _pending as dynamic;
pending ??= _StreamImplEvents<T>();
_pending = pending;
var pending = _pending ??= _PendingEvents<T>();
pending.add(event);
if (!_hasPending) {
_state |= _STATE_HAS_PENDING;
Expand Down Expand Up @@ -487,82 +485,8 @@ abstract class _StreamImpl<T> extends Stream<T> {
void _onListen(StreamSubscription subscription) {}
}

typedef _PendingEvents<T> _EventGenerator<T>();

/// Stream that generates its own events.
class _GeneratedStreamImpl<T> extends _StreamImpl<T> {
final _EventGenerator<T> _pending;
bool _isUsed = false;

/// Initializes the stream to have only the events provided by a
/// [_PendingEvents].
///
/// A new [_PendingEvents] must be generated for each listen.
_GeneratedStreamImpl(this._pending);

StreamSubscription<T> _createSubscription(void onData(T data)?,
Function? onError, void onDone()?, bool cancelOnError) {
if (_isUsed) throw new StateError("Stream has already been listened to.");
_isUsed = true;
return new _BufferingStreamSubscription<T>(
onData, onError, onDone, cancelOnError)
.._setPendingEvents(_pending());
}
}

/// Pending events object that gets its events from an [Iterable].
class _IterablePendingEvents<T> extends _PendingEvents<T> {
// The iterator providing data for data events.
// Set to null when iteration has completed.
Iterator<T>? _iterator;

_IterablePendingEvents(Iterable<T> data) : _iterator = data.iterator;

bool get isEmpty => _iterator == null;

void handleNext(_EventDispatch<T> dispatch) {
var iterator = _iterator;
if (iterator == null) {
throw new StateError("No events pending.");
}
// Send one event per call to moveNext.
// If moveNext returns true, send the current element as data.
// If current throws, send that error, but keep iterating.
// If moveNext returns false, send a done event and clear the _iterator.
// If moveNext throws an error, send an error and prepare to send a done
// event afterwards.
bool movedNext = false;
try {
if (iterator.moveNext()) {
movedNext = true;
dispatch._sendData(iterator.current);
} else {
_iterator = null;
dispatch._sendDone();
}
} catch (e, s) {
if (!movedNext) {
// Threw in .moveNext().
// Ensure that we send a done afterwards.
_iterator = const EmptyIterator<Never>();
}
// Else threw in .current.
dispatch._sendError(e, s);
}
}

void clear() {
if (isScheduled) cancelSchedule();
_iterator = null;
}
}

// Internal helpers.

// Types of the different handlers on a stream. Types used to type fields.
typedef void _DataHandler<T>(T value);
typedef void _DoneHandler();

/// Default data handler, does nothing.
void _nullDataHandler(dynamic value) {}

Expand Down Expand Up @@ -617,32 +541,36 @@ class _DelayedDone implements _DelayedEvent {
}
}

/// Superclass for provider of pending events.
abstract class _PendingEvents<T> {
/// Container and manager of pending events for a stream subscription.
class _PendingEvents<T> {
// No async event has been scheduled.
static const int _STATE_UNSCHEDULED = 0;
static const int stateUnscheduled = 0;
// An async event has been scheduled to run a function.
static const int _STATE_SCHEDULED = 1;
static const int stateScheduled = 1;
// An async event has been scheduled, but it will do nothing when it runs.
// Async events can't be preempted.
static const int _STATE_CANCELED = 3;
static const int stateCanceled = 3;

/// State of being scheduled.
///
/// Set to [_STATE_SCHEDULED] when pending events are scheduled for
/// Set to [stateScheduled] when pending events are scheduled for
/// async dispatch. Since we can't cancel a [scheduleMicrotask] call, if
/// scheduling is "canceled", the _state is simply set to [_STATE_CANCELED]
/// scheduling is "canceled", the _state is simply set to [stateCanceled]
/// which will make the async code do nothing except resetting [_state].
///
/// If events are scheduled while the state is [_STATE_CANCELED], it is
/// merely switched back to [_STATE_SCHEDULED], but no new call to
/// If events are scheduled while the state is [stateCanceled], it is
/// merely switched back to [stateScheduled], but no new call to
/// [scheduleMicrotask] is performed.
int _state = _STATE_UNSCHEDULED;
int _state = stateUnscheduled;

bool get isEmpty;
/// First element in the list of pending events, if any.
_DelayedEvent? firstPendingEvent;

bool get isScheduled => _state == _STATE_SCHEDULED;
bool get _eventScheduled => _state >= _STATE_SCHEDULED;
/// Last element in the list of pending events. New events are added after it.
_DelayedEvent? lastPendingEvent;

bool get isScheduled => _state == stateScheduled;
bool get _eventScheduled => _state >= stateScheduled;

/// Schedule an event to run later.
///
Expand All @@ -652,37 +580,23 @@ abstract class _PendingEvents<T> {
if (isScheduled) return;
assert(!isEmpty);
if (_eventScheduled) {
assert(_state == _STATE_CANCELED);
_state = _STATE_SCHEDULED;
assert(_state == stateCanceled);
_state = stateScheduled;
return;
}
scheduleMicrotask(() {
int oldState = _state;
_state = _STATE_UNSCHEDULED;
if (oldState == _STATE_CANCELED) return;
_state = stateUnscheduled;
if (oldState == stateCanceled) return;
handleNext(dispatch);
});
_state = _STATE_SCHEDULED;
_state = stateScheduled;
}

void cancelSchedule() {
if (isScheduled) _state = _STATE_CANCELED;
if (isScheduled) _state = stateCanceled;
}

void handleNext(_EventDispatch<T> dispatch);

/// Throw away any pending events and cancel scheduled events.
void clear();
}

/// Class holding pending events for a [_StreamImpl].
class _StreamImplEvents<T> extends _PendingEvents<T> {
/// Single linked list of [_DelayedEvent] objects.
_DelayedEvent? firstPendingEvent;

/// Last element in the list of pending events. New events are added after it.
_DelayedEvent? lastPendingEvent;

bool get isEmpty => lastPendingEvent == null;

void add(_DelayedEvent event) {
Expand Down Expand Up @@ -722,7 +636,7 @@ class _DoneStreamSubscription<T> implements StreamSubscription<T> {

final Zone _zone;
int _state = 0;
_DoneHandler? _onDone;
void Function()? _onDone;

_DoneStreamSubscription(this._onDone) : _zone = Zone.current {
_schedule();
Expand Down
18 changes: 14 additions & 4 deletions tests/lib/async/stream_from_iterable_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,16 @@ main() {
}

{
// Test that you can't listen twice..
asyncStart();
// Test that you can listen twice.
Stream stream = new Stream.fromIterable(iter);
stream.listen((x) {}).cancel();
Expect.throws<StateError>(() => stream.listen((x) {}));
stream.listen((x) {}).cancel(); // Doesn't throw.
Future.wait([stream.toList(), stream.toList()]).then((result) {
Expect.listEquals(iter.toList(), result[0]);
Expect.listEquals(iter.toList(), result[1]);
asyncEnd();
});
}

{
Expand Down Expand Up @@ -216,9 +222,13 @@ Future<List<Object>> collectEvents(Stream<Object> stream) {
var c = new Completer<List<Object>>();
var events = <Object>[];
stream.listen((value) {
events..add("value")..add(value);
events
..add("value")
..add(value);
}, onError: (error) {
events..add("error")..add(error);
events
..add("error")
..add(error);
}, onDone: () {
c.complete(events);
});
Expand Down

0 comments on commit 330759e

Please sign in to comment.