Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35165][runtime/coordination] AdaptiveBatch Scheduler should not restrict the default source parall… #24736

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

venkata91
Copy link
Contributor

@venkata91 venkata91 commented Apr 28, 2024

…elism to the max parallelism set

What is the purpose of the change

With AdaptiveBatchScheduler, the current behavior is if both execution.batch.adaptive.auto-parallelism.default-source-parallelism and execution.batch.adaptive.auto-parallelism.max-parallelism configurations are specified and if the execution.batch.adaptive.auto-parallelism.default-source-parallelism is greater than the execution.batch.adaptive.auto-parallelism.max-parallelism, the source parallelism is bounded to execution.batch.adaptive.auto-parallelism.max-parallelism.

  • Source vertex is unique and does not have any upstream vertices - Downstream vertices read shuffled data partitioned by key, which is not the case for the Source vertex
  • Limiting source parallelism by downstream vertices' max parallelism is incorrect.

For eg: In the case of, "High filter selectivity with huge amounts of data to read", this has the following issues:

  • Setting high "execution.batch.adaptive.auto-parallelism.max-parallelism" so that source parallelism can be set higher can lead to small blocks and sub-optimal performance.
    Setting high "execution.batch.adaptive.auto-parallelism.max-parallelism" requires careful tuning of network buffer configurations which is unnecessary in cases where it is not required just so that the source parallelism can be set high.

The proposed solution is to decouple the configs execution.batch.adaptive.auto-parallelism.default-source-parallelism and execution.batch.adaptive.auto-parallelism.max-parallelism and not bound the value of source parallelism to execution.batch.adaptive.auto-parallelism.max-parallelism.

Verifying this change

This change is already covered by existing tests, such as (please describe tests).

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@flinkbot
Copy link
Collaborator

flinkbot commented Apr 28, 2024

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@venkata91 venkata91 changed the title [FLINK-35165] AdaptiveBatch Scheduler should not restrict the default source parall… [FLINK-35165][runtime/coordination] AdaptiveBatch Scheduler should not restrict the default source parall… Apr 28, 2024
@venkata91 venkata91 marked this pull request as ready for review April 30, 2024 03:27
@venkata91
Copy link
Contributor Author

cc @SinBex and @JunRuiLee for reviews.

Copy link
Contributor

@SinBex SinBex left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks to Venkata for creating this PR. I have some comments, please take a look.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might need to verify the following case: when source vertex does not have maxParallelism set and the defaultSourceParallelism is greater than the globalMaxParallelism.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel that simply making changes here might not be adequate. We need to also consider the logic of the method AdaptiveBatchScheduler#computeVertexParallelismStoreForDynamicGraph. Imagine, when the source vertex uses globalMaxParallelism as its maxParallelism, if the inferred source parallelism is greater than the maxParallelism, then the method DefaultVertexParallelismInfo#setParallelism will throw an IllegalArgumentException.

Copy link
Contributor

@JunRuiLee JunRuiLee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @venkata91, for your contribution! After reviewing this PR, I'm concerned that it entirely removes limit that source parallelism should lower than source jobVertex's max parallelism. And I think the goal of this pr is ensure source parallelism isn't limited by config option execution.batch.adaptive.auto-parallelism.max-parallelism, but still respects the max parallelism of source jobVertex.

WDYT?

@venkata91
Copy link
Contributor Author

Thanks, @venkata91, for your contribution! After reviewing this PR, I'm concerned that it entirely removes limit that source parallelism should lower than source jobVertex's max parallelism. And I think the goal of this pr is ensure source parallelism isn't limited by config option execution.batch.adaptive.auto-parallelism.max-parallelism, but still respects the max parallelism of source jobVertex.

WDYT?

I think that makes sense. Basically what you're saying is if source's max parallelism is determined by the source itself which is < default-source-parallelism config, we should cap it by the source computed max parallelism correct? If so, I agree with that.

@JunRuiLee
Copy link
Contributor

Thanks, @venkata91, for your contribution! After reviewing this PR, I'm concerned that it entirely removes limit that source parallelism should lower than source jobVertex's max parallelism. And I think the goal of this pr is ensure source parallelism isn't limited by config option execution.batch.adaptive.auto-parallelism.max-parallelism, but still respects the max parallelism of source jobVertex.
WDYT?

I think that makes sense. Basically what you're saying is if source's max parallelism is determined by the source itself which is < default-source-parallelism config, we should cap it by the source computed max parallelism correct? If so, I agree with that.

Yes, that's correct.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants