Skip to content

Commit

Permalink
[FLINK-9706] Properly wait for termination of JobManagerRunner before…
Browse files Browse the repository at this point in the history
… restarting jobs

In order to avoid race conditions between resource clean up, we now wait for the proper
termination of a previously running JobMaster responsible for the same job (e.g. originating
from a job recovery or a re-submission).

This closes #6279.
  • Loading branch information
tillrohrmann committed Jul 11, 2018
1 parent dc7d81c commit 3c4e59a
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 46 deletions.
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.util.function;

import org.apache.flink.util.ExceptionUtils;

import java.util.function.Consumer;

/**
* A checked extension of the {@link Consumer} interface.
*
* @param <T> type of the first argument
* @param <E> type of the thrown exception
*/
public interface ConsumerWithException<T, E extends Throwable> extends Consumer<T> {

void acceptWithException(T value) throws E;

@Override
default void accept(T value) {
try {
acceptWithException(value);
} catch (Throwable t) {
ExceptionUtils.rethrow(t);
}
}
}
Expand Up @@ -64,13 +64,13 @@
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ConsumerWithException;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -126,7 +126,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
@Nullable
protected final String restAddress;

private CompletableFuture<Void> orphanedJobManagerRunnersTerminationFuture = CompletableFuture.completedFuture(null);
private final Map<JobID, CompletableFuture<Void>> jobManagerTerminationFutures;

public Dispatcher(
RpcService rpcService,
Expand Down Expand Up @@ -173,6 +173,8 @@ public Dispatcher(
this.archivedExecutionGraphStore = Preconditions.checkNotNull(archivedExecutionGraphStore);

this.jobManagerRunnerFactory = Preconditions.checkNotNull(jobManagerRunnerFactory);

this.jobManagerTerminationFutures = new HashMap<>(2);
}

//------------------------------------------------------
Expand All @@ -183,11 +185,7 @@ public Dispatcher(
public CompletableFuture<Void> postStop() {
log.info("Stopping dispatcher {}.", getAddress());

final CompletableFuture<Void> jobManagerRunnersTerminationFuture = terminateJobManagerRunners();

final CompletableFuture<Void> allJobManagerRunnersTerminationFuture = FutureUtils.completeAll(Arrays.asList(
jobManagerRunnersTerminationFuture,
orphanedJobManagerRunnersTerminationFuture));
final CompletableFuture<Void> allJobManagerRunnersTerminationFuture = terminateJobManagerRunnersAndGetTerminationFuture();

return FutureUtils.runAfterwards(
allJobManagerRunnersTerminationFuture,
Expand Down Expand Up @@ -238,20 +236,26 @@ public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout)
final JobID jobId = jobGraph.getJobID();

log.info("Submitting job {} ({}).", jobId, jobGraph.getName());
final RunningJobsRegistry.JobSchedulingStatus jobSchedulingStatus;

try {
final RunningJobsRegistry.JobSchedulingStatus jobSchedulingStatus = runningJobsRegistry.getJobSchedulingStatus(jobId);
jobSchedulingStatus = runningJobsRegistry.getJobSchedulingStatus(jobId);
} catch (IOException e) {
return FutureUtils.completedExceptionally(new FlinkException(String.format("Failed to retrieve job scheduling status for job %s.", jobId), e));
}

if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.DONE || jobManagerRunners.containsKey(jobId)) {
return FutureUtils.completedExceptionally(
new JobSubmissionException(jobId, String.format("Job has already been submitted and is in state %s.", jobSchedulingStatus)));
} else {
persistAndRunJob(jobGraph);
if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.DONE || jobManagerRunners.containsKey(jobId)) {
return FutureUtils.completedExceptionally(
new JobSubmissionException(jobId, String.format("Job has already been submitted and is in state %s.", jobSchedulingStatus)));
} else {
final CompletableFuture<Acknowledge> persistAndRunFuture = waitForTerminatingJobManager(jobId, jobGraph, this::persistAndRunJob)
.thenApply(ignored -> Acknowledge.get());

return CompletableFuture.completedFuture(Acknowledge.get());
}
} catch (Exception e) {
return FutureUtils.completedExceptionally(new FlinkException(String.format("Failed to submit job %s.", jobId), e));
return persistAndRunFuture.exceptionally(
(Throwable throwable) -> {
throw new CompletionException(
new JobSubmissionException(jobId, "Failed to submit job.", throwable));
});
}
}

Expand Down Expand Up @@ -536,7 +540,25 @@ public CompletableFuture<Acknowledge> shutDownCluster() {
private void removeJobAndRegisterTerminationFuture(JobID jobId, boolean cleanupHA) {
final CompletableFuture<Void> cleanupFuture = removeJob(jobId, cleanupHA);

registerOrphanedJobManagerTerminationFuture(cleanupFuture);
registerJobManagerRunnerTerminationFuture(jobId, cleanupFuture);
}

private void registerJobManagerRunnerTerminationFuture(JobID jobId, CompletableFuture<Void> jobManagerRunnerTerminationFuture) {
Preconditions.checkState(!jobManagerTerminationFutures.containsKey(jobId));

jobManagerTerminationFutures.put(jobId, jobManagerRunnerTerminationFuture);

// clean up the pending termination future
jobManagerRunnerTerminationFuture.thenRunAsync(
() -> {
final CompletableFuture<Void> terminationFuture = jobManagerTerminationFutures.remove(jobId);

//noinspection ObjectEquality
if (terminationFuture != null && terminationFuture != jobManagerRunnerTerminationFuture) {
jobManagerTerminationFutures.put(jobId, terminationFuture);
}
},
getUnfencedMainThreadExecutor());
}

private CompletableFuture<Void> removeJob(JobID jobId, boolean cleanupHA) {
Expand Down Expand Up @@ -573,19 +595,21 @@ private CompletableFuture<Void> removeJob(JobID jobId, boolean cleanupHA) {

/**
* Terminate all currently running {@link JobManagerRunner}.
*
* @return Future which is completed once all {@link JobManagerRunner} have terminated
*/
private CompletableFuture<Void> terminateJobManagerRunners() {
private void terminateJobManagerRunners() {
log.info("Stopping all currently running jobs of dispatcher {}.", getAddress());

final HashSet<JobID> jobsToRemove = new HashSet<>(jobManagerRunners.keySet());

final List<CompletableFuture<Void>> terminationFutures = jobsToRemove.stream()
.map(jobId -> removeJob(jobId, false))
.collect(Collectors.toList());
for (JobID jobId : jobsToRemove) {
removeJobAndRegisterTerminationFuture(jobId, false);
}
}

return FutureUtils.completeAll(terminationFutures);
private CompletableFuture<Void> terminateJobManagerRunnersAndGetTerminationFuture() {
terminateJobManagerRunners();
final Collection<CompletableFuture<Void>> values = jobManagerTerminationFutures.values();
return FutureUtils.completeAll(values);
}

/**
Expand Down Expand Up @@ -677,12 +701,6 @@ private void jobMasterFailed(JobID jobId, Throwable cause) {
onFatalError(new FlinkException(String.format("JobMaster for job %s failed.", jobId), cause));
}

private void registerOrphanedJobManagerTerminationFuture(CompletableFuture<Void> jobManagerRunnerTerminationFuture) {
orphanedJobManagerRunnersTerminationFuture = FutureUtils.completeAll(Arrays.asList(
orphanedJobManagerRunnersTerminationFuture,
jobManagerRunnerTerminationFuture));
}

private CompletableFuture<JobMasterGateway> getJobMasterGatewayFuture(JobID jobId) {
final JobManagerRunner jobManagerRunner = jobManagerRunners.get(jobId);

Expand Down Expand Up @@ -741,7 +759,7 @@ public void grantLeadership(final UUID newLeaderSessionID) {

final CompletableFuture<Collection<JobGraph>> recoveredJobsFuture = recoverJobs();

final CompletableFuture<Boolean> fencingTokenFuture = recoveredJobsFuture.thenApplyAsync(
final CompletableFuture<Boolean> fencingTokenFuture = recoveredJobsFuture.thenComposeAsync(
(Collection<JobGraph> recoveredJobs) -> tryAcceptLeadershipAndRunJobs(newLeaderSessionID, recoveredJobs),
getUnfencedMainThreadExecutor());

Expand All @@ -761,31 +779,44 @@ public void grantLeadership(final UUID newLeaderSessionID) {
});
}

private boolean tryAcceptLeadershipAndRunJobs(UUID newLeaderSessionID, Collection<JobGraph> recoveredJobs) {
private CompletableFuture<Boolean> tryAcceptLeadershipAndRunJobs(UUID newLeaderSessionID, Collection<JobGraph> recoveredJobs) {
final DispatcherId dispatcherId = DispatcherId.fromUuid(newLeaderSessionID);

if (leaderElectionService.hasLeadership(newLeaderSessionID)) {
log.debug("Dispatcher {} accepted leadership with fencing token {}. Start recovered jobs.", getAddress(), dispatcherId);
setNewFencingToken(dispatcherId);

Collection<CompletableFuture<Void>> runFutures = new ArrayList<>(recoveredJobs.size());

for (JobGraph recoveredJob : recoveredJobs) {
try {
runJob(recoveredJob);
} catch (Exception e) {
throw new CompletionException(
new FlinkException(
String.format("Failed to recover job %s.", recoveredJob.getJobID()),
e));
}
final CompletableFuture<Void> runFuture = waitForTerminatingJobManager(recoveredJob.getJobID(), recoveredJob, this::runJob);
runFutures.add(runFuture);
}

return true;
return FutureUtils.waitForAll(runFutures).thenApply(ignored -> true);
} else {
log.debug("Dispatcher {} lost leadership before accepting it. Stop recovering jobs for fencing token {}.", getAddress(), dispatcherId);
return false;
return CompletableFuture.completedFuture(false);
}
}

private CompletableFuture<Void> waitForTerminatingJobManager(JobID jobId, JobGraph jobGraph, ConsumerWithException<JobGraph, ?> action) {
final CompletableFuture<Void> jobManagerTerminationFuture = jobManagerTerminationFutures
.getOrDefault(jobId, CompletableFuture.completedFuture(null))
.exceptionally((Throwable throwable) -> {
throw new CompletionException(
new DispatcherException(
String.format("Termination of previous JobManager for job %s failed. Cannot submit job under the same job id.", jobId),
throwable)); });

return jobManagerTerminationFuture.thenRunAsync(
() -> {
jobManagerTerminationFutures.remove(jobId);
action.accept(jobGraph);
},
getMainThreadExecutor());
}

private void setNewFencingToken(@Nullable DispatcherId dispatcherId) {
// clear the state if we've been the leader before
if (getFencingToken() != null) {
Expand All @@ -796,8 +827,7 @@ private void setNewFencingToken(@Nullable DispatcherId dispatcherId) {
}

private void clearDispatcherState() {
final CompletableFuture<Void> jobManagerRunnersTerminationFuture = terminateJobManagerRunners();
registerOrphanedJobManagerTerminationFuture(jobManagerRunnersTerminationFuture);
terminateJobManagerRunners();
}

/**
Expand Down
Expand Up @@ -127,6 +127,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
private CompletableFuture<JobID> deleteAllFuture;
private CompletableFuture<ArchivedExecutionGraph> resultFuture;
private CompletableFuture<JobID> cleanupJobFuture;
private CompletableFuture<Void> terminationFuture;

@BeforeClass
public static void setupClass() {
Expand Down Expand Up @@ -162,6 +163,7 @@ public void setup() throws Exception {
.createTestingBlobStore();

cleanupJobFuture = new CompletableFuture<>();
terminationFuture = new CompletableFuture<>();

blobServer = new TestingBlobServer(configuration, testingBlobStore, cleanupJobFuture);

Expand All @@ -185,7 +187,7 @@ public void setup() throws Exception {
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
null,
new MemoryArchivedExecutionGraphStore(),
new TestingJobManagerRunnerFactory(resultFuture, CompletableFuture.completedFuture(null)),
new TestingJobManagerRunnerFactory(resultFuture, terminationFuture),
fatalErrorHandler);

dispatcher.start();
Expand Down Expand Up @@ -225,6 +227,7 @@ public void testBlobServerCleanupWhenJobFinished() throws Exception {

// complete the job
resultFuture.complete(new ArchivedExecutionGraphBuilder().setJobID(jobId).setState(JobStatus.FINISHED).build());
terminationFuture.complete(null);

assertThat(cleanupJobFuture.get(), equalTo(jobId));

Expand All @@ -245,6 +248,7 @@ public void testBlobServerCleanupWhenJobNotFinished() throws Exception {

// job not finished
resultFuture.completeExceptionally(new JobNotFinishedException(jobId));
terminationFuture.complete(null);

assertThat(cleanupJobFuture.get(), equalTo(jobId));

Expand All @@ -266,6 +270,7 @@ public void testBlobServerCleanupWhenClosingDispatcher() throws Exception {
submitJob();

dispatcher.shutDown();
terminationFuture.complete(null);
dispatcher.getTerminationFuture().get();

assertThat(cleanupJobFuture.get(), equalTo(jobId));
Expand Down Expand Up @@ -295,13 +300,65 @@ public void testRunningJobsRegistryCleanup() throws Exception {
assertThat(runningJobsRegistry.contains(jobId), is(true));

resultFuture.complete(new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).setJobID(jobId).build());
terminationFuture.complete(null);

// wait for the clearing
clearedJobLatch.await();

assertThat(runningJobsRegistry.contains(jobId), is(false));
}

/**
* Tests that the previous JobManager needs to be completely terminated
* before a new job with the same {@link JobID} is started.
*/
@Test
public void testJobSubmissionUnderSameJobId() throws Exception {
submitJob();

runningJobsRegistry.setJobRunning(jobId);
resultFuture.completeExceptionally(new JobNotFinishedException(jobId));

final CompletableFuture<Acknowledge> submissionFuture = dispatcherGateway.submitJob(jobGraph, timeout);

try {
submissionFuture.get(10L, TimeUnit.MILLISECONDS);
fail("The job submission future should not complete until the previous JobManager " +
"termination future has been completed.");
} catch (TimeoutException ignored) {
// expected
} finally {
terminationFuture.complete(null);
}

assertThat(submissionFuture.get(), equalTo(Acknowledge.get()));
}

/**
* Tests that recovered jobs will only be started after the complete termination of any
* other previously running JobMasters for the same job.
*/
@Test
public void testJobRecoveryWithPendingTermination() throws Exception {
submitJob();
runningJobsRegistry.setJobRunning(jobId);

dispatcherLeaderElectionService.notLeader();
final UUID leaderSessionId = UUID.randomUUID();
final CompletableFuture<UUID> leaderFuture = dispatcherLeaderElectionService.isLeader(leaderSessionId);

try {
leaderFuture.get(10L, TimeUnit.MILLISECONDS);
fail("We should not become leader before all previously running JobMasters have terminated.");
} catch (TimeoutException ignored) {
// expected
} finally {
terminationFuture.complete(null);
}

assertThat(leaderFuture.get(), equalTo(leaderSessionId));
}

private static final class SingleRunningJobsRegistry implements RunningJobsRegistry {

@Nonnull
Expand Down

0 comments on commit 3c4e59a

Please sign in to comment.