diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java index c3c666b1686e1..292da70d8c82b 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java @@ -47,6 +47,8 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.client.JobClient; import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.client.JobListeningContext; +import org.apache.flink.runtime.client.JobRetrievalException; import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -428,6 +430,39 @@ public JobSubmissionResult runDetached(JobGraph jobGraph, ClassLoader classLoade } } + /** + * Reattaches to a running from from the supplied job id + * @param jobID The job id of the job to attach to + * @return The JobExecutionResult for the jobID + * @throws JobExecutionException if an error occurs during monitoring the job execution + */ + public JobExecutionResult retrieveJob(JobID jobID) throws JobExecutionException { + final LeaderRetrievalService leaderRetrievalService; + try { + leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig); + } catch (Exception e) { + throw new JobRetrievalException(jobID, "Could not create the leader retrieval service", e); + } + + ActorGateway jobManagerGateway; + try { + jobManagerGateway = getJobManagerGateway(); + } catch (Exception e) { + throw new JobRetrievalException(jobID, "Could not retrieve the JobManager Gateway"); + } + + final JobListeningContext listeningContext = JobClient.attachToRunningJob( + jobID, + jobManagerGateway, + flinkConfig, + actorSystemLoader.get(), + leaderRetrievalService, + timeout, + printStatusDuringExecution); + + return JobClient.awaitJobResult(listeningContext); + } + /** * Cancels a job identified by the job id. * @param jobId the job id @@ -446,11 +481,11 @@ public void cancel(JobID jobId) throws Exception { final Object result = Await.result(response, timeout); if (result instanceof JobManagerMessages.CancellationSuccess) { - LOG.info("Job cancellation with ID " + jobId + " succeeded."); + logAndSysout("Job cancellation with ID " + jobId + " succeeded."); } else if (result instanceof JobManagerMessages.CancellationFailure) { final Throwable t = ((JobManagerMessages.CancellationFailure) result).cause(); - LOG.info("Job cancellation with ID " + jobId + " failed.", t); - throw new Exception("Failed to cancel the job because of \n" + t.getMessage()); + logAndSysout("Job cancellation with ID " + jobId + " failed because of " + t.getMessage()); + throw new Exception("Failed to cancel the job with id " + jobId, t); } else { throw new Exception("Unknown message received while cancelling: " + result.getClass().getName()); } diff --git a/flink-clients/src/test/resources/log4j-test.properties b/flink-clients/src/test/resources/log4j-test.properties index 85897b37b4999..5100c1ff526d3 100644 --- a/flink-clients/src/test/resources/log4j-test.properties +++ b/flink-clients/src/test/resources/log4j-test.properties @@ -27,4 +27,4 @@ log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n # suppress the irrelevant (wrong) warnings from the netty channel handler -log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger \ No newline at end of file +log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger diff --git a/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java b/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java index bc5ae09a332cc..cb4ecc548a852 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java @@ -34,7 +34,7 @@ public class JobExecutionResult extends JobSubmissionResult { private long netRuntime; - private Map accumulatorResults = Collections.emptyMap(); + private final Map accumulatorResults; /** * Creates a new JobExecutionResult. @@ -49,6 +49,8 @@ public JobExecutionResult(JobID jobID, long netRuntime, Map accu if (accumulators != null) { this.accumulatorResults = accumulators; + } else { + this.accumulatorResults = Collections.emptyMap(); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java new file mode 100644 index 0000000000000..5446002acf06d --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.client; + +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.actor.Status; +import akka.dispatch.Futures; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.runtime.akka.ListeningBehaviour; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.messages.JobClientMessages; +import org.apache.flink.runtime.messages.JobClientMessages.AttachToJobAndWait; +import org.apache.flink.runtime.messages.JobManagerMessages; +import scala.concurrent.duration.FiniteDuration; + +import java.util.concurrent.Callable; + + +/** + * Actor which handles Job attachment process and provides Job updates until completion. + */ +public class JobAttachmentClientActor extends JobClientActor { + + /** JobID to attach to when the JobClientActor retrieves a job */ + private JobID jobID; + /** true if a JobRegistrationSuccess message has been received */ + private boolean successfullyRegisteredForJob = false; + + public JobAttachmentClientActor( + LeaderRetrievalService leaderRetrievalService, + FiniteDuration timeout, + boolean sysoutUpdates) { + super(leaderRetrievalService, timeout, sysoutUpdates); + } + + @Override + public void connectedToJobManager() { + if (jobID != null && !successfullyRegisteredForJob) { + tryToAttachToJob(); + } + } + + @Override + protected Class getClientMessageClass() { + return AttachToJobAndWait.class; + } + + @Override + public void handleCustomMessage(Object message) { + if (message instanceof AttachToJobAndWait) { + // sanity check that this no job registration was performed through this actor before - + // it is a one-shot actor after all + if (this.client == null) { + jobID = ((AttachToJobAndWait) message).jobID(); + if (jobID == null) { + LOG.error("Received null JobID"); + sender().tell( + decorateMessage(new Status.Failure(new Exception("JobID is null"))), + getSelf()); + } else { + LOG.info("Received JobID {}.", jobID); + + this.client = getSender(); + + // is only successful if we already know the job manager leader + if (jobManager != null) { + tryToAttachToJob(); + } + } + } else { + // repeated submission - tell failure to sender and kill self + String msg = "Received repeated 'AttachToJobAndWait'"; + LOG.error(msg); + getSender().tell( + decorateMessage(new Status.Failure(new Exception(msg))), ActorRef.noSender()); + + terminate(); + } + } + else if (message instanceof JobManagerMessages.RegisterJobClientSuccess) { + // job registration was successful :o) + JobManagerMessages.RegisterJobClientSuccess msg = ((JobManagerMessages.RegisterJobClientSuccess) message); + logAndPrintMessage("Successfully registered at the JobManager for Job " + msg.jobId()); + successfullyRegisteredForJob = true; + } + else if (message instanceof JobManagerMessages.JobNotFound) { + LOG.info("Couldn't register JobClient for JobID {}", + ((JobManagerMessages.JobNotFound) message).jobID()); + client.tell(decorateMessage(message), getSelf()); + terminate(); + } + else if (JobClientMessages.getRegistrationTimeout().equals(message)) { + // check if our registration for a job was successful in the meantime + if (!successfullyRegisteredForJob) { + if (isClientConnected()) { + client.tell( + decorateMessage(new Status.Failure( + new JobClientActorRegistrationTimeoutException("Registration for Job at the JobManager " + + "timed out. " + "You may increase '" + ConfigConstants.AKKA_CLIENT_TIMEOUT + + "' in case the JobManager needs more time to confirm the job client registration."))), + getSelf()); + } + + // We haven't heard back from the job manager after attempting registration for a job + // therefore terminate + terminate(); + } + } else { + LOG.error("{} received unknown message: ", getClass()); + } + + } + + private void tryToAttachToJob() { + LOG.info("Sending message to JobManager {} to attach to job {} and wait for progress", jobID); + + Futures.future(new Callable() { + @Override + public Object call() throws Exception { + LOG.info("Attaching to job {} at the job manager {}.", jobID, jobManager.path()); + + jobManager.tell( + decorateMessage( + new JobManagerMessages.RegisterJobClient( + jobID, + ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES)), + getSelf()); + + // issue a RegistrationTimeout message to check that we submit the job within + // the given timeout + getContext().system().scheduler().scheduleOnce( + timeout, + getSelf(), + decorateMessage(JobClientMessages.getRegistrationTimeout()), + getContext().dispatcher(), + ActorRef.noSender()); + + return null; + } + }, getContext().dispatcher()); + } + + public static Props createActorProps( + LeaderRetrievalService leaderRetrievalService, + FiniteDuration timeout, + boolean sysoutUpdates) { + return Props.create( + JobAttachmentClientActor.class, + leaderRetrievalService, + timeout, + sysoutUpdates); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java index c0e0d08415327..4e916eb24818b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java @@ -21,6 +21,7 @@ import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Address; +import akka.actor.Identify; import akka.actor.PoisonPill; import akka.actor.Props; import akka.pattern.Patterns; @@ -30,6 +31,8 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.ListeningBehaviour; +import org.apache.flink.runtime.blob.BlobCache; +import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; @@ -44,10 +47,15 @@ import scala.Tuple2; import scala.concurrent.Await; import scala.concurrent.Future; +import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; import java.io.IOException; import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.List; import java.util.concurrent.TimeoutException; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -80,28 +88,18 @@ public static ActorSystem startJobClientActorSystem(Configuration config) } /** - * Sends a [[JobGraph]] to the JobClient actor specified by jobClient which submits it then to - * the JobManager. The method blocks until the job has finished or the JobManager is no longer - * alive. In the former case, the [[SerializedJobExecutionResult]] is returned and in the latter - * case a [[JobExecutionException]] is thrown. - * - * @param actorSystem The actor system that performs the communication. - * @param leaderRetrievalService Leader retrieval service which used to find the current leading - * JobManager - * @param jobGraph JobGraph describing the Flink job - * @param timeout Timeout for futures - * @param sysoutLogUpdates prints log updates to system out if true - * @return The job execution result - * @throws org.apache.flink.runtime.client.JobExecutionException Thrown if the job - * execution fails. + * Submits a job to a Flink cluster (non-blocking) and returns a JobListeningContext which can be + * passed to {@code awaitJobResult} to get the result of the submission. + * @return JobListeningContext which may be used to retrieve the JobExecutionResult via + * {@code awaitJobResult(JobListeningContext context)}. */ - public static JobExecutionResult submitJobAndWait( + public static JobListeningContext submitJob( ActorSystem actorSystem, LeaderRetrievalService leaderRetrievalService, JobGraph jobGraph, FiniteDuration timeout, boolean sysoutLogUpdates, - ClassLoader classLoader) throws JobExecutionException { + ClassLoader classLoader) { checkNotNull(actorSystem, "The actorSystem must not be null."); checkNotNull(leaderRetrievalService, "The jobManagerGateway must not be null."); @@ -112,29 +110,187 @@ public static JobExecutionResult submitJobAndWait( // the JobManager. It forwards the job submission, checks the success/failure responses, logs // update messages, watches for disconnect between client and JobManager, ... - Props jobClientActorProps = JobClientActor.createJobClientActorProps( + Props jobClientActorProps = JobSubmissionClientActor.createActorProps( leaderRetrievalService, timeout, sysoutLogUpdates); ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps); - - // first block handles errors while waiting for the result - Object answer; + + Future submissionFuture = Patterns.ask( + jobClientActor, + new JobClientMessages.SubmitJobAndWait(jobGraph), + new Timeout(AkkaUtils.INF_TIMEOUT())); + + return new JobListeningContext( + jobGraph.getJobID(), + submissionFuture, + jobClientActor, + timeout, + classLoader); + } + + + /** + * Attaches to a running Job using the JobID. + * Reconstructs the user class loader by downloading the jars from the JobManager. + */ + public static JobListeningContext attachToRunningJob( + JobID jobID, + ActorGateway jobManagerGateWay, + Configuration configuration, + ActorSystem actorSystem, + LeaderRetrievalService leaderRetrievalService, + FiniteDuration timeout, + boolean sysoutLogUpdates) { + + checkNotNull(jobID, "The jobID must not be null."); + checkNotNull(jobManagerGateWay, "The jobManagerGateWay must not be null."); + checkNotNull(configuration, "The configuration must not be null."); + checkNotNull(actorSystem, "The actorSystem must not be null."); + checkNotNull(leaderRetrievalService, "The jobManagerGateway must not be null."); + checkNotNull(timeout, "The timeout must not be null."); + + // we create a proxy JobClientActor that deals with all communication with + // the JobManager. It forwards the job attachments, checks the success/failure responses, logs + // update messages, watches for disconnect between client and JobManager, ... + Props jobClientActorProps = JobAttachmentClientActor.createActorProps( + leaderRetrievalService, + timeout, + sysoutLogUpdates); + + ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps); + + Future attachmentFuture = Patterns.ask( + jobClientActor, + new JobClientMessages.AttachToJobAndWait(jobID), + new Timeout(AkkaUtils.INF_TIMEOUT())); + + return new JobListeningContext( + jobID, + attachmentFuture, + jobClientActor, + timeout, + actorSystem, + configuration); + } + + /** + * Reconstructs the class loader by first requesting information about it at the JobManager + * and then downloading missing jar files. + * @param jobID id of job + * @param jobManager gateway to the JobManager + * @param config the flink configuration + * @return A classloader that should behave like the original classloader + * @throws JobRetrievalException if anything goes wrong + */ + public static ClassLoader retrieveClassLoader( + JobID jobID, + ActorGateway jobManager, + Configuration config) + throws JobRetrievalException { + + final Object jmAnswer; try { - Future future = Patterns.ask(jobClientActor, - new JobClientMessages.SubmitJobAndWait(jobGraph), - new Timeout(AkkaUtils.INF_TIMEOUT())); - - answer = Await.result(future, AkkaUtils.INF_TIMEOUT()); + jmAnswer = Await.result( + jobManager.ask( + new JobManagerMessages.RequestClassloadingProps(jobID), + AkkaUtils.getDefaultTimeout()), + AkkaUtils.getDefaultTimeout()); + } catch (Exception e) { + throw new JobRetrievalException(jobID, "Couldn't retrieve class loading properties from JobManager.", e); } - catch (TimeoutException e) { - throw new JobTimeoutException(jobGraph.getJobID(), "Timeout while waiting for JobManager answer. " + - "Job time exceeded " + AkkaUtils.INF_TIMEOUT(), e); + + if (jmAnswer instanceof JobManagerMessages.ClassloadingProps) { + JobManagerMessages.ClassloadingProps props = ((JobManagerMessages.ClassloadingProps) jmAnswer); + + Option jmHost = jobManager.actor().path().address().host(); + String jmHostname = jmHost.isDefined() ? jmHost.get() : "localhost"; + InetSocketAddress serverAddress = new InetSocketAddress(jmHostname, props.blobManagerPort()); + final BlobCache blobClient = new BlobCache(serverAddress, config); + + final List requiredJarFiles = props.requiredJarFiles(); + final List requiredClasspaths = props.requiredClasspaths(); + + final URL[] allURLs = new URL[requiredJarFiles.size() + requiredClasspaths.size()]; + + int pos = 0; + for (BlobKey blobKey : props.requiredJarFiles()) { + try { + allURLs[pos++] = blobClient.getURL(blobKey); + } catch (Exception e) { + blobClient.shutdown(); + throw new JobRetrievalException(jobID, "Failed to download BlobKey " + blobKey); + } + } + + for (URL url : requiredClasspaths) { + allURLs[pos++] = url; + } + + return new URLClassLoader(allURLs, JobClient.class.getClassLoader()); + } else if (jmAnswer instanceof JobManagerMessages.JobNotFound) { + throw new JobRetrievalException(jobID, "Couldn't retrieve class loader. Job " + jobID + " not found"); + } else { + throw new JobRetrievalException(jobID, "Unknown response from JobManager: " + jmAnswer); + } + } + + /** + * Given a JobListeningContext, awaits the result of the job execution that this context is bound to + * @param listeningContext The listening context of the job execution + * @return The result of the execution + * @throws JobExecutionException if anything goes wrong while monitoring the job + */ + public static JobExecutionResult awaitJobResult(JobListeningContext listeningContext) throws JobExecutionException { + + final JobID jobID = listeningContext.getJobID(); + final ActorRef jobClientActor = listeningContext.getJobClientActor(); + final Future jobSubmissionFuture = listeningContext.getJobResultFuture(); + final FiniteDuration askTimeout = listeningContext.getTimeout(); + // retrieves class loader if necessary + final ClassLoader classLoader = listeningContext.getClassLoader(); + + // wait for the future which holds the result to be ready + // ping the JobClientActor from time to time to check if it is still running + while (!jobSubmissionFuture.isCompleted()) { + try { + Await.ready(jobSubmissionFuture, askTimeout); + } catch (InterruptedException e) { + throw new JobExecutionException( + jobID, + "Interrupted while waiting for job completion."); + } catch (TimeoutException e) { + try { + Await.result( + Patterns.ask( + jobClientActor, + // Ping the Actor to see if it is alive + new Identify(true), + Timeout.durationToTimeout(askTimeout)), + askTimeout); + // we got a reply, continue waiting for the job result + } catch (Exception eInner) { + // we could have a result but the JobClientActor might have been killed and + // thus the health check failed + if (!jobSubmissionFuture.isCompleted()) { + throw new JobExecutionException( + jobID, + "JobClientActor seems to have died before the JobExecutionResult could be retrieved.", + eInner); + } + } + } + } + + final Object answer; + try { + // we have already awaited the result, zero time to wait here + answer = Await.result(jobSubmissionFuture, Duration.Zero()); } catch (Throwable throwable) { - throw new JobExecutionException(jobGraph.getJobID(), - "Communication with JobManager failed: " + throwable.getMessage(), throwable); + throw new JobExecutionException(jobID, + "Couldn't retrieve the JobExecutionResult from the JobManager.", throwable); } finally { // failsafe shutdown of the client actor @@ -149,18 +305,16 @@ public static JobExecutionResult submitJobAndWait( if (result != null) { try { return result.toJobExecutionResult(classLoader); + } catch (Throwable t) { + throw new JobExecutionException(jobID, + "Job was successfully executed but JobExecutionResult could not be deserialized."); } - catch (Throwable t) { - throw new JobExecutionException(jobGraph.getJobID(), - "Job was successfully executed but JobExecutionResult could not be deserialized."); - } - } - else { - throw new JobExecutionException(jobGraph.getJobID(), - "Job was successfully executed but result contained a null JobExecutionResult."); + } else { + throw new JobExecutionException(jobID, + "Job was successfully executed but result contained a null JobExecutionResult."); } } - if (answer instanceof JobManagerMessages.JobResultFailure) { + else if (answer instanceof JobManagerMessages.JobResultFailure) { LOG.info("Job execution failed"); SerializedThrowable serThrowable = ((JobManagerMessages.JobResultFailure) answer).cause(); @@ -168,22 +322,61 @@ public static JobExecutionResult submitJobAndWait( Throwable cause = serThrowable.deserializeError(classLoader); if (cause instanceof JobExecutionException) { throw (JobExecutionException) cause; + } else { + throw new JobExecutionException(jobID, "Job execution failed", cause); } - else { - throw new JobExecutionException(jobGraph.getJobID(), "Job execution failed", cause); - } - } - else { - throw new JobExecutionException(jobGraph.getJobID(), - "Job execution failed with null as failure cause."); + } else { + throw new JobExecutionException(jobID, + "Job execution failed with null as failure cause."); } } + else if (answer instanceof JobManagerMessages.JobNotFound) { + throw new JobRetrievalException( + ((JobManagerMessages.JobNotFound) answer).jobID(), + "Couldn't retrieve Job " + jobID + " because it was not running."); + } else { - throw new JobExecutionException(jobGraph.getJobID(), - "Unknown answer from JobManager after submitting the job: " + answer); + throw new JobExecutionException(jobID, + "Unknown answer from JobManager after submitting the job: " + answer); } } + /** + * Sends a [[JobGraph]] to the JobClient actor specified by jobClient which submits it then to + * the JobManager. The method blocks until the job has finished or the JobManager is no longer + * alive. In the former case, the [[SerializedJobExecutionResult]] is returned and in the latter + * case a [[JobExecutionException]] is thrown. + * + * @param actorSystem The actor system that performs the communication. + * @param leaderRetrievalService Leader retrieval service which used to find the current leading + * JobManager + * @param jobGraph JobGraph describing the Flink job + * @param timeout Timeout for futures + * @param sysoutLogUpdates prints log updates to system out if true + * @param classLoader The class loader for deserializing the results + * @return The job execution result + * @throws org.apache.flink.runtime.client.JobExecutionException Thrown if the job + * execution fails. + */ + public static JobExecutionResult submitJobAndWait( + ActorSystem actorSystem, + LeaderRetrievalService leaderRetrievalService, + JobGraph jobGraph, + FiniteDuration timeout, + boolean sysoutLogUpdates, + ClassLoader classLoader) throws JobExecutionException { + + JobListeningContext jobListeningContext = submitJob( + actorSystem, + leaderRetrievalService, + jobGraph, + timeout, + sysoutLogUpdates, + classLoader); + + return awaitJobResult(jobListeningContext); + } + /** * Submits a job in detached mode. The method sends the JobGraph to the * JobManager and waits for the answer whether the job could be started or not. @@ -227,7 +420,7 @@ public static void submitJobDetached( "JobManager did not respond within " + timeout.toString(), e); } catch (Throwable t) { - throw new JobExecutionException(jobGraph.getJobID(), + throw new JobSubmissionException(jobGraph.getJobID(), "Failed to send job to JobManager: " + t.getMessage(), t.getCause()); } @@ -258,4 +451,5 @@ else if (result instanceof JobManagerMessages.JobResultFailure) { throw new JobExecutionException(jobGraph.getJobID(), "Unexpected response from JobManager: " + result); } } + } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java index 9379c30343925..1380e76d41fe7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java @@ -20,18 +20,11 @@ import akka.actor.ActorRef; import akka.actor.PoisonPill; -import akka.actor.Props; import akka.actor.Status; import akka.actor.Terminated; -import akka.dispatch.Futures; import akka.dispatch.OnSuccess; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.FlinkUntypedActor; -import org.apache.flink.runtime.akka.ListeningBehaviour; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.instance.AkkaActorGateway; -import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; @@ -39,47 +32,39 @@ import org.apache.flink.runtime.messages.JobClientMessages; import org.apache.flink.runtime.messages.JobClientMessages.JobManagerActorRef; import org.apache.flink.runtime.messages.JobClientMessages.JobManagerLeaderAddress; -import org.apache.flink.runtime.messages.JobClientMessages.SubmitJobAndWait; import org.apache.flink.runtime.messages.JobManagerMessages; -import org.apache.flink.runtime.util.SerializedThrowable; import org.apache.flink.util.Preconditions; import scala.concurrent.duration.FiniteDuration; -import java.io.IOException; import java.util.UUID; -import java.util.concurrent.Callable; + /** - * Actor which constitutes the bridge between the non-actor code and the JobManager. The JobClient - * is used to submit jobs to the JobManager and to request the port of the BlobManager. + * Actor which constitutes the bridge between the non-actor code and the JobManager. + * This base class handles the connection to the JobManager and notifies in case of timeouts. It also + * receives and prints job updates until job completion. */ -public class JobClientActor extends FlinkUntypedActor implements LeaderRetrievalListener { +public abstract class JobClientActor extends FlinkUntypedActor implements LeaderRetrievalListener { private final LeaderRetrievalService leaderRetrievalService; /** timeout for futures */ - private final FiniteDuration timeout; + protected final FiniteDuration timeout; /** true if status messages shall be printed to sysout */ private final boolean sysoutUpdates; - /** true if a SubmitJobSuccess message has been received */ - private boolean jobSuccessfullySubmitted = false; - - /** true if a PoisonPill was taken */ - private boolean terminated = false; + /** true if a PoisonPill about to be taken */ + private boolean toBeTerminated = false; /** ActorRef to the current leader */ - private ActorRef jobManager; + protected ActorRef jobManager; /** leader session ID of the JobManager when this actor was created */ - private UUID leaderSessionID; - - /** Actor which submits a job to the JobManager via this actor */ - private ActorRef submitter; + protected UUID leaderSessionID; - /** JobGraph which shall be submitted to the JobManager */ - private JobGraph jobGraph; + /** The client which the actor is responsible for */ + protected ActorRef client; public JobClientActor( LeaderRetrievalService leaderRetrievalService, @@ -109,9 +94,27 @@ public void postStop() { } } + /** + * Hook to be called once a connection has been established with the JobManager. + */ + protected abstract void connectedToJobManager(); + + /** + * Hook to handle custom client message which are not handled by the base class. + * @param message The message to be handled + */ + protected abstract void handleCustomMessage(Object message); + + /** + * Hook to let the client know about messages that should start a timer for a timeout + * @return The message class after which a timeout should be started + */ + protected abstract Class getClientMessageClass(); + + @Override protected void handleMessage(Object message) { - + // =========== State Change Messages =============== if (message instanceof ExecutionGraphMessages.ExecutionStateChanged) { @@ -149,79 +152,31 @@ public void onSuccess(ActorRef result) throws Throwable { JobManagerActorRef msg = (JobManagerActorRef) message; connectToJobManager(msg.jobManager()); - logAndPrintMessage("Connected to JobManager at " + msg.jobManager()); + logAndPrintMessage("Connected to JobManager at " + msg.jobManager()); - if (jobGraph != null && !jobSuccessfullySubmitted) { - // if we haven't yet submitted the job successfully - tryToSubmitJob(jobGraph); - } + connectedToJobManager(); } // =========== Job Life Cycle Messages =============== - - // submit a job to the JobManager - else if (message instanceof SubmitJobAndWait) { - // only accept SubmitJobWait messages if we're not about to terminate - if (!terminated) { - // sanity check that this no job was submitted through this actor before - - // it is a one-shot actor after all - if (this.submitter == null) { - jobGraph = ((SubmitJobAndWait) message).jobGraph(); - if (jobGraph == null) { - LOG.error("Received null JobGraph"); - sender().tell( - decorateMessage(new Status.Failure(new Exception("JobGraph is null"))), - getSelf()); - } else { - LOG.info("Received job {} ({}).", jobGraph.getName(), jobGraph.getJobID()); - - this.submitter = getSender(); - - // is only successful if we already know the job manager leader - tryToSubmitJob(jobGraph); - } - } else { - // repeated submission - tell failure to sender and kill self - String msg = "Received repeated 'SubmitJobAndWait'"; - LOG.error(msg); - getSender().tell( - decorateMessage(new Status.Failure(new Exception(msg))), ActorRef.noSender()); - - terminate(); - } - } else { - // we're about to receive a PoisonPill because terminated == true - String msg = getClass().getName() + " is about to be terminated. Therefore, the " + - "job submission cannot be executed."; - LOG.error(msg); - getSender().tell( - decorateMessage(new Status.Failure(new Exception(msg))), ActorRef.noSender()); - } - } + // acknowledgement to submit job is only logged, our original - // submitter is only interested in the final job result - else if (message instanceof JobManagerMessages.JobResultSuccess || - message instanceof JobManagerMessages.JobResultFailure) { - + // client is only interested in the final job result + else if (message instanceof JobManagerMessages.JobResultMessage) { + if (LOG.isDebugEnabled()) { LOG.debug("Received {} message from JobManager", message.getClass().getSimpleName()); } - // forward the success to the original job submitter - if (hasJobBeenSubmitted()) { - this.submitter.tell(decorateMessage(message), getSelf()); + // forward the success to the original client + if (isClientConnected()) { + this.client.tell(decorateMessage(message), getSelf()); } terminate(); } - else if (message instanceof JobManagerMessages.JobSubmitSuccess) { - // job was successfully submitted :-) - LOG.info("Job was successfully submitted to the JobManager {}.", getSender().path()); - jobSuccessfullySubmitted = true; - } // =========== Actor / Communication Failure / Timeouts =============== - + else if (message instanceof Terminated) { ActorRef target = ((Terminated) message).getActor(); if (jobManager.equals(target)) { @@ -234,7 +189,7 @@ else if (message instanceof Terminated) { // Important: The ConnectionTimeout message is filtered out in case that we are // notified about a new leader by setting the new leader session ID, because // ConnectionTimeout extends RequiresLeaderSessionID - if (hasJobBeenSubmitted()) { + if (isClientConnected()) { getContext().system().scheduler().scheduleOnce( timeout, getSelf(), @@ -245,49 +200,61 @@ else if (message instanceof Terminated) { } else { LOG.warn("Received 'Terminated' for unknown actor " + target); } - } else if (JobClientMessages.getConnectionTimeout().equals(message)) { + } + else if (JobClientMessages.getConnectionTimeout().equals(message)) { // check if we haven't found a job manager yet - if (!isConnected()) { - if (hasJobBeenSubmitted()) { - submitter.tell( - decorateMessage(new Status.Failure( - new JobClientActorConnectionTimeoutException("Lost connection to the JobManager."))), + if (!isJobManagerConnected()) { + final JobClientActorConnectionTimeoutException errorMessage = + new JobClientActorConnectionTimeoutException("Lost connection to the JobManager."); + final Object replyMessage = decorateMessage(new Status.Failure(errorMessage)); + if (isClientConnected()) { + client.tell( + replyMessage, getSelf()); } // Connection timeout reached, let's terminate terminate(); } - } else if (JobClientMessages.getSubmissionTimeout().equals(message)) { - // check if our job submission was successful in the meantime - if (!jobSuccessfullySubmitted) { - if (hasJobBeenSubmitted()) { - submitter.tell( - decorateMessage(new Status.Failure( - new JobClientActorSubmissionTimeoutException("Job submission to the JobManager timed out. " + - "You may increase '" + ConfigConstants.AKKA_CLIENT_TIMEOUT + "' in case the JobManager " + - "needs more time to configure and confirm the job submission."))), - getSelf()); - } - - // We haven't heard back from the job manager after sending the job graph to him, - // therefore terminate - terminate(); - } } - // =========== Unknown Messages =============== - + // =========== Message Delegation =============== + + else if (!isJobManagerConnected() && getClientMessageClass().equals(message.getClass())) { + LOG.info( + "Received {} but there is no connection to a JobManager yet.", + message); + // We want to submit/attach to a job, but we haven't found a job manager yet. + // Let's give him another chance to find a job manager within the given timeout. + getContext().system().scheduler().scheduleOnce( + timeout, + getSelf(), + decorateMessage(JobClientMessages.getConnectionTimeout()), + getContext().dispatcher(), + ActorRef.noSender() + ); + handleCustomMessage(message); + } else { - LOG.error("JobClient received unknown message: " + message); + if (!toBeTerminated) { + handleCustomMessage(message); + } else { + // we're about to receive a PoisonPill because toBeTerminated == true + String msg = getClass().getName() + " is about to be terminated. Therefore, the " + + "job submission cannot be executed."; + LOG.error(msg); + getSender().tell( + decorateMessage(new Status.Failure(new Exception(msg))), ActorRef.noSender()); + } } } + @Override protected UUID getLeaderSessionID() { return leaderSessionID; } - private void logAndPrintMessage(String message) { + protected void logAndPrintMessage(String message) { LOG.info(message); if (sysoutUpdates) { System.out.println(message); @@ -351,97 +318,19 @@ private void connectToJobManager(ActorRef jobManager) { getContext().watch(jobManager); } - private void tryToSubmitJob(final JobGraph jobGraph) { - this.jobGraph = jobGraph; - - if (isConnected()) { - LOG.info("Sending message to JobManager {} to submit job {} ({}) and wait for progress", - jobManager.path().toString(), jobGraph.getName(), jobGraph.getJobID()); - - Futures.future(new Callable() { - @Override - public Object call() throws Exception { - ActorGateway jobManagerGateway = new AkkaActorGateway(jobManager, leaderSessionID); - - LOG.info("Upload jar files to job manager {}.", jobManager.path()); - - try { - jobGraph.uploadUserJars(jobManagerGateway, timeout); - } catch (IOException exception) { - getSelf().tell( - decorateMessage(new JobManagerMessages.JobResultFailure( - new SerializedThrowable( - new JobSubmissionException( - jobGraph.getJobID(), - "Could not upload the jar files to the job manager.", - exception) - ) - )), - ActorRef.noSender() - ); - } - - LOG.info("Submit job to the job manager {}.", jobManager.path()); - - jobManager.tell( - decorateMessage( - new JobManagerMessages.SubmitJob( - jobGraph, - ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES)), - getSelf()); - - // issue a SubmissionTimeout message to check that we submit the job within - // the given timeout - getContext().system().scheduler().scheduleOnce( - timeout, - getSelf(), - decorateMessage(JobClientMessages.getSubmissionTimeout()), - getContext().dispatcher(), - ActorRef.noSender()); - - return null; - } - }, getContext().dispatcher()); - } else { - LOG.info("Could not submit job {} ({}), because there is no connection to a " + - "JobManager.", - jobGraph.getName(), jobGraph.getJobID()); - - // We want to submit a job, but we haven't found a job manager yet. - // Let's give him another chance to find a job manager within the given timeout. - getContext().system().scheduler().scheduleOnce( - timeout, - getSelf(), - decorateMessage(JobClientMessages.getConnectionTimeout()), - getContext().dispatcher(), - ActorRef.noSender() - ); - } - } - - private void terminate() { + protected void terminate() { LOG.info("Terminate JobClientActor."); - terminated = true; + toBeTerminated = true; disconnectFromJobManager(); getSelf().tell(decorateMessage(PoisonPill.getInstance()), ActorRef.noSender()); } - private boolean isConnected() { + private boolean isJobManagerConnected() { return jobManager != ActorRef.noSender(); } - private boolean hasJobBeenSubmitted() { - return submitter != ActorRef.noSender(); + protected boolean isClientConnected() { + return client != ActorRef.noSender(); } - public static Props createJobClientActorProps( - LeaderRetrievalService leaderRetrievalService, - FiniteDuration timeout, - boolean sysoutUpdates) { - return Props.create( - JobClientActor.class, - leaderRetrievalService, - timeout, - sysoutUpdates); - } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActorRegistrationTimeoutException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActorRegistrationTimeoutException.java new file mode 100644 index 0000000000000..e57d1b4f40e19 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActorRegistrationTimeoutException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.client; + +/** + * Exception which is thrown by the {@link JobClientActor} if it has not heard back from the job + * manager after it has attempted to register for a job within a given timeout interval. + */ +public class JobClientActorRegistrationTimeoutException extends Exception { + private static final long serialVersionUID = 8762463142030454853L; + + public JobClientActorRegistrationTimeoutException(String msg) { + super(msg); + } + + public JobClientActorRegistrationTimeoutException(String msg, Throwable cause) { + super(msg, cause); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java new file mode 100644 index 0000000000000..b5d7cb77cebf0 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.client; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.util.LeaderRetrievalUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The JobListeningContext holds the state necessary to monitor a running job and receive its results. + */ +public final class JobListeningContext { + + private final Logger LOG = LoggerFactory.getLogger(getClass()); + + /** The Job id of the Job */ + private final JobID jobID; + /** The Future which is completed upon job completion */ + private final Future jobResultFuture; + /** The JobClientActor which handles communication and monitoring of the job */ + private final ActorRef jobClientActor; + /** Timeout used Asks */ + private final FiniteDuration timeout; + + /** ActorSystem for leader retrieval */ + private ActorSystem actorSystem; + /** Flink configuration for initializing the BlobService */ + private Configuration configuration; + + /** The class loader (either provided at job submission or reconstructed when it is needed */ + private ClassLoader classLoader; + + /** + * Constructor to use when the class loader is available. + */ + public JobListeningContext( + JobID jobID, + Future jobResultFuture, + ActorRef jobClientActor, + FiniteDuration timeout, + ClassLoader classLoader) { + this.jobID = checkNotNull(jobID); + this.jobResultFuture = checkNotNull(jobResultFuture); + this.jobClientActor = checkNotNull(jobClientActor); + this.timeout = checkNotNull(timeout); + this.classLoader = checkNotNull(classLoader); + } + + /** + * Constructor to use when the class loader is not available. + */ + public JobListeningContext( + JobID jobID, + Future jobResultFuture, + ActorRef jobClientActor, + FiniteDuration timeout, + ActorSystem actorSystem, + Configuration configuration) { + this.jobID = checkNotNull(jobID); + this.jobResultFuture = checkNotNull(jobResultFuture); + this.jobClientActor = checkNotNull(jobClientActor); + this.timeout = checkNotNull(timeout); + this.actorSystem = checkNotNull(actorSystem); + this.configuration = checkNotNull(configuration); + } + + /** + * @return The Job ID that this context is bound to. + */ + public JobID getJobID() { + return jobID; + } + + /** + * @return The Future that eventually holds the result of the execution. + */ + public Future getJobResultFuture() { + return jobResultFuture; + } + + /** + * @return The Job Client actor which communicats with the JobManager. + */ + public ActorRef getJobClientActor() { + return jobClientActor; + } + + /** + * @return The default timeout of Akka asks + */ + public FiniteDuration getTimeout() { + return timeout; + } + + /** + * The class loader necessary to deserialize the result of a job execution, + * i.e. JobExecutionResult or Exceptions + * @return The class loader for the job id + * @throws JobRetrievalException if anything goes wrong + */ + public ClassLoader getClassLoader() throws JobRetrievalException { + if (classLoader == null) { + // lazily initializes the class loader when it is needed + classLoader = JobClient.retrieveClassLoader(jobID, getJobManager(), configuration); + LOG.info("Reconstructed class loader for Job {}", jobID); + } + return classLoader; + } + + private ActorGateway getJobManager() throws JobRetrievalException { + try { + return LeaderRetrievalUtils.retrieveLeaderGateway( + LeaderRetrievalUtils.createLeaderRetrievalService(configuration), + actorSystem, + AkkaUtils.getLookupTimeout(configuration)); + } catch (Exception e) { + throw new JobRetrievalException(jobID, "Couldn't retrieve leading JobManager.", e); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobRetrievalException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobRetrievalException.java new file mode 100644 index 0000000000000..a92bddc74f6d2 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobRetrievalException.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.runtime.client; + +import org.apache.flink.api.common.JobID; + +/** + * Exception used to indicate that a job couldn't be retrieved from the JobManager + */ +public class JobRetrievalException extends JobExecutionException { + + private static final long serialVersionUID = -42L; + + public JobRetrievalException(JobID jobID, String msg, Throwable cause) { + super(jobID, msg, cause); + } + + public JobRetrievalException(JobID jobID, String msg) { + super(jobID, msg); + } + + public JobRetrievalException(JobID jobID, Throwable cause) { + super(jobID, cause); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java new file mode 100644 index 0000000000000..2cc4a507ca05a --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.client; + +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.actor.Status; +import akka.dispatch.Futures; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.runtime.akka.ListeningBehaviour; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.AkkaActorGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.messages.JobClientMessages; +import org.apache.flink.runtime.messages.JobClientMessages.SubmitJobAndWait; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.util.SerializedThrowable; +import scala.concurrent.duration.FiniteDuration; + +import java.io.IOException; +import java.util.concurrent.Callable; + + +/** + * Actor which handles Job submission process and provides Job updates until completion. + */ +public class JobSubmissionClientActor extends JobClientActor { + + /** JobGraph which shall be submitted to the JobManager */ + private JobGraph jobGraph; + /** true if a SubmitJobSuccess message has been received */ + private boolean jobSuccessfullySubmitted = false; + + public JobSubmissionClientActor( + LeaderRetrievalService leaderRetrievalService, + FiniteDuration timeout, + boolean sysoutUpdates) { + super(leaderRetrievalService, timeout, sysoutUpdates); + } + + + @Override + public void connectedToJobManager() { + if (jobGraph != null && !jobSuccessfullySubmitted) { + // if we haven't yet submitted the job successfully + tryToSubmitJob(); + } + } + + @Override + protected Class getClientMessageClass() { + return SubmitJobAndWait.class; + } + + @Override + public void handleCustomMessage(Object message) { + // submit a job to the JobManager + if (message instanceof SubmitJobAndWait) { + // sanity check that this no job was submitted through this actor before - + // it is a one-shot actor after all + if (this.client == null) { + jobGraph = ((SubmitJobAndWait) message).jobGraph(); + if (jobGraph == null) { + LOG.error("Received null JobGraph"); + sender().tell( + decorateMessage(new Status.Failure(new Exception("JobGraph is null"))), + getSelf()); + } else { + LOG.info("Received job {} ({}).", jobGraph.getName(), jobGraph.getJobID()); + + this.client = getSender(); + + // is only successful if we already know the job manager leader + if (jobManager != null) { + tryToSubmitJob(); + } + } + } else { + // repeated submission - tell failure to sender and kill self + String msg = "Received repeated 'SubmitJobAndWait'"; + LOG.error(msg); + getSender().tell( + decorateMessage(new Status.Failure(new Exception(msg))), ActorRef.noSender()); + + terminate(); + } + } else if (message instanceof JobManagerMessages.JobSubmitSuccess) { + // job was successfully submitted :-) + LOG.info("Job {} was successfully submitted to the JobManager {}.", + ((JobManagerMessages.JobSubmitSuccess) message).jobId(), + getSender().path()); + jobSuccessfullySubmitted = true; + } else if (JobClientMessages.getSubmissionTimeout().equals(message)) { + // check if our job submission was successful in the meantime + if (!jobSuccessfullySubmitted) { + if (isClientConnected()) { + client.tell( + decorateMessage(new Status.Failure( + new JobClientActorSubmissionTimeoutException("Job submission to the JobManager timed out. " + + "You may increase '" + ConfigConstants.AKKA_CLIENT_TIMEOUT + "' in case the JobManager " + + "needs more time to configure and confirm the job submission."))), + getSelf()); + } + + // We haven't heard back from the job manager after sending the job graph to him, + // therefore terminate + terminate(); + } + } else { + LOG.error("{} received unknown message: ", getClass()); + } + } + + private void tryToSubmitJob() { + LOG.info("Sending message to JobManager {} to submit job {} ({}) and wait for progress", + jobManager.path().toString(), jobGraph.getName(), jobGraph.getJobID()); + + Futures.future(new Callable() { + @Override + public Object call() throws Exception { + ActorGateway jobManagerGateway = new AkkaActorGateway(jobManager, leaderSessionID); + + LOG.info("Upload jar files to job manager {}.", jobManager.path()); + + try { + jobGraph.uploadUserJars(jobManagerGateway, timeout); + } catch (IOException exception) { + getSelf().tell( + decorateMessage(new JobManagerMessages.JobResultFailure( + new SerializedThrowable( + new JobSubmissionException( + jobGraph.getJobID(), + "Could not upload the jar files to the job manager.", + exception) + ) + )), + ActorRef.noSender() + ); + } + + LOG.info("Submit job to the job manager {}.", jobManager.path()); + + jobManager.tell( + decorateMessage( + new JobManagerMessages.SubmitJob( + jobGraph, + ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES)), + getSelf()); + + // issue a SubmissionTimeout message to check that we submit the job within + // the given timeout + getContext().system().scheduler().scheduleOnce( + timeout, + getSelf(), + decorateMessage(JobClientMessages.getSubmissionTimeout()), + getContext().dispatcher(), + ActorRef.noSender()); + + return null; + } + }, getContext().dispatcher()); + } + + + public static Props createActorProps( + LeaderRetrievalService leaderRetrievalService, + FiniteDuration timeout, + boolean sysoutUpdates) { + return Props.create( + JobSubmissionClientActor.class, + leaderRetrievalService, + timeout, + sysoutUpdates); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 7a94c0ff18ca2..d7e40a3e05e53 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -931,6 +931,7 @@ public void prepareForArchiving() { intermediateResults.clear(); currentExecutions.clear(); requiredJarFiles.clear(); + requiredClasspaths.clear(); jobStatusListeners.clear(); executionListeners.clear(); diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala index 67d7a067ce73e..a84650c42175b 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala @@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmanager import akka.actor.ActorRef import org.apache.flink.runtime.akka.ListeningBehaviour + /** * Utility class to store job information on the [[JobManager]]. The JobInfo stores which actor * submitted the job, when the start time and, if already terminated, the end time was. @@ -37,11 +38,14 @@ import org.apache.flink.runtime.akka.ListeningBehaviour * @param start Starting time */ class JobInfo( - val client: ActorRef, - val listeningBehaviour: ListeningBehaviour, + client: ActorRef, + listeningBehaviour: ListeningBehaviour, val start: Long, val sessionTimeout: Long) extends Serializable { + val clients = scala.collection.mutable.HashSet[(ActorRef, ListeningBehaviour)]() + clients += ((client, listeningBehaviour)) + var sessionAlive = sessionTimeout > 0 var lastActive = 0L @@ -58,10 +62,62 @@ class JobInfo( } } - override def toString = s"JobInfo(client: $client ($listeningBehaviour), start: $start)" + + /** + * Notifies all clients by sending a message + * @param message the message to send + */ + def notifyClients(message: Any) = { + clients foreach { + case (clientActor, _) => + clientActor ! message + } + } + + /** + * Notifies all clients which are not of type detached + * @param message the message to sent to non-detached clients + */ + def notifyNonDetachedClients(message: Any) = { + clients foreach { + case (clientActor, ListeningBehaviour.DETACHED) => + // do nothing + case (clientActor, _) => + clientActor ! message + } + } + + /** + * Sends a message to job clients that match the listening behavior + * @param message the message to send to all clients + * @param listeningBehaviour the desired listening behaviour + */ + def notifyClients(message: Any, listeningBehaviour: ListeningBehaviour) = { + clients foreach { + case (clientActor, `listeningBehaviour`) => + clientActor ! message + case _ => + } + } def setLastActive() = lastActive = System.currentTimeMillis() + + + override def toString = s"JobInfo(clients: ${clients.toString()}, start: $start)" + + override def equals(other: Any): Boolean = other match { + case that: JobInfo => + clients == that.clients && + start == that.start && + sessionTimeout == that.sessionTimeout + case _ => false + } + + override def hashCode(): Int = { + val state = Seq(clients, start, sessionTimeout) + state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b) + } } object JobInfo{ diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 0587987294729..d35fb0aed25c0 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -19,18 +19,16 @@ package org.apache.flink.runtime.jobmanager import java.io.{File, IOException} -import java.net.{BindException, ServerSocket, UnknownHostException, InetAddress, InetSocketAddress} +import java.net.{BindException, InetAddress, InetSocketAddress, ServerSocket, UnknownHostException} import java.lang.management.ManagementFactory import java.util.UUID import java.util.concurrent.{ExecutorService, TimeUnit, TimeoutException} import javax.management.ObjectName -import akka.actor.Status.{Success, Failure} +import akka.actor.Status.{Failure, Success} import akka.actor._ import akka.pattern.ask - import grizzled.slf4j.Logger - import org.apache.flink.api.common.{ExecutionConfig, JobID} import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration} import org.apache.flink.core.fs.FileSystem @@ -41,8 +39,8 @@ import org.apache.flink.runtime.accumulators.AccumulatorSnapshot import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour} import org.apache.flink.runtime.blob.BlobServer import org.apache.flink.runtime.checkpoint._ -import org.apache.flink.runtime.checkpoint.savepoint.{SavepointLoader, SavepointStoreFactory, SavepointStore} -import org.apache.flink.runtime.checkpoint.stats.{CheckpointStatsTracker, SimpleCheckpointStatsTracker, DisabledCheckpointStatsTracker} +import org.apache.flink.runtime.checkpoint.savepoint.{SavepointLoader, SavepointStore, SavepointStoreFactory} +import org.apache.flink.runtime.checkpoint.stats.{CheckpointStatsTracker, DisabledCheckpointStatsTracker, SimpleCheckpointStatsTracker} import org.apache.flink.runtime.client._ import org.apache.flink.runtime.execution.SuppressRestartsException import org.apache.flink.runtime.clusterframework.FlinkResourceManager @@ -58,24 +56,22 @@ import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertexID} import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler} import org.apache.flink.runtime.leaderelection.{LeaderContender, LeaderElectionService, StandaloneLeaderElectionService} - import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged import org.apache.flink.runtime.messages.JobManagerMessages._ -import org.apache.flink.runtime.messages.Messages.{Disconnect, Acknowledge} +import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect} import org.apache.flink.runtime.messages.RegistrationMessages._ import org.apache.flink.runtime.messages.TaskManagerMessages.{Heartbeat, SendStackTrace} import org.apache.flink.runtime.messages.TaskMessages.{PartitionState, UpdateTaskExecutionState} import org.apache.flink.runtime.messages.accumulators.{AccumulatorMessage, AccumulatorResultStringsFound, AccumulatorResultsErroneous, AccumulatorResultsFound, RequestAccumulatorResults, RequestAccumulatorResultsStringified} -import org.apache.flink.runtime.messages.checkpoint.{DeclineCheckpoint, AbstractCheckpointMessage, AcknowledgeCheckpoint} - +import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, AcknowledgeCheckpoint, DeclineCheckpoint} import org.apache.flink.runtime.messages.webmonitor.InfoMessage import org.apache.flink.runtime.messages.webmonitor._ import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry} import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup import org.apache.flink.runtime.process.ProcessReaper -import org.apache.flink.runtime.query.{UnknownKvStateLocation, KvStateMessage} -import org.apache.flink.runtime.query.KvStateMessage.{NotifyKvStateUnregistered, LookupKvStateLocation, NotifyKvStateRegistered} +import org.apache.flink.runtime.query.{KvStateMessage, UnknownKvStateLocation} +import org.apache.flink.runtime.query.KvStateMessage.{LookupKvStateLocation, NotifyKvStateRegistered, NotifyKvStateUnregistered} import org.apache.flink.runtime.security.SecurityUtils import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner import org.apache.flink.runtime.taskmanager.TaskManager @@ -83,7 +79,6 @@ import org.apache.flink.runtime.util._ import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils} import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages} import org.apache.flink.util.{ConfigurationUtil, InstantiationUtil, NetUtils} - import org.jboss.netty.channel.ChannelException import scala.annotation.tailrec @@ -479,6 +474,22 @@ class JobManager( submitJob(jobGraph, jobInfo) + case RegisterJobClient(jobID, listeningBehaviour) => + val client = sender() + currentJobs.get(jobID) match { + case Some((executionGraph, jobInfo)) => + log.info("Registering client for job $jobID") + jobInfo.clients += ((client, listeningBehaviour)) + val listener = new StatusListenerMessenger(client, leaderSessionID.orNull) + executionGraph.registerJobStatusListener(listener) + if (listeningBehaviour == ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES) { + executionGraph.registerExecutionListener(listener) + } + client ! decorateMessage(RegisterJobClientSuccess(jobID)) + case None => + client ! decorateMessage(JobNotFound(jobID)) + } + case RecoverSubmittedJob(submittedJobGraph) => if (!currentJobs.contains(submittedJobGraph.getJobId)) { submitJob( @@ -788,50 +799,53 @@ class JobManager( } // is the client waiting for the job result? - if (jobInfo.listeningBehaviour != ListeningBehaviour.DETACHED) { - newJobStatus match { - case JobStatus.FINISHED => - try { - val accumulatorResults = executionGraph.getAccumulatorsSerialized() - val result = new SerializedJobExecutionResult( - jobID, - jobInfo.duration, - accumulatorResults) - - jobInfo.client ! decorateMessage(JobResultSuccess(result)) - } catch { - case e: Exception => - log.error(s"Cannot fetch final accumulators for job $jobID", e) - val exception = new JobExecutionException(jobID, - "Failed to retrieve accumulator results.", e) + newJobStatus match { + case JobStatus.FINISHED => + try { + val accumulatorResults = executionGraph.getAccumulatorsSerialized() + val result = new SerializedJobExecutionResult( + jobID, + jobInfo.duration, + accumulatorResults) + + jobInfo.notifyNonDetachedClients( + decorateMessage(JobResultSuccess(result))) + } catch { + case e: Exception => + log.error(s"Cannot fetch final accumulators for job $jobID", e) + val exception = new JobExecutionException(jobID, + "Failed to retrieve accumulator results.", e) - jobInfo.client ! decorateMessage(JobResultFailure( - new SerializedThrowable(exception))) - } + jobInfo.notifyNonDetachedClients( + decorateMessage(JobResultFailure( + new SerializedThrowable(exception)))) + } - case JobStatus.CANCELED => - // the error may be packed as a serialized throwable - val unpackedError = SerializedThrowable.get( - error, executionGraph.getUserClassLoader()) + case JobStatus.CANCELED => + // the error may be packed as a serialized throwable + val unpackedError = SerializedThrowable.get( + error, executionGraph.getUserClassLoader()) - jobInfo.client ! decorateMessage(JobResultFailure( + jobInfo.notifyNonDetachedClients( + decorateMessage(JobResultFailure( new SerializedThrowable( - new JobCancellationException(jobID, "Job was cancelled.", unpackedError)))) + new JobCancellationException(jobID, "Job was cancelled.", unpackedError))))) - case JobStatus.FAILED => - val unpackedError = SerializedThrowable.get( - error, executionGraph.getUserClassLoader()) + case JobStatus.FAILED => + val unpackedError = SerializedThrowable.get( + error, executionGraph.getUserClassLoader()) - jobInfo.client ! decorateMessage(JobResultFailure( + jobInfo.notifyNonDetachedClients( + decorateMessage(JobResultFailure( new SerializedThrowable( - new JobExecutionException(jobID, "Job execution failed.", unpackedError)))) - - case x => - val exception = new JobExecutionException(jobID, s"$x is not a terminal state.") - jobInfo.client ! decorateMessage(JobResultFailure( - new SerializedThrowable(exception))) - throw exception - } + new JobExecutionException(jobID, "Job execution failed.", unpackedError))))) + + case x => + val exception = new JobExecutionException(jobID, s"$x is not a terminal state.") + jobInfo.notifyNonDetachedClients( + decorateMessage(JobResultFailure( + new SerializedThrowable(exception)))) + throw exception } }(context.dispatcher) } @@ -919,6 +933,18 @@ class JobManager( archive forward decorateMessage(RequestJob(jobID)) } + case RequestClassloadingProps(jobID) => + currentJobs.get(jobID) match { + case Some((graph, jobInfo)) => + sender() ! decorateMessage( + ClassloadingProps( + libraryCacheManager.getBlobServerPort, + graph.getRequiredJarFiles, + graph.getRequiredClasspaths)) + case None => + sender() ! decorateMessage(JobNotFound(jobID)) + } + case RequestBlobManagerPort => sender ! decorateMessage(libraryCacheManager.getBlobServerPort) @@ -1052,11 +1078,10 @@ class JobManager( */ private def submitJob(jobGraph: JobGraph, jobInfo: JobInfo, isRecovery: Boolean = false): Unit = { if (jobGraph == null) { - jobInfo.client ! decorateMessage(JobResultFailure( - new SerializedThrowable( - new JobSubmissionException(null, "JobGraph must not be null.") - ) - )) + jobInfo.notifyClients( + decorateMessage(JobResultFailure( + new SerializedThrowable( + new JobSubmissionException(null, "JobGraph must not be null."))))) } else { val jobId = jobGraph.getJobID @@ -1259,13 +1284,15 @@ class JobManager( executionGraph.registerJobStatusListener( new StatusListenerMessenger(self, leaderSessionID.orNull)) - if (jobInfo.listeningBehaviour == ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES) { + jobInfo.clients foreach { // the sender wants to be notified about state changes - val listener = new StatusListenerMessenger(jobInfo.client, leaderSessionID.orNull) - - executionGraph.registerExecutionListener(listener) - executionGraph.registerJobStatusListener(listener) + case (client, ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES) => + val listener = new StatusListenerMessenger(client, leaderSessionID.orNull) + executionGraph.registerExecutionListener(listener) + executionGraph.registerJobStatusListener(listener) + case _ => // do nothing } + } catch { case t: Throwable => log.error(s"Failed to submit job $jobId ($jobName)", t) @@ -1283,7 +1310,8 @@ class JobManager( new JobExecutionException(jobId, s"Failed to submit job $jobId ($jobName)", t) } - jobInfo.client ! decorateMessage(JobResultFailure(new SerializedThrowable(rt))) + jobInfo.notifyClients( + decorateMessage(JobResultFailure(new SerializedThrowable(rt)))) return } @@ -1338,7 +1366,8 @@ class JobManager( } } - jobInfo.client ! decorateMessage(JobSubmitSuccess(jobGraph.getJobID)) + jobInfo.notifyClients( + decorateMessage(JobSubmitSuccess(jobGraph.getJobID))) if (leaderElectionService.hasLeadership) { // There is a small chance that multiple job managers schedule the same job after if @@ -1740,10 +1769,10 @@ class JobManager( future { eg.suspend(cause) - if (jobInfo.listeningBehaviour != ListeningBehaviour.DETACHED) { - jobInfo.client ! decorateMessage( - Failure(new JobExecutionException(jobID, "All jobs are cancelled and cleared.", cause))) - } + jobInfo.notifyNonDetachedClients( + decorateMessage( + Failure( + new JobExecutionException(jobID, "All jobs are cancelled and cleared.", cause)))) }(context.dispatcher) } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobClientMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobClientMessages.scala index a60fa7aa093ce..1f29e322ff450 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobClientMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobClientMessages.scala @@ -21,6 +21,7 @@ package org.apache.flink.runtime.messages import java.util.UUID import akka.actor.ActorRef +import org.apache.flink.api.common.JobID import org.apache.flink.runtime.jobgraph.JobGraph /** @@ -29,7 +30,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph object JobClientMessages { /** - * This message is sent to the JobClient (via ask) to submit a job and + * This message is sent to the JobClientActor (via ask) to submit a job and * get a response when the job execution has finished. * * The response to this message is a @@ -40,15 +41,11 @@ object JobClientMessages { case class SubmitJobAndWait(jobGraph: JobGraph) /** - * This message is sent to the JobClient (via ask) to submit a job and - * return as soon as the result of the submit operation is known. - * - * The response to this message is a - * [[org.apache.flink.api.common.JobSubmissionResult]] - * - * @param jobGraph The job to be executed. - */ - case class SubmitJobDetached(jobGraph: JobGraph) + * This message is sent to the JobClientActor to ask it to register at the JobManager + * and then return once the job execution is complete. + * @param jobID The job id + */ + case class AttachToJobAndWait(jobID: JobID) /** Notifies the JobClientActor about a new leader address and a leader session ID. * @@ -66,9 +63,13 @@ object JobClientMessages { /** Message which is triggered when the submission timeout has been reached. */ case object SubmissionTimeout extends RequiresLeaderSessionID - /** Messaeg which is triggered when the connection timeout has been reached. */ + /** Message which is triggered when the JobClient registration at the JobManager times out */ + case object RegistrationTimeout extends RequiresLeaderSessionID + + /** Message which is triggered when the connection timeout has been reached. */ case object ConnectionTimeout extends RequiresLeaderSessionID def getSubmissionTimeout(): AnyRef = SubmissionTimeout + def getRegistrationTimeout(): AnyRef = RegistrationTimeout def getConnectionTimeout(): AnyRef = ConnectionTimeout } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala index 14f72b0b3dd8d..40c4dcf75f599 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala @@ -18,6 +18,7 @@ package org.apache.flink.runtime.messages +import java.net.URL import java.util.UUID import akka.actor.ActorRef @@ -68,6 +69,19 @@ object JobManagerMessages { listeningBehaviour: ListeningBehaviour) extends RequiresLeaderSessionID + /** + * Registers the sender of the message as the client for the provided job identifier. + * This message is acknowledged by the JobManager with [[RegisterJobClientSuccess]] + * or [[JobNotFound]] if the job was not running. + * @param jobID The job id of the job + * @param listeningBehaviour The types of updates which will be sent to the sender + * after registration + */ + case class RegisterJobClient( + jobID: JobID, + listeningBehaviour: ListeningBehaviour) + extends RequiresLeaderSessionID + /** * Triggers the recovery of the job with the given ID. * @@ -194,6 +208,23 @@ object JobManagerMessages { */ case object RequestTotalNumberOfSlots + /** + * Requests all entities necessary for reconstructing a job class loader + * May respond with [[ClassloadingProps]] or [[JobNotFound]] + * @param jobId The job id of the registered job + */ + case class RequestClassloadingProps(jobId: JobID) + + /** + * Response to [[RequestClassloadingProps]] + * @param blobManagerPort The port of the blobManager + * @param requiredJarFiles The blob keys of the required jar files + * @param requiredClasspaths The urls of the required classpaths + */ + case class ClassloadingProps(blobManagerPort: Integer, + requiredJarFiles: java.util.List[BlobKey], + requiredClasspaths: java.util.List[URL]) + /** * Requests the port of the blob manager from the job manager. The result is sent back to the * sender as an [[Int]]. @@ -217,17 +248,28 @@ object JobManagerMessages { */ case class JobSubmitSuccess(jobId: JobID) + /** + * Denotes a successful registration of a JobClientActor for a running job + * @param jobId The job id of the registered job + */ + case class RegisterJobClientSuccess(jobId: JobID) + + /** + * Denotes messages which contain the result of a completed job execution + */ + sealed trait JobResultMessage + /** * Denotes a successful job execution. * @param result The result of the job execution, in serialized form. */ - case class JobResultSuccess(result: SerializedJobExecutionResult) + case class JobResultSuccess(result: SerializedJobExecutionResult) extends JobResultMessage /** * Denotes an unsuccessful job execution. * @param cause The exception that caused the job to fail, in serialized form. */ - case class JobResultFailure(cause: SerializedThrowable) + case class JobResultFailure(cause: SerializedThrowable) extends JobResultMessage sealed trait CancellationResponse{ @@ -316,7 +358,7 @@ object JobManagerMessages { /** * Denotes that there is no job with [[jobID]] retrievable. This message can be the response of - * [[RequestJob]] or [[RequestJobStatus]]. + * [[RequestJob]], [[RequestJobStatus]] or [[RegisterJobClient]]. * * @param jobID */ diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala index 9640fcdda05a1..df4f95a8316eb 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala @@ -26,7 +26,7 @@ import org.apache.flink.runtime.execution.ExecutionState import org.apache.flink.runtime.jobgraph.JobStatus import org.apache.flink.runtime.jobmanager.JobManager import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged -import org.apache.flink.runtime.messages.JobManagerMessages.GrantLeadership +import org.apache.flink.runtime.messages.JobManagerMessages.{GrantLeadership, RegisterJobClient} import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect} import org.apache.flink.runtime.messages.RegistrationMessages.RegisterTaskManager import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat @@ -67,6 +67,8 @@ trait TestingJobManagerLike extends FlinkActor { override def compare(x: (Int, ActorRef), y: (Int, ActorRef)): Int = y._1 - x._1 }) + val waitForClient = scala.collection.mutable.HashSet[ActorRef]() + val waitForShutdown = scala.collection.mutable.HashSet[ActorRef]() var disconnectDisabled = false @@ -328,6 +330,14 @@ trait TestingJobManagerLike extends FlinkActor { waitForLeader.clear() + case NotifyWhenClientConnects => + waitForClient += sender() + sender() ! true + + case msg: RegisterJobClient => + super.handleMessage(msg) + waitForClient.foreach(_ ! true) + case NotifyWhenAtLeastNumTaskManagerAreRegistered(numRegisteredTaskManager) => if (that.instanceManager.getNumberOfRegisteredTaskManagers >= numRegisteredTaskManager) { // there are already at least numRegisteredTaskManager registered --> send Acknowledge diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala index a411c8b7ebdc4..a88ed4386c246 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala @@ -82,6 +82,11 @@ object TestingJobManagerMessages { */ case object NotifyWhenLeader + /** + * Notifies the sender when the [[TestingJobManager]] receives new clients for jobs + */ + case object NotifyWhenClientConnects + /** * Registers to be notified by an [[org.apache.flink.runtime.messages.Messages.Acknowledge]] * message when at least numRegisteredTaskManager have registered at the JobManager. @@ -111,6 +116,7 @@ object TestingJobManagerMessages { case class ResponseSavepoint(savepoint: Savepoint) def getNotifyWhenLeader(): AnyRef = NotifyWhenLeader + def getNotifyWhenClientConnects(): AnyRef = NotifyWhenClientConnects def getDisablePostStop(): AnyRef = DisablePostStop } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java index 073164c030a41..2adf7eb807f0b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java @@ -25,17 +25,17 @@ import akka.pattern.Patterns; import akka.testkit.JavaTestKit; import akka.util.Timeout; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.FlinkUntypedActor; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; import org.apache.flink.runtime.messages.JobClientMessages; -import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.messages.JobClientMessages.AttachToJobAndWait; import org.apache.flink.runtime.messages.Messages; import org.apache.flink.util.TestLogger; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import scala.concurrent.Await; @@ -45,6 +45,8 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; +import static org.apache.flink.runtime.messages.JobManagerMessages.*; + public class JobClientActorTest extends TestLogger { private static ActorSystem system; @@ -62,8 +64,8 @@ public static void teardown() { } /** Tests that a {@link JobClientActorSubmissionTimeoutException} is thrown when the job cannot - * be submitted by the JobClientActor. This is here the case, because the started JobManager - * never replies to a SubmitJob message. + * be submitted by the JobSubmissionClientActor. This is here the case, because the started JobManager + * never replies to a {@link SubmitJob} message. * * @throws Exception */ @@ -84,7 +86,7 @@ public void testSubmissionTimeout() throws Exception { leaderSessionID ); - Props jobClientActorProps = JobClientActor.createJobClientActorProps( + Props jobClientActorProps = JobSubmissionClientActor.createActorProps( testingLeaderRetrievalService, jobClientActorTimeout, false); @@ -100,19 +102,56 @@ public void testSubmissionTimeout() throws Exception { Await.result(jobExecutionResult, timeout); } + + /** Tests that a {@link JobClientActorRegistrationTimeoutException} is thrown when the registration + * cannot be performed at the JobManager by the JobAttachmentClientActor. This is here the case, because the + * started JobManager never replies to a {@link RegisterJobClient} message. + */ + @Test(expected=JobClientActorRegistrationTimeoutException.class) + public void testRegistrationTimeout() throws Exception { + FiniteDuration jobClientActorTimeout = new FiniteDuration(5, TimeUnit.SECONDS); + FiniteDuration timeout = jobClientActorTimeout.$times(2); + + UUID leaderSessionID = UUID.randomUUID(); + + ActorRef jobManager = system.actorOf( + Props.create( + PlainActor.class, + leaderSessionID)); + + TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService( + jobManager.path().toString(), + leaderSessionID + ); + + Props jobClientActorProps = JobAttachmentClientActor.createActorProps( + testingLeaderRetrievalService, + jobClientActorTimeout, + false); + + ActorRef jobClientActor = system.actorOf(jobClientActorProps); + + Future jobExecutionResult = Patterns.ask( + jobClientActor, + new JobClientMessages.AttachToJobAndWait(testJobGraph.getJobID()), + new Timeout(timeout)); + + Await.result(jobExecutionResult, timeout); + } + /** Tests that a {@link org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException} - * is thrown when the JobClientActor wants to submit a job but has not connected to a JobManager. + * is thrown when the JobSubmissionClientActor wants to submit a job but has not connected to a JobManager. * * @throws Exception */ @Test(expected=JobClientActorConnectionTimeoutException.class) - public void testConnectionTimeoutWithoutJobManager() throws Exception { + public void testConnectionTimeoutWithoutJobManagerForSubmission() throws Exception { FiniteDuration jobClientActorTimeout = new FiniteDuration(5, TimeUnit.SECONDS); FiniteDuration timeout = jobClientActorTimeout.$times(2); TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService(); - Props jobClientActorProps = JobClientActor.createJobClientActorProps( + Props jobClientActorProps = JobSubmissionClientActor.createActorProps( testingLeaderRetrievalService, jobClientActorTimeout, false); @@ -127,6 +166,32 @@ public void testConnectionTimeoutWithoutJobManager() throws Exception { Await.result(jobExecutionResult, timeout); } + /** Tests that a {@link org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException} + * is thrown when the JobAttachmentClientActor attach to a job at the JobManager + * but has not connected to a JobManager. + */ + @Test(expected=JobClientActorConnectionTimeoutException.class) + public void testConnectionTimeoutWithoutJobManagerForRegistration() throws Exception { + FiniteDuration jobClientActorTimeout = new FiniteDuration(5, TimeUnit.SECONDS); + FiniteDuration timeout = jobClientActorTimeout.$times(2); + + TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService(); + + Props jobClientActorProps = JobAttachmentClientActor.createActorProps( + testingLeaderRetrievalService, + jobClientActorTimeout, + false); + + ActorRef jobClientActor = system.actorOf(jobClientActorProps); + + Future jobExecutionResult = Patterns.ask( + jobClientActor, + new JobClientMessages.AttachToJobAndWait(testJobGraph.getJobID()), + new Timeout(timeout)); + + Await.result(jobExecutionResult, timeout); + } + /** Tests that a {@link org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException} * is thrown after a successful job submission if the JobManager dies. * @@ -149,7 +214,7 @@ public void testConnectionTimeoutAfterJobSubmission() throws Exception { leaderSessionID ); - Props jobClientActorProps = JobClientActor.createJobClientActorProps( + Props jobClientActorProps = JobSubmissionClientActor.createActorProps( testingLeaderRetrievalService, jobClientActorTimeout, false); @@ -170,6 +235,91 @@ public void testConnectionTimeoutAfterJobSubmission() throws Exception { Await.result(jobExecutionResult, timeout); } + /** Tests that a {@link JobClientActorConnectionTimeoutException} + * is thrown after a successful registration of the client at the JobManager. + */ + @Test(expected=JobClientActorConnectionTimeoutException.class) + public void testConnectionTimeoutAfterJobRegistration() throws Exception { + FiniteDuration jobClientActorTimeout = new FiniteDuration(5, TimeUnit.SECONDS); + FiniteDuration timeout = jobClientActorTimeout.$times(2); + + UUID leaderSessionID = UUID.randomUUID(); + + ActorRef jobManager = system.actorOf( + Props.create( + JobAcceptingActor.class, + leaderSessionID)); + + TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService( + jobManager.path().toString(), + leaderSessionID + ); + + Props jobClientActorProps = JobAttachmentClientActor.createActorProps( + testingLeaderRetrievalService, + jobClientActorTimeout, + false); + + ActorRef jobClientActor = system.actorOf(jobClientActorProps); + + Future jobExecutionResult = Patterns.ask( + jobClientActor, + new AttachToJobAndWait(testJobGraph.getJobID()), + new Timeout(timeout)); + + Future waitFuture = Patterns.ask(jobManager, new RegisterTest(), new Timeout(timeout)); + + Await.result(waitFuture, timeout); + + jobManager.tell(PoisonPill.getInstance(), ActorRef.noSender()); + + Await.result(jobExecutionResult, timeout); + } + + + /** Tests that JobClient throws an Exception if the JobClientActor dies and can't answer to + * {@link akka.actor.Identify} message anymore. + */ + @Test + public void testGuaranteedAnswerIfJobClientDies() throws Exception { + FiniteDuration timeout = new FiniteDuration(2, TimeUnit.SECONDS); + + UUID leaderSessionID = UUID.randomUUID(); + + ActorRef jobManager = system.actorOf( + Props.create( + JobAcceptingActor.class, + leaderSessionID)); + + TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService( + jobManager.path().toString(), + leaderSessionID + ); + + JobListeningContext jobListeningContext = + JobClient.submitJob( + system, + testingLeaderRetrievalService, + testJobGraph, + timeout, + false, + getClass().getClassLoader()); + + Future waitFuture = Patterns.ask(jobManager, new RegisterTest(), new Timeout(timeout)); + Await.result(waitFuture, timeout); + + // kill the job client actor which has been registered at the JobManager + jobListeningContext.getJobClientActor().tell(PoisonPill.getInstance(), ActorRef.noSender()); + + try { + // should not block but return an error + JobClient.awaitJobResult(jobListeningContext); + Assert.fail(); + } catch (JobExecutionException e) { + // this is what we want + } + } + public static class PlainActor extends FlinkUntypedActor { private final UUID leaderSessionID; @@ -180,7 +330,6 @@ public PlainActor(UUID leaderSessionID) { @Override protected void handleMessage(Object message) throws Exception { - } @Override @@ -200,17 +349,29 @@ public JobAcceptingActor(UUID leaderSessionID) { @Override protected void handleMessage(Object message) throws Exception { - if (message instanceof JobManagerMessages.SubmitJob) { + if (message instanceof SubmitJob) { getSender().tell( - new JobManagerMessages.JobSubmitSuccess(((JobManagerMessages.SubmitJob) message).jobGraph().getJobID()), + new JobSubmitSuccess(((SubmitJob) message).jobGraph().getJobID()), getSelf()); jobAccepted = true; - if(testFuture != ActorRef.noSender()) { + if (testFuture != ActorRef.noSender()) { testFuture.tell(Messages.getAcknowledge(), getSelf()); } - } else if (message instanceof RegisterTest) { + } + else if (message instanceof RegisterJobClient) { + getSender().tell( + new RegisterJobClientSuccess(((RegisterJobClient) message).jobID()), + getSelf()); + + jobAccepted = true; + + if (testFuture != ActorRef.noSender()) { + testFuture.tell(Messages.getAcknowledge(), getSelf()); + } + } + else if (message instanceof RegisterTest) { testFuture = getSender(); if (jobAccepted) { @@ -226,4 +387,5 @@ protected UUID getLeaderSessionID() { } public static class RegisterTest{} + } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java index c71bd355d01f8..426dfbafb574c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java @@ -286,7 +286,6 @@ protected void verifyJobGraphs(SubmittedJobGraph expected, SubmittedJobGraph act JobInfo expectedJobInfo = expected.getJobInfo(); JobInfo actualJobInfo = actual.getJobInfo(); - assertEquals(expectedJobInfo.listeningBehaviour(), actualJobInfo.listeningBehaviour()); - assertEquals(expectedJobInfo.start(), actualJobInfo.start()); + assertEquals(expectedJobInfo, actualJobInfo); } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java new file mode 100644 index 0000000000000..db17ee870e207 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.test.clients.examples; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; +import org.apache.flink.api.common.JobID; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.StandaloneClusterClient; +import org.apache.flink.runtime.client.JobRetrievalException; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.minicluster.FlinkMiniCluster; +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; +import org.apache.flink.test.util.ForkableFlinkMiniCluster; +import org.apache.flink.util.TestLogger; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import scala.collection.Seq; + +import java.util.concurrent.Semaphore; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + + +/** + * Tests retrieval of a job from a running Flink cluster + */ +public class JobRetrievalITCase extends TestLogger { + + private static final Semaphore lock = new Semaphore(1); + + private static FlinkMiniCluster cluster; + + @BeforeClass + public static void before() { + cluster = new ForkableFlinkMiniCluster(new Configuration(), false); + cluster.start(); + } + + @AfterClass + public static void after() { + cluster.stop(); + cluster = null; + } + + @Test + public void testJobRetrieval() throws Exception { + final JobID jobID = new JobID(); + + final JobVertex imalock = new JobVertex("imalock"); + imalock.setInvokableClass(SemaphoreInvokable.class); + + final JobGraph jobGraph = new JobGraph(jobID, "testjob", imalock); + + final ClusterClient client = new StandaloneClusterClient(cluster.configuration()); + + // acquire the lock to make sure that the job cannot complete until the job client + // has been attached in resumingThread + lock.acquire(); + client.runDetached(jobGraph, JobRetrievalITCase.class.getClassLoader()); + + final Thread resumingThread = new Thread(new Runnable() { + @Override + public void run() { + try { + assertNotNull(client.retrieveJob(jobID)); + } catch (JobExecutionException e) { + fail(e.getMessage()); + } + } + }); + + final Seq actorSystemSeq = cluster.jobManagerActorSystems().get(); + final ActorSystem actorSystem = actorSystemSeq.last(); + JavaTestKit testkit = new JavaTestKit(actorSystem); + + final ActorRef jm = cluster.getJobManagersAsJava().get(0); + // wait until client connects + jm.tell(TestingJobManagerMessages.getNotifyWhenClientConnects(), testkit.getRef()); + // confirm registration + testkit.expectMsgEquals(true); + + // kick off resuming + resumingThread.start(); + + // wait for client to connect + testkit.expectMsgEquals(true); + // client has connected, we can release the lock + lock.release(); + + resumingThread.join(); + } + + @Test + public void testNonExistingJobRetrieval() throws Exception { + final JobID jobID = new JobID(); + ClusterClient client = new StandaloneClusterClient(cluster.configuration()); + + try { + client.retrieveJob(jobID); + fail(); + } catch (JobRetrievalException e) { + // this is what we want + } + } + + + public static class SemaphoreInvokable extends AbstractInvokable { + + @Override + public void invoke() throws Exception { + lock.acquire(); + } + } + +}