Skip to content

Heap-based global top.#5551

Closed
robertwb wants to merge 1 commit intoapache:masterfrom
robertwb:fast-global-top
Closed

Heap-based global top.#5551
robertwb wants to merge 1 commit intoapache:masterfrom
robertwb:fast-global-top

Conversation

@robertwb
Copy link
Copy Markdown
Contributor

@robertwb robertwb commented Jun 4, 2018

This adds a specialized implementation for global top that greatly reduces the number of compares required in the (single) reducer. Also uses heapq rather than repeated buffer + sort + truncate.


Follow this checklist to help us incorporate your contribution quickly and easily:

  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

It will help us expedite review of your Pull Request if you tag someone (e.g. @username) to look at it.

This adds a specialized implementation for global top that greatly reduces
the number of compares required in the (single) reducer.
Also uses heapq rather than repeated buffer + sort + truncate.

The new implementation doesn't accept side keywords and arguments, but these
(if any) are much more efficiently passed via a closure of the comparison
operator than on processing every single element.

@with_input_types(T)
@with_output_types(KV[None, List[T]])
class _TopPerShard(core.DoFn):
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps _TopPerBundle (and MergeTopPerBundle respectively)?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call.

original_compare = compare
compare = lambda a, b: original_compare(b, a)
# This is a more efficient global algorithm.
return (
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implementation seems to prevent the equivalent of "combiner lifting" for a future where one might take advantage of Multi Shard Combining. Could that be problematic?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is correct. There are two issues here that lead to this structure:

  1. I want to accumulate into a heap, but then sort the accumulator before emitting it to shift work from the (single) reducer to the (many) mappers. It's unclear how to express this as a CombineFn. Perhaps https://issues.apache.org/jira/browse/BEAM-4030 could help (if we ensure that it is called on all runners).
  2. I want to avoid encoding _ComparableValue objects, which will be done via pickling. This could possibly be worked around if we could set and enforce custom coders.

Perhaps a single accumulator object with a custom reduce would suffice.

@stale
Copy link
Copy Markdown

stale bot commented Sep 1, 2018

This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions.

@stale stale bot added the stale label Sep 1, 2018
@stale
Copy link
Copy Markdown

stale bot commented Sep 8, 2018

This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

@stale stale bot closed this Sep 8, 2018
Copy link
Copy Markdown
Contributor Author

@robertwb robertwb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was unable to re-open this pull request after rebasing, see #6997

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants