-
Couldn't load subscription status.
- Fork 13.7k
[FLINK-4273] Modify JobClient to attach to running jobs #2313
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| val listeningBehaviour: ListeningBehaviour, | ||
|
|
||
| var client: ActorRef, | ||
| var listeningBehaviour: ListeningBehaviour, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might want to allow multiple clients here. Otherwise only the least recent registered client will receive updates.
7636aea to
3164cc5
Compare
|
CC @rmetzger @tillrohrmann Could you please take a look? I would like to merge this. Tests are passing: https://travis-ci.org/mxm/flink/builds/151653198 |
| Future<Object> submissionFuture = Patterns.ask( | ||
| jobClientActor, | ||
| new JobClientMessages.SubmitJobAndWait(jobGraph), | ||
| new Timeout(AkkaUtils.INF_TIMEOUT())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason not to use the default "akka.ask.timeout" here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's not possible because the JobClientActor will complete this future with the result of the job execution which may be be infinitely delayed. In all other cases (i.e. timeout to register at jobmanager, failure to attach to job, failure to submit job), the JobClientActor will complete the future with a failure message.
|
I did a quick pass over the code. I think this change needs another review by our Actor expert @tillrohrmann ;) |
| // retrieve classloader first before doing anything | ||
| ClassLoader classloader; | ||
| try { | ||
| classloader = retrieveClassLoader(jobID, jobManagerGateWay, configuration, timeout); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if the JobManager has already changed at this point? We would no longer be able to retrieve the ClassLoader, wouldn't we?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, this code assumes that the JobManager doesn't change between retrieving the leading jobmanager and retrieving the class loader. There is always some possible gap where the jobmanager could change. We could mitigate this by retrying in case is has changed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I think that would be good. The user code class loader should always be retrievable if the job is still running.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I'm not really sure about this corner case. We don't typically retry client side operations in case the leader has changed after retrieving it. Instead, we just throw an error (see all the methods in ClusterClient). The JobClientActor is exceptional in this regard and it has to be because it operates independently of the user function.
So we could fail if we can't reconstruct the class loader. That of course has the caveat that even if the user doesn't use custom classes for the JobExecutionResult or Exceptions, the job retrieval may fail (e.g. firewall blocking the blobManager port). That's why I didn't want to enforce this step but we could enforce it and fix eventual problems with the BlobManager communication if there are any.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that's the question: Shall we fail or try to perform on a best effort basis. If you have user code classes in your result, then the deserialization will fail later on, right? In this case, it would be better imo that the user tries the operation again because the failure might have been caused by a leader change. On the other hand you might only be interested in the cancel, stop job commands and are not interested in the deserialized result.
Would it be possible that we first connect to the JobManager and only if we want to wait for the job result we try to reconstruct the classloader? If that fails, then we throw an exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In addition to the result, you'll also need the class loader for getting accumulators of a running job.
I agree that it would be nice to fail when the class loader can't be reconstructed, but only if it is really the only option. So we could start off with the class loader set to None in the JobListeningContext. When the class loader is needed, i.e. accumulator retrieval or job execution result retrieval, it is fetched.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that could be a good solution :-)
|
Good work @mxm. I made some minor comments inline. Just for my own clarification: Is it still planned to have a new kind of The test case After addressing the comments +1 for merging. |
|
Thanks for the review @tillrohrmann. Yes, the plan is to have a |
|
@tillrohrmann I've refactored the JobClientActor to include the common code in |
|
@tillrohrmann Pinging the actor now to check if it is still alive. Also added another test case for that. |
|
I've made the last changes concerning the lazy reconstruction of the class loader we discussed. Rebased to master. Should be good to go now. |
0b92621 to
b7c6787
Compare
| Await.result( | ||
| Patterns.ask( | ||
| jobClientActor, | ||
| JobClientMessages.getPing(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can also use Akka's build-in message Identify to do the same. Then we don't have to introduce a new message type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea.
|
Updated according to our comment discussion. |
|
Merging this if there are no further comments. |
| Timeout.durationToTimeout(AkkaUtils.getDefaultTimeout())), | ||
| AkkaUtils.getDefaultTimeout()); | ||
| Await.ready(jobSubmissionFuture, askTimeout); | ||
| } catch (Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can narrow down this exception here. Should be good to catch TimeoutException.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We throw the exception anyways afterwards. The only difference is that we wrap the exception and throw only if the future has not been completed in the meantime.
We would have to catch InterruptedException, TimeoutException, and IllegalArgumentException. I'm not convinced this is necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But at least an IllegalArgumentException should not trigger the pinging of the job client actor. This should be handled differently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I thought you were commenting on the inner catch block. Yes, it makes sense to catch only TimeoutException and InterruptedException here for Await.ready. Await.result actually throws Exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
These changes are required for FLINK-4272 (introduce a JobClient class for job control). Essentially, we want to be able to re-attach to a running job and monitor it. It shouldn't make any difference whether we just submitted the job or we recover it from an existing JobID. This PR modifies the JobClientActor to support two different operation modes: a) submitJob and monitor b) re-attach to job and monitor The JobClient class has been updated with methods to access this functionality. Before the class just had `submitJobAndWait` and `submitJobDetached`. Now, it has the additional methods `submitJob`, `attachToRunningJob`, and `awaitJobResult`. The job submission has been split up in two phases: 1a. submitJob(..) Submit job and return a future which can be completed to get the result with `awaitJobResult` 1b. attachToRunningJob(..) Re-attach to a runnning job, reconstruct its class loader, and return a future which can be completed with `awaitJobResult` 2. awaitJobResult(..) Blocks until the returned future from either `submitJob` or `attachToRunningJob` has been completed - split up JobClientActor into a base class and two implementations - JobClient: on waiting check JobClientActor liveness - lazily reconstruct user class loader - add additional tests for JobClientActor - add test case to test resuming of jobs This closes apache#2313
|
Rebased to the changes on master. Merging after tests pass again. |
|
Thanks for helpful review @tillrohrmann and @rmetzger. |
These changes are required for FLINK-4272 (introduce a JobClient class for job control). Essentially, we want to be able to re-attach to a running job and monitor it. It shouldn't make any difference whether we just submitted the job or we recover it from an existing JobID.
This PR modifies the JobClientActor to support two different operation modes: a) submitJob and monitor b) re-attach to job and monitor
The JobClient class has been updated with methods to access this functionality. Before the class just had
submitJobAndWaitandsubmitJobDetachd. Now, it has the additional methodssubmitJob,attachToRunningJob, andawaitJobResult.awaitJobResultawaitJobResultsubmitJoborattachToRunningJobhas been completed