Skip to content

Conversation

@huwh
Copy link
Contributor

@huwh huwh commented Dec 13, 2022

What is the purpose of the change

This pull request makes ResourceAllocator declarative in order to reduce the coupling between the SlotManager and the ResourceManager

Brief change log

  • Introduce declareResourceNeeded into ResourceAllocator. ActiveResourceManager will allocate/release workers to meet the declared resources.
  • SlotManager use declareResourceNeeded instead of allocate/releaseResource to reduce the coupling between the SlotManager and the ResourceManager

Verifying this change

This change added tests and can be verified as follows:

  • Added tests that validates that ResourceManager will allocate/release workers to meet the declared resources.
  • Update the tests for SlotManager to use the declarative api.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes, ResourceManager/SlotManager)
  • The S3 file system connector: (no)

Documentation

@flinkbot
Copy link
Collaborator

flinkbot commented Dec 13, 2022

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@huwh
Copy link
Contributor Author

huwh commented Dec 13, 2022

@xintongsong Could you please help review this PR in your free time?

@huwh
Copy link
Contributor Author

huwh commented Dec 13, 2022

@flinkbot run azure

@huwh huwh force-pushed the feat/FLINK-29869 branch 2 times, most recently from 1941b91 to 9f068d5 Compare December 13, 2022 09:15
Copy link
Contributor

@xintongsong xintongsong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for opening this PR, @huwh. I have some questions. Please take a look.

@huwh huwh force-pushed the feat/FLINK-29869 branch 3 times, most recently from 6dfa917 to b9b572c Compare December 20, 2022 03:23
@huwh
Copy link
Contributor Author

huwh commented Dec 20, 2022

Thanks for your comment, @xintongsong, I made the changes

  1. delete UnwantedWorker and UnwantedWorkerWithResourceProfile. Make the unwanted workers as hint.
  2. add totalWorkerCounter to maintian the count of all workers which have resource spec(include recovered workers)
  3. remove the delayed check of declarationResource, since its cost is not very heavy.

@huwh
Copy link
Contributor Author

huwh commented Dec 22, 2022

@xintongsong thanks for the review. problems are addressed. In addition, ARM#requestWorkerIfRequired has been replaced by checkResourceDeclarations at this PR.

@huwh
Copy link
Contributor Author

huwh commented Dec 22, 2022

I have manually verified these tests below, with DeclarativeSlotManager and DeclarativeSlotManager

  • job with 2 parallism, flink cluster with 2 TaskManagers, each have 1 slot.
  • TaskManager could released when idle timeoued.
  • TaskManager could released when registered slots more than max.
  • New JobManager/RM will use the previous TM when JobManager failover.
  • New TaskManager will be requested when TaskManager terminated.
    • registered taskmanager terminated
    • allocated not registered taskmanager terminated
    • released by RM because of reaching resourcemanager.taskmanager-registration.timeout

Copy link
Contributor

@xintongsong xintongsong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@huwh, thanks for addressing the comments. I have squashed the previous fix-up commits to get a complete view of the changes. I think this PR is very close to a mergable state. I have only one concern regarding the semantic change of SlotManager#registerTaskManager, plus a few minor comments.

defaultWorkerResourceSpec, totalWorkerNum, new HashSet<>(unWantedWorkers)));
}

private void declareNeededResourcesWithDelay() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The xxxWithDelay pattern has been repeated 4 times (declarative/fine-grained slot manager, check resource requirement / declare needed resources), and there could be more in future. I wonder if it can be deduplicated as a util. Not necessary for this PR though. This PR is already quite complicated.

huwh added 4 commits December 26, 2022 14:12
…lareResourceNeeded to allocate/release resources.
…eDeclarations to check if a new worker needs to be requested when worker fails.
@huwh
Copy link
Contributor Author

huwh commented Dec 26, 2022

Thanks, @xintongsong, comments are addressed except the XXXWithDely, we can address this in another PR.

I force pushed the new commits since it is not easy to avoid conflict while move TestingResourceAllocator.java#declareResourceNeededConsumer to the previous commit.

Copy link
Contributor

@xintongsong xintongsong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @huwh. LGTM. Merging this once CI passes.

@huwh huwh deleted the feat/FLINK-29869 branch January 12, 2023 06:08
chucheng92 pushed a commit to chucheng92/flink that referenced this pull request Feb 3, 2023
…eDeclarations to check if a new worker needs to be requested when worker fails.

This closes apache#21496
akkinenivijay pushed a commit to krisnaru/flink that referenced this pull request Feb 11, 2023
…eDeclarations to check if a new worker needs to be requested when worker fails.

This closes apache#21496
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants