Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-33341][state] Add support for rescaling from local keyed state #23591

Conversation

StefanRRichter
Copy link
Contributor

@StefanRRichter StefanRRichter commented Oct 25, 2023

What is the purpose of the change

This PR extends the use case for local (keyed) state from recovery to rescaling. Previously, Flink cannot make use of relevant local state in rescaling scenarios, even if such state would be available. For example, when scaling our we will often find already a superset of the state after scale out in the previously existing slots. And for scaling in, we should find a subset of the state in the existing slot. Utilizing such existing state can reduce the download from remote storage in rescaling scenarios.

Brief change log

  • Introduced AutoRescalingITCase as a variant of RescalingITCase, that uses checkpoint & changing resource requirements to trigger a rescale that preservers local state (like an autoscaler would), instead of using savepoint & restart (losing local state).
  • Preparatory refactoring, introducing AbstractIncrementalStateHandle as common superclass to IncrementalRemoteKeyedStateHandle and IncrementalLocalKeyedStateHandle. See next change for reason.
  • Improving RocksDBIncrementalRestoreOperation in such way that we can transparently rescale from IncrementalRemoteKeyedStateHandle and IncrementalLocalKeyedStateHandle (or a mix of both).
  • Changing PrioritizedOperatorSubtaskState to support creating an alternative as a mix of local and remote (=assigned from job manager) state, instead of only alternatives that are either purely local ore purely remote.

Verifying this change

This change added tests:

  • AutoRescalingITCase
  • Modified PrioritizedOperatorSubtaskStateTest

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

@StefanRRichter StefanRRichter marked this pull request as ready for review October 25, 2023 12:19
@flinkbot
Copy link
Collaborator

flinkbot commented Oct 25, 2023

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@StefanRRichter StefanRRichter force-pushed the srichter-local-rescaling-FLINK-33341 branch from 27090a0 to b8b8c48 Compare October 26, 2023 10:40
@StefanRRichter StefanRRichter changed the title [FLINK-33341][state] Add support for rescaling from local state [FLINK-33341][state] Add support for rescaling from local keyed state Oct 26, 2023
@StefanRRichter StefanRRichter force-pushed the srichter-local-rescaling-FLINK-33341 branch from b8b8c48 to aa59dc1 Compare October 26, 2023 10:42
Copy link
Contributor

@rkhachatryan rkhachatryan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice changes @StefanRRichter!
I've only spotted one small issue with equals, other things are mostly NITs.
Other than that LGTM.

Comment on lines 115 to 118
public int hashCode() {
int result = super.hashCode();
result = 31 * result + getMetaDataState().hashCode();
int result = directoryStateHandle.hashCode();
result = 31 * result + getKeyGroupRange().hashCode();
result = 31 * result + getMetaDataStateHandle().hashCode();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Doesn't equals need to be updated as well? I think it's currently doesn't match hashCode
  2. nit: move closer to equals?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, I think the hashCode/equals/toString in those classes should be consolidated. I'll create a separate commit to streamline them.

*
* @param jobManagerState the state that the task got assigned from the job manager (this
* state lives in remote storage).
* @param alternativesByPriority local alternatives to the job manager starte, ordered by
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo "starte"

*
* @param jobManagerState the state that the task got assigned from the job manager (this
* state lives in remote storage).
* @param alternativesByPriority local alternatives to the job manager starte, ordered by
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo "starte"

@StefanRRichter StefanRRichter force-pushed the srichter-local-rescaling-FLINK-33341 branch 2 times, most recently from ca38a5e to 31b79ef Compare October 27, 2023 09:45
@StefanRRichter StefanRRichter force-pushed the srichter-local-rescaling-FLINK-33341 branch from 31b79ef to 7013a86 Compare October 27, 2023 11:34
@StefanRRichter StefanRRichter merged commit a4ad86f into apache:master Oct 27, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants