New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-8176][flip6] Start SubmittedJobGraphStore in Dispatcher #5107
Conversation
@@ -246,5 +258,11 @@ protected JobManagerRunner createJobManagerRunner( | |||
|
|||
return jobManagerRunner; | |||
} | |||
|
|||
@Override | |||
protected void runAsync(final Runnable runnable) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if I can set Akka's CallingThreadDispatcher
, this is not needed.
dispatcher.submitJob(jobGraph, timeout); | ||
|
||
// pretend that other Dispatcher has removed job from submittedJobGraphStore | ||
dispatcher.onRemovedJobGraph(TEST_JOB_ID); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test is not thread-safe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interacting with the dispatcher via the self gateway should help.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. After changing the tests to use the DispatcherGateway
we can merge it.
try { | ||
submittedJobGraph = submittedJobGraphStore.recoverJobGraph(jobId); | ||
} catch (final Exception e) { | ||
log.error("Could not submit job {}.", jobId, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe "could not recover job".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed it.
dispatcher.submitJob(jobGraph, timeout); | ||
|
||
// pretend that other Dispatcher has removed job from submittedJobGraphStore | ||
dispatcher.onRemovedJobGraph(TEST_JOB_ID); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interacting with the dispatcher via the self gateway should help.
|
||
@Override | ||
void recoverJobs() { | ||
if (recoverJobsEnabled.get()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without this I do not see how I can verify whether a job was submitted regularly or via recoverJobs
.
…bManagerHARecoveryTest
…Dispatcher Call start() on SubmittedJobGraphStore with Dispatcher as listener. To enable this, the dispatcher must implement the SubmittedJobGraphListener interface. Add simple unit tests for the new methods. Refactor DispatcherTest to remove redundancy.
…oc end with period
…tingHighAvailabilityServices
Check if jobManagerRunner exists before submitting job. Replace JobManagerRunner mock used in tests with real instance. Do not run job graph recovery in actor main thread when job graph is recovered from SubmittedJobGraphListener#onAddedJobGraph(JobID).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes look good to me. Thanks for your contribution @GJL. I only had minor comments. After addressing them, this PR is good to be merged.
@Override | ||
public void onRemovedJobGraph(final JobID jobId) { | ||
runAsync(() -> { | ||
if (jobManagerRunners.containsKey(jobId)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check should be superfluous because it will be check in removeJob
as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
return; | ||
} | ||
runAsync(() -> { | ||
if (!jobManagerRunners.containsKey(jobId)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this check is not strictly needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
TestingFatalErrorHandler fatalErrorHandler = new TestingFatalErrorHandler(); | ||
@Before | ||
public void setUp() throws Exception { | ||
MockitoAnnotations.initMocks(this); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No longer needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch.
private boolean started; | ||
|
||
@Override | ||
public synchronized void start(@Nullable SubmittedJobGraphListener jobGraphListener) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should also trigger SubmittedJobGraphListener
notifications when a job is added or removed from this store to make it behave similarly to the ZooKeeperSubmittedJobGraphStore
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if it is actually applicable. onRemovedJobGraph
explicitly demands that the graph is removed by a different SubmittedJobGraphStore
instance:
/**
* Callback for {@link SubmittedJobGraph} instances removed by a different {@link
* SubmittedJobGraphStore} instance.
*
* @param jobId The {@link JobID} of the removed job graph
*/
void onRemovedJobGraph(JobID jobId);
*/ | ||
@Test | ||
public void testSubmittedJobGraphListener() throws Exception { | ||
dispatcher.recoverJobsEnabled.set(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this to be set to false
? Recover all jobs should only be triggered in case of a leadership grant to the dispatcher which only happens at the very beginning of this test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not so predictable when the main logic of recoverJobs()
is scheduled because it does not run in the main thread:
/**
* Recovers all jobs persisted via the submitted job graph store.
*/
@VisibleForTesting
void recoverJobs() {
log.info("Recovering all persisted jobs.");
getRpcService().execute(
() -> {
final Collection<JobID> jobIds;
...
When the test adds something to the job graph store, it could potentially be submitted by recoverJobs
, which is something I need to avoid.
…lbacks Always attempt the job submission if onAddedJobGraph or onRemovedJobGraph are called. The checks in submitJob and removeJob are sufficient.
Changes look good. Will merge this. |
Implement SubmittedJobGraphListener interface in Dispatcher Call start() on SubmittedJobGraphStore with Dispatcher as listener. To enable this, the dispatcher must implement the SubmittedJobGraphListener interface. Add simple unit tests for the new methods. Refactor DispatcherTest to remove redundancy. [FLINK-8176][flip6] Make InMemorySubmittedJobGraphStore thread-safe [FLINK-8176][flip6] Add method isStarted() to TestingLeaderElectionService [FLINK-8176][flip6] Return same RunningJobsRegistry instance from TestingHighAvailabilityServices [FLINK-8176][flip6] Fix race conditions in Dispatcher and DispatcherTest Check if jobManagerRunner exists before submitting job. Replace JobManagerRunner mock used in tests with real instance. Do not run job graph recovery in actor main thread when job graph is recovered from SubmittedJobGraphListener#onAddedJobGraph(JobID). [FLINK-8176][flip6] Rename variables in DispatcherTest [FLINK-8176][flip6] Remove injectMocks in DispatcherTest [FLINK-8176][flip6] Update Dispatcher's SubmittedJobGraphListener callbacks Always attempt the job submission if onAddedJobGraph or onRemovedJobGraph are called. The checks in submitJob and removeJob are sufficient. This closes apache#5107.
What is the purpose of the change
The FLIP-6 dispatcher never calls
start()
on its SubmittedJobGraphStore instance. Hence, when a Job is submitted (YARN session mode with HA enabled), an IllegalStateException is thrown. This pull request adds the necessary changes so that jobs can be submitted.Brief change log
Verifying this change
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation
CC: @tillrohrmann