Skip to content

Commit

Permalink
[FLINK-8332] [flip6] Move savepoint dispose into ClusterClient
Browse files Browse the repository at this point in the history
Move the savepoint disposal logic from the CliFrontend into the ClusterClient. This gives
a better separation of concerns and allows the CliFrontend to be used with different
ClusterClient implementations.
  • Loading branch information
tillrohrmann committed Jan 1, 2018
1 parent 783bd1d commit 192adb7
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 279 deletions.
192 changes: 77 additions & 115 deletions flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.util.EnvironmentInformation;
Expand Down Expand Up @@ -91,14 +91,10 @@
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;

import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepointFailure;

/**
* Implementation of a simple command line frontend for executing programs.
*/
Expand Down Expand Up @@ -659,128 +655,107 @@ protected int savepoint(String[] args) {
return 0;
}

if (options.isDispose()) {
// Discard
return disposeSavepoint(options);
} else {
// Trigger
String[] cleanedArgs = options.getArgs();
JobID jobId;
CustomCommandLine<?> customCommandLine = getActiveCustomCommandLine(options.getCommandLine());

if (cleanedArgs.length >= 1) {
String jobIdString = cleanedArgs[0];
try {
jobId = new JobID(StringUtils.hexStringToByte(jobIdString));
} catch (Exception e) {
return handleArgException(new IllegalArgumentException(
"Error: The value for the Job ID is not a valid ID."));
}
ClusterClient clusterClient = customCommandLine.retrieveCluster(options.getCommandLine(), config, configurationDirectory);

try {
if (options.isDispose()) {
// Discard
return disposeSavepoint(clusterClient, options.getSavepointPath());
} else {
return handleArgException(new IllegalArgumentException(
// Trigger
String[] cleanedArgs = options.getArgs();
JobID jobId;

if (cleanedArgs.length >= 1) {
String jobIdString = cleanedArgs[0];
try {
jobId = new JobID(StringUtils.hexStringToByte(jobIdString));
} catch (Exception e) {
return handleArgException(new IllegalArgumentException(
"Error: The value for the Job ID is not a valid ID."));
}
} else {
return handleArgException(new IllegalArgumentException(
"Error: The value for the Job ID is not a valid ID. " +
"Specify a Job ID to trigger a savepoint."));
}
"Specify a Job ID to trigger a savepoint."));
}

String savepointDirectory = null;
if (cleanedArgs.length >= 2) {
savepointDirectory = cleanedArgs[1];
}
String savepointDirectory = null;
if (cleanedArgs.length >= 2) {
savepointDirectory = cleanedArgs[1];
}

// Print superfluous arguments
if (cleanedArgs.length >= 3) {
logAndSysout("Provided more arguments than required. Ignoring not needed arguments.");
}
// Print superfluous arguments
if (cleanedArgs.length >= 3) {
logAndSysout("Provided more arguments than required. Ignoring not needed arguments.");
}

return triggerSavepoint(options, jobId, savepointDirectory);
return triggerSavepoint(clusterClient, jobId, savepointDirectory);
}
} catch (Exception e) {
return handleError(e);
} finally {
try {
clusterClient.shutdown();
} catch (Exception e) {
LOG.info("Could not shutdown the cluster client.", e);
}
}
}

/**
* Sends a {@link org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint}
* message to the job manager.
*/
private int triggerSavepoint(SavepointOptions options, JobID jobId, String savepointDirectory) {
try {
CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(options.getCommandLine());
ClusterClient client = activeCommandLine.retrieveCluster(options.getCommandLine(), config, configurationDirectory);
try {
logAndSysout("Triggering savepoint for job " + jobId + ".");
CompletableFuture<String> savepointPathFuture = client.triggerSavepoint(jobId, savepointDirectory);

String savepointPath;
try {
logAndSysout("Waiting for response...");
savepointPath = savepointPathFuture.get();
}
catch (ExecutionException ee) {
Throwable cause = ExceptionUtils.stripExecutionException(ee);
throw new FlinkException("Triggering a savepoint for the job " + jobId + " failed.", cause);
}

logAndSysout("Savepoint completed. Path: " + savepointPath);
logAndSysout("You can resume your program from this savepoint with the run command.");
private int triggerSavepoint(ClusterClient clusterClient, JobID jobId, String savepointDirectory) throws Exception {
logAndSysout("Triggering savepoint for job " + jobId + ".");
CompletableFuture<String> savepointPathFuture = clusterClient.triggerSavepoint(jobId, savepointDirectory);

return 0;
}
finally {
client.shutdown();
}
String savepointPath;
try {
logAndSysout("Waiting for response...");
savepointPath = savepointPathFuture.get();
}
catch (Throwable t) {
return handleError(t);
catch (ExecutionException ee) {
Throwable cause = ExceptionUtils.stripExecutionException(ee);
throw new FlinkException("Triggering a savepoint for the job " + jobId + " failed.", cause);
}

logAndSysout("Savepoint completed. Path: " + savepointPath);
logAndSysout("You can resume your program from this savepoint with the run command.");

return 0;
}

/**
* Sends a {@link org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint}
* message to the job manager.
*/
private int disposeSavepoint(SavepointOptions options) {
try {
String savepointPath = options.getSavepointPath();
if (savepointPath == null) {
throw new IllegalArgumentException("Missing required argument: savepoint path. " +
"Usage: bin/flink savepoint -d <savepoint-path>");
}

ActorGateway jobManager = getJobManagerGateway(options);
private int disposeSavepoint(ClusterClient clusterClient, String savepointPath) {
Preconditions.checkNotNull(savepointPath, "Missing required argument: savepoint path. " +
"Usage: bin/flink savepoint -d <savepoint-path>");

logAndSysout("Disposing savepoint '" + savepointPath + "'.");
logAndSysout("Disposing savepoint '" + savepointPath + "'.");

Object msg = new DisposeSavepoint(savepointPath);
Future<Object> response = jobManager.ask(msg, clientTimeout);
CompletableFuture<Acknowledge> disposeFuture = null;
try {
disposeFuture = clusterClient.disposeSavepoint(savepointPath, FutureUtils.toTime(clientTimeout));
} catch (Exception e) {
return handleError(new FlinkException("Could not dispose savepoint", e));
}

Object result;
try {
logAndSysout("Waiting for response...");
result = Await.result(response, clientTimeout);
} catch (Exception e) {
throw new Exception("Disposing the savepoint with path" + savepointPath + " failed.", e);
}
logAndSysout("Waiting for response...");

if (result.getClass() == JobManagerMessages.getDisposeSavepointSuccess().getClass()) {
logAndSysout("Savepoint '" + savepointPath + "' disposed.");
return 0;
} else if (result instanceof DisposeSavepointFailure) {
DisposeSavepointFailure failure = (DisposeSavepointFailure) result;

if (failure.cause() instanceof ClassNotFoundException) {
throw new ClassNotFoundException("Savepoint disposal failed, because of a " +
"missing class. This is most likely caused by a custom state " +
"instance, which cannot be disposed without the user code class " +
"loader. Please provide the program jar with which you have created " +
"the savepoint via -j <JAR> for disposal.",
failure.cause().getCause());
} else {
throw failure.cause();
}
} else {
throw new IllegalStateException("Unknown JobManager response of type " +
result.getClass());
}
} catch (Throwable t) {
return handleError(t);
try {
disposeFuture.get(clientTimeout.toMillis(), TimeUnit.MILLISECONDS);
} catch (Exception e) {
return handleError(e);
}

logAndSysout("Savepoint '" + savepointPath + "' disposed.");
return 0;
}

// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -882,19 +857,6 @@ protected ClusterClient retrieveClient(CommandLineOptions options) {
}
}

/**
* Retrieves the {@link ActorGateway} for the JobManager. The ClusterClient is retrieved
* from the provided {@link CommandLineOptions}.
*
* @param options CommandLineOptions specifying the JobManager URL
* @return Gateway to the JobManager
* @throws Exception
*/
protected ActorGateway getJobManagerGateway(CommandLineOptions options) throws Exception {
logAndSysout("Retrieving JobManager.");
return retrieveClient(options).getJobManagerGateway();
}

/**
* Creates a {@link ClusterClient} object from the given command line options and other parameters.
* @param options Command line options
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous;
import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsFound;
Expand All @@ -62,6 +63,7 @@
import org.apache.flink.runtime.util.LeaderConnectionInfo;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;

Expand Down Expand Up @@ -691,6 +693,39 @@ public CompletableFuture<String> triggerSavepoint(JobID jobId, @Nullable String
});
}

public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath, Time timeout) throws Exception {
final ActorGateway jobManager = getJobManagerGateway();

Object msg = new JobManagerMessages.DisposeSavepoint(savepointPath);
CompletableFuture<Object> responseFuture = FutureUtils.toJava(
jobManager.ask(
msg,
FutureUtils.toFiniteDuration(timeout)));

return responseFuture.thenApply(
(Object response) -> {
if (response instanceof JobManagerMessages.DisposeSavepointSuccess$) {
return Acknowledge.get();
} else if (response instanceof JobManagerMessages.DisposeSavepointFailure) {
JobManagerMessages.DisposeSavepointFailure failureResponse = (JobManagerMessages.DisposeSavepointFailure) response;

if (failureResponse.cause() instanceof ClassNotFoundException) {
throw new CompletionException(
new ClassNotFoundException("Savepoint disposal failed, because of a " +
"missing class. This is most likely caused by a custom state " +
"instance, which cannot be disposed without the user code class " +
"loader. Please provide the program jar with which you have created " +
"the savepoint via -j <JAR> for disposal.",
failureResponse.cause().getCause()));
} else {
throw new CompletionException(failureResponse.cause());
}
} else {
throw new CompletionException(new FlinkRuntimeException("Unknown response type " + response.getClass().getSimpleName() + '.'));
}
});
}

/**
* Lists the currently running and finished jobs on the cluster.
*
Expand Down

0 comments on commit 192adb7

Please sign in to comment.