Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add StreamExtensions.buffer() #235

Merged
merged 7 commits into from
Mar 2, 2023
Merged

Conversation

nex3
Copy link
Member

@nex3 nex3 commented Feb 8, 2023

The immediate motivation for this is to buffer stderr from a subprocess until we learn whether the subprocess completed successfully or not. It's important to consume the output eagerly so the process doesn't deadlock after saturating the OS and/or dart:io buffers, but we don't want to print it at all if the process completes successfully.

I think it's generally useful to have an explicit way of doing this, since otherwise users may naively try to write Stream.pipe(StreamController) which doesn't actually buffer output if the controller doesn't have a listener.

Copy link
Member

@natebosch natebosch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lrhn - do you think this would fit better in stream_transform while it's still separate from async?


/// Returns a view of this stream that eagerly buffers all events until
/// `listen()` is called.
Stream<T> buffer() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This name conflicts with buffer from stream_transform.

https://pub.dev/documentation/stream_transform/latest/stream_transform/RateLimit/buffer.html

WDYT about asSingleSubscriptionStream to contrast with asBroadcastStream?

Would it be sensible to also have

if (!isBroadcast) return this;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be sensible to also have

I guess if the goal is specifically to have a listen on streams that might not otherwise ever have one it wouldn't make sense to return this.

We could also consider the name eagerListen instead of asSingleSubscriptionStream to reinforce the intent.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think eagerListen() or bufferUntilListen() would make sense as names.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or eagerlyBuffer().
The bufferUntilListen seems most descriptive, eagerListen sounds like it's more eager than just doing listen now, which isn't possible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like bufferUntilListen even if it's a bit verbose.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The listenAndBuffer is technically correct. It's a low-level (low abstraction) name because it describes the implementation, not a higher-level concept.
Maybe because there is no higher level concept, this really is a shorthand for listening-and-buffering, which will only be used as a low-level primitive of other things. Users shouldn't need to use it directly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lrhn - is that a vote for listenAndBuffer or do you still prefer bufferUntilListen?

I do think listenAndBuffer is the least likely to be misread.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's say it's a vote for listenAndBuffer. It's an ugly name, but it's precise and understandable.

I like that listen is early, because listening to a stream is a significant action to take. You'd usually not expect a function returning a stream to do anything before you listen to that stream.

If the name is missing something, it is what it's behavior is after listening to the returned stream.
But adding more would be too long, so putting that in the documentation is fine.

👍

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nex3 - can you rename to listenAndBuffer? Also update the name in the CHANGELOG.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, done!

onPause: () => subscription.pause(),
onResume: () => subscription.resume(),
onCancel: () => subscription.cancel());
subscription = listen(controller.add,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is safe to pass cancelOnError: false here.

If the result is listened with cancelOnError: false, we'd need it here too or that parameter won't work.

If the result is listened with cancelOnError: true, it should be OK. We'll add multiple errors here, but close the subscription once the result's listener closes on the first error.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree—but isn't cancelOnError: false the default?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤦 - it is the default.

@natebosch
Copy link
Member

It's important to consume the output eagerly so the process doesn't deadlock after saturating the OS and/or dart:io buffers

We don't get unlimited extra buffering here right? There is still backpressure through pausing the subscription, so is it still possible to fill up an OS buffer?

@nex3
Copy link
Member Author

nex3 commented Feb 17, 2023

We don't get unlimited extra buffering here right? There is still backpressure through pausing the subscription, so is it still possible to fill up an OS buffer?

That's right, it buffers until the point listen() is called and then behaves like a normal stream with normal backpressure. We could potentially add a bufferWhilePaused parameter as well, either now or in the future.

@natebosch natebosch requested a review from lrhn February 17, 2023 02:34
Copy link
Member

@lrhn lrhn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems reasonable, implementation can be simplified. Feature can potentially be made more feature-full, but that can always be added later if there is an actual need.

lib/src/stream_extensions.dart Outdated Show resolved Hide resolved
lib/src/stream_extensions.dart Outdated Show resolved Hide resolved
lib/src/stream_extensions.dart Outdated Show resolved Hide resolved
lib/src/stream_extensions.dart Outdated Show resolved Hide resolved
lib/src/stream_extensions.dart Show resolved Hide resolved

/// Returns a view of this stream that eagerly buffers all events until
/// `listen()` is called.
Stream<T> buffer() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or eagerlyBuffer().
The bufferUntilListen seems most descriptive, eagerListen sounds like it's more eager than just doing listen now, which isn't possible.

@lrhn
Copy link
Member

lrhn commented Feb 17, 2023

I wouldn't put anything new into stream_transformers.

Either we like the features in there, or at least some of them, and we should put those into async, or we don't, and we should drop them all.
Time to graduate from being an experiment :)

lib/src/stream_extensions.dart Outdated Show resolved Hide resolved
lib/src/stream_extensions.dart Outdated Show resolved Hide resolved
lib/src/stream_extensions.dart Show resolved Hide resolved
@nex3 nex3 requested a review from natebosch March 2, 2023 00:09
@natebosch natebosch merged commit f454380 into dart-lang:master Mar 2, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants