Skip to content

Commit

Permalink
[FLINK-7780] [Client] Move savepoint logic into ClusterClient
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Oct 11, 2017
1 parent 90eb902 commit e8e1e33
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 123 deletions.
Expand Up @@ -61,11 +61,11 @@
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;

Expand All @@ -89,16 +89,15 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import scala.Option;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;

import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepointFailure;
import static org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointFailure;

/**
* Implementation of a simple command line frontend for executing programs.
Expand Down Expand Up @@ -726,35 +725,29 @@ protected int savepoint(String[] args) {
*/
private int triggerSavepoint(SavepointOptions options, JobID jobId, String savepointDirectory) {
try {
ActorGateway jobManager = getJobManagerGateway(options);

logAndSysout("Triggering savepoint for job " + jobId + ".");
Future<Object> response = jobManager.ask(new TriggerSavepoint(jobId, Option.apply(savepointDirectory)),
new FiniteDuration(1, TimeUnit.HOURS));

Object result;
CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(options.getCommandLine());
ClusterClient client = activeCommandLine.retrieveCluster(options.getCommandLine(), config, configurationDirectory);
try {
logAndSysout("Waiting for response...");
result = Await.result(response, FiniteDuration.Inf());
}
catch (Exception e) {
throw new Exception("Triggering a savepoint for the job " + jobId + " failed.", e);
}
logAndSysout("Triggering savepoint for job " + jobId + ".");
CompletableFuture<String> savepointPathFuture = client.triggerSavepoint(jobId, savepointDirectory);

if (result instanceof TriggerSavepointSuccess) {
TriggerSavepointSuccess success = (TriggerSavepointSuccess) result;
logAndSysout("Savepoint completed. Path: " + success.savepointPath());
String savepointPath;
try {
logAndSysout("Waiting for response...");
savepointPath = savepointPathFuture.get();
}
catch (ExecutionException ee) {
Throwable cause = ExceptionUtils.stripExecutionException(ee);
throw new FlinkException("Triggering a savepoint for the job " + jobId + " failed.", cause);
}

logAndSysout("Savepoint completed. Path: " + savepointPath);
logAndSysout("You can resume your program from this savepoint with the run command.");

return 0;
}
else if (result instanceof TriggerSavepointFailure) {
TriggerSavepointFailure failure = (TriggerSavepointFailure) result;
throw failure.cause();
}
else {
throw new IllegalStateException("Unknown JobManager response of type " +
result.getClass());
finally {
client.shutdown();
}
}
catch (Throwable t) {
Expand Down
Expand Up @@ -42,6 +42,7 @@
import org.apache.flink.runtime.client.JobListeningContext;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.instance.ActorGateway;
Expand Down Expand Up @@ -72,6 +73,9 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;

import scala.Option;
import scala.Tuple2;
Expand Down Expand Up @@ -649,6 +653,36 @@ public void stop(final JobID jobId) throws Exception {
}
}

/**
* Triggers a savepoint for the job identified by the job id. The savepoint will be written to the given savepoint
* directory, or {@link org.apache.flink.configuration.CoreOptions#SAVEPOINT_DIRECTORY} if it is null.
*
* @param jobId job id
* @param savepointDirectory directory the savepoint should be written to
* @return path future where the savepoint is located
* @throws Exception if no connection to the cluster could be established
*/
public CompletableFuture<String> triggerSavepoint(JobID jobId, @Nullable String savepointDirectory) throws Exception {
final ActorGateway jobManager = getJobManagerGateway();

Future<Object> response = jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobId, Option.apply(savepointDirectory)),
new FiniteDuration(1, TimeUnit.HOURS));
CompletableFuture<Object> responseFuture = FutureUtils.toJava(response);

return responseFuture.thenApply((responseMessage) -> {
if (responseMessage instanceof JobManagerMessages.TriggerSavepointSuccess) {
JobManagerMessages.TriggerSavepointSuccess success = (JobManagerMessages.TriggerSavepointSuccess) responseMessage;
return success.savepointPath();
} else if (responseMessage instanceof JobManagerMessages.TriggerSavepointFailure) {
JobManagerMessages.TriggerSavepointFailure failure = (JobManagerMessages.TriggerSavepointFailure) responseMessage;
throw new CompletionException(failure.cause());
} else {
throw new CompletionException(
new IllegalStateException("Unknown JobManager response of type " + responseMessage.getClass()));
}
});
}

/**
* Requests and returns the accumulators for the given job identifier. Accumulators can be
* requested while a is running or after it has finished. The default class loader is used
Expand Down
Expand Up @@ -20,6 +20,8 @@

import org.apache.flink.api.common.JobID;
import org.apache.flink.client.cli.CommandLineOptions;
import org.apache.flink.client.util.MockedCliFrontend;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.messages.JobManagerMessages;

Expand All @@ -33,22 +35,23 @@
import java.io.File;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.util.concurrent.CompletableFuture;
import java.util.zip.ZipOutputStream;

import scala.Option;
import scala.concurrent.Future;
import scala.concurrent.Promise;
import scala.concurrent.duration.FiniteDuration;

import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepointFailure;
import static org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
import static org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointFailure;
import static org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
import static org.apache.flink.runtime.messages.JobManagerMessages.getDisposeSavepointSuccess;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -76,31 +79,19 @@ public void testTriggerSavepointSuccess() throws Exception {

try {
JobID jobId = new JobID();
ActorGateway jobManager = mock(ActorGateway.class);

Promise<Object> triggerResponse = new scala.concurrent.impl.Promise.DefaultPromise<>();

when(jobManager.ask(
Mockito.eq(new TriggerSavepoint(jobId, Option.<String>empty())),
any(FiniteDuration.class)))
.thenReturn(triggerResponse.future());

String savepointPath = "expectedSavepointPath";

triggerResponse.success(new TriggerSavepointSuccess(jobId, -1, savepointPath, -1));

CliFrontend frontend = new MockCliFrontend(
CliFrontendTestUtils.getConfigDir(), jobManager);
MockedCliFrontend frontend = new SavepointTestCliFrontend(savepointPath);

String[] parameters = { jobId.toString() };
int returnCode = frontend.savepoint(parameters);

assertEquals(0, returnCode);
verify(jobManager, times(1)).ask(
Mockito.eq(new TriggerSavepoint(jobId, Option.<String>empty())),
any(FiniteDuration.class));
verify(frontend.client, times(1))
.triggerSavepoint(eq(jobId), isNull(String.class));

assertTrue(buffer.toString().contains("expectedSavepointPath"));
assertTrue(buffer.toString().contains(savepointPath));
}
finally {
restoreStdOutAndStdErr();
Expand All @@ -113,29 +104,17 @@ public void testTriggerSavepointFailure() throws Exception {

try {
JobID jobId = new JobID();
ActorGateway jobManager = mock(ActorGateway.class);

Promise<Object> triggerResponse = new scala.concurrent.impl.Promise.DefaultPromise<>();

when(jobManager.ask(
Mockito.eq(new TriggerSavepoint(jobId, Option.<String>empty())),
any(FiniteDuration.class)))
.thenReturn(triggerResponse.future());

Exception testException = new Exception("expectedTestException");

triggerResponse.success(new TriggerSavepointFailure(jobId, testException));

CliFrontend frontend = new MockCliFrontend(
CliFrontendTestUtils.getConfigDir(), jobManager);
MockedCliFrontend frontend = new SavepointTestCliFrontend(testException);

String[] parameters = { jobId.toString() };
int returnCode = frontend.savepoint(parameters);

assertTrue(returnCode != 0);
verify(jobManager, times(1)).ask(
Mockito.eq(new TriggerSavepoint(jobId, Option.<String>empty())),
any(FiniteDuration.class));
assertNotEquals(0, returnCode);
verify(frontend.client, times(1))
.triggerSavepoint(eq(jobId), isNull(String.class));

assertTrue(buffer.toString().contains("expectedTestException"));
}
Expand All @@ -162,77 +141,29 @@ public void testTriggerSavepointFailureIllegalJobID() throws Exception {
}
}

@Test
public void testTriggerSavepointFailureUnknownResponse() throws Exception {
replaceStdOutAndStdErr();

try {
JobID jobId = new JobID();
ActorGateway jobManager = mock(ActorGateway.class);

Promise<Object> triggerResponse = new scala.concurrent.impl.Promise.DefaultPromise<>();

when(jobManager.ask(
Mockito.eq(new TriggerSavepoint(jobId, Option.<String>empty())),
any(FiniteDuration.class)))
.thenReturn(triggerResponse.future());

triggerResponse.success("UNKNOWN RESPONSE");

CliFrontend frontend = new MockCliFrontend(
CliFrontendTestUtils.getConfigDir(), jobManager);

String[] parameters = { jobId.toString() };
int returnCode = frontend.savepoint(parameters);

assertTrue(returnCode != 0);
verify(jobManager, times(1)).ask(
Mockito.eq(new TriggerSavepoint(jobId, Option.<String>empty())),
any(FiniteDuration.class));

String errMsg = buffer.toString();
assertTrue(errMsg.contains("IllegalStateException"));
assertTrue(errMsg.contains("Unknown JobManager response"));
}
finally {
restoreStdOutAndStdErr();
}
}

/**
* Tests that a CLI call with a custom savepoint directory target is
* forwarded correctly to the JM.
* forwarded correctly to the cluster client.
*/
@Test
public void testTriggerSavepointCustomTarget() throws Exception {
replaceStdOutAndStdErr();

try {
JobID jobId = new JobID();
Option<String> customTarget = Option.apply("customTargetDirectory");
ActorGateway jobManager = mock(ActorGateway.class);

Promise<Object> triggerResponse = new scala.concurrent.impl.Promise.DefaultPromise<>();
String savepointDirectory = "customTargetDirectory";

when(jobManager.ask(
Mockito.eq(new TriggerSavepoint(jobId, customTarget)),
any(FiniteDuration.class)))
.thenReturn(triggerResponse.future());
String savepointPath = "expectedSavepointPath";
triggerResponse.success(new TriggerSavepointSuccess(jobId, -1, savepointPath, -1));
MockedCliFrontend frontend = new SavepointTestCliFrontend(savepointDirectory);

CliFrontend frontend = new MockCliFrontend(
CliFrontendTestUtils.getConfigDir(), jobManager);

String[] parameters = { jobId.toString(), customTarget.get() };
String[] parameters = { jobId.toString(), savepointDirectory };
int returnCode = frontend.savepoint(parameters);

assertEquals(0, returnCode);
verify(jobManager, times(1)).ask(
Mockito.eq(new TriggerSavepoint(jobId, customTarget)),
any(FiniteDuration.class));
verify(frontend.client, times(1))
.triggerSavepoint(eq(jobId), eq(savepointDirectory));

assertTrue(buffer.toString().contains("expectedSavepointPath"));
assertTrue(buffer.toString().contains(savepointDirectory));
}
finally {
restoreStdOutAndStdErr();
Expand Down Expand Up @@ -444,4 +375,17 @@ private static void restoreStdOutAndStdErr() {
System.setOut(stdOut);
System.setErr(stdErr);
}

private static final class SavepointTestCliFrontend extends MockedCliFrontend {

SavepointTestCliFrontend(String expectedResponse) throws Exception {
when(client.triggerSavepoint(any(JobID.class), anyString()))
.thenReturn(CompletableFuture.completedFuture(expectedResponse));
}

SavepointTestCliFrontend(Exception expectedException) throws Exception {
when(client.triggerSavepoint(any(JobID.class), anyString()))
.thenReturn(FutureUtils.completedExceptionally(expectedException));
}
}
}

0 comments on commit e8e1e33

Please sign in to comment.