Skip to content

Commit

Permalink
[FLINK-8887] Wait for JobMaster leader election in Dispatcher
Browse files Browse the repository at this point in the history
Before sending requests from the Dispatcher to the JobMasters, the Dispatcher must
wait until the respective JobMaster has gained leadership. Otherwise we might risk
that the messages are ignored because no fencing token was set.

This is solved by letting the JobManagerRunner expose a CompletableFuture<JobMasterGateway>
which is only completed after the JobMaster has gained leadership. The future is cleared
once the leadership is revoked.

This closes #5767.
  • Loading branch information
tillrohrmann committed Mar 28, 2018
1 parent e6b1d5c commit c0dddc1
Show file tree
Hide file tree
Showing 5 changed files with 233 additions and 172 deletions.
Expand Up @@ -20,27 +20,18 @@

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException;
import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.util.LeaderConnectionInfo;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.OptionalFailure;
import org.apache.flink.util.SerializedValue;

Expand All @@ -54,20 +45,15 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Supplier;

/**
* Client to interact with a {@link MiniCluster}.
*/
public class MiniClusterClient extends ClusterClient<MiniClusterClient.MiniClusterId> {

private final MiniCluster miniCluster;
private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4, new ExecutorThreadFactory("Flink-MiniClusterClient"));
private final ScheduledExecutor scheduledExecutor = new ScheduledExecutorServiceAdapter(scheduledExecutorService);

public MiniClusterClient(@Nonnull Configuration configuration, @Nonnull MiniCluster miniCluster) throws Exception {
public MiniClusterClient(@Nonnull Configuration configuration, @Nonnull MiniCluster miniCluster) {
super(configuration, miniCluster.getHighAvailabilityServices(), true);

this.miniCluster = miniCluster;
Expand All @@ -76,7 +62,6 @@ public MiniClusterClient(@Nonnull Configuration configuration, @Nonnull MiniClus
@Override
public void shutdown() throws Exception {
super.shutdown();
scheduledExecutorService.shutdown();
}

@Override
Expand Down Expand Up @@ -104,32 +89,32 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader)

@Override
public void cancel(JobID jobId) throws Exception {
guardWithSingleRetry(() -> miniCluster.cancelJob(jobId), scheduledExecutor);
miniCluster.cancelJob(jobId).get();
}

@Override
public String cancelWithSavepoint(JobID jobId, @Nullable String savepointDirectory) throws Exception {
return guardWithSingleRetry(() -> miniCluster.triggerSavepoint(jobId, savepointDirectory, true), scheduledExecutor).get();
return miniCluster.triggerSavepoint(jobId, savepointDirectory, true).get();
}

@Override
public void stop(JobID jobId) throws Exception {
guardWithSingleRetry(() -> miniCluster.stopJob(jobId), scheduledExecutor).get();
miniCluster.stopJob(jobId).get();
}

@Override
public CompletableFuture<String> triggerSavepoint(JobID jobId, @Nullable String savepointDirectory) throws FlinkException {
return guardWithSingleRetry(() -> miniCluster.triggerSavepoint(jobId, savepointDirectory, false), scheduledExecutor);
public CompletableFuture<String> triggerSavepoint(JobID jobId, @Nullable String savepointDirectory) {
return miniCluster.triggerSavepoint(jobId, savepointDirectory, false);
}

@Override
public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath) throws FlinkException {
return guardWithSingleRetry(() -> miniCluster.disposeSavepoint(savepointPath), scheduledExecutor);
public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath) {
return miniCluster.disposeSavepoint(savepointPath);
}

@Override
public CompletableFuture<Collection<JobStatusMessage>> listJobs() throws Exception {
return guardWithSingleRetry(miniCluster::listJobs, scheduledExecutor);
public CompletableFuture<Collection<JobStatusMessage>> listJobs() {
return miniCluster.listJobs();
}

@Override
Expand All @@ -139,7 +124,7 @@ public Map<String, OptionalFailure<Object>> getAccumulators(JobID jobID) throws

@Override
public Map<String, OptionalFailure<Object>> getAccumulators(JobID jobID, ClassLoader loader) throws Exception {
AccessExecutionGraph executionGraph = guardWithSingleRetry(() -> miniCluster.getExecutionGraph(jobID), scheduledExecutor).get();
AccessExecutionGraph executionGraph = miniCluster.getExecutionGraph(jobID).get();
Map<String, SerializedValue<OptionalFailure<Object>>> accumulatorsSerialized = executionGraph.getAccumulatorsSerialized();
Map<String, OptionalFailure<Object>> result = new HashMap<>(accumulatorsSerialized.size());
for (Map.Entry<String, SerializedValue<OptionalFailure<Object>>> acc : accumulatorsSerialized.entrySet()) {
Expand All @@ -150,7 +135,7 @@ public Map<String, OptionalFailure<Object>> getAccumulators(JobID jobID, ClassLo

@Override
public CompletableFuture<JobStatus> getJobStatus(JobID jobId) {
return guardWithSingleRetry(() -> miniCluster.getJobStatus(jobId), scheduledExecutor);
return miniCluster.getJobStatus(jobId);
}

@Override
Expand Down Expand Up @@ -202,16 +187,4 @@ public boolean hasUserJarsInClassPath(List<URL> userJarFiles) {
enum MiniClusterId {
INSTANCE
}

private static <X> CompletableFuture<X> guardWithSingleRetry(Supplier<CompletableFuture<X>> operation, ScheduledExecutor executor) {
return FutureUtils.retryWithDelay(
operation,
1,
Time.milliseconds(500),
throwable -> {
Throwable actualException = ExceptionUtils.stripCompletionException(throwable);
return actualException instanceof FencingTokenException || actualException instanceof AkkaRpcException;
},
executor);
}
}

0 comments on commit c0dddc1

Please sign in to comment.