[BEAM-4546] Multi level combine#5639
Conversation
256a0c4 to
c5763b4
Compare
|
R: @katsiapis |
| Returns: | ||
| A PObject holding the result of the combine operation. | ||
| """ | ||
| def with_hot_key_fanout(self, fanout): |
There was a problem hiding this comment.
Could you add a comment about how this will be used, and what will it do? Especially what is expected from fanout fn? (Would it be a better to rename fanout to fanoutfn?)
There was a problem hiding this comment.
Done. See below about fanoutfn.
| def process(self, element): | ||
| key, value = element | ||
| fanout = fanout_fn(key) | ||
| if not fanout or fanout is 1: |
There was a problem hiding this comment.
Should it be invalid to return None from fanout_fn?
Also a high level question, do we expect users to have an idea about what keys would be hot to a point of how much fanout they will want per key? Would it be simpler to just accept an fanout integer that will apply to all keys?
There was a problem hiding this comment.
I've changed it to require an integer.
There are definitely cases where some keys are much more frequent than others, and can be detected. (E.g. word distributions in natural languages.) However, this also accepts a plain integer, which I expect will be use most of the time, which is why I simply called it fanout.
| # Boolean indicates this is not an accumulator. | ||
| yield pvalue.TaggedOutput('cold', (key, (False, value))) | ||
| else: | ||
| self.counter += 1 # Round-robin should be more even than random. |
There was a problem hiding this comment.
nit; "should be more" more what?
There was a problem hiding this comment.
More even. But this is ambitious with "even than." I'll reword.
| Note that a fanout greater than 1 requires the data to be sent through | ||
| two GroupByKeys, and a high fanout can also result in more shuffle data | ||
| due to less per-bundle combining. Setting the fanout for a key at 1 or less | ||
| places values on the "cold key" path that skip the intermeidate level of |
There was a problem hiding this comment.
nit; intermeidate -> intermediate
This is needed to avoid double-counting for multiply-firing triggers in accumulation mode.
8c6416b to
c51b18b
Compare
|
Jenkins: please run Python PreCommit |
|
R: @xinzha623 Xin, could you and Foo review this? It will be useful for TFMA implementation. |
Follow this checklist to help us incorporate your contribution quickly and easily:
[BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replaceBEAM-XXXwith the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.It will help us expedite review of your Pull Request if you tag someone (e.g.
@username) to look at it.