Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-18685] [runtime] Make MiniClusterJobClient#getAccumulators non-blocking in Streaming mode #14558

Closed

Conversation

echauchot
Copy link
Contributor

What is the purpose of the change

Make MiniClusterJobClient#getAccumulators() non-blocking in Streaming mode

Brief change log

Get the serialized accumulators from the execution graph rather than getting the accumulators from the JobExecutionResult.

Verifying this change

Changed AccumulatorLiveITCase so that the verification of the accumulators is done by calling getAccumulators() with both ClusterClient (as it was in previous test version) and MiniClusterJobClient

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? it was a bug that was not compliant with the javadocs. Now it is, so javadoc is untouched by this PR.

@flinkbot
Copy link
Collaborator

flinkbot commented Jan 5, 2021

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit aa00e2a (Fri May 28 08:14:28 UTC 2021)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Jan 5, 2021

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build
  • @flinkbot run azure re-run the last Azure build

@echauchot
Copy link
Contributor Author

R:@rmetzger. There are some test failures that I missed during my local tests. I'll ping you when this PR is ready for review.

@echauchot echauchot force-pushed the FLINK-18685-blockingGetAccumulators branch from 4480110 to 5543d0d Compare January 8, 2021 15:15
@echauchot
Copy link
Contributor Author

echauchot commented Jan 8, 2021

@rmetzger I fixed one of the failing tests. But for UnalignedCheckpointCompatibilityITCase savepoint tests, I know we need to get the accumulators from JobExecutionResult for the test to work. But I struggle to find the proper condition when to get the accumulators from JobExecutionResult rather than getting them from the ExecutionGraph as usual. I don't know Flink enough yet, can you give me a clue on this condition?

//TODO: this is not the only case when we need to get the accumulators from JobExecutionResult.
// It is needed also for UnalignedCheckpointCompatibilityITCase savepoints tests to pass.
// What is the complete condition ?
if (!miniCluster.isRunning()) {
Copy link
Contributor

@gaoyunhaii gaoyunhaii Jan 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if the miniCluster.isRunning() might not be the proper condition ? Since a mini-cluster could be viewed as a cluster that could run multiple jobs, the miniCluster would still be able to run after the job is done logically. Perhaps we could use jobResultFuture.isDone() ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree miniCluster.isRunning() is more a side effect of PerJobMiniClusterFactoryTest.testJobExecution stopping the minicluster when the job is done. jobResultFuture.isDone() is definitely a better condition.

@gaoyunhaii
Copy link
Contributor

@rmetzger I fixed one of the failing tests. But for UnalignedCheckpointCompatibilityITCase savepoint tests, I know we need to get the accumulators from JobExecutionResult for the test to work. But I struggle to find the proper condition when to get the accumulators from JobExecutionResult rather than getting them from the ExecutionGraph as usual. I don't know Flink enough yet, can you give me a clue on this condition?

Hi @echauchot, from my point of view, there would be two possible semantics when users want to acquire accumulators:

  1. Users want to retrieve the current value of accumulators immediately.
  2. Users knows the job is bounded and want to acquire the final accumulator values after the job is finished.

Previously getAccumulator() takes the second semantics, and now we want to move it to the first semantics to make it consistent with the other client implementation. Therefore, if users still want to use the second semantics, I think they would need to call getJobExecutionResult().thenApply(JobExecutionResult::getAllAccumulatorResults) explicitly. Thus for me to fix the failed tests we need to modify the tests since we would not be able to distinguish the first and second semantics automatically. What do you think about it ?

@echauchot echauchot force-pushed the FLINK-18685-blockingGetAccumulators branch from 5543d0d to 792763a Compare January 14, 2021 11:07
@echauchot
Copy link
Contributor Author

@rmetzger I fixed one of the failing tests. But for UnalignedCheckpointCompatibilityITCase savepoint tests, I know we need to get the accumulators from JobExecutionResult for the test to work. But I struggle to find the proper condition when to get the accumulators from JobExecutionResult rather than getting them from the ExecutionGraph as usual. I don't know Flink enough yet, can you give me a clue on this condition?

Hi @echauchot, from my point of view, there would be two possible semantics when users want to acquire accumulators:

  1. Users want to retrieve the current value of accumulators immediately.
  2. Users knows the job is bounded and want to acquire the final accumulator values after the job is finished.

Previously getAccumulator() takes the second semantics, and now we want to move it to the first semantics to make it consistent with the other client implementation. Therefore, if users still want to use the second semantics, I think they would need to call getJobExecutionResult().thenApply(JobExecutionResult::getAllAccumulatorResults) explicitly. Thus for me to fix the failed tests we need to modify the tests since we would not be able to distinguish the first and second semantics automatically. What do you think about it ?

@gaoyunhaii thanks for your comments. I totally agree with them. I also concluded that the only way was to change UnalignedCheckpointCompatibilityITCase to wait for the job end. Thanks for the confirmation !

@echauchot echauchot force-pushed the FLINK-18685-blockingGetAccumulators branch from 792763a to c875ca9 Compare January 14, 2021 11:15
@echauchot
Copy link
Contributor Author

@flinkbot run azure

@echauchot
Copy link
Contributor Author

@flinkbot run travis

@echauchot
Copy link
Contributor Author

@rmetzger this PR is ready for review PTAL.

@echauchot echauchot force-pushed the FLINK-18685-blockingGetAccumulators branch from c875ca9 to aa00e2a Compare January 14, 2021 12:07
@echauchot
Copy link
Contributor Author

Friendly ping: @rmetzger do you have time to review/merge this PR? if not, can you ping someone to look ? Thanks.

@echauchot
Copy link
Contributor Author

@tillrohrmann can you please take a look at this PR as I saw you in the git history of this part of code ? Thanks

@rmetzger
Copy link
Contributor

@echauchot I'm really sorry for the delay. I'll take a look at this PR today!

@echauchot
Copy link
Contributor Author

@rmetzger don't worry, I know what it is :)

Copy link
Contributor

@rmetzger rmetzger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for this fix!

I also manually verified the fix according to the Jira description.

I'll merge this now!

@rmetzger rmetzger closed this in beae8e7 Jan 27, 2021
@echauchot echauchot deleted the FLINK-18685-blockingGetAccumulators branch February 3, 2021 11:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants