-
Notifications
You must be signed in to change notification settings - Fork 13k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-4272] Create a JobClient for job control and monitoring #2732
Conversation
Rebased to the latest changes on the master. |
Also includes: [FLINK-4274] Expose new JobClient in the DataSet/DataStream API - rename JobClient class to JobClientActorUtils - introduce JobClient interface with two implementations - JobClientEager: starts an actor system right away and monitors the job - Move ClusterClient#cancel, ClusterClient#stop, ClusterClient#getAccumulators to JobClient - JobClientLazy: starts an actor system when requests are made by encapsulating the eager job client - Java and Scala API - JobClient integration - introduce ExecutionEnvironment#executeWithControl() - introduce StreamExecutionEnvironment#executeWithControl() - report errors during job execution as JobExecutionException instead of ProgramInvocationException and adapt test cases - provide finalizers to run code upon shutdown of client - use ActorGateway in JobListeningContext - add test case for JobClient implementations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for you contribution @mxm. It's good to refactor how the client interacts with a running job.
I think that we should revisit the public interface of JobClient
in order to decide which functionality we actually want to expose. I made some comments inline.
I think it would also be good to decouple the JobClient
and then also the ClusterClient
from the underlying RPC implementation. At the moment it is tightly coupled with Akka and the ActorGatways
. Having Flip-6 in mind, it would be good to have an abstraction which hides these details. Otherwise, the newly introduced code has to be rewritten for Flip-6 again.
stop(); | ||
boolean sysoutPrint = isPrintingStatusDuringExecution(); | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
two line linebreak
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you.
try { | ||
stop(); | ||
} catch (Exception e) { | ||
throw new RuntimeException("Failed to run cleanup", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will crash the JobClientEager
when calling JobClientEager.shutdown
. Is this intended?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, catching exceptions per finalizer would make sense.
Runnable cleanup = new Runnable() { | ||
@Override | ||
public void run() { | ||
if (shutDownAtEnd) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't we move this if condition out of the runnable and only add the clean up runnable if shutDownAtEnd == true
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could but it wouldn't make any semantic difference since the enclosed variable must be final. Ok granted, it would spare as one object allocation but that's a minor benefit.
try { | ||
stop(); | ||
} catch (Exception e) { | ||
throw new RuntimeException("Failed to clean up.", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here with the exception. I think it is not a good practice to masquerade checked exceptions as unchecked exceptions, because it makes it violates the contract defined by the Runnable
interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fine, then we need something like Runnable with a checked exception signature.
new Runnable() { | ||
@Override | ||
public void run() { | ||
if (shutDownAtEnd) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe moving this out of the runnable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This closure should be fine since Java demands the variable to be final.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The closure is fine, but by moving this out of the runnable, we can save to register Runnables
if we don't need them.
|
||
import java.util.Map; | ||
|
||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No JavaDoc comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing *
.
/* | ||
* An Flink job client interface to interact with running Flink jobs. | ||
*/ | ||
public interface JobClient { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PublicEvolving
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to be able to retrieve the ClusterClient
from the JobClient
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
I could pass the ClusterClient
to the JobClient
. I thought I would avoid that because it would expose the ClusterClient also from the regular Java API which is generally agnostic of job submission and cluster management.
* when the client is shut down. Runnables are called | ||
* in the order they are added. | ||
*/ | ||
void addFinalizer(Runnable finalizer) throws Exception; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a method we want to expose to the user? Seems to me like something with which he shouldn't fiddle around.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's an issue with sharing the interface across modules. Let me try to get rid of it for the interface.
*/ | ||
@Override | ||
public JobExecutionResult waitForResult() throws JobExecutionException { | ||
LOG.info("Waiting for results of Job {}", jobListeningContext.getJobID()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo: "results of job {}"
/** | ||
* A detached job client which lazily initiates the cluster connection. | ||
*/ | ||
public class JobClientLazy implements JobClient { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The distinction between JobClientEager
and JobClientLazy
feels a little bit clumsy. Can't we get rid of one them and simply have a JobClientImpl
? The only place where JobClientLazy
is returned is when calling submitJobDetached
. I think in this case, you don't expect to get a JobClient
back because it is submitted in detached mode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found it rather clever to have a lazy implementation of the client which can be retrieved when needed. For the sake of keeping things simple, I would opt to remove it in favor of one implementation.
* Runs finalization code to shutdown the client | ||
* and its dependencies. | ||
*/ | ||
void shutdown(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shutdown seems to be an internal implementation detail for the new finalizers. It should therefore not be in the public API, it event seems problematic to allow users to call it because it would prematurely call finalizers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shutdown should not be internal. The idea was that it is used by the user to shutdown the client and any code associated with it (e.g. mini cluster).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But now there's cancel()
, stop()
and shutdown()
which might be quite confusing for users. And nothing is done in shutdown()
except calling the finalizers right now, correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, let's see if we can solely dedicate the execution of shutdown to finalizers and shutdown hooks then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm also not sure whether a JobClient
should be allowed to shutdown a cluster. Imagine that you have multiple jobs running on the same cluster. Then you don't want to have this kind of behaviour.
Thank you for your comments @tillrohrmann and @aljoscha. I'll make changes and get back to you. |
This probably needs an overhaul by now. Have there been any efforts undergone to introduce a job client? |
No work yet, but we still need it. 😅 |
This is still based on old runtime parts (JobManager), though the interface allows it to be ported to the new runtime (JobMaster). As the new one is about to supersede the old one, it might be sensible to port this to the new one first. |
I'm closing this as "Abandoned", since there is no more activity and the code base has moved on quite a bit. Please re-open this if you feel otherwise and work should continue. |
I agree that this is obsolete now. Thanks! |
Also includes: [FLINK-4274] Expose new JobClient in the DataSet/DataStream API
rename JobClient class to JobClientActorUtils
introduce JobClient interface with two implementations
JobClientEager: starts an actor system right away and monitors the job
ClusterClient#getAccumulators to JobClient
JobClientLazy: starts an actor system when requests are made by
encapsulating the eager job client
Java and Scala API
report errors during job execution as JobExecutionException instead of
ProgramInvocationException and adapt test cases
provide finalizers to run code upon shutdown of client
use ActorGateway in JobListeningContext
add test case for JobClient implementations