Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-10809][state] Include keyed state that is not from head operat… #7048

Closed

Conversation

StefanRRichter
Copy link
Contributor

…ors in state assignment

What is the purpose of the change

This PR includes keyed state that was not from a head operator (head of operator chain) in the state assignment. This fixes problems with restoring keyed state for operators after DataStreamUtils.reinterpretAsKeyedStream.

Brief change log

Remove a check if keyed state is from a head operator in the state assignment algorithm. This was an optimization from the times where Flink only allowed keyed state in the head operators (like what happens after every keyBy).

Verifying this change

Extended ReinterpretDataStreamAsKeyedStreamITCase with a recovery cycle to test proper state restore of non-head operators.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? ( no)
  • If yes, how is the feature documented? (not applicable)

Copy link
Contributor

@tzulitai tzulitai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @StefanRRichter. I have some questions to understand the change.

@@ -156,15 +165,22 @@ public void invoke(Tuple2<Integer, Integer> value, Context context) throws Excep
}
}

private static class FromPartitionFileSource extends RichParallelSourceFunction<Tuple2<Integer, Integer>> {
private static class FromPartitionFileSource extends RichParallelSourceFunction<Tuple2<Integer, Integer>>
implements CheckpointedFunction, CheckpointListener {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs proper indentation

}
} catch (EOFException ignore) {
while (!isRestored) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This while loop is a bit confusing for me.
Isn't isRestored set only once in initializeState?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and that is ok. The loop is only there to prevent that the source will terminate before a successful checkpoint. After the failover, when isRestore == true, we allow that the source can terminate directly to finish the job.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't then the checkFail call alone be enough? The isRestored flag is already checked there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I guess we can just check it once in the beginning as an if.

sumState = context.getOperatorStateStore().getListState(
new ListStateDescriptor<>("sumState", Integer.class));

if (context.isRestored()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where do we check whether or not we actually have restore something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Through through the restore flag and a validating sink? If state was dropped, the validation would go wrong.

@tzulitai
Copy link
Contributor

@StefanRRichter I only have one final comment about the while loop.
Other than that, this LGTM 👍

@StefanRRichter
Copy link
Contributor Author

Thanks for the review @tzulitai! Merging.

asfgit pushed a commit that referenced this pull request Nov 12, 2018
…ors in state assignment

This closes #7048.

Signed-off-by: Stefan Richter <s.richter@data-artisans.com>
@asfgit asfgit closed this in bf760f9 Nov 12, 2018
asfgit pushed a commit that referenced this pull request Nov 12, 2018
…ors in state assignment

This closes #7048.

Signed-off-by: Stefan Richter <s.richter@data-artisans.com>
JTaky pushed a commit to JTaky/flink that referenced this pull request Dec 26, 2018
…ors in state assignment

This closes apache#7048.

Signed-off-by: Stefan Richter <s.richter@data-artisans.com>
ashangit pushed a commit to criteo-forks/flink that referenced this pull request Dec 31, 2018
…ors in state assignment

This closes apache#7048.

Signed-off-by: Stefan Richter <s.richter@data-artisans.com>
tisonkun pushed a commit to tisonkun/flink that referenced this pull request Jan 17, 2019
…ors in state assignment

This closes apache#7048.

Signed-off-by: Stefan Richter <s.richter@data-artisans.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants