Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: filter out elements without demand #28467

Merged
merged 1 commit into from Jan 23, 2020
Merged

Conversation

@jrudolph
Copy link
Member

jrudolph commented Jan 9, 2020

This will also mean that completion will not be blocked by elements that
will later be filtered out.

One particular use case of that would be a kind of partitioning use case,
where you put several streams behind a broadcast and each consumer will filter
out elements not handled there. In that case, the broadcast can get head-of-line
blocked when one of the consumers currently has no demand but also wouldn't
have to handle any elements because they would all be filtered out.

The question would be if we can just change the behavior like this or if needs a
flag. From an observer the behavior change is quite subtle (filter pulling elements without downstream demand). In general, I'd say we should strive for the
specification to be general enough that such a change would be possible as an implementation detail but there might be concerns in this concrete case.

Refs #18170 (a long-standing beef I had with filter)

@akka-ci

This comment has been minimized.

Copy link

akka-ci commented Jan 9, 2020

Test FAILed.

Copy link
Member

patriknw left a comment

I think it would be an acceptable semantical change, also without providing the alternative. Wouldn't a usage that depended on that be relying on implementation details?

We document it as: backpressures when the given predicate returns true for the element and downstream backpressures

} else {
pull(in)
}
if (p(elem)) emit(out, elem, () => if (!isClosed(in)) pull(in))

This comment has been minimized.

Copy link
@patriknw

patriknw Jan 13, 2020

Member

performance of filter can be important and I'm thinking if we should avoid emit and instead have a manual OptionVal buffer in this stage?

This comment has been minimized.

Copy link
@johanandren

johanandren Jan 14, 2020

Member

That was my gut feeling looking at this as well, emit will be the lambda + one EmittingSingle per element worst case?

This comment has been minimized.

Copy link
@jrudolph

jrudolph Jan 14, 2020

Author Member

Made that change even if it's 20 lines more

@jrudolph jrudolph force-pushed the jrudolph:eager-filter branch from e477009 to c99afbf Jan 14, 2020
@akka-ci akka-ci added validating and removed needs-attention labels Jan 14, 2020
@akka-ci akka-ci added tested and removed validating labels Jan 14, 2020
@akka-ci

This comment has been minimized.

Copy link

akka-ci commented Jan 14, 2020

Test PASSed.

@jrudolph jrudolph marked this pull request as ready for review Jan 15, 2020
@jrudolph

This comment has been minimized.

Copy link
Member Author

jrudolph commented Jan 15, 2020

I figured there's nothing to change with the docs because as Patrik mentioned, "We document it as: backpressures when the given predicate returns true for the element and downstream backpressures" is already as accurate as necessary.

Copy link
Member

patriknw left a comment

LGTM, with a style suggestion. Maybe also run the cinnamon job on this to make sure it doesn't break anything. https://jenkins.akka.io:8498/job/cinnamon-pr-validator/

This will also mean that completion will not be blocked by elements that
will later be filtered out.

One particular use case of that would be a kind of partitioning use case,
where you put several streams behind a broadcast and each consumer will filter
out elements not handled there. In that case, the broadcast can get head-of-line
blocked when one of the consumers currently has no demand but also wouldn't
have to handle any elements because they would all be filtered out.
@jrudolph jrudolph force-pushed the jrudolph:eager-filter branch from c99afbf to c39dd65 Jan 22, 2020
@akka-ci

This comment has been minimized.

Copy link

akka-ci commented Jan 22, 2020

Test PASSed.

@patriknw patriknw merged commit 35259f1 into akka:master Jan 23, 2020
3 checks passed
3 checks passed
Jenkins PR Validation Test PASSed. 5199 tests run, 458 skipped, 0 failed.
Details
continuous-integration/travis-ci/pr The Travis CI build passed
Details
typesafe-cla-validator All users have signed the CLA
Details
@patriknw patriknw added this to the 2.6.2 milestone Jan 23, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Linked issues

Successfully merging this pull request may close these issues.

None yet

4 participants
You can’t perform that action at this time.