Skip to content

Commit

Permalink
[FLINK-7322] [futures] Replace Flink's futures with Java 8's Completa…
Browse files Browse the repository at this point in the history
…bleFuture in CheckpointCoordinator

Fix failing JobManagerITCase

This closes #4436.
  • Loading branch information
tillrohrmann committed Aug 1, 2017
1 parent 4a9f19b commit 4378ac3
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 47 deletions.
Expand Up @@ -26,9 +26,6 @@
import org.apache.flink.runtime.checkpoint.hooks.MasterHooks; import org.apache.flink.runtime.checkpoint.hooks.MasterHooks;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader; import org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore; import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
import org.apache.flink.runtime.concurrent.ApplyFunction;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
Expand Down Expand Up @@ -58,6 +55,7 @@
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
Expand Down Expand Up @@ -362,7 +360,7 @@ public boolean isShutdown() {
* configured * configured
* @throws Exception Failures during triggering are forwarded * @throws Exception Failures during triggering are forwarded
*/ */
public Future<CompletedCheckpoint> triggerSavepoint(long timestamp, String targetDirectory) throws Exception { public CompletableFuture<CompletedCheckpoint> triggerSavepoint(long timestamp, String targetDirectory) throws Exception {
checkNotNull(targetDirectory, "Savepoint target directory"); checkNotNull(targetDirectory, "Savepoint target directory");


CheckpointProperties props = CheckpointProperties.forStandardSavepoint(); CheckpointProperties props = CheckpointProperties.forStandardSavepoint();
Expand All @@ -377,29 +375,30 @@ public Future<CompletedCheckpoint> triggerSavepoint(long timestamp, String targe
savepointDirectory, savepointDirectory,
false); false);


Future<CompletedCheckpoint> result; CompletableFuture<CompletedCheckpoint> result;


if (triggerResult.isSuccess()) { if (triggerResult.isSuccess()) {
result = triggerResult.getPendingCheckpoint().getCompletionFuture(); result = triggerResult.getPendingCheckpoint().getCompletionFuture();
} else { } else {
Throwable cause = new Exception("Failed to trigger savepoint: " + triggerResult.getFailureReason().message()); Throwable cause = new Exception("Failed to trigger savepoint: " + triggerResult.getFailureReason().message());
result = FlinkCompletableFuture.completedExceptionally(cause); result = new CompletableFuture<>();
result.completeExceptionally(cause);
return result;
} }


// Make sure to remove the created base directory on Exceptions // Make sure to remove the created base directory on Exceptions
result.exceptionallyAsync(new ApplyFunction<Throwable, Void>() { result.whenCompleteAsync(
@Override (CompletedCheckpoint checkpoint, Throwable throwable) -> {
public Void apply(Throwable value) { if (throwable != null) {
try { try {
SavepointStore.deleteSavepointDirectory(savepointDirectory); SavepointStore.deleteSavepointDirectory(savepointDirectory);
} catch (Throwable t) { } catch (Throwable t) {
LOG.warn("Failed to delete savepoint directory " + savepointDirectory LOG.warn("Failed to delete savepoint directory " + savepointDirectory
+ " after failed savepoint.", t); + " after failed savepoint.", t);
}
} }

},
return null; executor);
}
}, executor);


return result; return result;
} }
Expand Down Expand Up @@ -427,7 +426,7 @@ public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) {
*/ */
@VisibleForTesting @VisibleForTesting
@Internal @Internal
public Future<CompletedCheckpoint> triggerCheckpoint(long timestamp, CheckpointOptions options) throws Exception { public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(long timestamp, CheckpointOptions options) throws Exception {
switch (options.getCheckpointType()) { switch (options.getCheckpointType()) {
case SAVEPOINT: case SAVEPOINT:
return triggerSavepoint(timestamp, options.getTargetLocation()); return triggerSavepoint(timestamp, options.getTargetLocation());
Expand All @@ -440,7 +439,9 @@ public Future<CompletedCheckpoint> triggerCheckpoint(long timestamp, CheckpointO
return triggerResult.getPendingCheckpoint().getCompletionFuture(); return triggerResult.getPendingCheckpoint().getCompletionFuture();
} else { } else {
Throwable cause = new Exception("Failed to trigger checkpoint: " + triggerResult.getFailureReason().message()); Throwable cause = new Exception("Failed to trigger checkpoint: " + triggerResult.getFailureReason().message());
return FlinkCompletableFuture.completedExceptionally(cause); CompletableFuture<CompletedCheckpoint> failedResult = new CompletableFuture<>();
failedResult.completeExceptionally(cause);
return failedResult;
} }


default: default:
Expand Down
Expand Up @@ -22,8 +22,6 @@
import org.apache.flink.runtime.checkpoint.savepoint.Savepoint; import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore; import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2; import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.OperatorID;
Expand All @@ -47,6 +45,7 @@
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;


Expand Down Expand Up @@ -104,7 +103,7 @@ public enum TaskAcknowledgeResult {
private final String targetDirectory; private final String targetDirectory;


/** The promise to fulfill once the checkpoint has been completed. */ /** The promise to fulfill once the checkpoint has been completed. */
private final FlinkCompletableFuture<CompletedCheckpoint> onCompletionPromise; private final CompletableFuture<CompletedCheckpoint> onCompletionPromise;


/** The executor for potentially blocking I/O operations, like state disposal */ /** The executor for potentially blocking I/O operations, like state disposal */
private final Executor executor; private final Executor executor;
Expand Down Expand Up @@ -149,7 +148,7 @@ public PendingCheckpoint(
this.operatorStates = new HashMap<>(); this.operatorStates = new HashMap<>();
this.masterState = new ArrayList<>(); this.masterState = new ArrayList<>();
this.acknowledgedTasks = new HashSet<>(verticesToConfirm.size()); this.acknowledgedTasks = new HashSet<>(verticesToConfirm.size());
this.onCompletionPromise = new FlinkCompletableFuture<>(); this.onCompletionPromise = new CompletableFuture<>();
} }


// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -249,7 +248,7 @@ public boolean setCancellerHandle(ScheduledFuture<?> cancellerHandle) {
* *
* @return A future to the completed checkpoint * @return A future to the completed checkpoint
*/ */
public Future<CompletedCheckpoint> getCompletionFuture() { public CompletableFuture<CompletedCheckpoint> getCompletionFuture() {
return onCompletionPromise; return onCompletionPromise;
} }


Expand Down
Expand Up @@ -21,7 +21,8 @@ package org.apache.flink.runtime.jobmanager
import java.io.IOException import java.io.IOException
import java.net._ import java.net._
import java.util.UUID import java.util.UUID
import java.util.concurrent.{TimeUnit, Future => _, TimeoutException => _, _} import java.util.concurrent.{Future => JavaFuture, _}
import java.util.function.BiFunction


import akka.actor.Status.{Failure, Success} import akka.actor.Status.{Failure, Success}
import akka.actor._ import akka.actor._
Expand All @@ -38,13 +39,13 @@ import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour} import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
import org.apache.flink.runtime.blob.{BlobServer, BlobStore} import org.apache.flink.runtime.blob.{BlobServer, BlobStore}
import org.apache.flink.runtime.checkpoint._ import org.apache.flink.runtime.checkpoint._
import org.apache.flink.runtime.checkpoint.savepoint.{SavepointLoader, SavepointStore} import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
import org.apache.flink.runtime.client._ import org.apache.flink.runtime.client._
import org.apache.flink.runtime.clusterframework.FlinkResourceManager import org.apache.flink.runtime.clusterframework.FlinkResourceManager
import org.apache.flink.runtime.clusterframework.messages._ import org.apache.flink.runtime.clusterframework.messages._
import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
import org.apache.flink.runtime.clusterframework.types.ResourceID import org.apache.flink.runtime.clusterframework.types.ResourceID
import org.apache.flink.runtime.concurrent.{AcceptFunction, ApplyFunction, BiFunction, Executors => FlinkExecutors} import org.apache.flink.runtime.concurrent.{AcceptFunction, ApplyFunction, Executors => FlinkExecutors}
import org.apache.flink.runtime.execution.SuppressRestartsException import org.apache.flink.runtime.execution.SuppressRestartsException
import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, LibraryCacheManager} import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, LibraryCacheManager}
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
Expand All @@ -58,7 +59,6 @@ import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkSchedule
import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway
import org.apache.flink.runtime.jobmaster.JobMaster import org.apache.flink.runtime.jobmaster.JobMaster
import org.apache.flink.runtime.leaderelection.{LeaderContender, LeaderElectionService} import org.apache.flink.runtime.leaderelection.{LeaderContender, LeaderElectionService}
import org.apache.flink.runtime.jobmaster.JobMaster.{ARCHIVE_NAME, JOB_MANAGER_NAME}
import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph
import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
import org.apache.flink.runtime.messages.JobManagerMessages._ import org.apache.flink.runtime.messages.JobManagerMessages._
Expand All @@ -80,12 +80,11 @@ import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils
import org.apache.flink.runtime.security.SecurityUtils import org.apache.flink.runtime.security.SecurityUtils
import org.apache.flink.runtime.security.SecurityUtils.SecurityConfiguration import org.apache.flink.runtime.security.SecurityUtils.SecurityConfiguration
import org.apache.flink.runtime.taskexecutor.TaskExecutor import org.apache.flink.runtime.taskexecutor.TaskExecutor
import org.apache.flink.runtime.taskexecutor.TaskExecutor.TASK_MANAGER_NAME
import org.apache.flink.runtime.taskmanager.TaskManager import org.apache.flink.runtime.taskmanager.TaskManager
import org.apache.flink.runtime.util._ import org.apache.flink.runtime.util._
import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils} import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages} import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages}
import org.apache.flink.util.{ConfigurationUtil, InstantiationUtil, NetUtils} import org.apache.flink.util.{InstantiationUtil, NetUtils}


import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.mutable import scala.collection.mutable
Expand Down
Expand Up @@ -26,7 +26,6 @@
import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
Expand Down Expand Up @@ -76,6 +75,7 @@
import java.util.Random; import java.util.Random;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -1446,7 +1446,7 @@ public void testTriggerAndConfirmSimpleSavepoint() throws Exception {


// trigger the first checkpoint. this should succeed // trigger the first checkpoint. this should succeed
String savepointDir = tmpFolder.newFolder().getAbsolutePath(); String savepointDir = tmpFolder.newFolder().getAbsolutePath();
Future<CompletedCheckpoint> savepointFuture = coord.triggerSavepoint(timestamp, savepointDir); CompletableFuture<CompletedCheckpoint> savepointFuture = coord.triggerSavepoint(timestamp, savepointDir);
assertFalse(savepointFuture.isDone()); assertFalse(savepointFuture.isDone());


// validate that we have a pending savepoint // validate that we have a pending savepoint
Expand Down Expand Up @@ -1601,7 +1601,7 @@ public void testSavepointsAreNotSubsumed() throws Exception {
String savepointDir = tmpFolder.newFolder().getAbsolutePath(); String savepointDir = tmpFolder.newFolder().getAbsolutePath();


// Trigger savepoint and checkpoint // Trigger savepoint and checkpoint
Future<CompletedCheckpoint> savepointFuture1 = coord.triggerSavepoint(timestamp, savepointDir); CompletableFuture<CompletedCheckpoint> savepointFuture1 = coord.triggerSavepoint(timestamp, savepointDir);
long savepointId1 = counter.getLast(); long savepointId1 = counter.getLast();
assertEquals(1, coord.getNumberOfPendingCheckpoints()); assertEquals(1, coord.getNumberOfPendingCheckpoints());


Expand All @@ -1626,7 +1626,7 @@ public void testSavepointsAreNotSubsumed() throws Exception {
long checkpointId3 = counter.getLast(); long checkpointId3 = counter.getLast();
assertEquals(2, coord.getNumberOfPendingCheckpoints()); assertEquals(2, coord.getNumberOfPendingCheckpoints());


Future<CompletedCheckpoint> savepointFuture2 = coord.triggerSavepoint(timestamp + 4, savepointDir); CompletableFuture<CompletedCheckpoint> savepointFuture2 = coord.triggerSavepoint(timestamp + 4, savepointDir);
long savepointId2 = counter.getLast(); long savepointId2 = counter.getLast();
assertEquals(3, coord.getNumberOfPendingCheckpoints()); assertEquals(3, coord.getNumberOfPendingCheckpoints());


Expand Down Expand Up @@ -1911,7 +1911,7 @@ public void testConcurrentSavepoints() throws Exception {
null, null,
Executors.directExecutor()); Executors.directExecutor());


List<Future<CompletedCheckpoint>> savepointFutures = new ArrayList<>(); List<CompletableFuture<CompletedCheckpoint>> savepointFutures = new ArrayList<>();


int numSavepoints = 5; int numSavepoints = 5;


Expand All @@ -1923,7 +1923,7 @@ public void testConcurrentSavepoints() throws Exception {
} }


// After triggering multiple savepoints, all should in progress // After triggering multiple savepoints, all should in progress
for (Future<CompletedCheckpoint> savepointFuture : savepointFutures) { for (CompletableFuture<CompletedCheckpoint> savepointFuture : savepointFutures) {
assertFalse(savepointFuture.isDone()); assertFalse(savepointFuture.isDone());
} }


Expand All @@ -1934,7 +1934,7 @@ public void testConcurrentSavepoints() throws Exception {
} }


// After ACKs, all should be completed // After ACKs, all should be completed
for (Future<CompletedCheckpoint> savepointFuture : savepointFutures) { for (CompletableFuture<CompletedCheckpoint> savepointFuture : savepointFutures) {
assertTrue(savepointFuture.isDone()); assertTrue(savepointFuture.isDone());
} }
} }
Expand Down Expand Up @@ -1966,10 +1966,10 @@ public void testMinDelayBetweenSavepoints() throws Exception {


String savepointDir = tmpFolder.newFolder().getAbsolutePath(); String savepointDir = tmpFolder.newFolder().getAbsolutePath();


Future<CompletedCheckpoint> savepoint0 = coord.triggerSavepoint(0, savepointDir); CompletableFuture<CompletedCheckpoint> savepoint0 = coord.triggerSavepoint(0, savepointDir);
assertFalse("Did not trigger savepoint", savepoint0.isDone()); assertFalse("Did not trigger savepoint", savepoint0.isDone());


Future<CompletedCheckpoint> savepoint1 = coord.triggerSavepoint(1, savepointDir); CompletableFuture<CompletedCheckpoint> savepoint1 = coord.triggerSavepoint(1, savepointDir);
assertFalse("Did not trigger savepoint", savepoint1.isDone()); assertFalse("Did not trigger savepoint", savepoint1.isDone());
} }


Expand Down Expand Up @@ -3600,7 +3600,7 @@ public void testSavepointsAreNotAddedToCompletedCheckpointStore() throws Excepti
assertTrue(1 == completedCheckpointStore.getNumberOfRetainedCheckpoints()); assertTrue(1 == completedCheckpointStore.getNumberOfRetainedCheckpoints());


// trigger a savepoint --> this should not have any effect on the CompletedCheckpointStore // trigger a savepoint --> this should not have any effect on the CompletedCheckpointStore
Future<CompletedCheckpoint> savepointFuture = checkpointCoordinator.triggerSavepoint(savepointTimestamp, savepointDir); CompletableFuture<CompletedCheckpoint> savepointFuture = checkpointCoordinator.triggerSavepoint(savepointTimestamp, savepointDir);


checkpointCoordinator.receiveAcknowledgeMessage( checkpointCoordinator.receiveAcknowledgeMessage(
new AcknowledgeCheckpoint( new AcknowledgeCheckpoint(
Expand Down
Expand Up @@ -20,7 +20,6 @@


import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.executiongraph.ExecutionVertex;
Expand All @@ -40,6 +39,7 @@
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;


Expand Down Expand Up @@ -134,7 +134,7 @@ public void testCompletionFuture() throws Exception {


// Abort declined // Abort declined
PendingCheckpoint pending = createPendingCheckpoint(props, "ignored"); PendingCheckpoint pending = createPendingCheckpoint(props, "ignored");
Future<CompletedCheckpoint> future = pending.getCompletionFuture(); CompletableFuture<CompletedCheckpoint> future = pending.getCompletionFuture();


assertFalse(future.isDone()); assertFalse(future.isDone());
pending.abortDeclined(); pending.abortDeclined();
Expand Down
Expand Up @@ -18,14 +18,15 @@


package org.apache.flink.runtime.jobmanager package org.apache.flink.runtime.jobmanager


import java.util.concurrent.CompletableFuture

import akka.actor.ActorSystem import akka.actor.ActorSystem
import akka.testkit.{ImplicitSender, TestKit} import akka.testkit.{ImplicitSender, TestKit}
import akka.util.Timeout import akka.util.Timeout
import org.apache.flink.api.common.JobID import org.apache.flink.api.common.JobID
import org.apache.flink.runtime.akka.ListeningBehaviour import org.apache.flink.runtime.akka.ListeningBehaviour
import org.apache.flink.runtime.checkpoint.{CheckpointCoordinator, CompletedCheckpoint} import org.apache.flink.runtime.checkpoint.{CheckpointCoordinator, CompletedCheckpoint}
import org.apache.flink.runtime.client.JobExecutionException import org.apache.flink.runtime.client.JobExecutionException
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture
import org.apache.flink.runtime.io.network.partition.ResultPartitionType import org.apache.flink.runtime.io.network.partition.ResultPartitionType
import org.apache.flink.runtime.jobgraph.tasks.{ExternalizedCheckpointSettings, JobCheckpointingSettings} import org.apache.flink.runtime.jobgraph.tasks.{ExternalizedCheckpointSettings, JobCheckpointingSettings}
import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, JobVertex, ScheduleMode} import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, JobVertex, ScheduleMode}
Expand Down Expand Up @@ -913,7 +914,7 @@ class JobManagerITCase(_system: ActorSystem)
doThrow(new Exception("Expected Test Exception")) doThrow(new Exception("Expected Test Exception"))
.when(checkpointCoordinator) .when(checkpointCoordinator)
.triggerSavepoint(org.mockito.Matchers.anyLong(), org.mockito.Matchers.anyString()) .triggerSavepoint(org.mockito.Matchers.anyLong(), org.mockito.Matchers.anyString())
val savepointPathPromise = new FlinkCompletableFuture[CompletedCheckpoint]() val savepointPathPromise = new CompletableFuture[CompletedCheckpoint]()
doReturn(savepointPathPromise) doReturn(savepointPathPromise)
.when(checkpointCoordinator) .when(checkpointCoordinator)
.triggerSavepoint(org.mockito.Matchers.anyLong(), org.mockito.Matchers.anyString()) .triggerSavepoint(org.mockito.Matchers.anyLong(), org.mockito.Matchers.anyString())
Expand Down Expand Up @@ -982,7 +983,7 @@ class JobManagerITCase(_system: ActorSystem)
.when(checkpointCoordinator) .when(checkpointCoordinator)
.triggerSavepoint(org.mockito.Matchers.anyLong(), org.mockito.Matchers.anyString()) .triggerSavepoint(org.mockito.Matchers.anyLong(), org.mockito.Matchers.anyString())


val savepointPromise = new FlinkCompletableFuture[CompletedCheckpoint]() val savepointPromise = new CompletableFuture[CompletedCheckpoint]()
doReturn(savepointPromise) doReturn(savepointPromise)
.when(checkpointCoordinator) .when(checkpointCoordinator)
.triggerSavepoint(org.mockito.Matchers.anyLong(), org.mockito.Matchers.anyString()) .triggerSavepoint(org.mockito.Matchers.anyLong(), org.mockito.Matchers.anyString())
Expand Down
Expand Up @@ -18,14 +18,15 @@


package org.apache.flink.runtime.testingUtils package org.apache.flink.runtime.testingUtils


import java.util.function.BiFunction

import akka.actor.{ActorRef, Cancellable, Terminated} import akka.actor.{ActorRef, Cancellable, Terminated}
import akka.pattern.{ask, pipe} import akka.pattern.{ask, pipe}
import org.apache.flink.api.common.JobID import org.apache.flink.api.common.JobID
import org.apache.flink.runtime.FlinkActor import org.apache.flink.runtime.FlinkActor
import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint import org.apache.flink.runtime.checkpoint.CompletedCheckpoint
import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
import org.apache.flink.runtime.concurrent.BiFunction
import org.apache.flink.runtime.execution.ExecutionState import org.apache.flink.runtime.execution.ExecutionState
import org.apache.flink.runtime.jobgraph.JobStatus import org.apache.flink.runtime.jobgraph.JobStatus
import org.apache.flink.runtime.jobmanager.JobManager import org.apache.flink.runtime.jobmanager.JobManager
Expand Down

0 comments on commit 4378ac3

Please sign in to comment.