[BEAM-1467] Add cross-SDK implementations and tests of WindowedValueCoder#2018
[BEAM-1467] Add cross-SDK implementations and tests of WindowedValueCoder#2018vikkyrk wants to merge 6 commits intoapache:masterfrom vikkyrk:common_windowed_value_coder
Conversation
| # lower negative number. For ex: -3 / 2 = -2, but we expect it to be -1, | ||
| # to be consistent across SDKs. | ||
| self._from_normal_time( | ||
| restore_sign * (abs(wv.timestamp_micros) / 1000))) |
There was a problem hiding this comment.
Better ways to fix this?
There was a problem hiding this comment.
Much better! Any performance implications dividing by float and converting to int? (I think not)
There was a problem hiding this comment.
I do not know, but I do not think so.
There was a problem hiding this comment.
Actually float division loses precision for large long numbers, so this won't work.
|
Changes Unknown when pulling 61b964d on vikkyrk:common_windowed_value_coder into ** on apache:master**. |
|
Refer to this link for build results (access rights to CI server needed): |
|
Changes Unknown when pulling 61b964d on vikkyrk:common_windowed_value_coder into ** on apache:master**. |
|
Refer to this link for build results (access rights to CI server needed): |
|
Attach JIRA issues to TODOs, I will defer to R: @charlesccychen for review. |
| convertValue(element, coderSpec.getComponents().get(0), elementCoder)); | ||
| } | ||
| return convertedElements; | ||
| case "urn:beam:coders:global_window:0.1": |
There was a problem hiding this comment.
I must have missed this prior for IterableCoder, but it can be useful to put these case statements fully inside of {} (see interval_window case above) -- that lets each case use a different set of local variables.
| windowCoder)); | ||
| } | ||
| // Note: Until Python SDK supports PaneInfo, we default to PaneInfo.NO_FIRING. | ||
| return WindowedValue.of(windowValue, timestamp, windows, PaneInfo.NO_FIRING); |
There was a problem hiding this comment.
Suggest that:
- document the Python issue with a JIRA.
- Restrict the test values to those with PaneInfo.NO_FIRING, and add a TODO in
standard_coders.yamlreferencing the JIRA. - return the correct value in Java.
Seems cleaner -- fewer places to clean up once Python has PaneInfo.
| restore_sign * (abs(wv.timestamp_micros) / 1000))) | ||
| self._windows_coder.encode_to_stream(wv.windows, out, True) | ||
| # Default PaneInfo encoded byte representing NO_FIRING. | ||
| # TODO(vikasrk): Remove the hard coding here once PaneInfo is supported. |
There was a problem hiding this comment.
instead of TODO(vikasrk) make it TODO(BEAM_XXX) to a JIRA. :)
|
|
||
| # TODO: Fn Harness only supports millis. Is this important enough to fix? | ||
| def _to_normal_time(self, value): | ||
| """Convert "lexicographically ordered unsigned" to signed.""" |
There was a problem hiding this comment.
Can you document what "normal time" means for coders? What is "lexicographically ordered unsigned"?
| self._value_coder.encode_to_stream(wv.value, out, True) | ||
| # Avoid creation of Timestamp object. | ||
| out.write_bigendian_int64(wv.timestamp_micros) | ||
| restore_sign = -1 if wv.timestamp_micros < 0 else 1 |
There was a problem hiding this comment.
Can you add a comment that this workaround is to maintain compatibility with the Java SDK and point to a jira issue to resolve this in the future?
| # of precision while converting to millis. | ||
| # Note: This is only a best effort here as there is no way to know if these | ||
| # were indeed MIN/MAX timestamps. | ||
| # TODO(vikasrk): Clean this up once we have a BEAM wide consensus on |
There was a problem hiding this comment.
Please point to jira.
| # were indeed MIN/MAX timestamps. | ||
| # TODO(vikasrk): Clean this up once we have a BEAM wide consensus on | ||
| # precision of timestamps. | ||
| if timestamp == -(abs(MIN_TIMESTAMP.micros) / 1000): |
There was a problem hiding this comment.
Do we want to do <= and >= here and below? This is somewhat fragile. Can you also document the Java min and max values?
There was a problem hiding this comment.
This should be go away once the BEAM-1524 is fixed. Added a jira.
|
|
||
| windows = self._windows_coder.decode_from_stream(in_stream, True) | ||
| # Read PaneInfo encoded byte. | ||
| in_stream.read_byte() |
There was a problem hiding this comment.
Please make it clearer in the comment that this is ignored for now.
|
Refer to this link for build results (access rights to CI server needed): Failed Tests: 1beam_PreCommit_Java_MavenInstall/org.apache.beam:beam-runners-spark: 1--none-- |
|
R: @charlesccychen addressed comments, PTAL. |
|
Thanks! LGTM. |
|
Refer to this link for build results (access rights to CI server needed): Failed Tests: 1beam_PreCommit_Java_MavenInstall/org.apache.beam:beam-runners-spark: 1--none-- |
|
Retest this please. |
|
Refer to this link for build results (access rights to CI server needed): |
Be sure to do all of the following to help us incorporate your contribution
quickly and easily:
Tests for GlobalWindowCoder
Cross-sdk implementation / tests for WindowedValueCoder
Note: PaneInfo isn't supported in python SDK yet, so we hard code the value to 0x0F which represents PaneInfo.NO_FIRING
Make sure the PR title is formatted like:
[BEAM-<Jira issue #>] Description of pull requestMake sure tests pass via
mvn clean verify. (Even better, enableTravis-CI on your fork and ensure the whole test matrix passes).
Replace
<Jira issue #>in the title with the actual Jira issuenumber, if there is one.
If this contribution is large, please file an Apache
Individual Contributor License Agreement.