Skip to content

Commit

Permalink
[FLINK-1879] [client] Simplify JobClient. Hide actorRefs behind metho…
Browse files Browse the repository at this point in the history
…d calls where possible.

 - Drop redundant routing actor
 - Consistently set the flag to subscribe to updates or not.
 - Scala style cleanups: Drop default values for some method parameters.
  • Loading branch information
StephanEwen committed Apr 13, 2015
1 parent f81d9f0 commit ad63707
Show file tree
Hide file tree
Showing 28 changed files with 678 additions and 604 deletions.
Expand Up @@ -16,20 +16,17 @@
* limitations under the License.
*/


package org.apache.flink.client;

import java.util.List;

import akka.actor.ActorRef;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.PlanExecutor;
import org.apache.flink.api.common.Program;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.client.SerializedJobExecutionResult;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.api.java.ExecutionEnvironment;
Expand Down Expand Up @@ -180,12 +177,8 @@ public JobExecutionResult executePlan(Plan plan) throws Exception {

JobGraphGenerator jgg = new JobGraphGenerator();
JobGraph jobGraph = jgg.compileJobGraph(op);

ActorRef jobClient = flink.getJobClient();

SerializedJobExecutionResult result =
JobClient.submitJobAndWait(jobGraph, printStatusDuringExecution, jobClient, flink.timeout());


SerializedJobExecutionResult result = flink.submitJobAndWait(jobGraph, printStatusDuringExecution);
return result.toJobExecutionResult(ClassLoader.getSystemClassLoader());
}
finally {
Expand Down
Expand Up @@ -49,10 +49,10 @@
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.SerializedJobExecutionResult;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import scala.Tuple2;
import scala.concurrent.duration.FiniteDuration;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
Expand Down Expand Up @@ -316,36 +316,50 @@ public JobSubmissionResult run(OptimizedPlan compiledPlan, List<File> libraries,

public JobSubmissionResult run(JobGraph jobGraph, boolean wait) throws ProgramInvocationException {
this.lastJobId = jobGraph.getJobID();
final String hostname = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
if (hostname == null) {
throw new ProgramInvocationException("Could not find hostname of job manager.");
}

FiniteDuration timeout = AkkaUtils.getTimeout(configuration);

InetSocketAddress jobManagerAddress;
try {
jobManagerAddress = JobClient.getJobManagerAddress(configuration);
}
catch (IOException e) {
throw new ProgramInvocationException(e.getMessage(), e);
}
LOG.info("JobManager actor system address is " + jobManagerAddress);

LOG.info("Starting client actor system");
final ActorSystem actorSystem;
final ActorRef client;

try {
Tuple2<ActorSystem, ActorRef> pair = JobClient.startActorSystemAndActor(configuration, false);
actorSystem = pair._1();
client = pair._2();
actorSystem = JobClient.startJobClientActorSystem(configuration);
}
catch (Exception e) {
throw new ProgramInvocationException("Could not build up connection to JobManager.", e);
throw new ProgramInvocationException("Could start client actor system.", e);
}

LOG.info("Looking up JobManager");
ActorRef jobManager;
try {
jobManager = JobManager.getJobManagerRemoteReference(jobManagerAddress, actorSystem, configuration);
}
catch (IOException e) {
throw new ProgramInvocationException("Failed to resolve JobManager", e);
}
LOG.info("JobManager runs at " + jobManager.path());

FiniteDuration timeout = AkkaUtils.getTimeout(configuration);
LOG.info("Communication between client and JobManager will have a timeout of " + timeout);

LOG.info("Checking and uploading JAR files");
try {
JobClient.uploadJarFiles(jobGraph, hostname, client, timeout);
JobClient.uploadJarFiles(jobGraph, jobManager, timeout);
}
catch (IOException e) {
throw new ProgramInvocationException("Could not upload the program's JAR files to the JobManager.", e);
}

try{
if (wait) {
SerializedJobExecutionResult result =
JobClient.submitJobAndWait(jobGraph, printStatusDuringExecution, client, timeout);
SerializedJobExecutionResult result = JobClient.submitJobAndWait(actorSystem,
jobManager, jobGraph, timeout, printStatusDuringExecution);
try {
return result.toJobExecutionResult(this.userCodeClassLoader);
}
Expand All @@ -355,8 +369,8 @@ public JobSubmissionResult run(JobGraph jobGraph, boolean wait) throws ProgramIn
}
}
else {
JobClient.submitJobDetached(jobGraph, client, timeout);
// return a "Fake" execution result with the JobId
JobClient.submitJobDetached(jobManager, jobGraph, timeout);
// return a dummy execution result with the JobId
return new JobSubmissionResult(jobGraph.getJobID());
}
}
Expand Down
Expand Up @@ -23,7 +23,6 @@
import akka.actor.Status;
import akka.actor.UntypedActor;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.ExecutionEnvironment;
Expand All @@ -38,6 +37,7 @@
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.net.NetUtils;
import org.junit.After;

Expand Down Expand Up @@ -224,7 +224,13 @@ public static class SuccessReturningActor extends UntypedActor {

@Override
public void onReceive(Object message) throws Exception {
getSender().tell(new Status.Success(new JobID()), getSelf());
if (message instanceof JobManagerMessages.SubmitJob) {
JobID jid = ((JobManagerMessages.SubmitJob) message).jobGraph().getJobID();
getSender().tell(new Status.Success(jid), getSelf());
}
else {
getSender().tell(new Status.Failure(new Exception("Unknown message " + message)), getSelf());
}
}
}

Expand Down

0 comments on commit ad63707

Please sign in to comment.