Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Reshuffle implementation in Java SDK #28853

Merged
merged 1 commit into from
Jan 3, 2024

Conversation

kennknowles
Copy link
Member

@kennknowles kennknowles commented Oct 5, 2023

This fixes the Reshuffle default implementation to preserve all metadata and be a true no-op, whereas before it did not preserve the pane info.

Regarding update compatibility, my understanding of runners that have long-lived streaming jobs and perform updates, as of this writing:

  • Flink: overrides Reshuffle
  • Samza: overrides Reshuffle
  • Dataflow v1: overrides Reshuffle
  • Dataflow v2: executes default implementation

I went ahead and used the update compatibility flag anyhow just in case, so it can be reverted to exactly the old implementation on demand.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@kennknowles
Copy link
Member Author

R: @robertwb

@github-actions
Copy link
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control

@kennknowles
Copy link
Member Author

run java precommit

@kennknowles
Copy link
Member Author

The failure in the AWS2 tests was the same Spark resume from checkpoint test. It shouldn't even have been run in that suite. It looks like our invocations are a bit messed up (@damccorm FYI but I'm assuming this is just the same as it was in Jenkins).

@kennknowles
Copy link
Member Author

run java precommit

@kennknowles
Copy link
Member Author

I get the impression this somehow actually slows things down. Everything looks like a timeout. Or perhaps it just invalidates too much of the Gradle cache?

@kennknowles
Copy link
Member Author

run java precommit

@kennknowles
Copy link
Member Author

@robertwb finally green - I don't think this accomplishes everything from the dev thread, just the SDK implementation. It does not impact runners that override or specially translate reshuffle.

@kennknowles
Copy link
Member Author

run dataflow validatesrunner

@kennknowles
Copy link
Member Author

run flink validatesrunner

@kennknowles
Copy link
Member Author

run spark validatesrunner

@kennknowles
Copy link
Member Author

run samza validatesrunner

new DoFn<KV<K, ValueInSingleWindow<V>>, KV<K, V>>() {
@Override
public Duration getAllowedTimestampSkew() {
return Duration.millis(Long.MAX_VALUE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The runner doesn't read/act on this value anywhere, does it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noting that this is not a change but a move. But no the runner does not act on it. In fact it is unsafe for this reason - the runner may well drop data that you have allowed to be output.

.apply(
"RestoreOriginalWindows",
Window.into(new RestoreWindowsFn<>(originalStrategy.getWindowFn())))
.apply("RestoreOriginalTimestamps", new RestoreTimestamps<>())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about the PaneInfo information?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ha of course. The whole point.

I got caught up in the fact that I could not restore windows and timestamps in a single ParDo and had to do a rewindowing. But in fact I don't think there is a way to restore the paneinfo without changing the model after all to at least allow direct manipulation of it.

PCollection<KV<K, ValueInSingleWindow<V>>> reified =
input
.apply("SetIdentityWindow", rewindow)
.apply("ReifyOriginalMetadata", Reify.windowsInValue());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this break backwards compatibility for everything using a shuffle (though I concede it's fixing a bug)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had that same concern and was coming down on the side of "we should change this anyhow". BUT looking into it, every single runner directly implements reshuffle. So this is essentially just a fix for the reference implementation and the default for runners that don't implement it yet (like Dataflow v2).

So I actually need a ValidatesRunner test which I presume most runner will fail. But we don't have to worry about update incompatibility unless/until those runners come into compliance. I still think this change should happen.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we test that this does not break compatibility (even if the test is a one-off manual test)?

@kennknowles
Copy link
Member Author

OK, I added the capability to outputWindowedValue to the SDK (somewhat naively IDE-guided refactor) and used it in Create in order to write a basic test of what I think we want.

@kennknowles
Copy link
Member Author

Gotta go through and poke a few more files

@github-actions github-actions bot added the flink label Nov 30, 2023
@kennknowles kennknowles force-pushed the reshuffle branch 2 times, most recently from 2dfa6a0 to 657b7e4 Compare December 4, 2023 20:25
@robertwb
Copy link
Contributor

robertwb commented Dec 5, 2023

What's the status on this?

@kennknowles
Copy link
Member Author

java precommit timed out - perhaps due to cache invalidation due to modifying core SDK

@@ -669,6 +670,15 @@ public void output(RestrictionT part) {
public void outputWithTimestamp(RestrictionT part, Instant timestamp) {
throw new UnsupportedOperationException();
}

@Override
public void outputWindowedValue(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be easier to make this outputWindowedValue a default implementation for OutputReceiver to simplify this change for all the test implementations of OutputReceiver.

@kennknowles
Copy link
Member Author

The outputWindowedValue change has been merged independently, shrinking this one. I'll check it over to make sure the residual change makes sense and is adequately complete.

@kennknowles kennknowles force-pushed the reshuffle branch 2 times, most recently from 7817665 to 7a214a0 Compare December 18, 2023 16:34
@kennknowles
Copy link
Member Author

Failure is the error on master fixed by #29798 (using \n instead of %n in a format string)

@kennknowles
Copy link
Member Author

Noting that this was green but I went ahead and added the old expansion as an option using the new update compat flag. I am going to use the flag to test compatibility on Dataflow v1. We expect the override makes this change irrelevant, so we don't want to actually bug users. And on v2 we want the incompatibility.

@kennknowles
Copy link
Member Author

Added internal Dataflow test that reloads the pipeline with changing the compatibility flag.

@kennknowles kennknowles merged commit 8bb6610 into apache:master Jan 3, 2024
33 checks passed
@kennknowles kennknowles deleted the reshuffle branch January 3, 2024 15:41
@Abacn
Copy link
Contributor

Abacn commented Jan 10, 2024

It seems the newly added test testReshufflePreservesMetadata not working for all runners:

beam_PostCommit_Java_PVR_Samza: https://github.com/apache/beam/actions/runs/7399747764/job/20131866953

beam_PostCommit_Java_PVR_Spark3_Streaming: https://github.com/apache/beam/actions/runs/7399737943/job/20131836389

beam_PostCommit_Java_PVR_Spark3_Batch: https://github.com/apache/beam/runs/20340841429

beam_PostCommit_Java_ValidatesRunner_ULR also

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants