From 9b07445fa7f1159f065dd3c93a904053957b2a1e Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Thu, 18 Aug 2016 16:04:35 +0200 Subject: [PATCH] [FLINK-4273] Modify JobClient to attach to running jobs These changes are required for FLINK-4272 (introduce a JobClient class for job control). Essentially, we want to be able to re-attach to a running job and monitor it. It shouldn't make any difference whether we just submitted the job or we recover it from an existing JobID. This PR modifies the JobClientActor to support two different operation modes: a) submitJob and monitor b) re-attach to job and monitor The JobClient class has been updated with methods to access this functionality. Before the class just had `submitJobAndWait` and `submitJobDetached`. Now, it has the additional methods `submitJob`, `attachToRunningJob`, and `awaitJobResult`. The job submission has been split up in two phases: 1a. submitJob(..) Submit job and return a future which can be completed to get the result with `awaitJobResult` 1b. attachToRunningJob(..) Re-attach to a runnning job, reconstruct its class loader, and return a future which can be completed with `awaitJobResult` 2. awaitJobResult(..) Blocks until the returned future from either `submitJob` or `attachToRunningJob` has been completed - split up JobClientActor into a base class and two implementations - JobClient: on waiting check JobClientActor liveness - lazily reconstruct user class loader - add additional tests for JobClientActor - add test case to test resuming of jobs This closes #2313 --- .../flink/client/program/ClusterClient.java | 41 ++- .../src/test/resources/log4j-test.properties | 2 +- .../flink/api/common/JobExecutionResult.java | 4 +- .../client/JobAttachmentClientActor.java | 171 ++++++++++ .../flink/runtime/client/JobClient.java | 292 +++++++++++++++--- .../flink/runtime/client/JobClientActor.java | 281 +++++------------ ...ientActorRegistrationTimeoutException.java | 35 +++ .../runtime/client/JobListeningContext.java | 145 +++++++++ .../runtime/client/JobRetrievalException.java | 42 +++ .../client/JobSubmissionClientActor.java | 192 ++++++++++++ .../executiongraph/ExecutionGraph.java | 1 + .../flink/runtime/jobmanager/JobInfo.scala | 62 +++- .../flink/runtime/jobmanager/JobManager.scala | 161 ++++++---- .../runtime/messages/JobClientMessages.scala | 23 +- .../runtime/messages/JobManagerMessages.scala | 48 ++- .../testingUtils/TestingJobManagerLike.scala | 12 +- .../TestingJobManagerMessages.scala | 6 + .../runtime/client/JobClientActorTest.java | 190 +++++++++++- ...ooKeeperSubmittedJobGraphsStoreITCase.java | 3 +- .../clients/examples/JobRetrievalITCase.java | 138 +++++++++ 20 files changed, 1499 insertions(+), 350 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActorRegistrationTimeoutException.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/client/JobRetrievalException.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java create mode 100644 flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java index c3c666b1686e1..292da70d8c82b 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java @@ -47,6 +47,8 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.client.JobClient; import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.client.JobListeningContext; +import org.apache.flink.runtime.client.JobRetrievalException; import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -428,6 +430,39 @@ public JobSubmissionResult runDetached(JobGraph jobGraph, ClassLoader classLoade } } + /** + * Reattaches to a running from from the supplied job id + * @param jobID The job id of the job to attach to + * @return The JobExecutionResult for the jobID + * @throws JobExecutionException if an error occurs during monitoring the job execution + */ + public JobExecutionResult retrieveJob(JobID jobID) throws JobExecutionException { + final LeaderRetrievalService leaderRetrievalService; + try { + leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig); + } catch (Exception e) { + throw new JobRetrievalException(jobID, "Could not create the leader retrieval service", e); + } + + ActorGateway jobManagerGateway; + try { + jobManagerGateway = getJobManagerGateway(); + } catch (Exception e) { + throw new JobRetrievalException(jobID, "Could not retrieve the JobManager Gateway"); + } + + final JobListeningContext listeningContext = JobClient.attachToRunningJob( + jobID, + jobManagerGateway, + flinkConfig, + actorSystemLoader.get(), + leaderRetrievalService, + timeout, + printStatusDuringExecution); + + return JobClient.awaitJobResult(listeningContext); + } + /** * Cancels a job identified by the job id. * @param jobId the job id @@ -446,11 +481,11 @@ public void cancel(JobID jobId) throws Exception { final Object result = Await.result(response, timeout); if (result instanceof JobManagerMessages.CancellationSuccess) { - LOG.info("Job cancellation with ID " + jobId + " succeeded."); + logAndSysout("Job cancellation with ID " + jobId + " succeeded."); } else if (result instanceof JobManagerMessages.CancellationFailure) { final Throwable t = ((JobManagerMessages.CancellationFailure) result).cause(); - LOG.info("Job cancellation with ID " + jobId + " failed.", t); - throw new Exception("Failed to cancel the job because of \n" + t.getMessage()); + logAndSysout("Job cancellation with ID " + jobId + " failed because of " + t.getMessage()); + throw new Exception("Failed to cancel the job with id " + jobId, t); } else { throw new Exception("Unknown message received while cancelling: " + result.getClass().getName()); } diff --git a/flink-clients/src/test/resources/log4j-test.properties b/flink-clients/src/test/resources/log4j-test.properties index 85897b37b4999..5100c1ff526d3 100644 --- a/flink-clients/src/test/resources/log4j-test.properties +++ b/flink-clients/src/test/resources/log4j-test.properties @@ -27,4 +27,4 @@ log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n # suppress the irrelevant (wrong) warnings from the netty channel handler -log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger \ No newline at end of file +log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger diff --git a/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java b/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java index bc5ae09a332cc..cb4ecc548a852 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java @@ -34,7 +34,7 @@ public class JobExecutionResult extends JobSubmissionResult { private long netRuntime; - private Map accumulatorResults = Collections.emptyMap(); + private final Map accumulatorResults; /** * Creates a new JobExecutionResult. @@ -49,6 +49,8 @@ public JobExecutionResult(JobID jobID, long netRuntime, Map accu if (accumulators != null) { this.accumulatorResults = accumulators; + } else { + this.accumulatorResults = Collections.emptyMap(); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java new file mode 100644 index 0000000000000..5446002acf06d --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.client; + +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.actor.Status; +import akka.dispatch.Futures; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.runtime.akka.ListeningBehaviour; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.messages.JobClientMessages; +import org.apache.flink.runtime.messages.JobClientMessages.AttachToJobAndWait; +import org.apache.flink.runtime.messages.JobManagerMessages; +import scala.concurrent.duration.FiniteDuration; + +import java.util.concurrent.Callable; + + +/** + * Actor which handles Job attachment process and provides Job updates until completion. + */ +public class JobAttachmentClientActor extends JobClientActor { + + /** JobID to attach to when the JobClientActor retrieves a job */ + private JobID jobID; + /** true if a JobRegistrationSuccess message has been received */ + private boolean successfullyRegisteredForJob = false; + + public JobAttachmentClientActor( + LeaderRetrievalService leaderRetrievalService, + FiniteDuration timeout, + boolean sysoutUpdates) { + super(leaderRetrievalService, timeout, sysoutUpdates); + } + + @Override + public void connectedToJobManager() { + if (jobID != null && !successfullyRegisteredForJob) { + tryToAttachToJob(); + } + } + + @Override + protected Class getClientMessageClass() { + return AttachToJobAndWait.class; + } + + @Override + public void handleCustomMessage(Object message) { + if (message instanceof AttachToJobAndWait) { + // sanity check that this no job registration was performed through this actor before - + // it is a one-shot actor after all + if (this.client == null) { + jobID = ((AttachToJobAndWait) message).jobID(); + if (jobID == null) { + LOG.error("Received null JobID"); + sender().tell( + decorateMessage(new Status.Failure(new Exception("JobID is null"))), + getSelf()); + } else { + LOG.info("Received JobID {}.", jobID); + + this.client = getSender(); + + // is only successful if we already know the job manager leader + if (jobManager != null) { + tryToAttachToJob(); + } + } + } else { + // repeated submission - tell failure to sender and kill self + String msg = "Received repeated 'AttachToJobAndWait'"; + LOG.error(msg); + getSender().tell( + decorateMessage(new Status.Failure(new Exception(msg))), ActorRef.noSender()); + + terminate(); + } + } + else if (message instanceof JobManagerMessages.RegisterJobClientSuccess) { + // job registration was successful :o) + JobManagerMessages.RegisterJobClientSuccess msg = ((JobManagerMessages.RegisterJobClientSuccess) message); + logAndPrintMessage("Successfully registered at the JobManager for Job " + msg.jobId()); + successfullyRegisteredForJob = true; + } + else if (message instanceof JobManagerMessages.JobNotFound) { + LOG.info("Couldn't register JobClient for JobID {}", + ((JobManagerMessages.JobNotFound) message).jobID()); + client.tell(decorateMessage(message), getSelf()); + terminate(); + } + else if (JobClientMessages.getRegistrationTimeout().equals(message)) { + // check if our registration for a job was successful in the meantime + if (!successfullyRegisteredForJob) { + if (isClientConnected()) { + client.tell( + decorateMessage(new Status.Failure( + new JobClientActorRegistrationTimeoutException("Registration for Job at the JobManager " + + "timed out. " + "You may increase '" + ConfigConstants.AKKA_CLIENT_TIMEOUT + + "' in case the JobManager needs more time to confirm the job client registration."))), + getSelf()); + } + + // We haven't heard back from the job manager after attempting registration for a job + // therefore terminate + terminate(); + } + } else { + LOG.error("{} received unknown message: ", getClass()); + } + + } + + private void tryToAttachToJob() { + LOG.info("Sending message to JobManager {} to attach to job {} and wait for progress", jobID); + + Futures.future(new Callable() { + @Override + public Object call() throws Exception { + LOG.info("Attaching to job {} at the job manager {}.", jobID, jobManager.path()); + + jobManager.tell( + decorateMessage( + new JobManagerMessages.RegisterJobClient( + jobID, + ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES)), + getSelf()); + + // issue a RegistrationTimeout message to check that we submit the job within + // the given timeout + getContext().system().scheduler().scheduleOnce( + timeout, + getSelf(), + decorateMessage(JobClientMessages.getRegistrationTimeout()), + getContext().dispatcher(), + ActorRef.noSender()); + + return null; + } + }, getContext().dispatcher()); + } + + public static Props createActorProps( + LeaderRetrievalService leaderRetrievalService, + FiniteDuration timeout, + boolean sysoutUpdates) { + return Props.create( + JobAttachmentClientActor.class, + leaderRetrievalService, + timeout, + sysoutUpdates); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java index c0e0d08415327..4e916eb24818b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java @@ -21,6 +21,7 @@ import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Address; +import akka.actor.Identify; import akka.actor.PoisonPill; import akka.actor.Props; import akka.pattern.Patterns; @@ -30,6 +31,8 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.ListeningBehaviour; +import org.apache.flink.runtime.blob.BlobCache; +import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; @@ -44,10 +47,15 @@ import scala.Tuple2; import scala.concurrent.Await; import scala.concurrent.Future; +import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; import java.io.IOException; import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.List; import java.util.concurrent.TimeoutException; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -80,28 +88,18 @@ public static ActorSystem startJobClientActorSystem(Configuration config) } /** - * Sends a [[JobGraph]] to the JobClient actor specified by jobClient which submits it then to - * the JobManager. The method blocks until the job has finished or the JobManager is no longer - * alive. In the former case, the [[SerializedJobExecutionResult]] is returned and in the latter - * case a [[JobExecutionException]] is thrown. - * - * @param actorSystem The actor system that performs the communication. - * @param leaderRetrievalService Leader retrieval service which used to find the current leading - * JobManager - * @param jobGraph JobGraph describing the Flink job - * @param timeout Timeout for futures - * @param sysoutLogUpdates prints log updates to system out if true - * @return The job execution result - * @throws org.apache.flink.runtime.client.JobExecutionException Thrown if the job - * execution fails. + * Submits a job to a Flink cluster (non-blocking) and returns a JobListeningContext which can be + * passed to {@code awaitJobResult} to get the result of the submission. + * @return JobListeningContext which may be used to retrieve the JobExecutionResult via + * {@code awaitJobResult(JobListeningContext context)}. */ - public static JobExecutionResult submitJobAndWait( + public static JobListeningContext submitJob( ActorSystem actorSystem, LeaderRetrievalService leaderRetrievalService, JobGraph jobGraph, FiniteDuration timeout, boolean sysoutLogUpdates, - ClassLoader classLoader) throws JobExecutionException { + ClassLoader classLoader) { checkNotNull(actorSystem, "The actorSystem must not be null."); checkNotNull(leaderRetrievalService, "The jobManagerGateway must not be null."); @@ -112,29 +110,187 @@ public static JobExecutionResult submitJobAndWait( // the JobManager. It forwards the job submission, checks the success/failure responses, logs // update messages, watches for disconnect between client and JobManager, ... - Props jobClientActorProps = JobClientActor.createJobClientActorProps( + Props jobClientActorProps = JobSubmissionClientActor.createActorProps( leaderRetrievalService, timeout, sysoutLogUpdates); ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps); - - // first block handles errors while waiting for the result - Object answer; + + Future submissionFuture = Patterns.ask( + jobClientActor, + new JobClientMessages.SubmitJobAndWait(jobGraph), + new Timeout(AkkaUtils.INF_TIMEOUT())); + + return new JobListeningContext( + jobGraph.getJobID(), + submissionFuture, + jobClientActor, + timeout, + classLoader); + } + + + /** + * Attaches to a running Job using the JobID. + * Reconstructs the user class loader by downloading the jars from the JobManager. + */ + public static JobListeningContext attachToRunningJob( + JobID jobID, + ActorGateway jobManagerGateWay, + Configuration configuration, + ActorSystem actorSystem, + LeaderRetrievalService leaderRetrievalService, + FiniteDuration timeout, + boolean sysoutLogUpdates) { + + checkNotNull(jobID, "The jobID must not be null."); + checkNotNull(jobManagerGateWay, "The jobManagerGateWay must not be null."); + checkNotNull(configuration, "The configuration must not be null."); + checkNotNull(actorSystem, "The actorSystem must not be null."); + checkNotNull(leaderRetrievalService, "The jobManagerGateway must not be null."); + checkNotNull(timeout, "The timeout must not be null."); + + // we create a proxy JobClientActor that deals with all communication with + // the JobManager. It forwards the job attachments, checks the success/failure responses, logs + // update messages, watches for disconnect between client and JobManager, ... + Props jobClientActorProps = JobAttachmentClientActor.createActorProps( + leaderRetrievalService, + timeout, + sysoutLogUpdates); + + ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps); + + Future attachmentFuture = Patterns.ask( + jobClientActor, + new JobClientMessages.AttachToJobAndWait(jobID), + new Timeout(AkkaUtils.INF_TIMEOUT())); + + return new JobListeningContext( + jobID, + attachmentFuture, + jobClientActor, + timeout, + actorSystem, + configuration); + } + + /** + * Reconstructs the class loader by first requesting information about it at the JobManager + * and then downloading missing jar files. + * @param jobID id of job + * @param jobManager gateway to the JobManager + * @param config the flink configuration + * @return A classloader that should behave like the original classloader + * @throws JobRetrievalException if anything goes wrong + */ + public static ClassLoader retrieveClassLoader( + JobID jobID, + ActorGateway jobManager, + Configuration config) + throws JobRetrievalException { + + final Object jmAnswer; try { - Future future = Patterns.ask(jobClientActor, - new JobClientMessages.SubmitJobAndWait(jobGraph), - new Timeout(AkkaUtils.INF_TIMEOUT())); - - answer = Await.result(future, AkkaUtils.INF_TIMEOUT()); + jmAnswer = Await.result( + jobManager.ask( + new JobManagerMessages.RequestClassloadingProps(jobID), + AkkaUtils.getDefaultTimeout()), + AkkaUtils.getDefaultTimeout()); + } catch (Exception e) { + throw new JobRetrievalException(jobID, "Couldn't retrieve class loading properties from JobManager.", e); } - catch (TimeoutException e) { - throw new JobTimeoutException(jobGraph.getJobID(), "Timeout while waiting for JobManager answer. " + - "Job time exceeded " + AkkaUtils.INF_TIMEOUT(), e); + + if (jmAnswer instanceof JobManagerMessages.ClassloadingProps) { + JobManagerMessages.ClassloadingProps props = ((JobManagerMessages.ClassloadingProps) jmAnswer); + + Option jmHost = jobManager.actor().path().address().host(); + String jmHostname = jmHost.isDefined() ? jmHost.get() : "localhost"; + InetSocketAddress serverAddress = new InetSocketAddress(jmHostname, props.blobManagerPort()); + final BlobCache blobClient = new BlobCache(serverAddress, config); + + final List requiredJarFiles = props.requiredJarFiles(); + final List requiredClasspaths = props.requiredClasspaths(); + + final URL[] allURLs = new URL[requiredJarFiles.size() + requiredClasspaths.size()]; + + int pos = 0; + for (BlobKey blobKey : props.requiredJarFiles()) { + try { + allURLs[pos++] = blobClient.getURL(blobKey); + } catch (Exception e) { + blobClient.shutdown(); + throw new JobRetrievalException(jobID, "Failed to download BlobKey " + blobKey); + } + } + + for (URL url : requiredClasspaths) { + allURLs[pos++] = url; + } + + return new URLClassLoader(allURLs, JobClient.class.getClassLoader()); + } else if (jmAnswer instanceof JobManagerMessages.JobNotFound) { + throw new JobRetrievalException(jobID, "Couldn't retrieve class loader. Job " + jobID + " not found"); + } else { + throw new JobRetrievalException(jobID, "Unknown response from JobManager: " + jmAnswer); + } + } + + /** + * Given a JobListeningContext, awaits the result of the job execution that this context is bound to + * @param listeningContext The listening context of the job execution + * @return The result of the execution + * @throws JobExecutionException if anything goes wrong while monitoring the job + */ + public static JobExecutionResult awaitJobResult(JobListeningContext listeningContext) throws JobExecutionException { + + final JobID jobID = listeningContext.getJobID(); + final ActorRef jobClientActor = listeningContext.getJobClientActor(); + final Future jobSubmissionFuture = listeningContext.getJobResultFuture(); + final FiniteDuration askTimeout = listeningContext.getTimeout(); + // retrieves class loader if necessary + final ClassLoader classLoader = listeningContext.getClassLoader(); + + // wait for the future which holds the result to be ready + // ping the JobClientActor from time to time to check if it is still running + while (!jobSubmissionFuture.isCompleted()) { + try { + Await.ready(jobSubmissionFuture, askTimeout); + } catch (InterruptedException e) { + throw new JobExecutionException( + jobID, + "Interrupted while waiting for job completion."); + } catch (TimeoutException e) { + try { + Await.result( + Patterns.ask( + jobClientActor, + // Ping the Actor to see if it is alive + new Identify(true), + Timeout.durationToTimeout(askTimeout)), + askTimeout); + // we got a reply, continue waiting for the job result + } catch (Exception eInner) { + // we could have a result but the JobClientActor might have been killed and + // thus the health check failed + if (!jobSubmissionFuture.isCompleted()) { + throw new JobExecutionException( + jobID, + "JobClientActor seems to have died before the JobExecutionResult could be retrieved.", + eInner); + } + } + } + } + + final Object answer; + try { + // we have already awaited the result, zero time to wait here + answer = Await.result(jobSubmissionFuture, Duration.Zero()); } catch (Throwable throwable) { - throw new JobExecutionException(jobGraph.getJobID(), - "Communication with JobManager failed: " + throwable.getMessage(), throwable); + throw new JobExecutionException(jobID, + "Couldn't retrieve the JobExecutionResult from the JobManager.", throwable); } finally { // failsafe shutdown of the client actor @@ -149,18 +305,16 @@ public static JobExecutionResult submitJobAndWait( if (result != null) { try { return result.toJobExecutionResult(classLoader); + } catch (Throwable t) { + throw new JobExecutionException(jobID, + "Job was successfully executed but JobExecutionResult could not be deserialized."); } - catch (Throwable t) { - throw new JobExecutionException(jobGraph.getJobID(), - "Job was successfully executed but JobExecutionResult could not be deserialized."); - } - } - else { - throw new JobExecutionException(jobGraph.getJobID(), - "Job was successfully executed but result contained a null JobExecutionResult."); + } else { + throw new JobExecutionException(jobID, + "Job was successfully executed but result contained a null JobExecutionResult."); } } - if (answer instanceof JobManagerMessages.JobResultFailure) { + else if (answer instanceof JobManagerMessages.JobResultFailure) { LOG.info("Job execution failed"); SerializedThrowable serThrowable = ((JobManagerMessages.JobResultFailure) answer).cause(); @@ -168,22 +322,61 @@ public static JobExecutionResult submitJobAndWait( Throwable cause = serThrowable.deserializeError(classLoader); if (cause instanceof JobExecutionException) { throw (JobExecutionException) cause; + } else { + throw new JobExecutionException(jobID, "Job execution failed", cause); } - else { - throw new JobExecutionException(jobGraph.getJobID(), "Job execution failed", cause); - } - } - else { - throw new JobExecutionException(jobGraph.getJobID(), - "Job execution failed with null as failure cause."); + } else { + throw new JobExecutionException(jobID, + "Job execution failed with null as failure cause."); } } + else if (answer instanceof JobManagerMessages.JobNotFound) { + throw new JobRetrievalException( + ((JobManagerMessages.JobNotFound) answer).jobID(), + "Couldn't retrieve Job " + jobID + " because it was not running."); + } else { - throw new JobExecutionException(jobGraph.getJobID(), - "Unknown answer from JobManager after submitting the job: " + answer); + throw new JobExecutionException(jobID, + "Unknown answer from JobManager after submitting the job: " + answer); } } + /** + * Sends a [[JobGraph]] to the JobClient actor specified by jobClient which submits it then to + * the JobManager. The method blocks until the job has finished or the JobManager is no longer + * alive. In the former case, the [[SerializedJobExecutionResult]] is returned and in the latter + * case a [[JobExecutionException]] is thrown. + * + * @param actorSystem The actor system that performs the communication. + * @param leaderRetrievalService Leader retrieval service which used to find the current leading + * JobManager + * @param jobGraph JobGraph describing the Flink job + * @param timeout Timeout for futures + * @param sysoutLogUpdates prints log updates to system out if true + * @param classLoader The class loader for deserializing the results + * @return The job execution result + * @throws org.apache.flink.runtime.client.JobExecutionException Thrown if the job + * execution fails. + */ + public static JobExecutionResult submitJobAndWait( + ActorSystem actorSystem, + LeaderRetrievalService leaderRetrievalService, + JobGraph jobGraph, + FiniteDuration timeout, + boolean sysoutLogUpdates, + ClassLoader classLoader) throws JobExecutionException { + + JobListeningContext jobListeningContext = submitJob( + actorSystem, + leaderRetrievalService, + jobGraph, + timeout, + sysoutLogUpdates, + classLoader); + + return awaitJobResult(jobListeningContext); + } + /** * Submits a job in detached mode. The method sends the JobGraph to the * JobManager and waits for the answer whether the job could be started or not. @@ -227,7 +420,7 @@ public static void submitJobDetached( "JobManager did not respond within " + timeout.toString(), e); } catch (Throwable t) { - throw new JobExecutionException(jobGraph.getJobID(), + throw new JobSubmissionException(jobGraph.getJobID(), "Failed to send job to JobManager: " + t.getMessage(), t.getCause()); } @@ -258,4 +451,5 @@ else if (result instanceof JobManagerMessages.JobResultFailure) { throw new JobExecutionException(jobGraph.getJobID(), "Unexpected response from JobManager: " + result); } } + } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java index 9379c30343925..1380e76d41fe7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java @@ -20,18 +20,11 @@ import akka.actor.ActorRef; import akka.actor.PoisonPill; -import akka.actor.Props; import akka.actor.Status; import akka.actor.Terminated; -import akka.dispatch.Futures; import akka.dispatch.OnSuccess; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.FlinkUntypedActor; -import org.apache.flink.runtime.akka.ListeningBehaviour; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.instance.AkkaActorGateway; -import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; @@ -39,47 +32,39 @@ import org.apache.flink.runtime.messages.JobClientMessages; import org.apache.flink.runtime.messages.JobClientMessages.JobManagerActorRef; import org.apache.flink.runtime.messages.JobClientMessages.JobManagerLeaderAddress; -import org.apache.flink.runtime.messages.JobClientMessages.SubmitJobAndWait; import org.apache.flink.runtime.messages.JobManagerMessages; -import org.apache.flink.runtime.util.SerializedThrowable; import org.apache.flink.util.Preconditions; import scala.concurrent.duration.FiniteDuration; -import java.io.IOException; import java.util.UUID; -import java.util.concurrent.Callable; + /** - * Actor which constitutes the bridge between the non-actor code and the JobManager. The JobClient - * is used to submit jobs to the JobManager and to request the port of the BlobManager. + * Actor which constitutes the bridge between the non-actor code and the JobManager. + * This base class handles the connection to the JobManager and notifies in case of timeouts. It also + * receives and prints job updates until job completion. */ -public class JobClientActor extends FlinkUntypedActor implements LeaderRetrievalListener { +public abstract class JobClientActor extends FlinkUntypedActor implements LeaderRetrievalListener { private final LeaderRetrievalService leaderRetrievalService; /** timeout for futures */ - private final FiniteDuration timeout; + protected final FiniteDuration timeout; /** true if status messages shall be printed to sysout */ private final boolean sysoutUpdates; - /** true if a SubmitJobSuccess message has been received */ - private boolean jobSuccessfullySubmitted = false; - - /** true if a PoisonPill was taken */ - private boolean terminated = false; + /** true if a PoisonPill about to be taken */ + private boolean toBeTerminated = false; /** ActorRef to the current leader */ - private ActorRef jobManager; + protected ActorRef jobManager; /** leader session ID of the JobManager when this actor was created */ - private UUID leaderSessionID; - - /** Actor which submits a job to the JobManager via this actor */ - private ActorRef submitter; + protected UUID leaderSessionID; - /** JobGraph which shall be submitted to the JobManager */ - private JobGraph jobGraph; + /** The client which the actor is responsible for */ + protected ActorRef client; public JobClientActor( LeaderRetrievalService leaderRetrievalService, @@ -109,9 +94,27 @@ public void postStop() { } } + /** + * Hook to be called once a connection has been established with the JobManager. + */ + protected abstract void connectedToJobManager(); + + /** + * Hook to handle custom client message which are not handled by the base class. + * @param message The message to be handled + */ + protected abstract void handleCustomMessage(Object message); + + /** + * Hook to let the client know about messages that should start a timer for a timeout + * @return The message class after which a timeout should be started + */ + protected abstract Class getClientMessageClass(); + + @Override protected void handleMessage(Object message) { - + // =========== State Change Messages =============== if (message instanceof ExecutionGraphMessages.ExecutionStateChanged) { @@ -149,79 +152,31 @@ public void onSuccess(ActorRef result) throws Throwable { JobManagerActorRef msg = (JobManagerActorRef) message; connectToJobManager(msg.jobManager()); - logAndPrintMessage("Connected to JobManager at " + msg.jobManager()); + logAndPrintMessage("Connected to JobManager at " + msg.jobManager()); - if (jobGraph != null && !jobSuccessfullySubmitted) { - // if we haven't yet submitted the job successfully - tryToSubmitJob(jobGraph); - } + connectedToJobManager(); } // =========== Job Life Cycle Messages =============== - - // submit a job to the JobManager - else if (message instanceof SubmitJobAndWait) { - // only accept SubmitJobWait messages if we're not about to terminate - if (!terminated) { - // sanity check that this no job was submitted through this actor before - - // it is a one-shot actor after all - if (this.submitter == null) { - jobGraph = ((SubmitJobAndWait) message).jobGraph(); - if (jobGraph == null) { - LOG.error("Received null JobGraph"); - sender().tell( - decorateMessage(new Status.Failure(new Exception("JobGraph is null"))), - getSelf()); - } else { - LOG.info("Received job {} ({}).", jobGraph.getName(), jobGraph.getJobID()); - - this.submitter = getSender(); - - // is only successful if we already know the job manager leader - tryToSubmitJob(jobGraph); - } - } else { - // repeated submission - tell failure to sender and kill self - String msg = "Received repeated 'SubmitJobAndWait'"; - LOG.error(msg); - getSender().tell( - decorateMessage(new Status.Failure(new Exception(msg))), ActorRef.noSender()); - - terminate(); - } - } else { - // we're about to receive a PoisonPill because terminated == true - String msg = getClass().getName() + " is about to be terminated. Therefore, the " + - "job submission cannot be executed."; - LOG.error(msg); - getSender().tell( - decorateMessage(new Status.Failure(new Exception(msg))), ActorRef.noSender()); - } - } + // acknowledgement to submit job is only logged, our original - // submitter is only interested in the final job result - else if (message instanceof JobManagerMessages.JobResultSuccess || - message instanceof JobManagerMessages.JobResultFailure) { - + // client is only interested in the final job result + else if (message instanceof JobManagerMessages.JobResultMessage) { + if (LOG.isDebugEnabled()) { LOG.debug("Received {} message from JobManager", message.getClass().getSimpleName()); } - // forward the success to the original job submitter - if (hasJobBeenSubmitted()) { - this.submitter.tell(decorateMessage(message), getSelf()); + // forward the success to the original client + if (isClientConnected()) { + this.client.tell(decorateMessage(message), getSelf()); } terminate(); } - else if (message instanceof JobManagerMessages.JobSubmitSuccess) { - // job was successfully submitted :-) - LOG.info("Job was successfully submitted to the JobManager {}.", getSender().path()); - jobSuccessfullySubmitted = true; - } // =========== Actor / Communication Failure / Timeouts =============== - + else if (message instanceof Terminated) { ActorRef target = ((Terminated) message).getActor(); if (jobManager.equals(target)) { @@ -234,7 +189,7 @@ else if (message instanceof Terminated) { // Important: The ConnectionTimeout message is filtered out in case that we are // notified about a new leader by setting the new leader session ID, because // ConnectionTimeout extends RequiresLeaderSessionID - if (hasJobBeenSubmitted()) { + if (isClientConnected()) { getContext().system().scheduler().scheduleOnce( timeout, getSelf(), @@ -245,49 +200,61 @@ else if (message instanceof Terminated) { } else { LOG.warn("Received 'Terminated' for unknown actor " + target); } - } else if (JobClientMessages.getConnectionTimeout().equals(message)) { + } + else if (JobClientMessages.getConnectionTimeout().equals(message)) { // check if we haven't found a job manager yet - if (!isConnected()) { - if (hasJobBeenSubmitted()) { - submitter.tell( - decorateMessage(new Status.Failure( - new JobClientActorConnectionTimeoutException("Lost connection to the JobManager."))), + if (!isJobManagerConnected()) { + final JobClientActorConnectionTimeoutException errorMessage = + new JobClientActorConnectionTimeoutException("Lost connection to the JobManager."); + final Object replyMessage = decorateMessage(new Status.Failure(errorMessage)); + if (isClientConnected()) { + client.tell( + replyMessage, getSelf()); } // Connection timeout reached, let's terminate terminate(); } - } else if (JobClientMessages.getSubmissionTimeout().equals(message)) { - // check if our job submission was successful in the meantime - if (!jobSuccessfullySubmitted) { - if (hasJobBeenSubmitted()) { - submitter.tell( - decorateMessage(new Status.Failure( - new JobClientActorSubmissionTimeoutException("Job submission to the JobManager timed out. " + - "You may increase '" + ConfigConstants.AKKA_CLIENT_TIMEOUT + "' in case the JobManager " + - "needs more time to configure and confirm the job submission."))), - getSelf()); - } - - // We haven't heard back from the job manager after sending the job graph to him, - // therefore terminate - terminate(); - } } - // =========== Unknown Messages =============== - + // =========== Message Delegation =============== + + else if (!isJobManagerConnected() && getClientMessageClass().equals(message.getClass())) { + LOG.info( + "Received {} but there is no connection to a JobManager yet.", + message); + // We want to submit/attach to a job, but we haven't found a job manager yet. + // Let's give him another chance to find a job manager within the given timeout. + getContext().system().scheduler().scheduleOnce( + timeout, + getSelf(), + decorateMessage(JobClientMessages.getConnectionTimeout()), + getContext().dispatcher(), + ActorRef.noSender() + ); + handleCustomMessage(message); + } else { - LOG.error("JobClient received unknown message: " + message); + if (!toBeTerminated) { + handleCustomMessage(message); + } else { + // we're about to receive a PoisonPill because toBeTerminated == true + String msg = getClass().getName() + " is about to be terminated. Therefore, the " + + "job submission cannot be executed."; + LOG.error(msg); + getSender().tell( + decorateMessage(new Status.Failure(new Exception(msg))), ActorRef.noSender()); + } } } + @Override protected UUID getLeaderSessionID() { return leaderSessionID; } - private void logAndPrintMessage(String message) { + protected void logAndPrintMessage(String message) { LOG.info(message); if (sysoutUpdates) { System.out.println(message); @@ -351,97 +318,19 @@ private void connectToJobManager(ActorRef jobManager) { getContext().watch(jobManager); } - private void tryToSubmitJob(final JobGraph jobGraph) { - this.jobGraph = jobGraph; - - if (isConnected()) { - LOG.info("Sending message to JobManager {} to submit job {} ({}) and wait for progress", - jobManager.path().toString(), jobGraph.getName(), jobGraph.getJobID()); - - Futures.future(new Callable() { - @Override - public Object call() throws Exception { - ActorGateway jobManagerGateway = new AkkaActorGateway(jobManager, leaderSessionID); - - LOG.info("Upload jar files to job manager {}.", jobManager.path()); - - try { - jobGraph.uploadUserJars(jobManagerGateway, timeout); - } catch (IOException exception) { - getSelf().tell( - decorateMessage(new JobManagerMessages.JobResultFailure( - new SerializedThrowable( - new JobSubmissionException( - jobGraph.getJobID(), - "Could not upload the jar files to the job manager.", - exception) - ) - )), - ActorRef.noSender() - ); - } - - LOG.info("Submit job to the job manager {}.", jobManager.path()); - - jobManager.tell( - decorateMessage( - new JobManagerMessages.SubmitJob( - jobGraph, - ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES)), - getSelf()); - - // issue a SubmissionTimeout message to check that we submit the job within - // the given timeout - getContext().system().scheduler().scheduleOnce( - timeout, - getSelf(), - decorateMessage(JobClientMessages.getSubmissionTimeout()), - getContext().dispatcher(), - ActorRef.noSender()); - - return null; - } - }, getContext().dispatcher()); - } else { - LOG.info("Could not submit job {} ({}), because there is no connection to a " + - "JobManager.", - jobGraph.getName(), jobGraph.getJobID()); - - // We want to submit a job, but we haven't found a job manager yet. - // Let's give him another chance to find a job manager within the given timeout. - getContext().system().scheduler().scheduleOnce( - timeout, - getSelf(), - decorateMessage(JobClientMessages.getConnectionTimeout()), - getContext().dispatcher(), - ActorRef.noSender() - ); - } - } - - private void terminate() { + protected void terminate() { LOG.info("Terminate JobClientActor."); - terminated = true; + toBeTerminated = true; disconnectFromJobManager(); getSelf().tell(decorateMessage(PoisonPill.getInstance()), ActorRef.noSender()); } - private boolean isConnected() { + private boolean isJobManagerConnected() { return jobManager != ActorRef.noSender(); } - private boolean hasJobBeenSubmitted() { - return submitter != ActorRef.noSender(); + protected boolean isClientConnected() { + return client != ActorRef.noSender(); } - public static Props createJobClientActorProps( - LeaderRetrievalService leaderRetrievalService, - FiniteDuration timeout, - boolean sysoutUpdates) { - return Props.create( - JobClientActor.class, - leaderRetrievalService, - timeout, - sysoutUpdates); - } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActorRegistrationTimeoutException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActorRegistrationTimeoutException.java new file mode 100644 index 0000000000000..e57d1b4f40e19 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActorRegistrationTimeoutException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.client; + +/** + * Exception which is thrown by the {@link JobClientActor} if it has not heard back from the job + * manager after it has attempted to register for a job within a given timeout interval. + */ +public class JobClientActorRegistrationTimeoutException extends Exception { + private static final long serialVersionUID = 8762463142030454853L; + + public JobClientActorRegistrationTimeoutException(String msg) { + super(msg); + } + + public JobClientActorRegistrationTimeoutException(String msg, Throwable cause) { + super(msg, cause); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java new file mode 100644 index 0000000000000..b5d7cb77cebf0 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.client; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.util.LeaderRetrievalUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The JobListeningContext holds the state necessary to monitor a running job and receive its results. + */ +public final class JobListeningContext { + + private final Logger LOG = LoggerFactory.getLogger(getClass()); + + /** The Job id of the Job */ + private final JobID jobID; + /** The Future which is completed upon job completion */ + private final Future jobResultFuture; + /** The JobClientActor which handles communication and monitoring of the job */ + private final ActorRef jobClientActor; + /** Timeout used Asks */ + private final FiniteDuration timeout; + + /** ActorSystem for leader retrieval */ + private ActorSystem actorSystem; + /** Flink configuration for initializing the BlobService */ + private Configuration configuration; + + /** The class loader (either provided at job submission or reconstructed when it is needed */ + private ClassLoader classLoader; + + /** + * Constructor to use when the class loader is available. + */ + public JobListeningContext( + JobID jobID, + Future jobResultFuture, + ActorRef jobClientActor, + FiniteDuration timeout, + ClassLoader classLoader) { + this.jobID = checkNotNull(jobID); + this.jobResultFuture = checkNotNull(jobResultFuture); + this.jobClientActor = checkNotNull(jobClientActor); + this.timeout = checkNotNull(timeout); + this.classLoader = checkNotNull(classLoader); + } + + /** + * Constructor to use when the class loader is not available. + */ + public JobListeningContext( + JobID jobID, + Future jobResultFuture, + ActorRef jobClientActor, + FiniteDuration timeout, + ActorSystem actorSystem, + Configuration configuration) { + this.jobID = checkNotNull(jobID); + this.jobResultFuture = checkNotNull(jobResultFuture); + this.jobClientActor = checkNotNull(jobClientActor); + this.timeout = checkNotNull(timeout); + this.actorSystem = checkNotNull(actorSystem); + this.configuration = checkNotNull(configuration); + } + + /** + * @return The Job ID that this context is bound to. + */ + public JobID getJobID() { + return jobID; + } + + /** + * @return The Future that eventually holds the result of the execution. + */ + public Future getJobResultFuture() { + return jobResultFuture; + } + + /** + * @return The Job Client actor which communicats with the JobManager. + */ + public ActorRef getJobClientActor() { + return jobClientActor; + } + + /** + * @return The default timeout of Akka asks + */ + public FiniteDuration getTimeout() { + return timeout; + } + + /** + * The class loader necessary to deserialize the result of a job execution, + * i.e. JobExecutionResult or Exceptions + * @return The class loader for the job id + * @throws JobRetrievalException if anything goes wrong + */ + public ClassLoader getClassLoader() throws JobRetrievalException { + if (classLoader == null) { + // lazily initializes the class loader when it is needed + classLoader = JobClient.retrieveClassLoader(jobID, getJobManager(), configuration); + LOG.info("Reconstructed class loader for Job {}", jobID); + } + return classLoader; + } + + private ActorGateway getJobManager() throws JobRetrievalException { + try { + return LeaderRetrievalUtils.retrieveLeaderGateway( + LeaderRetrievalUtils.createLeaderRetrievalService(configuration), + actorSystem, + AkkaUtils.getLookupTimeout(configuration)); + } catch (Exception e) { + throw new JobRetrievalException(jobID, "Couldn't retrieve leading JobManager.", e); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobRetrievalException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobRetrievalException.java new file mode 100644 index 0000000000000..a92bddc74f6d2 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobRetrievalException.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.runtime.client; + +import org.apache.flink.api.common.JobID; + +/** + * Exception used to indicate that a job couldn't be retrieved from the JobManager + */ +public class JobRetrievalException extends JobExecutionException { + + private static final long serialVersionUID = -42L; + + public JobRetrievalException(JobID jobID, String msg, Throwable cause) { + super(jobID, msg, cause); + } + + public JobRetrievalException(JobID jobID, String msg) { + super(jobID, msg); + } + + public JobRetrievalException(JobID jobID, Throwable cause) { + super(jobID, cause); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java new file mode 100644 index 0000000000000..2cc4a507ca05a --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.client; + +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.actor.Status; +import akka.dispatch.Futures; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.runtime.akka.ListeningBehaviour; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.AkkaActorGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.messages.JobClientMessages; +import org.apache.flink.runtime.messages.JobClientMessages.SubmitJobAndWait; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.util.SerializedThrowable; +import scala.concurrent.duration.FiniteDuration; + +import java.io.IOException; +import java.util.concurrent.Callable; + + +/** + * Actor which handles Job submission process and provides Job updates until completion. + */ +public class JobSubmissionClientActor extends JobClientActor { + + /** JobGraph which shall be submitted to the JobManager */ + private JobGraph jobGraph; + /** true if a SubmitJobSuccess message has been received */ + private boolean jobSuccessfullySubmitted = false; + + public JobSubmissionClientActor( + LeaderRetrievalService leaderRetrievalService, + FiniteDuration timeout, + boolean sysoutUpdates) { + super(leaderRetrievalService, timeout, sysoutUpdates); + } + + + @Override + public void connectedToJobManager() { + if (jobGraph != null && !jobSuccessfullySubmitted) { + // if we haven't yet submitted the job successfully + tryToSubmitJob(); + } + } + + @Override + protected Class getClientMessageClass() { + return SubmitJobAndWait.class; + } + + @Override + public void handleCustomMessage(Object message) { + // submit a job to the JobManager + if (message instanceof SubmitJobAndWait) { + // sanity check that this no job was submitted through this actor before - + // it is a one-shot actor after all + if (this.client == null) { + jobGraph = ((SubmitJobAndWait) message).jobGraph(); + if (jobGraph == null) { + LOG.error("Received null JobGraph"); + sender().tell( + decorateMessage(new Status.Failure(new Exception("JobGraph is null"))), + getSelf()); + } else { + LOG.info("Received job {} ({}).", jobGraph.getName(), jobGraph.getJobID()); + + this.client = getSender(); + + // is only successful if we already know the job manager leader + if (jobManager != null) { + tryToSubmitJob(); + } + } + } else { + // repeated submission - tell failure to sender and kill self + String msg = "Received repeated 'SubmitJobAndWait'"; + LOG.error(msg); + getSender().tell( + decorateMessage(new Status.Failure(new Exception(msg))), ActorRef.noSender()); + + terminate(); + } + } else if (message instanceof JobManagerMessages.JobSubmitSuccess) { + // job was successfully submitted :-) + LOG.info("Job {} was successfully submitted to the JobManager {}.", + ((JobManagerMessages.JobSubmitSuccess) message).jobId(), + getSender().path()); + jobSuccessfullySubmitted = true; + } else if (JobClientMessages.getSubmissionTimeout().equals(message)) { + // check if our job submission was successful in the meantime + if (!jobSuccessfullySubmitted) { + if (isClientConnected()) { + client.tell( + decorateMessage(new Status.Failure( + new JobClientActorSubmissionTimeoutException("Job submission to the JobManager timed out. " + + "You may increase '" + ConfigConstants.AKKA_CLIENT_TIMEOUT + "' in case the JobManager " + + "needs more time to configure and confirm the job submission."))), + getSelf()); + } + + // We haven't heard back from the job manager after sending the job graph to him, + // therefore terminate + terminate(); + } + } else { + LOG.error("{} received unknown message: ", getClass()); + } + } + + private void tryToSubmitJob() { + LOG.info("Sending message to JobManager {} to submit job {} ({}) and wait for progress", + jobManager.path().toString(), jobGraph.getName(), jobGraph.getJobID()); + + Futures.future(new Callable() { + @Override + public Object call() throws Exception { + ActorGateway jobManagerGateway = new AkkaActorGateway(jobManager, leaderSessionID); + + LOG.info("Upload jar files to job manager {}.", jobManager.path()); + + try { + jobGraph.uploadUserJars(jobManagerGateway, timeout); + } catch (IOException exception) { + getSelf().tell( + decorateMessage(new JobManagerMessages.JobResultFailure( + new SerializedThrowable( + new JobSubmissionException( + jobGraph.getJobID(), + "Could not upload the jar files to the job manager.", + exception) + ) + )), + ActorRef.noSender() + ); + } + + LOG.info("Submit job to the job manager {}.", jobManager.path()); + + jobManager.tell( + decorateMessage( + new JobManagerMessages.SubmitJob( + jobGraph, + ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES)), + getSelf()); + + // issue a SubmissionTimeout message to check that we submit the job within + // the given timeout + getContext().system().scheduler().scheduleOnce( + timeout, + getSelf(), + decorateMessage(JobClientMessages.getSubmissionTimeout()), + getContext().dispatcher(), + ActorRef.noSender()); + + return null; + } + }, getContext().dispatcher()); + } + + + public static Props createActorProps( + LeaderRetrievalService leaderRetrievalService, + FiniteDuration timeout, + boolean sysoutUpdates) { + return Props.create( + JobSubmissionClientActor.class, + leaderRetrievalService, + timeout, + sysoutUpdates); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 7a94c0ff18ca2..d7e40a3e05e53 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -931,6 +931,7 @@ public void prepareForArchiving() { intermediateResults.clear(); currentExecutions.clear(); requiredJarFiles.clear(); + requiredClasspaths.clear(); jobStatusListeners.clear(); executionListeners.clear(); diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala index 67d7a067ce73e..a84650c42175b 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala @@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmanager import akka.actor.ActorRef import org.apache.flink.runtime.akka.ListeningBehaviour + /** * Utility class to store job information on the [[JobManager]]. The JobInfo stores which actor * submitted the job, when the start time and, if already terminated, the end time was. @@ -37,11 +38,14 @@ import org.apache.flink.runtime.akka.ListeningBehaviour * @param start Starting time */ class JobInfo( - val client: ActorRef, - val listeningBehaviour: ListeningBehaviour, + client: ActorRef, + listeningBehaviour: ListeningBehaviour, val start: Long, val sessionTimeout: Long) extends Serializable { + val clients = scala.collection.mutable.HashSet[(ActorRef, ListeningBehaviour)]() + clients += ((client, listeningBehaviour)) + var sessionAlive = sessionTimeout > 0 var lastActive = 0L @@ -58,10 +62,62 @@ class JobInfo( } } - override def toString = s"JobInfo(client: $client ($listeningBehaviour), start: $start)" + + /** + * Notifies all clients by sending a message + * @param message the message to send + */ + def notifyClients(message: Any) = { + clients foreach { + case (clientActor, _) => + clientActor ! message + } + } + + /** + * Notifies all clients which are not of type detached + * @param message the message to sent to non-detached clients + */ + def notifyNonDetachedClients(message: Any) = { + clients foreach { + case (clientActor, ListeningBehaviour.DETACHED) => + // do nothing + case (clientActor, _) => + clientActor ! message + } + } + + /** + * Sends a message to job clients that match the listening behavior + * @param message the message to send to all clients + * @param listeningBehaviour the desired listening behaviour + */ + def notifyClients(message: Any, listeningBehaviour: ListeningBehaviour) = { + clients foreach { + case (clientActor, `listeningBehaviour`) => + clientActor ! message + case _ => + } + } def setLastActive() = lastActive = System.currentTimeMillis() + + + override def toString = s"JobInfo(clients: ${clients.toString()}, start: $start)" + + override def equals(other: Any): Boolean = other match { + case that: JobInfo => + clients == that.clients && + start == that.start && + sessionTimeout == that.sessionTimeout + case _ => false + } + + override def hashCode(): Int = { + val state = Seq(clients, start, sessionTimeout) + state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b) + } } object JobInfo{ diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 0587987294729..d35fb0aed25c0 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -19,18 +19,16 @@ package org.apache.flink.runtime.jobmanager import java.io.{File, IOException} -import java.net.{BindException, ServerSocket, UnknownHostException, InetAddress, InetSocketAddress} +import java.net.{BindException, InetAddress, InetSocketAddress, ServerSocket, UnknownHostException} import java.lang.management.ManagementFactory import java.util.UUID import java.util.concurrent.{ExecutorService, TimeUnit, TimeoutException} import javax.management.ObjectName -import akka.actor.Status.{Success, Failure} +import akka.actor.Status.{Failure, Success} import akka.actor._ import akka.pattern.ask - import grizzled.slf4j.Logger - import org.apache.flink.api.common.{ExecutionConfig, JobID} import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration} import org.apache.flink.core.fs.FileSystem @@ -41,8 +39,8 @@ import org.apache.flink.runtime.accumulators.AccumulatorSnapshot import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour} import org.apache.flink.runtime.blob.BlobServer import org.apache.flink.runtime.checkpoint._ -import org.apache.flink.runtime.checkpoint.savepoint.{SavepointLoader, SavepointStoreFactory, SavepointStore} -import org.apache.flink.runtime.checkpoint.stats.{CheckpointStatsTracker, SimpleCheckpointStatsTracker, DisabledCheckpointStatsTracker} +import org.apache.flink.runtime.checkpoint.savepoint.{SavepointLoader, SavepointStore, SavepointStoreFactory} +import org.apache.flink.runtime.checkpoint.stats.{CheckpointStatsTracker, DisabledCheckpointStatsTracker, SimpleCheckpointStatsTracker} import org.apache.flink.runtime.client._ import org.apache.flink.runtime.execution.SuppressRestartsException import org.apache.flink.runtime.clusterframework.FlinkResourceManager @@ -58,24 +56,22 @@ import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertexID} import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler} import org.apache.flink.runtime.leaderelection.{LeaderContender, LeaderElectionService, StandaloneLeaderElectionService} - import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged import org.apache.flink.runtime.messages.JobManagerMessages._ -import org.apache.flink.runtime.messages.Messages.{Disconnect, Acknowledge} +import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect} import org.apache.flink.runtime.messages.RegistrationMessages._ import org.apache.flink.runtime.messages.TaskManagerMessages.{Heartbeat, SendStackTrace} import org.apache.flink.runtime.messages.TaskMessages.{PartitionState, UpdateTaskExecutionState} import org.apache.flink.runtime.messages.accumulators.{AccumulatorMessage, AccumulatorResultStringsFound, AccumulatorResultsErroneous, AccumulatorResultsFound, RequestAccumulatorResults, RequestAccumulatorResultsStringified} -import org.apache.flink.runtime.messages.checkpoint.{DeclineCheckpoint, AbstractCheckpointMessage, AcknowledgeCheckpoint} - +import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, AcknowledgeCheckpoint, DeclineCheckpoint} import org.apache.flink.runtime.messages.webmonitor.InfoMessage import org.apache.flink.runtime.messages.webmonitor._ import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry} import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup import org.apache.flink.runtime.process.ProcessReaper -import org.apache.flink.runtime.query.{UnknownKvStateLocation, KvStateMessage} -import org.apache.flink.runtime.query.KvStateMessage.{NotifyKvStateUnregistered, LookupKvStateLocation, NotifyKvStateRegistered} +import org.apache.flink.runtime.query.{KvStateMessage, UnknownKvStateLocation} +import org.apache.flink.runtime.query.KvStateMessage.{LookupKvStateLocation, NotifyKvStateRegistered, NotifyKvStateUnregistered} import org.apache.flink.runtime.security.SecurityUtils import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner import org.apache.flink.runtime.taskmanager.TaskManager @@ -83,7 +79,6 @@ import org.apache.flink.runtime.util._ import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils} import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages} import org.apache.flink.util.{ConfigurationUtil, InstantiationUtil, NetUtils} - import org.jboss.netty.channel.ChannelException import scala.annotation.tailrec @@ -479,6 +474,22 @@ class JobManager( submitJob(jobGraph, jobInfo) + case RegisterJobClient(jobID, listeningBehaviour) => + val client = sender() + currentJobs.get(jobID) match { + case Some((executionGraph, jobInfo)) => + log.info("Registering client for job $jobID") + jobInfo.clients += ((client, listeningBehaviour)) + val listener = new StatusListenerMessenger(client, leaderSessionID.orNull) + executionGraph.registerJobStatusListener(listener) + if (listeningBehaviour == ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES) { + executionGraph.registerExecutionListener(listener) + } + client ! decorateMessage(RegisterJobClientSuccess(jobID)) + case None => + client ! decorateMessage(JobNotFound(jobID)) + } + case RecoverSubmittedJob(submittedJobGraph) => if (!currentJobs.contains(submittedJobGraph.getJobId)) { submitJob( @@ -788,50 +799,53 @@ class JobManager( } // is the client waiting for the job result? - if (jobInfo.listeningBehaviour != ListeningBehaviour.DETACHED) { - newJobStatus match { - case JobStatus.FINISHED => - try { - val accumulatorResults = executionGraph.getAccumulatorsSerialized() - val result = new SerializedJobExecutionResult( - jobID, - jobInfo.duration, - accumulatorResults) - - jobInfo.client ! decorateMessage(JobResultSuccess(result)) - } catch { - case e: Exception => - log.error(s"Cannot fetch final accumulators for job $jobID", e) - val exception = new JobExecutionException(jobID, - "Failed to retrieve accumulator results.", e) + newJobStatus match { + case JobStatus.FINISHED => + try { + val accumulatorResults = executionGraph.getAccumulatorsSerialized() + val result = new SerializedJobExecutionResult( + jobID, + jobInfo.duration, + accumulatorResults) + + jobInfo.notifyNonDetachedClients( + decorateMessage(JobResultSuccess(result))) + } catch { + case e: Exception => + log.error(s"Cannot fetch final accumulators for job $jobID", e) + val exception = new JobExecutionException(jobID, + "Failed to retrieve accumulator results.", e) - jobInfo.client ! decorateMessage(JobResultFailure( - new SerializedThrowable(exception))) - } + jobInfo.notifyNonDetachedClients( + decorateMessage(JobResultFailure( + new SerializedThrowable(exception)))) + } - case JobStatus.CANCELED => - // the error may be packed as a serialized throwable - val unpackedError = SerializedThrowable.get( - error, executionGraph.getUserClassLoader()) + case JobStatus.CANCELED => + // the error may be packed as a serialized throwable + val unpackedError = SerializedThrowable.get( + error, executionGraph.getUserClassLoader()) - jobInfo.client ! decorateMessage(JobResultFailure( + jobInfo.notifyNonDetachedClients( + decorateMessage(JobResultFailure( new SerializedThrowable( - new JobCancellationException(jobID, "Job was cancelled.", unpackedError)))) + new JobCancellationException(jobID, "Job was cancelled.", unpackedError))))) - case JobStatus.FAILED => - val unpackedError = SerializedThrowable.get( - error, executionGraph.getUserClassLoader()) + case JobStatus.FAILED => + val unpackedError = SerializedThrowable.get( + error, executionGraph.getUserClassLoader()) - jobInfo.client ! decorateMessage(JobResultFailure( + jobInfo.notifyNonDetachedClients( + decorateMessage(JobResultFailure( new SerializedThrowable( - new JobExecutionException(jobID, "Job execution failed.", unpackedError)))) - - case x => - val exception = new JobExecutionException(jobID, s"$x is not a terminal state.") - jobInfo.client ! decorateMessage(JobResultFailure( - new SerializedThrowable(exception))) - throw exception - } + new JobExecutionException(jobID, "Job execution failed.", unpackedError))))) + + case x => + val exception = new JobExecutionException(jobID, s"$x is not a terminal state.") + jobInfo.notifyNonDetachedClients( + decorateMessage(JobResultFailure( + new SerializedThrowable(exception)))) + throw exception } }(context.dispatcher) } @@ -919,6 +933,18 @@ class JobManager( archive forward decorateMessage(RequestJob(jobID)) } + case RequestClassloadingProps(jobID) => + currentJobs.get(jobID) match { + case Some((graph, jobInfo)) => + sender() ! decorateMessage( + ClassloadingProps( + libraryCacheManager.getBlobServerPort, + graph.getRequiredJarFiles, + graph.getRequiredClasspaths)) + case None => + sender() ! decorateMessage(JobNotFound(jobID)) + } + case RequestBlobManagerPort => sender ! decorateMessage(libraryCacheManager.getBlobServerPort) @@ -1052,11 +1078,10 @@ class JobManager( */ private def submitJob(jobGraph: JobGraph, jobInfo: JobInfo, isRecovery: Boolean = false): Unit = { if (jobGraph == null) { - jobInfo.client ! decorateMessage(JobResultFailure( - new SerializedThrowable( - new JobSubmissionException(null, "JobGraph must not be null.") - ) - )) + jobInfo.notifyClients( + decorateMessage(JobResultFailure( + new SerializedThrowable( + new JobSubmissionException(null, "JobGraph must not be null."))))) } else { val jobId = jobGraph.getJobID @@ -1259,13 +1284,15 @@ class JobManager( executionGraph.registerJobStatusListener( new StatusListenerMessenger(self, leaderSessionID.orNull)) - if (jobInfo.listeningBehaviour == ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES) { + jobInfo.clients foreach { // the sender wants to be notified about state changes - val listener = new StatusListenerMessenger(jobInfo.client, leaderSessionID.orNull) - - executionGraph.registerExecutionListener(listener) - executionGraph.registerJobStatusListener(listener) + case (client, ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES) => + val listener = new StatusListenerMessenger(client, leaderSessionID.orNull) + executionGraph.registerExecutionListener(listener) + executionGraph.registerJobStatusListener(listener) + case _ => // do nothing } + } catch { case t: Throwable => log.error(s"Failed to submit job $jobId ($jobName)", t) @@ -1283,7 +1310,8 @@ class JobManager( new JobExecutionException(jobId, s"Failed to submit job $jobId ($jobName)", t) } - jobInfo.client ! decorateMessage(JobResultFailure(new SerializedThrowable(rt))) + jobInfo.notifyClients( + decorateMessage(JobResultFailure(new SerializedThrowable(rt)))) return } @@ -1338,7 +1366,8 @@ class JobManager( } } - jobInfo.client ! decorateMessage(JobSubmitSuccess(jobGraph.getJobID)) + jobInfo.notifyClients( + decorateMessage(JobSubmitSuccess(jobGraph.getJobID))) if (leaderElectionService.hasLeadership) { // There is a small chance that multiple job managers schedule the same job after if @@ -1740,10 +1769,10 @@ class JobManager( future { eg.suspend(cause) - if (jobInfo.listeningBehaviour != ListeningBehaviour.DETACHED) { - jobInfo.client ! decorateMessage( - Failure(new JobExecutionException(jobID, "All jobs are cancelled and cleared.", cause))) - } + jobInfo.notifyNonDetachedClients( + decorateMessage( + Failure( + new JobExecutionException(jobID, "All jobs are cancelled and cleared.", cause)))) }(context.dispatcher) } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobClientMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobClientMessages.scala index a60fa7aa093ce..1f29e322ff450 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobClientMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobClientMessages.scala @@ -21,6 +21,7 @@ package org.apache.flink.runtime.messages import java.util.UUID import akka.actor.ActorRef +import org.apache.flink.api.common.JobID import org.apache.flink.runtime.jobgraph.JobGraph /** @@ -29,7 +30,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph object JobClientMessages { /** - * This message is sent to the JobClient (via ask) to submit a job and + * This message is sent to the JobClientActor (via ask) to submit a job and * get a response when the job execution has finished. * * The response to this message is a @@ -40,15 +41,11 @@ object JobClientMessages { case class SubmitJobAndWait(jobGraph: JobGraph) /** - * This message is sent to the JobClient (via ask) to submit a job and - * return as soon as the result of the submit operation is known. - * - * The response to this message is a - * [[org.apache.flink.api.common.JobSubmissionResult]] - * - * @param jobGraph The job to be executed. - */ - case class SubmitJobDetached(jobGraph: JobGraph) + * This message is sent to the JobClientActor to ask it to register at the JobManager + * and then return once the job execution is complete. + * @param jobID The job id + */ + case class AttachToJobAndWait(jobID: JobID) /** Notifies the JobClientActor about a new leader address and a leader session ID. * @@ -66,9 +63,13 @@ object JobClientMessages { /** Message which is triggered when the submission timeout has been reached. */ case object SubmissionTimeout extends RequiresLeaderSessionID - /** Messaeg which is triggered when the connection timeout has been reached. */ + /** Message which is triggered when the JobClient registration at the JobManager times out */ + case object RegistrationTimeout extends RequiresLeaderSessionID + + /** Message which is triggered when the connection timeout has been reached. */ case object ConnectionTimeout extends RequiresLeaderSessionID def getSubmissionTimeout(): AnyRef = SubmissionTimeout + def getRegistrationTimeout(): AnyRef = RegistrationTimeout def getConnectionTimeout(): AnyRef = ConnectionTimeout } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala index 14f72b0b3dd8d..40c4dcf75f599 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala @@ -18,6 +18,7 @@ package org.apache.flink.runtime.messages +import java.net.URL import java.util.UUID import akka.actor.ActorRef @@ -68,6 +69,19 @@ object JobManagerMessages { listeningBehaviour: ListeningBehaviour) extends RequiresLeaderSessionID + /** + * Registers the sender of the message as the client for the provided job identifier. + * This message is acknowledged by the JobManager with [[RegisterJobClientSuccess]] + * or [[JobNotFound]] if the job was not running. + * @param jobID The job id of the job + * @param listeningBehaviour The types of updates which will be sent to the sender + * after registration + */ + case class RegisterJobClient( + jobID: JobID, + listeningBehaviour: ListeningBehaviour) + extends RequiresLeaderSessionID + /** * Triggers the recovery of the job with the given ID. * @@ -194,6 +208,23 @@ object JobManagerMessages { */ case object RequestTotalNumberOfSlots + /** + * Requests all entities necessary for reconstructing a job class loader + * May respond with [[ClassloadingProps]] or [[JobNotFound]] + * @param jobId The job id of the registered job + */ + case class RequestClassloadingProps(jobId: JobID) + + /** + * Response to [[RequestClassloadingProps]] + * @param blobManagerPort The port of the blobManager + * @param requiredJarFiles The blob keys of the required jar files + * @param requiredClasspaths The urls of the required classpaths + */ + case class ClassloadingProps(blobManagerPort: Integer, + requiredJarFiles: java.util.List[BlobKey], + requiredClasspaths: java.util.List[URL]) + /** * Requests the port of the blob manager from the job manager. The result is sent back to the * sender as an [[Int]]. @@ -217,17 +248,28 @@ object JobManagerMessages { */ case class JobSubmitSuccess(jobId: JobID) + /** + * Denotes a successful registration of a JobClientActor for a running job + * @param jobId The job id of the registered job + */ + case class RegisterJobClientSuccess(jobId: JobID) + + /** + * Denotes messages which contain the result of a completed job execution + */ + sealed trait JobResultMessage + /** * Denotes a successful job execution. * @param result The result of the job execution, in serialized form. */ - case class JobResultSuccess(result: SerializedJobExecutionResult) + case class JobResultSuccess(result: SerializedJobExecutionResult) extends JobResultMessage /** * Denotes an unsuccessful job execution. * @param cause The exception that caused the job to fail, in serialized form. */ - case class JobResultFailure(cause: SerializedThrowable) + case class JobResultFailure(cause: SerializedThrowable) extends JobResultMessage sealed trait CancellationResponse{ @@ -316,7 +358,7 @@ object JobManagerMessages { /** * Denotes that there is no job with [[jobID]] retrievable. This message can be the response of - * [[RequestJob]] or [[RequestJobStatus]]. + * [[RequestJob]], [[RequestJobStatus]] or [[RegisterJobClient]]. * * @param jobID */ diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala index 9640fcdda05a1..df4f95a8316eb 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala @@ -26,7 +26,7 @@ import org.apache.flink.runtime.execution.ExecutionState import org.apache.flink.runtime.jobgraph.JobStatus import org.apache.flink.runtime.jobmanager.JobManager import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged -import org.apache.flink.runtime.messages.JobManagerMessages.GrantLeadership +import org.apache.flink.runtime.messages.JobManagerMessages.{GrantLeadership, RegisterJobClient} import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect} import org.apache.flink.runtime.messages.RegistrationMessages.RegisterTaskManager import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat @@ -67,6 +67,8 @@ trait TestingJobManagerLike extends FlinkActor { override def compare(x: (Int, ActorRef), y: (Int, ActorRef)): Int = y._1 - x._1 }) + val waitForClient = scala.collection.mutable.HashSet[ActorRef]() + val waitForShutdown = scala.collection.mutable.HashSet[ActorRef]() var disconnectDisabled = false @@ -328,6 +330,14 @@ trait TestingJobManagerLike extends FlinkActor { waitForLeader.clear() + case NotifyWhenClientConnects => + waitForClient += sender() + sender() ! true + + case msg: RegisterJobClient => + super.handleMessage(msg) + waitForClient.foreach(_ ! true) + case NotifyWhenAtLeastNumTaskManagerAreRegistered(numRegisteredTaskManager) => if (that.instanceManager.getNumberOfRegisteredTaskManagers >= numRegisteredTaskManager) { // there are already at least numRegisteredTaskManager registered --> send Acknowledge diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala index a411c8b7ebdc4..a88ed4386c246 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala @@ -82,6 +82,11 @@ object TestingJobManagerMessages { */ case object NotifyWhenLeader + /** + * Notifies the sender when the [[TestingJobManager]] receives new clients for jobs + */ + case object NotifyWhenClientConnects + /** * Registers to be notified by an [[org.apache.flink.runtime.messages.Messages.Acknowledge]] * message when at least numRegisteredTaskManager have registered at the JobManager. @@ -111,6 +116,7 @@ object TestingJobManagerMessages { case class ResponseSavepoint(savepoint: Savepoint) def getNotifyWhenLeader(): AnyRef = NotifyWhenLeader + def getNotifyWhenClientConnects(): AnyRef = NotifyWhenClientConnects def getDisablePostStop(): AnyRef = DisablePostStop } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java index 073164c030a41..2adf7eb807f0b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java @@ -25,17 +25,17 @@ import akka.pattern.Patterns; import akka.testkit.JavaTestKit; import akka.util.Timeout; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.FlinkUntypedActor; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; import org.apache.flink.runtime.messages.JobClientMessages; -import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.messages.JobClientMessages.AttachToJobAndWait; import org.apache.flink.runtime.messages.Messages; import org.apache.flink.util.TestLogger; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import scala.concurrent.Await; @@ -45,6 +45,8 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; +import static org.apache.flink.runtime.messages.JobManagerMessages.*; + public class JobClientActorTest extends TestLogger { private static ActorSystem system; @@ -62,8 +64,8 @@ public static void teardown() { } /** Tests that a {@link JobClientActorSubmissionTimeoutException} is thrown when the job cannot - * be submitted by the JobClientActor. This is here the case, because the started JobManager - * never replies to a SubmitJob message. + * be submitted by the JobSubmissionClientActor. This is here the case, because the started JobManager + * never replies to a {@link SubmitJob} message. * * @throws Exception */ @@ -84,7 +86,7 @@ public void testSubmissionTimeout() throws Exception { leaderSessionID ); - Props jobClientActorProps = JobClientActor.createJobClientActorProps( + Props jobClientActorProps = JobSubmissionClientActor.createActorProps( testingLeaderRetrievalService, jobClientActorTimeout, false); @@ -100,19 +102,56 @@ public void testSubmissionTimeout() throws Exception { Await.result(jobExecutionResult, timeout); } + + /** Tests that a {@link JobClientActorRegistrationTimeoutException} is thrown when the registration + * cannot be performed at the JobManager by the JobAttachmentClientActor. This is here the case, because the + * started JobManager never replies to a {@link RegisterJobClient} message. + */ + @Test(expected=JobClientActorRegistrationTimeoutException.class) + public void testRegistrationTimeout() throws Exception { + FiniteDuration jobClientActorTimeout = new FiniteDuration(5, TimeUnit.SECONDS); + FiniteDuration timeout = jobClientActorTimeout.$times(2); + + UUID leaderSessionID = UUID.randomUUID(); + + ActorRef jobManager = system.actorOf( + Props.create( + PlainActor.class, + leaderSessionID)); + + TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService( + jobManager.path().toString(), + leaderSessionID + ); + + Props jobClientActorProps = JobAttachmentClientActor.createActorProps( + testingLeaderRetrievalService, + jobClientActorTimeout, + false); + + ActorRef jobClientActor = system.actorOf(jobClientActorProps); + + Future jobExecutionResult = Patterns.ask( + jobClientActor, + new JobClientMessages.AttachToJobAndWait(testJobGraph.getJobID()), + new Timeout(timeout)); + + Await.result(jobExecutionResult, timeout); + } + /** Tests that a {@link org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException} - * is thrown when the JobClientActor wants to submit a job but has not connected to a JobManager. + * is thrown when the JobSubmissionClientActor wants to submit a job but has not connected to a JobManager. * * @throws Exception */ @Test(expected=JobClientActorConnectionTimeoutException.class) - public void testConnectionTimeoutWithoutJobManager() throws Exception { + public void testConnectionTimeoutWithoutJobManagerForSubmission() throws Exception { FiniteDuration jobClientActorTimeout = new FiniteDuration(5, TimeUnit.SECONDS); FiniteDuration timeout = jobClientActorTimeout.$times(2); TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService(); - Props jobClientActorProps = JobClientActor.createJobClientActorProps( + Props jobClientActorProps = JobSubmissionClientActor.createActorProps( testingLeaderRetrievalService, jobClientActorTimeout, false); @@ -127,6 +166,32 @@ public void testConnectionTimeoutWithoutJobManager() throws Exception { Await.result(jobExecutionResult, timeout); } + /** Tests that a {@link org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException} + * is thrown when the JobAttachmentClientActor attach to a job at the JobManager + * but has not connected to a JobManager. + */ + @Test(expected=JobClientActorConnectionTimeoutException.class) + public void testConnectionTimeoutWithoutJobManagerForRegistration() throws Exception { + FiniteDuration jobClientActorTimeout = new FiniteDuration(5, TimeUnit.SECONDS); + FiniteDuration timeout = jobClientActorTimeout.$times(2); + + TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService(); + + Props jobClientActorProps = JobAttachmentClientActor.createActorProps( + testingLeaderRetrievalService, + jobClientActorTimeout, + false); + + ActorRef jobClientActor = system.actorOf(jobClientActorProps); + + Future jobExecutionResult = Patterns.ask( + jobClientActor, + new JobClientMessages.AttachToJobAndWait(testJobGraph.getJobID()), + new Timeout(timeout)); + + Await.result(jobExecutionResult, timeout); + } + /** Tests that a {@link org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException} * is thrown after a successful job submission if the JobManager dies. * @@ -149,7 +214,7 @@ public void testConnectionTimeoutAfterJobSubmission() throws Exception { leaderSessionID ); - Props jobClientActorProps = JobClientActor.createJobClientActorProps( + Props jobClientActorProps = JobSubmissionClientActor.createActorProps( testingLeaderRetrievalService, jobClientActorTimeout, false); @@ -170,6 +235,91 @@ public void testConnectionTimeoutAfterJobSubmission() throws Exception { Await.result(jobExecutionResult, timeout); } + /** Tests that a {@link JobClientActorConnectionTimeoutException} + * is thrown after a successful registration of the client at the JobManager. + */ + @Test(expected=JobClientActorConnectionTimeoutException.class) + public void testConnectionTimeoutAfterJobRegistration() throws Exception { + FiniteDuration jobClientActorTimeout = new FiniteDuration(5, TimeUnit.SECONDS); + FiniteDuration timeout = jobClientActorTimeout.$times(2); + + UUID leaderSessionID = UUID.randomUUID(); + + ActorRef jobManager = system.actorOf( + Props.create( + JobAcceptingActor.class, + leaderSessionID)); + + TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService( + jobManager.path().toString(), + leaderSessionID + ); + + Props jobClientActorProps = JobAttachmentClientActor.createActorProps( + testingLeaderRetrievalService, + jobClientActorTimeout, + false); + + ActorRef jobClientActor = system.actorOf(jobClientActorProps); + + Future jobExecutionResult = Patterns.ask( + jobClientActor, + new AttachToJobAndWait(testJobGraph.getJobID()), + new Timeout(timeout)); + + Future waitFuture = Patterns.ask(jobManager, new RegisterTest(), new Timeout(timeout)); + + Await.result(waitFuture, timeout); + + jobManager.tell(PoisonPill.getInstance(), ActorRef.noSender()); + + Await.result(jobExecutionResult, timeout); + } + + + /** Tests that JobClient throws an Exception if the JobClientActor dies and can't answer to + * {@link akka.actor.Identify} message anymore. + */ + @Test + public void testGuaranteedAnswerIfJobClientDies() throws Exception { + FiniteDuration timeout = new FiniteDuration(2, TimeUnit.SECONDS); + + UUID leaderSessionID = UUID.randomUUID(); + + ActorRef jobManager = system.actorOf( + Props.create( + JobAcceptingActor.class, + leaderSessionID)); + + TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService( + jobManager.path().toString(), + leaderSessionID + ); + + JobListeningContext jobListeningContext = + JobClient.submitJob( + system, + testingLeaderRetrievalService, + testJobGraph, + timeout, + false, + getClass().getClassLoader()); + + Future waitFuture = Patterns.ask(jobManager, new RegisterTest(), new Timeout(timeout)); + Await.result(waitFuture, timeout); + + // kill the job client actor which has been registered at the JobManager + jobListeningContext.getJobClientActor().tell(PoisonPill.getInstance(), ActorRef.noSender()); + + try { + // should not block but return an error + JobClient.awaitJobResult(jobListeningContext); + Assert.fail(); + } catch (JobExecutionException e) { + // this is what we want + } + } + public static class PlainActor extends FlinkUntypedActor { private final UUID leaderSessionID; @@ -180,7 +330,6 @@ public PlainActor(UUID leaderSessionID) { @Override protected void handleMessage(Object message) throws Exception { - } @Override @@ -200,17 +349,29 @@ public JobAcceptingActor(UUID leaderSessionID) { @Override protected void handleMessage(Object message) throws Exception { - if (message instanceof JobManagerMessages.SubmitJob) { + if (message instanceof SubmitJob) { getSender().tell( - new JobManagerMessages.JobSubmitSuccess(((JobManagerMessages.SubmitJob) message).jobGraph().getJobID()), + new JobSubmitSuccess(((SubmitJob) message).jobGraph().getJobID()), getSelf()); jobAccepted = true; - if(testFuture != ActorRef.noSender()) { + if (testFuture != ActorRef.noSender()) { testFuture.tell(Messages.getAcknowledge(), getSelf()); } - } else if (message instanceof RegisterTest) { + } + else if (message instanceof RegisterJobClient) { + getSender().tell( + new RegisterJobClientSuccess(((RegisterJobClient) message).jobID()), + getSelf()); + + jobAccepted = true; + + if (testFuture != ActorRef.noSender()) { + testFuture.tell(Messages.getAcknowledge(), getSelf()); + } + } + else if (message instanceof RegisterTest) { testFuture = getSender(); if (jobAccepted) { @@ -226,4 +387,5 @@ protected UUID getLeaderSessionID() { } public static class RegisterTest{} + } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java index c71bd355d01f8..426dfbafb574c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java @@ -286,7 +286,6 @@ protected void verifyJobGraphs(SubmittedJobGraph expected, SubmittedJobGraph act JobInfo expectedJobInfo = expected.getJobInfo(); JobInfo actualJobInfo = actual.getJobInfo(); - assertEquals(expectedJobInfo.listeningBehaviour(), actualJobInfo.listeningBehaviour()); - assertEquals(expectedJobInfo.start(), actualJobInfo.start()); + assertEquals(expectedJobInfo, actualJobInfo); } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java new file mode 100644 index 0000000000000..db17ee870e207 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.test.clients.examples; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; +import org.apache.flink.api.common.JobID; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.StandaloneClusterClient; +import org.apache.flink.runtime.client.JobRetrievalException; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.minicluster.FlinkMiniCluster; +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; +import org.apache.flink.test.util.ForkableFlinkMiniCluster; +import org.apache.flink.util.TestLogger; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import scala.collection.Seq; + +import java.util.concurrent.Semaphore; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + + +/** + * Tests retrieval of a job from a running Flink cluster + */ +public class JobRetrievalITCase extends TestLogger { + + private static final Semaphore lock = new Semaphore(1); + + private static FlinkMiniCluster cluster; + + @BeforeClass + public static void before() { + cluster = new ForkableFlinkMiniCluster(new Configuration(), false); + cluster.start(); + } + + @AfterClass + public static void after() { + cluster.stop(); + cluster = null; + } + + @Test + public void testJobRetrieval() throws Exception { + final JobID jobID = new JobID(); + + final JobVertex imalock = new JobVertex("imalock"); + imalock.setInvokableClass(SemaphoreInvokable.class); + + final JobGraph jobGraph = new JobGraph(jobID, "testjob", imalock); + + final ClusterClient client = new StandaloneClusterClient(cluster.configuration()); + + // acquire the lock to make sure that the job cannot complete until the job client + // has been attached in resumingThread + lock.acquire(); + client.runDetached(jobGraph, JobRetrievalITCase.class.getClassLoader()); + + final Thread resumingThread = new Thread(new Runnable() { + @Override + public void run() { + try { + assertNotNull(client.retrieveJob(jobID)); + } catch (JobExecutionException e) { + fail(e.getMessage()); + } + } + }); + + final Seq actorSystemSeq = cluster.jobManagerActorSystems().get(); + final ActorSystem actorSystem = actorSystemSeq.last(); + JavaTestKit testkit = new JavaTestKit(actorSystem); + + final ActorRef jm = cluster.getJobManagersAsJava().get(0); + // wait until client connects + jm.tell(TestingJobManagerMessages.getNotifyWhenClientConnects(), testkit.getRef()); + // confirm registration + testkit.expectMsgEquals(true); + + // kick off resuming + resumingThread.start(); + + // wait for client to connect + testkit.expectMsgEquals(true); + // client has connected, we can release the lock + lock.release(); + + resumingThread.join(); + } + + @Test + public void testNonExistingJobRetrieval() throws Exception { + final JobID jobID = new JobID(); + ClusterClient client = new StandaloneClusterClient(cluster.configuration()); + + try { + client.retrieveJob(jobID); + fail(); + } catch (JobRetrievalException e) { + // this is what we want + } + } + + + public static class SemaphoreInvokable extends AbstractInvokable { + + @Override + public void invoke() throws Exception { + lock.acquire(); + } + } + +}