Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: streamline recursion in mapAsyncPartitioned #32031

Merged
merged 3 commits into from
Aug 15, 2023

Conversation

leviramsey
Copy link
Contributor

In mapAsyncPartitioned whenever a future in a given partition completes we dequeue the longest prefix of the partition's queue which has completed before we potentially emit elements downstream and demand more elements from upstream.

When perPartition is low (say 1 or 2), this doesn't delay demanding more elements too long and if there are unstarted futures we get a head start. However there can be situations with larger perPartition values where this process materially delays demanding new elements. Additionally, in the situation where there are no unstarted futures in a partition, we still eagerly dequeue more elements even though there's no gain from starting them earlier.

This change introspects the partition queue: if after dequeuing, there are no unstarted futures for the partition, the stage will move into emission mode rather than attempt to dequeue more.

@@ -127,7 +127,13 @@ private[akka] final case class MapAsyncPartitioned[In, Out, Partition](
// dropped from per-partition queue, to be able to execute next, but result is kept in the
// main linear buffer for when out is ready
buffer.dropOnlyPartitionHead(partition)
dropCompletedThenPushIfPossible(partition)

if (perPartition > 1 && buffer.usedInPartition(partition) > perPartition) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The special case for perPartition == 1 does mean that we assume that any future started as a consequence of the dropOnlyPartitionHead above will wait until that async callback triggers.

Not only do we save the cost of introspecting the partition buffer's used, we also save the cost of an almost certainly pointless iteration of peeking and checking for completion.

Copy link
Member

@patriknw patriknw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, with a small suggestion

akka-stream/src/main/scala/akka/stream/impl/Buffers.scala Outdated Show resolved Hide resolved
Copy link
Member

@johanandren johanandren left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM but let's include P:s suggestion

@patriknw patriknw merged commit 1cb93af into akka:main Aug 15, 2023
6 checks passed
@patriknw patriknw added this to the 2.8.4 milestone Aug 15, 2023
@leviramsey leviramsey deleted the mapasyncpartitioned-fix branch August 30, 2023 18:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants