New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-8575] Added two unit tests to CombineTest class to test that Co… #10190
Conversation
Its Java parity is the |
The mean combine fn already covers this test case completely. |
This test (test_ConcatIntCombineFn_combine) is very similar to #10173 (test_MeanCombineFn_combine), except that it is supposed to be used to test counters. We can either continue to add this test (test_ConcatIntCombineFn_combine) and then write the counter test using this pipeline, (An email with more details is sent to the reviewer.) |
As discussed in the email, I re-implemented the tests to set three different kinds of metrics (counter, distribution and gauge) in a CombineFn and then extract and verify their values from the result returned by pipeline.run(). |
R: @robertwb |
Run PythonLint PreCommit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Just some minor comments on non-determinism.
Run Python PreCommit |
f2a93a6
to
04d8a44
Compare
Retest this please |
…mbinePerKey works with a simple customized CombineFn.
4a1cf4b
to
d37021a
Compare
Retest this please |
1 similar comment
Retest this please |
Run Python PreCommit |
Retest this please |
1 similar comment
Retest this please |
Retest this please |
Run Python PreCommit |
Retest this please |
1 similar comment
Retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just some minor changes.
result.wait_until_finish() | ||
|
||
# Verify the concatenated strings are correct. | ||
expected_concat_per_key = [('c', 'ginoru'), ('d', 'abeemsstt')] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: it's really hard to tell from reading this whether it is correct. Maybe make the input something like
('key1': 'a'), ('key1': 'ab'), .., ('key2', 'xyz'), ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -50,6 +52,39 @@ | |||
from apache_beam.utils.timestamp import Timestamp | |||
|
|||
|
|||
class CounterIncrememtingCombineFn(beam.CombineFn): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe call this SortedConcatWithCounters?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
self.last_word_len.set(len(element)) | ||
|
||
# ''.join() converts the list to a string. | ||
return ''.join(sorted(acc + element)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to sort here, just return acc + element.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
return ''.join(sorted(acc + element)) | ||
|
||
def merge_accumulators(self, accs): | ||
return ''.join(sorted(''.join(accs))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Likewise, return sum(accs, '')
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sum(accs, ' ') doesn't work, because sum couldn't concatenate strings.
return ''.join(sorted(''.join(accs))) | ||
|
||
def extract_output(self, acc): | ||
return acc |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd do the sorting here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once sorted, acc became a list again. So I use ''.join() to convert it back to a string.
global_concat = (input | ||
| beam.Values() | ||
| beam.CombineGlobally(CounterIncrememtingCombineFn()) | ||
| "sort global result" >> _SortLists) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no need for _SortLists anywhere here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Deleted.
Retest this please |
… changes accordingly.
Retest this please |
1 similar comment
Retest this please |
retest this please |
…mbinePerKey works with a simple customized CombineFn.
Added two unit tests to CombineTest class to test that CombinePerKey works with a simple customized CombineFn, namely ConcatIntCombineFn.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.See the Contributor Guide for more tips on how to make review process smoother.
Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.