Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-2914] Add TimestampPrefixingWindowCoder as beam:coder:custom_window:v1. #15137

Merged
merged 3 commits into from
Jul 16, 2021

Conversation

y1chi
Copy link
Contributor

@y1chi y1chi commented Jul 7, 2021

Add a new common standard coder beam:coder:custom_window:v1. The coder encodes arbitrary bounded window by prefixing the max timestamp to its encoded form using the window's registered coder. e.g.:
max_timestamp
encoded_window_bytes

The idea is that, any arbitrary window extending bounded window must have max timestamp defined and runner only needs to know the max timestamp when processing trigger timers. The encoded bytes using the original custom window's registered coder can be used transparently on runner side as the identity of the window.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

ValidatesRunner compliance status (on master branch)

Lang ULR Dataflow Flink Samza Spark Twister2
Go --- Build Status Build Status --- Build Status ---
Java Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Python --- Build Status
Build Status
Build Status
Build Status
Build Status
--- Build Status ---
XLang Build Status Build Status Build Status --- Build Status ---

Examples testing status on various runners

Lang ULR Dataflow Flink Samza Spark Twister2
Go --- --- --- --- --- --- ---
Java --- Build Status
Build Status
Build Status
--- --- --- --- ---
Python --- --- --- --- --- --- ---
XLang --- --- --- --- --- --- ---

Post-Commit SDK/Transform Integration Tests Status (on master branch)

Go Java Python
Build Status Build Status Build Status
Build Status
Build Status

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website Whitespace Typescript
Non-portable Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status Build Status
Portable --- Build Status Build Status --- --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

@codecov
Copy link

codecov bot commented Jul 7, 2021

Codecov Report

Merging #15137 (65a7ce8) into master (e616db2) will increase coverage by 0.06%.
The diff coverage is 85.54%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master   #15137      +/-   ##
==========================================
+ Coverage   83.77%   83.83%   +0.06%     
==========================================
  Files         441      441              
  Lines       59800    59692     -108     
==========================================
- Hits        50095    50043      -52     
+ Misses       9705     9649      -56     
Impacted Files Coverage Δ
sdks/python/apache_beam/dataframe/__init__.py 100.00% <ø> (ø)
...dks/python/apache_beam/examples/cookbook/coders.py 62.16% <0.00%> (ø)
...ython/apache_beam/examples/kafkataxi/kafka_taxi.py 0.00% <0.00%> (ø)
...on/apache_beam/runners/portability/spark_runner.py 67.34% <33.33%> (ø)
...m/runners/portability/spark_uber_jar_job_server.py 84.32% <50.00%> (ø)
...n/apache_beam/ml/gcp/recommendations_ai_test_it.py 69.76% <69.76%> (ø)
sdks/python/apache_beam/coders/coders.py 87.80% <80.00%> (-0.23%) ⬇️
...ks/python/apache_beam/ml/gcp/recommendations_ai.py 87.26% <87.26%> (ø)
sdks/python/apache_beam/dataframe/frames.py 94.87% <97.95%> (+0.21%) ⬆️
sdks/python/apache_beam/coders/coder_impl.py 95.28% <100.00%> (+0.08%) ⬆️
... and 40 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 2c6f42c...65a7ce8. Read the comment docs.

@y1chi y1chi force-pushed the custom_window branch 9 times, most recently from a416bd0 to b7e1e7a Compare July 12, 2021 23:39
@y1chi y1chi changed the title [WIP] Add TimestampPrefixingWindowCoder to java sdk. [BEAM-2914] Add TimestampPrefixingWindowCoder as beam:coder:custom_window:v1. Jul 13, 2021
@y1chi
Copy link
Contributor Author

y1chi commented Jul 13, 2021

Run Java PreCommit

@y1chi
Copy link
Contributor Author

y1chi commented Jul 13, 2021

Run GoPortable PreCommit

@y1chi y1chi requested review from robertwb and lukecwik July 13, 2021 19:23
@lukecwik
Copy link
Member

Run Java PreCommit

1 similar comment
@y1chi
Copy link
Contributor Author

y1chi commented Jul 16, 2021

Run Java PreCommit

@y1chi y1chi merged commit 7d98ad2 into apache:master Jul 16, 2021
# }
# System.out.println(example);
coder:
urn: "beam:coder:custom_window:v1"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be timestamp_prefixed (similar to length_prefixed)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think custom_window makes it a bit more clear that this is a coder for window type. timestamp_prefixed also works as long as sdk and runner agree with the urn I guess.

# System.out.println(example);
coder:
urn: "beam:coder:custom_window:v1"
components: [{urn: "beam:coder:interval_window:v1"}]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't be limited to interval_windows, any windowed_coder should do.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, using interval_window coder here makes it easier to verify in standard coder test.

// maxTimestamp - A big endian 8 byte integer representing millis-since-epoch.
// The encoded representation is shifted so that the byte representation
// of negative values are lexicographically ordered before the byte
// representation of positive values. This is typically done by
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"typically done" makes it sound like any transformation preserving lexicographic ordering is acceptable. There is no choice here.

Perhaps we could reference other coders here (e.g. the encoding used for TimestmpCoder, or in WindowedValueCoder?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggested copying it verbatim as that is the text in those segments already.


// Encodes an arbitrary user defined window and its max timestamp (inclusive).
// The encoding format is:
// maxTimestamp window
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is the window encoded? Should we length prefix it as well? Or do we expect TimestampPrefixed(LenghtPrefixed(CustomWindowCoder))?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think whether it should be length prefixed can be determined by runner with coder overwrite, since we are not limiting the wrapped window coder type, known coder can be also used(though it is unlikely in real use case, but it is handy for unit tests) which don't need length prefix.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather have the length prefix to be a component as Yichi describes since this will allow other length prefixing schemes then the current one we have.

def is_deterministic(self) -> bool:
return self._window_coder.is_deterministic()

def as_cloud_object(self, coders_context=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed?

@@ -21,6 +21,7 @@
class CloudObjectKinds {
static final String KIND_GLOBAL_WINDOW = "kind:global_window";
static final String KIND_INTERVAL_WINDOW = "kind:interval_window";
static final String KIND_CUSTOM_WINDOW = "kind:custom_window";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For translating it as a known coder on runner side.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It isn't needed since we shouldn't need to pass these in anymore to Dataflow since the beam proto -> DFE conversion should happen internally but I didn't ask to remove it since it was already done.

private static class CustomWindowCoder extends CustomCoder<CustomWindow> {

private static final Coder<IntervalWindow> INTERVAL_WINDOW_CODER = IntervalWindow.getCoder();
private static final int REGISTER_BYTE_SIZE = 1234;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not following what is meant by the REGISTER_BYTE_SIZE constant. BYTE_SIZE_TO_REGISTER?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants