-
Notifications
You must be signed in to change notification settings - Fork 13k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-27204][flink-runtime] Refract FileSystemJobResultStore to execute I/O operations on the ioExecutor #22341
Conversation
c655436
to
c1e8c3c
Compare
39ec75a
to
3760c90
Compare
Hi @WencongLiu, sorry for the late response. I found time to look into you proposal now. The initial intention of FLINK-27204 was to have the async functionality being hidden in
* The async calls in the runtime module usually have the formate The Does that sound reasonable to you? Let me know if you have more questions. |
@XComp Thanks for your reply! 😀 I'll follow the suggestions. |
14a4c03
to
abb0fcd
Compare
abb0fcd
to
3547aaa
Compare
@XComp I have made a round of changes. Please take a look at it when you have time. 😃 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @WencongLiu . I did a pass over the change and added a few comments. PTAL
Just letting you know. I'm on vacation for the next two weeks. Therefore, don't expect any responses in that time. ...just to help you coordinating your work. :-)
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStore.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStore.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/flink/runtime/highavailability/AbstractThreadsafeJobResultStore.java
Show resolved
Hide resolved
...rc/main/java/org/apache/flink/runtime/highavailability/AbstractThreadsafeJobResultStore.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
Outdated
Show resolved
Hide resolved
...ava/org/apache/flink/runtime/dispatcher/runner/JobDispatcherLeaderProcessFactoryFactory.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcess.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingJobResultStore.java
Outdated
Show resolved
Hide resolved
...time/src/test/java/org/apache/flink/runtime/highavailability/JobResultStoreContractTest.java
Outdated
Show resolved
Hide resolved
90d7f95
to
318edb4
Compare
@XComp I have made a round of changes. Please take a look at it when you have time. 😄 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for getting back to you that late, @WencongLiu . I haven't found the time till now to go through open PullRequest. Anyway, I went over the changes once more. I added a few comments. PTAL
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStore.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStore.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStore.java
Outdated
Show resolved
Hide resolved
...ava/org/apache/flink/runtime/dispatcher/runner/JobDispatcherLeaderProcessFactoryFactory.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
Outdated
Show resolved
Hide resolved
...runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
Outdated
Show resolved
Hide resolved
...est/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStoreContractTest.java
Outdated
Show resolved
Hide resolved
...va/org/apache/flink/runtime/highavailability/FileSystemJobResultStoreFileOperationsTest.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
Outdated
Show resolved
Hide resolved
318edb4
to
3118ded
Compare
Thanks for your careful review 😄 @XComp. Sorry for the first round of pull request because it's a bit rough. Please take a look when you have time. |
584c7a7
to
6ef6f2c
Compare
6ef6f2c
to
e3a4970
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @WencongLiu for addressing my comments. I went over the code once more. PTAL.
I might do another pass over it to address the issue in JobMasterServiceLeadershipRunner#verifyJobSchedulingStatusAndCreateJobMasterServiceProcess
next week.
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
Outdated
Show resolved
Hide resolved
flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/flink/runtime/highavailability/AbstractThreadsafeJobResultStore.java
Show resolved
Hide resolved
...rc/main/java/org/apache/flink/runtime/highavailability/AbstractThreadsafeJobResultStore.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/flink/runtime/highavailability/AbstractThreadsafeJobResultStore.java
Outdated
Show resolved
Hide resolved
...in/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedJobResultStore.java
Show resolved
Hide resolved
...va/org/apache/flink/runtime/highavailability/FileSystemJobResultStoreFileOperationsTest.java
Outdated
Show resolved
Hide resolved
...va/org/apache/flink/runtime/highavailability/FileSystemJobResultStoreFileOperationsTest.java
Show resolved
Hide resolved
...va/org/apache/flink/runtime/highavailability/FileSystemJobResultStoreFileOperationsTest.java
Outdated
Show resolved
Hide resolved
...time/src/test/java/org/apache/flink/runtime/highavailability/JobResultStoreContractTest.java
Outdated
Show resolved
Hide resolved
4eccd9b
to
107b36a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM already. I'm gonna pass over the JobMasterServiceLeadershipRunner
change tomorrow.
...va/org/apache/flink/runtime/highavailability/FileSystemJobResultStoreFileOperationsTest.java
Outdated
Show resolved
Hide resolved
...ntime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, we're almost there. 👍 I added a few (mostly cosmetic comments). PTAL
...ntime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java
Outdated
Show resolved
Hide resolved
...ntime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java
Outdated
Show resolved
Hide resolved
...ntime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java
Outdated
Show resolved
Hide resolved
...ntime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java
Outdated
Show resolved
Hide resolved
...ntime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java
Outdated
Show resolved
Hide resolved
...ntime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java
Outdated
Show resolved
Hide resolved
...ntime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java
Outdated
Show resolved
Hide resolved
Thanks for your patient review. @XComp I have added a fixup commit. PTAL. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool, thanks. We're getting closer. I have a few cosmetic comments. PTAL
...ntime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java
Outdated
Show resolved
Hide resolved
...ntime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java
Outdated
Show resolved
Hide resolved
...ntime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java
Outdated
Show resolved
Hide resolved
446249e
to
fdfa611
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @WencongLiu . I only have a few cosmetic proposals. But it looks good now. :-)
One additional thing: Could you fix the TODO in JobMasterServiceLeadershipRunner:171ff in a hotfix commit. I just noticed that we forgot to clean that up 😇
...ntime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java
Outdated
Show resolved
Hide resolved
...ntime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java
Outdated
Show resolved
Hide resolved
...ntime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java
Show resolved
Hide resolved
...ntime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java
Outdated
Show resolved
Hide resolved
fdfa611
to
4ed0b73
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great. Thanks very much for your effort. One last thing is there to be fixed.
When you're done with that, you can re-organize/squash the commits properly and rebase the branch. I will do a final pass over it after this is done. 👍
...ntime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java
Outdated
Show resolved
Hide resolved
705c6c9
to
249e0e9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for reorganizing the commits. In my final pass over the change, I realized that we missed one location where we should switch to the CompletableFuture
handling instead of blocking the call. Sorry for missing that earlier. I marked the code snippet below. PTAL.
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
Outdated
Show resolved
Hide resolved
...ntime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java
Show resolved
Hide resolved
249e0e9
to
a0e0688
Compare
...ntime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java
Outdated
Show resolved
Hide resolved
016b136
to
5a7329d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome. We're done here in my opinion. Good job. 👍 I pushed some minor fixes (see comparison). The changes can be merged to master
as soon as the release-1.18
branch is created.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CI is failing due to us not acknowledging the requirement that any state access in the Dispatcher
has to happen in the main thread (to ensure sequential execution of any Dispatcher
logic). Please see my comment below for further details.
return FutureUtils.completedExceptionally( | ||
DuplicateJobSubmissionException.ofGloballyTerminated( | ||
jobID)); | ||
} else if (jobManagerRunnerRegistry.isRegistered(jobID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} else if (jobManagerRunnerRegistry.isRegistered(jobID) | |
} else if (jobManagerRunnerRegistry.isRegistered(jobID) |
Ok, that's a tricky one which I missed: The Dispatcher has one requirement: Any state access needs to happen in the main thread of the Dispatcher. There's a special implementation of JobManagerRunnerRegistry
that ensures this invariant (see OnMainThreadJobmanagerRunnerRegistry). Our change (with the thenCompose
being chained with the future that's returned by isInGloballyTerminalState
) is going against this requirement. Why?
isInGloballyTerminalState
calls jobResultStore.hasJobResultEntryAsync
which executes the logic on the ioExecutor
(i.e. a thread for IO operations which is not the Dispatcher's main thread) internally. The returned future is linked to this executor, i.e. any chained CompletableFuture
calls will run in the same thread. The thenCompose
logic is, therefore, also executed in the ioExecutor
instead of the main thread.
To workaround this, we have to change the executor for the chained execution. This can be achieved by using thenComposeAsync
, instead. Here we would specify the main thread executor by calling getMainThreadExecutor
. One example where it's done like that is Dispatcher:619: The cleanupAsync
method is executed on the ioExecutor
. But the error handling has to happen in the main thread again. That's where we use handleAsync
with getMainThreadExecutor
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your detailed explanation! 😄 I've modified the thenCompose
to thenComposeAsync
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You didn't do a pull before adding the changes (I did a force-push to include a few minor changes previous). These changes were reverted with your most-recent push
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😼 I have added the changes in the comparison. Really sorry for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No worries. Let's see whether CI becomes 🟢 this time. 🤞
ee05e69
to
58575f4
Compare
…ons on the ioExecutor
…viceLeadershipRunner
58575f4
to
f59c00e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CI is green. 👍 Thanks again :)
What is the purpose of the change
At present, FileSystemJobResultStore executes I/O operations through FileSystem directly. We should refract the interface of JobResultStore to make I/O operations be executed asynchronously. This would move the responsibility of I/O operation from the Dispatcher into the JobResultStore.
Brief change log
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (no)Documentation