New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-6733] Use Flink's prepareSnapshotPreBarrier to replace BufferedOutputManager #9652
Conversation
…OutputManager For Flink version <= 1.5 the Flink Runner had to buffer any elements which are emitted during a snapshot because the barrier has already been emitted. Flink version >= 1.6 provides a hook to execute an action before the snapshot barrier is emitted by the operator. We can remove the buffering in favor of finishing the current bundle in the DoFnOperator's prepareSnapshotPreBarrier. This had previously been deferred (apache#7940) until removal of Flink 1.5 (apache#9632).
Unrelated test failure in Java PreCommit. |
I added couple questions to the ticket. |
The current buffer logic for items emitted during checkpointing is faulty in the sense that the buffer is partitioned on the output keys of the operator. The key may be changed or even be dropped. Thus, the original key partitioning will not be maintained which will cause checkpointing to fail. An alternative solution would be BEAM-6733 / apache#9652, but this change keeps the current buffering logic in place. The output buffer may now always be redistributed round-robin upon restoring from a checkpoint. Note that this is fine because no assumption can be made about the distribution of output elements of a DoFn operation.
The current buffer logic for items emitted during checkpointing is faulty in the sense that the buffer is partitioned on the output keys of the operator. The key may be changed or even be dropped. Thus, the original key partitioning will not be maintained which will cause checkpointing to fail. An alternative solution would be BEAM-6733 / apache#9652, but this change keeps the current buffering logic in place. The output buffer may now always be redistributed round-robin upon restoring from a checkpoint. Note that this is fine because no assumption can be made about the distribution of output elements of a DoFn operation.
The current buffer logic for items emitted during checkpointing is faulty in the sense that the buffer is partitioned on the output keys of the operator. The key may be changed or even be dropped. Thus, the original key partitioning will not be maintained which will cause checkpointing to fail. An alternative solution would be BEAM-6733 / apache#9652, but this change keeps the current buffering logic in place. The output buffer may now always be redistributed round-robin upon restoring from a checkpoint. Note that this is fine because no assumption can be made about the distribution of output elements of a DoFn operation.
The current buffer logic for items emitted during checkpointing is faulty in the sense that the buffer is partitioned on the output keys of the operator. The key may be changed or even be dropped. Thus, the original key partitioning will not be maintained which will cause checkpointing to fail. An alternative solution would be BEAM-6733 / apache#9652, but this change keeps the current buffering logic in place. The output buffer may now always be redistributed round-robin upon restoring from a checkpoint. Note that this is fine because no assumption can be made about the distribution of output elements of a DoFn operation.
The current buffer logic for items emitted during checkpointing is faulty in the sense that the buffer is partitioned on the output keys of the operator. The key may be changed or even be dropped. Thus, the original key partitioning will not be maintained which will cause checkpointing to fail. An alternative solution would be BEAM-6733 / apache#9652, but this change keeps the current buffering logic in place. The output buffer may now always be redistributed round-robin upon restoring from a checkpoint. Note that this is fine because no assumption can be made about the distribution of output elements of a DoFn operation.
The current buffer logic for items emitted during checkpointing is faulty in the sense that the buffer is partitioned on the output keys of the operator. The key may be changed or even be dropped. Thus, the original key partitioning will not be maintained which will cause checkpointing to fail. An alternative solution would be BEAM-6733 / apache#9652, but this change keeps the current buffering logic in place. The output buffer may now always be redistributed round-robin upon restoring from a checkpoint. Note that this is fine because no assumption can be made about the distribution of output elements of a DoFn operation.
The current buffer logic for items emitted during checkpointing is faulty in the sense that the buffer is partitioned on the output keys of the operator. The key may be changed or even be dropped. Thus, the original key partitioning will not be maintained which will cause checkpointing to fail. An alternative solution would be BEAM-6733 / apache#9652, but this change keeps the current buffering logic in place. The output buffer may now always be redistributed round-robin upon restoring from a checkpoint. Note that this is fine because no assumption can be made about the distribution of output elements of a DoFn operation.
* [BEAM-8549] Do not use keyed operator state for checkpoint buffering The current buffer logic for items emitted during checkpointing is faulty in the sense that the buffer is partitioned on the output keys of the operator. The key may be changed or even be dropped. Thus, the original key partitioning will not be maintained which will cause checkpointing to fail. An alternative solution would be BEAM-6733 / apache#9652, but this change keeps the current buffering logic in place. The output buffer may now always be redistributed round-robin upon restoring from a checkpoint. Note that this is fine because no assumption can be made about the distribution of output elements of a DoFn operation. * [BEAM-8566] Fix checkpoint buffering when when another bundle is started during checkpointing As part of a checkpoint, the current bundle is finalized. When the bundle is finalized, the watermark, currently held back, may also be progressed which can cause the start of another bundle. When a new bundle is started, any to-be-buffered items from the previous bundle for the pending checkpoint may be emitted. This should not happen. This only effects portable pipelines where we have to hold back the watermark due to the asynchronous processing of elements. * [BEAM-8566] Do not swallow execution errors during checkpointing If a bundle fails to finalize before creating a checkpoint, it may be swallowed and just considered a checkpointing error. This breaks the execution flow and exactly-once guarantees.
This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions. |
This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
…ting We had a couple of PRs in which we wanted to remove the buffering of bundle output during checkpointing: apache#7940 apache#9652. Ultimately, we didn't merge any of those because we weren't sure how the change would affect the checkpoint performance. As a better migration path, this changes the default from buffering during checkpointing to flushing before checkpointing while still retaining the option to use the previous buffering behavior via a pipeline option.
…ting We had a couple of PRs in which we wanted to remove the buffering of bundle output during checkpointing: apache#7940 apache#9652. Ultimately, we didn't merge any of those because we weren't sure how the change would affect the checkpoint performance. As a better migration path, this changes the default from buffering during checkpointing to flushing before checkpointing while still retaining the option to use the previous buffering behavior via a pipeline option.
…ting We had a couple of PRs in which we wanted to remove the buffering of bundle output during checkpointing: apache#7940 apache#9652. Ultimately, we didn't merge any of those because we weren't sure how the change would affect the checkpoint performance. As a better migration path, this introduces a pipeline option to change the default, buffering bundle output during checkpointing, to finishing the bundle and flushing all data before checkpointing.
…ting We had a couple of PRs in which we wanted to remove the buffering of bundle output during checkpointing: apache#7940 apache#9652. Ultimately, we didn't merge any of those because we weren't sure how the change would affect the checkpoint performance. As a better migration path, this introduces a pipeline option to change the default, buffering bundle output during checkpointing, to finishing the bundle and flushing all data before checkpointing.
…ting We had a couple of PRs in which we wanted to remove the buffering of bundle output during checkpointing: apache#7940 apache#9652. Ultimately, we didn't merge any of those because we weren't sure how the change would affect the checkpoint performance. As a better migration path, this introduces a pipeline option to change the default, buffering bundle output during checkpointing, to finishing the bundle and flushing all data before checkpointing.
* Support ZetaSQL DATE type as a Beam LogicalType * [BEAM-6733] Add pipeline option to flush bundle data before checkpointing We had a couple of PRs in which we wanted to remove the buffering of bundle output during checkpointing: #7940 #9652. Ultimately, we didn't merge any of those because we weren't sure how the change would affect the checkpoint performance. As a better migration path, this introduces a pipeline option to change the default, buffering bundle output during checkpointing, to finishing the bundle and flushing all data before checkpointing. * Remove all answer placeholder checks as they can be confusing at times for some learners * Update course in Stepik * [BEAM-10018] Fix timestamps in windowing kata In this Kata, the timestamp was calculated from time objects, and converted to a timestamp in the local timezone. Thus, the results of the test depended on the configuration of the local timezone in the running system. The tests were hardcoded with a timezone different to mine, and thus I always failed to pass this Kata. The changes in this commit change the type in Event to be a datetime, the timestamps are set in UTC, and the output in the tests is hardcoded in UTC too. This should ensure that the kata works regardless the timezone configured in the system running the kata. * [BEAM-10018] Kata failing due to failed parsing Parsing the timestamps as strings using fromisoformat was failing, and the Kata failed silently regardless the code written in the boxes. This change sets the same timestamps, with UTC timezone, without parsing strings. * Convert html task description to md for "Hello Beam" and "Core Transforms/Map" * Remove unused import * Add missing dependency * Fix member variable name in Kata documentation * Fix placeholder location * Convert html task description to md for "Core Transforms" remaining lessons * Convert html task description to md for "Common Transforms" lessons * Convert html task description to md for remaining Python Katas lessons * Convert html task description to md for most of Java Katas lessons * Convert html task description to md for Java Katas "Common Transforms" lessons * Convert html task description to md for Java Katas "Core Transforms" lessons * [BEAM-2530] Implement Zeta SQL precommit compile tests and run on java 11 (#11692) [BEAM-2530] Implement Zeta SQL precommit compile tests and run on java 11 * Python3 fix - convert dict.keys() to list before indexing (#11733) * Updates google-apitools and httplib2 (#11726) * [BEAM-9964] Update CHANGES.md (#11743) Co-authored-by: Omar Ismail <omarismail@omarismail-macbookpro.roam.corp.google.com> * [BEAM-9577] Artifact v2 support for uber jars. (#11708) * Adds a "filesystem" for artifacts placed on the classpath (e.g. within the uberjar). * Updates the flink and spark uberjars to use artifact staging v2, leveraging the above filesystem. * Populate all SpannerIO batching parameters in display data. Add all the grouping/batching parameters in SpannerIO populateDisplayData(). * Fix capitalization, clarify descriptions * fix capitalization, clarify description Grouped * Refactor to extract single method for popuplating displayData * Convert html task description to md for "Hello Beam" and "Core Transforms/Map" * Convert html task description to md for "Core Transforms" remaining lessons * Convert html task description to md for "Common Transforms" lessons * Convert html task description to md for remaining Python Katas lessons * Convert html task description to md for most of Java Katas lessons * Convert html task description to md for Java Katas "Common Transforms" lessons * Convert html task description to md for Java Katas "Core Transforms" lessons * Resolve merge conflict * Update Python Katas on Stepik * Update Beam Katas Java on Stepik Co-authored-by: Yueyang Qiu <robinyqiu@gmail.com> Co-authored-by: Maximilian Michels <mxm@apache.org> Co-authored-by: Israel Herraiz <ihr@google.com> Co-authored-by: pawelpasterz <32893017+pawelpasterz@users.noreply.github.com> Co-authored-by: Chamikara Jayalath <chamikara@apache.org> Co-authored-by: tvalentyn <tvalentyn@users.noreply.github.com> Co-authored-by: Pablo <pabloem@users.noreply.github.com> Co-authored-by: omarismail94 <44980219+omarismail94@users.noreply.github.com> Co-authored-by: Omar Ismail <omarismail@omarismail-macbookpro.roam.corp.google.com> Co-authored-by: Andrew Pilloud <apilloud@users.noreply.github.com> Co-authored-by: Robert Bradshaw <robertwb@google.com> Co-authored-by: nielm <nielm@google.com> Co-authored-by: Brian Hulette <hulettbh@gmail.com> Co-authored-by: Brian Hulette <bhulette@google.com>
…ting We had a couple of PRs in which we wanted to remove the buffering of bundle output during checkpointing: apache#7940 apache#9652. Ultimately, we didn't merge any of those because we weren't sure how the change would affect the checkpoint performance. As a better migration path, this introduces a pipeline option to change the default, buffering bundle output during checkpointing, to finishing the bundle and flushing all data before checkpointing.
* Support ZetaSQL DATE type as a Beam LogicalType * [BEAM-6733] Add pipeline option to flush bundle data before checkpointing We had a couple of PRs in which we wanted to remove the buffering of bundle output during checkpointing: apache#7940 apache#9652. Ultimately, we didn't merge any of those because we weren't sure how the change would affect the checkpoint performance. As a better migration path, this introduces a pipeline option to change the default, buffering bundle output during checkpointing, to finishing the bundle and flushing all data before checkpointing. * Remove all answer placeholder checks as they can be confusing at times for some learners * Update course in Stepik * [BEAM-10018] Fix timestamps in windowing kata In this Kata, the timestamp was calculated from time objects, and converted to a timestamp in the local timezone. Thus, the results of the test depended on the configuration of the local timezone in the running system. The tests were hardcoded with a timezone different to mine, and thus I always failed to pass this Kata. The changes in this commit change the type in Event to be a datetime, the timestamps are set in UTC, and the output in the tests is hardcoded in UTC too. This should ensure that the kata works regardless the timezone configured in the system running the kata. * [BEAM-10018] Kata failing due to failed parsing Parsing the timestamps as strings using fromisoformat was failing, and the Kata failed silently regardless the code written in the boxes. This change sets the same timestamps, with UTC timezone, without parsing strings. * Convert html task description to md for "Hello Beam" and "Core Transforms/Map" * Remove unused import * Add missing dependency * Fix member variable name in Kata documentation * Fix placeholder location * Convert html task description to md for "Core Transforms" remaining lessons * Convert html task description to md for "Common Transforms" lessons * Convert html task description to md for remaining Python Katas lessons * Convert html task description to md for most of Java Katas lessons * Convert html task description to md for Java Katas "Common Transforms" lessons * Convert html task description to md for Java Katas "Core Transforms" lessons * [BEAM-2530] Implement Zeta SQL precommit compile tests and run on java 11 (apache#11692) [BEAM-2530] Implement Zeta SQL precommit compile tests and run on java 11 * Python3 fix - convert dict.keys() to list before indexing (apache#11733) * Updates google-apitools and httplib2 (apache#11726) * [BEAM-9964] Update CHANGES.md (apache#11743) Co-authored-by: Omar Ismail <omarismail@omarismail-macbookpro.roam.corp.google.com> * [BEAM-9577] Artifact v2 support for uber jars. (apache#11708) * Adds a "filesystem" for artifacts placed on the classpath (e.g. within the uberjar). * Updates the flink and spark uberjars to use artifact staging v2, leveraging the above filesystem. * Populate all SpannerIO batching parameters in display data. Add all the grouping/batching parameters in SpannerIO populateDisplayData(). * Fix capitalization, clarify descriptions * fix capitalization, clarify description Grouped * Refactor to extract single method for popuplating displayData * Convert html task description to md for "Hello Beam" and "Core Transforms/Map" * Convert html task description to md for "Core Transforms" remaining lessons * Convert html task description to md for "Common Transforms" lessons * Convert html task description to md for remaining Python Katas lessons * Convert html task description to md for most of Java Katas lessons * Convert html task description to md for Java Katas "Common Transforms" lessons * Convert html task description to md for Java Katas "Core Transforms" lessons * Resolve merge conflict * Update Python Katas on Stepik * Update Beam Katas Java on Stepik Co-authored-by: Yueyang Qiu <robinyqiu@gmail.com> Co-authored-by: Maximilian Michels <mxm@apache.org> Co-authored-by: Israel Herraiz <ihr@google.com> Co-authored-by: pawelpasterz <32893017+pawelpasterz@users.noreply.github.com> Co-authored-by: Chamikara Jayalath <chamikara@apache.org> Co-authored-by: tvalentyn <tvalentyn@users.noreply.github.com> Co-authored-by: Pablo <pabloem@users.noreply.github.com> Co-authored-by: omarismail94 <44980219+omarismail94@users.noreply.github.com> Co-authored-by: Omar Ismail <omarismail@omarismail-macbookpro.roam.corp.google.com> Co-authored-by: Andrew Pilloud <apilloud@users.noreply.github.com> Co-authored-by: Robert Bradshaw <robertwb@google.com> Co-authored-by: nielm <nielm@google.com> Co-authored-by: Brian Hulette <hulettbh@gmail.com> Co-authored-by: Brian Hulette <bhulette@google.com>
For Flink version <= 1.5 the Flink Runner had to buffer any elements which are
emitted during a snapshot because the barrier has already been emitted.
Flink version >= 1.6 provides a hook to execute an action before the snapshot
barrier is emitted by the operator. We can remove the buffering in favor of
finishing the current bundle in the DoFnOperator's prepareSnapshotPreBarrier.
This had previously been deferred (#7940) until removal of Flink 1.5 (#9632).
Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.