Add DeduplicationByUniqueId transform#10972
Conversation
There was a problem hiding this comment.
Perhaps call this "DeduplicateByKey"?
There was a problem hiding this comment.
I would call it ChoosePerKey or something like that since what it actually does is choose an arbitrary element for each key. Test cases should include different values for the same key (will require a flexible result matching)
There was a problem hiding this comment.
I think there is an assumption of using this transform: under any situation, one unique id should map one specific value, for any 2 same unique id, the value should also be the same. Thus we can deduplicate the value by this id. One typical usage is reading from certain message queue, any message is paired with one unified ID.
There was a problem hiding this comment.
I understand. It does allow inconsistency but not too much risk since anyone will immediately see that a key might have two values and they have to be careful. FWIW in Java is it arranged slightly differently into Distinct.withRepresentativeValues. But that way is not as cross-language-friendly as precomputing the keys.
There was a problem hiding this comment.
I think if the user wants to specify things in terms of windows and triggers, it would be more natural to manually do windowing before this operation. Instead, perhaps higher-level semantic information could be provided (namely, over what interval should the deduplication occur), and windowing/triggering should be used to accomplish this.
lukecwik
left a comment
There was a problem hiding this comment.
As discussed in person, mostly left comments for myself.
| | core.WindowInto( | ||
| Sessions(self._session_size), | ||
| trigger=trigger.AfterCount(1), | ||
| accumulation_mode=trigger.AccumulationMode.DISCARDING) |
There was a problem hiding this comment.
We need to be using an accumulating trigger.
| Sessions(self._session_size), | ||
| trigger=trigger.AfterCount(1), | ||
| accumulation_mode=trigger.AccumulationMode.DISCARDING) | ||
| | core.CombinePerKey(self._DeduplicationCombineFn()) |
There was a problem hiding this comment.
This should be a GBK followed by a ParDo that only emits elements if it is the first pane.
| window=core.DoFn.WindowParam, | ||
| paneinfo=core.DoFn.PaneInfoParam): | ||
| id, value = kv | ||
| yield (id, WindowedValue(value, ts, [window], paneinfo)) |
There was a problem hiding this comment.
The deduplication key should also contain the window since we don't want to remove output that may have been assigned to multiple windows. Most SDFs will produce output in the global window.
|
Discussed offline. We decided to purchase timer/state approach. Close this PR for now. |
R: @lukecwik @robertwb
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username).[BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replaceBEAM-XXXwith the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.