Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.JobListeningContext;
import org.apache.flink.runtime.client.JobRetrievalException;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
Expand Down Expand Up @@ -428,6 +430,39 @@ public JobSubmissionResult runDetached(JobGraph jobGraph, ClassLoader classLoade
}
}

/**
* Reattaches to a running from from the supplied job id
* @param jobID The job id of the job to attach to
* @return The JobExecutionResult for the jobID
* @throws JobExecutionException if an error occurs during monitoring the job execution
*/
public JobExecutionResult retrieveJob(JobID jobID) throws JobExecutionException {
final LeaderRetrievalService leaderRetrievalService;
try {
leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig);
} catch (Exception e) {
throw new JobRetrievalException(jobID, "Could not create the leader retrieval service", e);
}

ActorGateway jobManagerGateway;
try {
jobManagerGateway = getJobManagerGateway();
} catch (Exception e) {
throw new JobRetrievalException(jobID, "Could not retrieve the JobManager Gateway");
}

final JobListeningContext listeningContext = JobClient.attachToRunningJob(
jobID,
jobManagerGateway,
flinkConfig,
actorSystemLoader.get(),
leaderRetrievalService,
timeout,
printStatusDuringExecution);

return JobClient.awaitJobResult(listeningContext);
}

/**
* Cancels a job identified by the job id.
* @param jobId the job id
Expand All @@ -446,11 +481,11 @@ public void cancel(JobID jobId) throws Exception {
final Object result = Await.result(response, timeout);

if (result instanceof JobManagerMessages.CancellationSuccess) {
LOG.info("Job cancellation with ID " + jobId + " succeeded.");
logAndSysout("Job cancellation with ID " + jobId + " succeeded.");
} else if (result instanceof JobManagerMessages.CancellationFailure) {
final Throwable t = ((JobManagerMessages.CancellationFailure) result).cause();
LOG.info("Job cancellation with ID " + jobId + " failed.", t);
throw new Exception("Failed to cancel the job because of \n" + t.getMessage());
logAndSysout("Job cancellation with ID " + jobId + " failed because of " + t.getMessage());
throw new Exception("Failed to cancel the job with id " + jobId, t);
} else {
throw new Exception("Unknown message received while cancelling: " + result.getClass().getName());
}
Expand Down
2 changes: 1 addition & 1 deletion flink-clients/src/test/resources/log4j-test.properties
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@ log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

# suppress the irrelevant (wrong) warnings from the netty channel handler
log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class JobExecutionResult extends JobSubmissionResult {

private long netRuntime;

private Map<String, Object> accumulatorResults = Collections.emptyMap();
private final Map<String, Object> accumulatorResults;

/**
* Creates a new JobExecutionResult.
Expand All @@ -49,6 +49,8 @@ public JobExecutionResult(JobID jobID, long netRuntime, Map<String, Object> accu

if (accumulators != null) {
this.accumulatorResults = accumulators;
} else {
this.accumulatorResults = Collections.emptyMap();
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.client;

import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.Status;
import akka.dispatch.Futures;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.JobClientMessages;
import org.apache.flink.runtime.messages.JobClientMessages.AttachToJobAndWait;
import org.apache.flink.runtime.messages.JobManagerMessages;
import scala.concurrent.duration.FiniteDuration;

import java.util.concurrent.Callable;


/**
* Actor which handles Job attachment process and provides Job updates until completion.
*/
public class JobAttachmentClientActor extends JobClientActor {

/** JobID to attach to when the JobClientActor retrieves a job */
private JobID jobID;
/** true if a JobRegistrationSuccess message has been received */
private boolean successfullyRegisteredForJob = false;

public JobAttachmentClientActor(
LeaderRetrievalService leaderRetrievalService,
FiniteDuration timeout,
boolean sysoutUpdates) {
super(leaderRetrievalService, timeout, sysoutUpdates);
}

@Override
public void connectedToJobManager() {
if (jobID != null && !successfullyRegisteredForJob) {
tryToAttachToJob();
}
}

@Override
protected Class getClientMessageClass() {
return AttachToJobAndWait.class;
}

@Override
public void handleCustomMessage(Object message) {
if (message instanceof AttachToJobAndWait) {
// sanity check that this no job registration was performed through this actor before -
// it is a one-shot actor after all
if (this.client == null) {
jobID = ((AttachToJobAndWait) message).jobID();
if (jobID == null) {
LOG.error("Received null JobID");
sender().tell(
decorateMessage(new Status.Failure(new Exception("JobID is null"))),
getSelf());
} else {
LOG.info("Received JobID {}.", jobID);

this.client = getSender();

// is only successful if we already know the job manager leader
if (jobManager != null) {
tryToAttachToJob();
}
}
} else {
// repeated submission - tell failure to sender and kill self
String msg = "Received repeated 'AttachToJobAndWait'";
LOG.error(msg);
getSender().tell(
decorateMessage(new Status.Failure(new Exception(msg))), ActorRef.noSender());

terminate();
}
}
else if (message instanceof JobManagerMessages.RegisterJobClientSuccess) {
// job registration was successful :o)
JobManagerMessages.RegisterJobClientSuccess msg = ((JobManagerMessages.RegisterJobClientSuccess) message);
logAndPrintMessage("Successfully registered at the JobManager for Job " + msg.jobId());
successfullyRegisteredForJob = true;
}
else if (message instanceof JobManagerMessages.JobNotFound) {
LOG.info("Couldn't register JobClient for JobID {}",
((JobManagerMessages.JobNotFound) message).jobID());
client.tell(decorateMessage(message), getSelf());
terminate();
}
else if (JobClientMessages.getRegistrationTimeout().equals(message)) {
// check if our registration for a job was successful in the meantime
if (!successfullyRegisteredForJob) {
if (isClientConnected()) {
client.tell(
decorateMessage(new Status.Failure(
new JobClientActorRegistrationTimeoutException("Registration for Job at the JobManager " +
"timed out. " + "You may increase '" + ConfigConstants.AKKA_CLIENT_TIMEOUT +
"' in case the JobManager needs more time to confirm the job client registration."))),
getSelf());
}

// We haven't heard back from the job manager after attempting registration for a job
// therefore terminate
terminate();
}
} else {
LOG.error("{} received unknown message: ", getClass());
}

}

private void tryToAttachToJob() {
LOG.info("Sending message to JobManager {} to attach to job {} and wait for progress", jobID);

Futures.future(new Callable<Object>() {
@Override
public Object call() throws Exception {
LOG.info("Attaching to job {} at the job manager {}.", jobID, jobManager.path());

jobManager.tell(
decorateMessage(
new JobManagerMessages.RegisterJobClient(
jobID,
ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES)),
getSelf());

// issue a RegistrationTimeout message to check that we submit the job within
// the given timeout
getContext().system().scheduler().scheduleOnce(
timeout,
getSelf(),
decorateMessage(JobClientMessages.getRegistrationTimeout()),
getContext().dispatcher(),
ActorRef.noSender());

return null;
}
}, getContext().dispatcher());
}

public static Props createActorProps(
LeaderRetrievalService leaderRetrievalService,
FiniteDuration timeout,
boolean sysoutUpdates) {
return Props.create(
JobAttachmentClientActor.class,
leaderRetrievalService,
timeout,
sysoutUpdates);
}
}
Loading