From 042638d89d94f1ca9a99115f460181bcdd2e762c Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Thu, 29 Mar 2018 11:40:19 +0200 Subject: [PATCH 1/6] [hotfix] Add FutureUtils.supplyAsync with SupplierWithException --- .../flink/runtime/concurrent/FutureUtils.java | 22 +++++++++++++++ .../runtime/concurrent/FutureUtilsTest.java | 28 +++++++++++++++++++ 2 files changed, 50 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java index 51740e3a1aba5..1cffaab112f90 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java @@ -24,6 +24,7 @@ import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.util.function.RunnableWithException; +import org.apache.flink.util.function.SupplierWithException; import akka.dispatch.OnComplete; @@ -32,6 +33,7 @@ import java.util.Collections; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; import java.util.concurrent.ForkJoinPool; @@ -748,6 +750,26 @@ public static CompletableFuture completedExceptionally(Throwable cause) { return result; } + /** + * Returns a future which is completed with the result of the {@link SupplierWithException}. + * + * @param supplier to provide the future's value + * @param executor to execute the supplier + * @param type of the result + * @return Future which is completed with the value of the supplier + */ + public static CompletableFuture supplyAsync(SupplierWithException supplier, Executor executor) { + return CompletableFuture.supplyAsync( + () -> { + try { + return supplier.get(); + } catch (Throwable e) { + throw new CompletionException(e); + } + }, + executor); + } + /** * Converts Flink time into a {@link FiniteDuration}. * diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java index df2a0c748c343..07bc4c19be403 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java @@ -20,6 +20,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; @@ -577,4 +578,31 @@ private void cancelConjunctFuture(Function future = FutureUtils.supplyAsync( + () -> { + throw testException; + }, + TestingUtils.defaultExecutor()); + + try { + future.get(); + fail("Expected an exception."); + } catch (ExecutionException e) { + assertThat(ExceptionUtils.findThrowableWithMessage(e, exceptionMessage).isPresent(), is(true)); + } + } + + @Test + public void testSupplyAsync() throws Exception { + final CompletableFuture future = FutureUtils.supplyAsync( + Acknowledge::get, + TestingUtils.defaultExecutor()); + + assertThat(future.get(), is(Acknowledge.get())); + } } From ab38937ee62d7e7350e05a48bd385da5940ba68a Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Thu, 29 Mar 2018 11:43:03 +0200 Subject: [PATCH 2/6] [FLINK-9106] [rpc] Add UnfencedMainThreadExecutor to FencedRpcEndpoint The UnfencedMainThreadExecutor executed Runnables in the main thread context without checking the fencing token. This is important to set a new fencing token, for example. This closes #5784. --- .../flink/runtime/rpc/FencedRpcEndpoint.java | 42 +++++++++++++++++++ .../flink/runtime/rpc/AsyncCallsTest.java | 22 ++++++++-- 2 files changed, 61 insertions(+), 3 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java index d078d58f2d3a7..36f5c68f60eb0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java @@ -19,13 +19,16 @@ package org.apache.flink.runtime.rpc; import org.apache.flink.api.common.time.Time; +import org.apache.flink.util.Preconditions; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.Serializable; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; /** * Base class for fenced {@link RpcEndpoint}. A fenced rpc endpoint expects all rpc messages @@ -37,14 +40,21 @@ */ public abstract class FencedRpcEndpoint extends RpcEndpoint { + private final UnfencedMainThreadExecutor unfencedMainThreadExecutor; private volatile F fencingToken; private volatile MainThreadExecutor fencedMainThreadExecutor; protected FencedRpcEndpoint(RpcService rpcService, String endpointId) { super(rpcService, endpointId); + Preconditions.checkArgument( + rpcServer instanceof FencedMainThreadExecutable, + "The rpcServer must be of type %s.", + FencedMainThreadExecutable.class.getSimpleName()); + // no fencing token == no leadership this.fencingToken = null; + this.unfencedMainThreadExecutor = new UnfencedMainThreadExecutor((FencedMainThreadExecutable) rpcServer); this.fencedMainThreadExecutor = new MainThreadExecutor( getRpcService().fenceRpcServer( rpcServer, @@ -86,6 +96,17 @@ protected MainThreadExecutor getMainThreadExecutor() { return fencedMainThreadExecutor; } + /** + * Returns a main thread executor which is not bound to the fencing token. + * This means that {@link Runnable} which are executed with this executor will always + * be executed. + * + * @return MainThreadExecutor which is not bound to the fencing token + */ + protected Executor getUnfencedMainThreadExecutor() { + return unfencedMainThreadExecutor; + } + /** * Run the given runnable in the main thread of the RpcEndpoint without checking the fencing * token. This allows to run operations outside of the fencing token scope. @@ -115,4 +136,25 @@ protected CompletableFuture callAsyncWithoutFencing(Callable callable, throw new RuntimeException("FencedRpcEndpoint has not been started with a FencedMainThreadExecutable RpcServer."); } } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + /** + * Executor which executes {@link Runnable} in the main thread context without fencing. + */ + private static class UnfencedMainThreadExecutor implements Executor { + + private final FencedMainThreadExecutable gateway; + + UnfencedMainThreadExecutor(FencedMainThreadExecutable gateway) { + this.gateway = Preconditions.checkNotNull(gateway); + } + + @Override + public void execute(@Nonnull Runnable runnable) { + gateway.runAsyncWithoutFencing(runnable); + } + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java index 66f8d9fd644f8..1f9d9e3f73736 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java @@ -18,8 +18,6 @@ package org.apache.flink.runtime.rpc; -import akka.actor.ActorSystem; - import org.apache.flink.api.common.time.Time; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.akka.AkkaUtils; @@ -31,6 +29,7 @@ import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.TestLogger; +import akka.actor.ActorSystem; import akka.actor.Terminated; import org.junit.AfterClass; import org.junit.Test; @@ -46,7 +45,12 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; -import static org.junit.Assert.*; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; @Category(Flip6.class) public class AsyncCallsTest extends TestLogger { @@ -259,6 +263,18 @@ public void testCallAsyncWithoutFencing() throws Exception { assertTrue(resultFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS)); } + @Test + public void testUnfencedMainThreadExecutor() throws Exception { + final UUID newFencingToken = UUID.randomUUID(); + + final boolean value = true; + final CompletableFuture resultFuture = testRunAsync( + endpoint -> CompletableFuture.supplyAsync(() -> value, endpoint.getUnfencedMainThreadExecutor()), + newFencingToken); + + assertThat(resultFuture.get(), is(value)); + } + private static CompletableFuture testRunAsync(Function> runAsyncCall, UUID newFencingToken) throws Exception { final UUID initialFencingToken = UUID.randomUUID(); final OneShotLatch enterSetNewFencingToken = new OneShotLatch(); From d15e21fce85707294ce486c6f936e9c904159dff Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Tue, 27 Mar 2018 09:45:13 +0200 Subject: [PATCH 3/6] [hotfix] Re-introduce FatalErrorHandler to JobManagerRunner --- .../flink/runtime/dispatcher/Dispatcher.java | 14 ++++--- .../jobmanager/OnCompletionActions.java | 6 +++ .../runtime/jobmaster/JobManagerRunner.java | 38 ++++++++++--------- .../flink/runtime/jobmaster/JobMaster.java | 24 ++++++------ .../runtime/dispatcher/DispatcherTest.java | 6 ++- .../dispatcher/MiniDispatcherTest.java | 4 +- .../jobmaster/JobManagerRunnerTest.java | 12 ++++-- .../runtime/jobmaster/JobMasterTest.java | 5 +++ 8 files changed, 67 insertions(+), 42 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index 38d63cfa23eda..6aeebf4cc7b8e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -267,7 +267,8 @@ public CompletableFuture submitJob(JobGraph jobGraph, Time timeout) heartbeatServices, blobServer, jobManagerSharedServices, - new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup)); + new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup), + fatalErrorHandler); jobManagerRunner.getResultFuture().whenCompleteAsync( (ArchivedExecutionGraph archivedExecutionGraph, Throwable throwable) -> { @@ -587,8 +588,6 @@ void recoverJobs() { } protected void onFatalError(Throwable throwable) { - log.error("Fatal error occurred in dispatcher {}.", getAddress(), throwable); - fatalErrorHandler.onFatalError(throwable); } @@ -790,7 +789,8 @@ JobManagerRunner createJobManagerRunner( HeartbeatServices heartbeatServices, BlobServer blobServer, JobManagerSharedServices jobManagerServices, - JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory) throws Exception; + JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, + FatalErrorHandler fatalErrorHandler) throws Exception; } /** @@ -809,7 +809,8 @@ public JobManagerRunner createJobManagerRunner( HeartbeatServices heartbeatServices, BlobServer blobServer, JobManagerSharedServices jobManagerServices, - JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory) throws Exception { + JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, + FatalErrorHandler fatalErrorHandler) throws Exception { return new JobManagerRunner( resourceId, jobGraph, @@ -819,7 +820,8 @@ public JobManagerRunner createJobManagerRunner( heartbeatServices, blobServer, jobManagerServices, - jobManagerJobMetricGroupFactory); + jobManagerJobMetricGroupFactory, + fatalErrorHandler); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java index 66ca4ee628ba4..5b6a50079963e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.jobmanager; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.jobmaster.JobMaster; /** * Interface for completion actions once a Flink job has reached @@ -37,4 +38,9 @@ public interface OnCompletionActions { * Job was finished by another JobMaster. */ void jobFinishedByOther(); + + /** + * The {@link JobMaster} failed while executing the job. + */ + void jobMasterFailed(Throwable cause); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java index 034232e17603b..9b80820935cb0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java @@ -56,7 +56,7 @@ * The runner for the job manager. It deals with job level leader election and make underlying job manager * properly reacted. */ -public class JobManagerRunner implements LeaderContender, OnCompletionActions, FatalErrorHandler, AutoCloseableAsync { +public class JobManagerRunner implements LeaderContender, OnCompletionActions, AutoCloseableAsync { private static final Logger log = LoggerFactory.getLogger(JobManagerRunner.class); @@ -78,6 +78,8 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F private final JobMaster jobMaster; + private final FatalErrorHandler fatalErrorHandler; + private final Time rpcTimeout; private final CompletableFuture resultFuture; @@ -107,7 +109,8 @@ public JobManagerRunner( final HeartbeatServices heartbeatServices, final BlobServer blobServer, final JobManagerSharedServices jobManagerSharedServices, - final JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory) throws Exception { + final JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, + final FatalErrorHandler fatalErrorHandler) throws Exception { this.resultFuture = new CompletableFuture<>(); this.terminationFuture = new CompletableFuture<>(); @@ -116,6 +119,7 @@ public JobManagerRunner( try { this.jobGraph = checkNotNull(jobGraph); this.jobManagerSharedServices = checkNotNull(jobManagerSharedServices); + this.fatalErrorHandler = checkNotNull(fatalErrorHandler); checkArgument(jobGraph.getNumberOfVertices() > 0, "The given job is empty"); @@ -155,7 +159,7 @@ public JobManagerRunner( blobServer, jobManagerJobMetricGroupFactory, this, - this, + fatalErrorHandler, userCodeLoader); } catch (Throwable t) { @@ -255,19 +259,17 @@ public void jobFinishedByOther() { resultFuture.completeExceptionally(new JobNotFinishedException(jobGraph.getJobID())); } - /** - * Job completion notification triggered by JobManager or self. - */ @Override - public void onFatalError(Throwable exception) { - // we log first to make sure an explaining message goes into the log - // we even guard the log statement here to increase chances that the error handler - // gets the notification on hard critical situations like out-of-memory errors - try { - log.error("JobManager runner encountered a fatal error.", exception); - } catch (Throwable ignored) {} + public void jobMasterFailed(Throwable cause) { + handleJobManagerRunnerError(cause); + } - resultFuture.completeExceptionally(exception); + private void handleJobManagerRunnerError(Throwable cause) { + if (ExceptionUtils.isJvmFatalError(cause)) { + fatalErrorHandler.onFatalError(cause); + } else { + resultFuture.completeExceptionally(cause); + } } /** @@ -302,7 +304,7 @@ public void grantLeadership(final UUID leaderSessionID) { try { verifyJobSchedulingStatusAndStartJobManager(leaderSessionID); } catch (Exception e) { - onFatalError(new FlinkException("Could not start the JobMaster.", e)); + handleJobManagerRunnerError(e); } } } @@ -325,7 +327,7 @@ private void verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId) t startFuture.whenCompleteAsync( (Acknowledge ack, Throwable throwable) -> { if (throwable != null) { - onFatalError(new FlinkException("Could not start the job manager.", throwable)); + handleJobManagerRunnerError(new FlinkException("Could not start the job manager.", throwable)); } else { confirmLeaderSessionIdIfStillLeader(leaderSessionId, currentLeaderGatewayFuture); } @@ -361,7 +363,7 @@ public void revokeLeadership() { suspendFuture.whenCompleteAsync( (Acknowledge ack, Throwable throwable) -> { if (throwable != null) { - onFatalError(new Exception("Could not start the job manager.", throwable)); + handleJobManagerRunnerError(new FlinkException("Could not suspend the job manager.", throwable)); } }, jobManagerSharedServices.getScheduledExecutorService()); @@ -393,7 +395,7 @@ public String getAddress() { @Override public void handleError(Exception exception) { log.error("Leader Election Service encountered a fatal error.", exception); - onFatalError(exception); + handleJobManagerRunnerError(exception); } //---------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 1e38792034f6c..934c0b2ecd850 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -172,7 +172,7 @@ public class JobMaster extends FencedRpcEndpoint implements JobMast private final OnCompletionActions jobCompletionActions; - private final FatalErrorHandler errorHandler; + private final FatalErrorHandler fatalErrorHandler; private final ClassLoader userCodeLoader; @@ -223,7 +223,7 @@ public JobMaster( BlobServer blobServer, JobManagerJobMetricGroupFactory jobMetricGroupFactory, OnCompletionActions jobCompletionActions, - FatalErrorHandler errorHandler, + FatalErrorHandler fatalErrorHandler, ClassLoader userCodeLoader) throws Exception { super(rpcService, AkkaRpcServiceUtils.createRandomName(JOB_MANAGER_NAME)); @@ -238,7 +238,7 @@ public JobMaster( this.blobServer = checkNotNull(blobServer); this.scheduledExecutorService = jobManagerSharedServices.getScheduledExecutorService(); this.jobCompletionActions = checkNotNull(jobCompletionActions); - this.errorHandler = checkNotNull(errorHandler); + this.fatalErrorHandler = checkNotNull(fatalErrorHandler); this.userCodeLoader = checkNotNull(userCodeLoader); this.jobMetricGroupFactory = checkNotNull(jobMetricGroupFactory); @@ -1202,14 +1202,14 @@ private void tryRestoreExecutionGraphFromSavepoint(ExecutionGraph executionGraph //---------------------------------------------------------------------------------------------- - private void handleFatalError(final Throwable cause) { - - try { + private void handleJobMasterError(final Throwable cause) { + if (ExceptionUtils.isJvmFatalError(cause)) { log.error("Fatal error occurred on JobManager.", cause); - } catch (Throwable ignore) {} - - // The fatal error handler implementation should make sure that this call is non-blocking - errorHandler.onFatalError(cause); + // The fatal error handler implementation should make sure that this call is non-blocking + fatalErrorHandler.onFatalError(cause); + } else { + jobCompletionActions.jobMasterFailed(cause); + } } private void jobStatusChanged( @@ -1452,7 +1452,7 @@ public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSes @Override public void handleError(final Exception exception) { - handleFatalError(new Exception("Fatal error in the ResourceManager leader service", exception)); + handleJobMasterError(new Exception("Fatal error in the ResourceManager leader service", exception)); } } @@ -1516,7 +1516,7 @@ protected void onRegistrationSuccess(final JobMasterRegistrationSuccess success) @Override protected void onRegistrationFailure(final Throwable failure) { - handleFatalError(failure); + handleJobMasterError(failure); } public ResourceID getResourceManagerResourceID() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index f6d3e357221b3..9ff901a5a44be 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -585,7 +585,8 @@ public JobManagerRunner createJobManagerRunner( HeartbeatServices heartbeatServices, BlobServer blobServer, JobManagerSharedServices jobManagerSharedServices, - JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory) throws Exception { + JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, + FatalErrorHandler fatalErrorHandler) throws Exception { assertEquals(expectedJobId, jobGraph.getJobID()); return Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE.createJobManagerRunner( @@ -597,7 +598,8 @@ public JobManagerRunner createJobManagerRunner( heartbeatServices, blobServer, jobManagerSharedServices, - jobManagerJobMetricGroupFactory); + jobManagerJobMetricGroupFactory, + fatalErrorHandler); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java index d58160e9ad707..e3633935b7d42 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java @@ -39,6 +39,7 @@ import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; +import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.TestingRpcService; @@ -283,7 +284,8 @@ public JobManagerRunner createJobManagerRunner( HeartbeatServices heartbeatServices, BlobServer blobServer, JobManagerSharedServices jobManagerSharedServices, - JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory) throws Exception { + JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, + FatalErrorHandler fatalErrorHandler) throws Exception { jobGraphFuture.complete(jobGraph); final JobManagerRunner mock = mock(JobManagerRunner.class); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java index aa3f153b4b873..a85d3516e0af6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java @@ -36,6 +36,7 @@ import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.TestLogger; @@ -81,6 +82,8 @@ public class JobManagerRunnerTest extends TestLogger { private TestingHighAvailabilityServices haServices; + private TestingFatalErrorHandler fatalErrorHandler; + @BeforeClass public static void setupClass() throws Exception { configuration = new Configuration(); @@ -110,11 +113,13 @@ public void setup() { haServices.setJobMasterLeaderElectionService(jobGraph.getJobID(), new TestingLeaderElectionService()); haServices.setResourceManagerLeaderRetriever(new SettableLeaderRetrievalService()); haServices.setCheckpointRecoveryFactory(new StandaloneCheckpointRecoveryFactory()); + + fatalErrorHandler = new TestingFatalErrorHandler(); } @After - public void tearDown() { - + public void tearDown() throws Exception { + fatalErrorHandler.rethrowError(); } @AfterClass @@ -210,6 +215,7 @@ private JobManagerRunner createJobManagerRunner() throws Exception { heartbeatServices, blobServer, jobManagerSharedServices, - UnregisteredJobManagerJobMetricGroupFactory.INSTANCE); + UnregisteredJobManagerJobMetricGroupFactory.INSTANCE, + fatalErrorHandler); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index 7bf831cb03009..62f5ee2173805 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -431,6 +431,11 @@ public void jobReachedGloballyTerminalState(ArchivedExecutionGraph executionGrap public void jobFinishedByOther() { } + + @Override + public void jobMasterFailed(Throwable cause) { + + } } private static final class DummyCheckpointStorageLocation implements CompletedCheckpointStorageLocation { From 1aa101b0ea67b36cad0e835e3ebfa047333a21f2 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Tue, 27 Mar 2018 10:00:56 +0200 Subject: [PATCH 4/6] [hotfix] Correct JavaDocs in SubmittedJobGraphStore and add Nullable annotation --- .../flink/runtime/jobmanager/SubmittedJobGraphStore.java | 8 +++++--- .../jobmanager/ZooKeeperSubmittedJobGraphStore.java | 5 ++++- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java index 6e91f80ebeb54..7e624ec6e1db4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java @@ -20,6 +20,8 @@ import org.apache.flink.api.common.JobID; +import javax.annotation.Nullable; + import java.util.Collection; /** @@ -38,10 +40,10 @@ public interface SubmittedJobGraphStore { void stop() throws Exception; /** - * Returns the {@link SubmittedJobGraph} with the given {@link JobID}. - * - *

An Exception is thrown, if no job graph with the given ID exists. + * Returns the {@link SubmittedJobGraph} with the given {@link JobID} or + * {@code null} if no job was registered. */ + @Nullable SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception; /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java index dfa931b2cce8a..7ba5d481177bb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java @@ -33,6 +33,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -158,6 +160,7 @@ public void stop() throws Exception { } @Override + @Nullable public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception { checkNotNull(jobId, "Job ID"); final String path = getPathForJob(jobId); @@ -179,7 +182,7 @@ public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception { return null; } catch (Exception e) { throw new FlinkException("Could not retrieve the submitted job graph state handle " + - "for " + path + "from the submitted job graph store.", e); + "for " + path + " from the submitted job graph store.", e); } SubmittedJobGraph jobGraph; From e66fce7027d83273de061ed5438f9bea321c10ab Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Tue, 27 Mar 2018 10:59:54 +0200 Subject: [PATCH 5/6] [FLINK-9097] Fail fatally if job submission fails when recovering jobs In order to not drop jobs, we have to fail fatally if a job submission fails when recovering jobs. In HA mode, this will restart the Dispatcher and let it retry to recover all jobs. This closes #5774. --- .../flink/runtime/dispatcher/Dispatcher.java | 194 ++++++++++-------- .../runtime/dispatcher/DispatcherTest.java | 114 ++++++---- 2 files changed, 180 insertions(+), 128 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index 6aeebf4cc7b8e..b0e9985fc38c6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -230,90 +230,89 @@ public void start() throws Exception { @Override public CompletableFuture submitJob(JobGraph jobGraph, Time timeout) { - final JobID jobId = jobGraph.getJobID(); - log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName()); - - final RunningJobsRegistry.JobSchedulingStatus jobSchedulingStatus; + log.info("Submitting job {} ({}).", jobId, jobGraph.getName()); try { - jobSchedulingStatus = runningJobsRegistry.getJobSchedulingStatus(jobId); - } catch (IOException e) { - log.warn("Cannot retrieve job status for {}.", jobId, e); - return FutureUtils.completedExceptionally( - new JobSubmissionException(jobId, "Could not retrieve the job status.", e)); - } + final RunningJobsRegistry.JobSchedulingStatus jobSchedulingStatus = runningJobsRegistry.getJobSchedulingStatus(jobId); - if (jobSchedulingStatus != RunningJobsRegistry.JobSchedulingStatus.DONE && - !jobManagerRunners.containsKey(jobId)) { - try { - submittedJobGraphStore.putJobGraph(new SubmittedJobGraph(jobGraph, null)); - } catch (Exception e) { - log.warn("Cannot persist JobGraph.", e); + if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.DONE || jobManagerRunners.containsKey(jobId)) { return FutureUtils.completedExceptionally( - new JobSubmissionException(jobId, "Could not persist JobGraph.", e)); + new JobSubmissionException(jobId, String.format("Job has already been submitted and is in state %s.", jobSchedulingStatus))); + } else { + persistAndRunJob(jobGraph); + + return CompletableFuture.completedFuture(Acknowledge.get()); } + } catch (Exception e) { + return FutureUtils.completedExceptionally(new FlinkException(String.format("Failed to submit job %s.", jobId), e)); + } + } - final JobManagerRunner jobManagerRunner; + private void persistAndRunJob(JobGraph jobGraph) throws Exception { + submittedJobGraphStore.putJobGraph(new SubmittedJobGraph(jobGraph, null)); + try { + runJob(jobGraph); + } catch (Exception e) { try { - jobManagerRunner = jobManagerRunnerFactory.createJobManagerRunner( - ResourceID.generate(), - jobGraph, - configuration, - getRpcService(), - highAvailabilityServices, - heartbeatServices, - blobServer, - jobManagerSharedServices, - new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup), - fatalErrorHandler); - - jobManagerRunner.getResultFuture().whenCompleteAsync( - (ArchivedExecutionGraph archivedExecutionGraph, Throwable throwable) -> { - // check if we are still the active JobManagerRunner by checking the identity - //noinspection ObjectEquality - if (jobManagerRunner == jobManagerRunners.get(jobId)) { - if (archivedExecutionGraph != null) { - jobReachedGloballyTerminalState(archivedExecutionGraph); - } else { - final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable); - - if (strippedThrowable instanceof JobNotFinishedException) { - jobNotFinished(jobId); - } else { - onFatalError(new FlinkException("JobManagerRunner for job " + jobId + " failed.", strippedThrowable)); - } - } - } else { - log.debug("There is a newer JobManagerRunner for the job {}.", jobId); - } - }, getMainThreadExecutor()); + submittedJobGraphStore.removeJobGraph(jobGraph.getJobID()); + } catch (Exception ie) { + e.addSuppressed(ie); + } - jobManagerRunner.start(); - } catch (Exception e) { - try { - // We should only remove a job from the submitted job graph store - // if the initial submission failed. Never in case of a recovery - submittedJobGraphStore.removeJobGraph(jobId); - } catch (Throwable t) { - log.warn("Cannot remove job graph from submitted job graph store.", t); - e.addSuppressed(t); - } + throw e; + } + } - return FutureUtils.completedExceptionally( - new JobSubmissionException(jobId, "Could not start JobManager.", e)); - } + private void runJob(JobGraph jobGraph) throws Exception { + Preconditions.checkState(!jobManagerRunners.containsKey(jobGraph.getJobID())); - jobManagerRunners.put(jobId, jobManagerRunner); + final JobManagerRunner jobManagerRunner = createJobManagerRunner(jobGraph); - return CompletableFuture.completedFuture(Acknowledge.get()); - } else { - return FutureUtils.completedExceptionally( - new JobSubmissionException(jobId, "Job has already been submitted and " + - "is currently in state " + jobSchedulingStatus + '.')); - } + jobManagerRunner.start(); + + jobManagerRunners.put(jobGraph.getJobID(), jobManagerRunner); + } + + private JobManagerRunner createJobManagerRunner(JobGraph jobGraph) throws Exception { + final JobID jobId = jobGraph.getJobID(); + + final JobManagerRunner jobManagerRunner = jobManagerRunnerFactory.createJobManagerRunner( + ResourceID.generate(), + jobGraph, + configuration, + getRpcService(), + highAvailabilityServices, + heartbeatServices, + blobServer, + jobManagerSharedServices, + new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup), + fatalErrorHandler); + + jobManagerRunner.getResultFuture().whenCompleteAsync( + (ArchivedExecutionGraph archivedExecutionGraph, Throwable throwable) -> { + // check if we are still the active JobManagerRunner by checking the identity + //noinspection ObjectEquality + if (jobManagerRunner == jobManagerRunners.get(jobId)) { + if (archivedExecutionGraph != null) { + jobReachedGloballyTerminalState(archivedExecutionGraph); + } else { + final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable); + + if (strippedThrowable instanceof JobNotFinishedException) { + jobNotFinished(jobId); + } else { + jobMasterFailed(jobId, strippedThrowable); + } + } + } else { + log.debug("There is a newer JobManagerRunner for the job {}.", jobId); + } + }, getMainThreadExecutor()); + + return jobManagerRunner; } @Override @@ -526,7 +525,7 @@ public CompletableFuture triggerSavepoint( * @param jobId JobID identifying the job to clean up * @param cleanupHA True iff HA data shall also be cleaned up */ - private void removeJob(JobID jobId, boolean cleanupHA) throws Exception { + private void removeJob(JobID jobId, boolean cleanupHA) { JobManagerRunner jobManagerRunner = jobManagerRunners.remove(jobId); if (jobManagerRunner != null) { @@ -537,7 +536,11 @@ private void removeJob(JobID jobId, boolean cleanupHA) throws Exception { jobManagerMetricGroup.removeJob(jobId); if (cleanupHA) { - submittedJobGraphStore.removeJobGraph(jobId); + try { + submittedJobGraphStore.removeJobGraph(jobId); + } catch (Exception e) { + log.warn("Could not properly remove job {} from submitted job graph store.", jobId); + } } // TODO: remove job related files from blob server @@ -577,9 +580,7 @@ void recoverJobs() { for (JobID jobId : jobIds) { try { - SubmittedJobGraph submittedJobGraph = submittedJobGraphStore.recoverJobGraph(jobId); - - runAsync(() -> submitJob(submittedJobGraph.getJobGraph(), RpcUtils.INF_TIMEOUT)); + recoverJob(jobId); } catch (Exception e) { onFatalError(new FlinkException("Could not recover the job graph for " + jobId + '.', e)); } @@ -587,6 +588,31 @@ void recoverJobs() { }); } + private void recoverJob(JobID jobId) throws Exception { + SubmittedJobGraph submittedJobGraph = submittedJobGraphStore.recoverJobGraph(jobId); + + if (submittedJobGraph != null) { + final CompletableFuture runJobFuture = callAsync( + () -> { + runJob(submittedJobGraph.getJobGraph()); + return null; + }, + RpcUtils.INF_TIMEOUT); + + runJobFuture.whenComplete( + (Void ignored, Throwable throwable) -> { + if (throwable != null) { + onFatalError(new FlinkException( + String.format("Could not run the recovered job %s.", jobId), + throwable)); + } + }); + } else { + log.warn("Could not find job {} in submitted job graph store. Ignoring recover call.", jobId); + } + + } + protected void onFatalError(Throwable throwable) { fatalErrorHandler.onFatalError(throwable); } @@ -612,21 +638,19 @@ protected void jobReachedGloballyTerminalState(ArchivedExecutionGraph archivedEx final JobID jobId = archivedExecutionGraph.getJobID(); - try { - removeJob(jobId, true); - } catch (Exception e) { - log.warn("Could not properly remove job {} from the dispatcher.", jobId, e); - } + removeJob(jobId, true); } protected void jobNotFinished(JobID jobId) { log.info("Job {} was not finished by JobManager.", jobId); - try { - removeJob(jobId, false); - } catch (Exception e) { - log.warn("Could not properly remove job {} from the dispatcher.", jobId, e); - } + removeJob(jobId, false); + } + + private void jobMasterFailed(JobID jobId, Throwable cause) { + // we fail fatally in case of a JobMaster failure in order to restart the + // dispatcher to recover the jobs again. This only works in HA mode, though + onFatalError(new FlinkException(String.format("JobMaster for job %s failed.", jobId), cause)); } private void registerOrphanedJobManagerTerminationFuture(CompletableFuture jobManagerRunnerTerminationFuture) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index 9ff901a5a44be..6d8392eec0357 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -144,6 +144,8 @@ public class DispatcherTest extends TestLogger { private RunningJobsRegistry runningJobsRegistry; + private CountDownLatch createdJobManagerRunnerLatch; + private Configuration configuration; /** Instance under test. */ @@ -191,6 +193,7 @@ public void setUp() throws Exception { BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); + createdJobManagerRunnerLatch = new CountDownLatch(2); dispatcher = new TestingDispatcher( rpcService, Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName(), @@ -202,8 +205,8 @@ public void setUp() throws Exception { UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(), null, new MemoryArchivedExecutionGraphStore(), - fatalErrorHandler, - TEST_JOB_ID); + new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch), + fatalErrorHandler); dispatcher.start(); } @@ -251,16 +254,7 @@ public void testLeaderElection() throws Exception { return jobIds; }); - UUID expectedLeaderSessionId = UUID.randomUUID(); - - assertNull(dispatcherLeaderElectionService.getConfirmationFuture()); - - dispatcherLeaderElectionService.isLeader(expectedLeaderSessionId); - - UUID actualLeaderSessionId = dispatcherLeaderElectionService.getConfirmationFuture() - .get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS); - - assertEquals(expectedLeaderSessionId, actualLeaderSessionId); + electDispatcher(); // wait that we asked the SubmittedJobGraphStore for the stored jobs jobIdsFuture.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS); @@ -292,7 +286,7 @@ public void testSubmittedJobGraphListener() throws Exception { runningJobsRegistry.clearJob(TEST_JOB_ID); submittedJobGraphStore.putJobGraph(submittedJobGraph); dispatcher.onAddedJobGraph(TEST_JOB_ID); - dispatcher.submitJobLatch.await(); + createdJobManagerRunnerLatch.await(); assertThat(dispatcherGateway.listJobs(TIMEOUT).get(), hasSize(1)); } @@ -364,7 +358,7 @@ public void testJobRecovery() throws Exception { dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get(); // wait until we have recovered the job - dispatcher.submitJobLatch.await(); + createdJobManagerRunnerLatch.await(); // check whether the job has been recovered final Collection jobIds = dispatcherGateway.listJobs(TIMEOUT).get(); @@ -450,16 +444,7 @@ public void testFatalErrorAfterJobIdRecoveryFailure() throws Exception { throw testException; }); - UUID expectedLeaderSessionId = UUID.randomUUID(); - - assertNull(dispatcherLeaderElectionService.getConfirmationFuture()); - - dispatcherLeaderElectionService.isLeader(expectedLeaderSessionId); - - UUID actualLeaderSessionId = dispatcherLeaderElectionService.getConfirmationFuture() - .get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS); - - assertEquals(expectedLeaderSessionId, actualLeaderSessionId); + electDispatcher(); // we expect that a fatal error occurred final Throwable error = fatalErrorHandler.getErrorFuture().get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS); @@ -485,6 +470,39 @@ public void testFatalErrorAfterJobRecoveryFailure() throws Exception { throw testException; }); + electDispatcher(); + + // we expect that a fatal error occurred + final Throwable error = fatalErrorHandler.getErrorFuture().get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS); + + assertThat(ExceptionUtils.findThrowableWithMessage(error, testException.getMessage()).isPresent(), is(true)); + + fatalErrorHandler.clearError(); + } + + /** + * Tests that the {@link Dispatcher} fails fatally if the job submission of a recovered job fails. + * See FLINK-9097. + */ + @Test + public void testJobSubmissionErrorAfterJobRecovery() throws Exception { + final FlinkException testException = new FlinkException("Test exception"); + + final JobGraph failingJobGraph = createFailingJobGraph(testException); + + final SubmittedJobGraph submittedJobGraph = new SubmittedJobGraph(failingJobGraph, null); + submittedJobGraphStore.putJobGraph(submittedJobGraph); + + electDispatcher(); + + final Throwable error = fatalErrorHandler.getErrorFuture().get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS); + + assertThat(ExceptionUtils.findThrowableWithMessage(error, testException.getMessage()).isPresent(), is(true)); + + fatalErrorHandler.clearError(); + } + + private void electDispatcher() throws InterruptedException, ExecutionException, java.util.concurrent.TimeoutException { UUID expectedLeaderSessionId = UUID.randomUUID(); assertNull(dispatcherLeaderElectionService.getConfirmationFuture()); @@ -495,19 +513,33 @@ public void testFatalErrorAfterJobRecoveryFailure() throws Exception { .get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS); assertEquals(expectedLeaderSessionId, actualLeaderSessionId); + } - // we expect that a fatal error occurred - final Throwable error = fatalErrorHandler.getErrorFuture().get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS); + private JobGraph createFailingJobGraph(Exception failureCause) { + final FailingJobVertex jobVertex = new FailingJobVertex("Failing JobVertex", failureCause); + jobVertex.setInvokableClass(NoOpInvokable.class); + return new JobGraph(jobGraph.getJobID(), "Failing JobGraph", jobVertex); + } - assertThat(ExceptionUtils.findThrowableWithMessage(error, testException.getMessage()).isPresent(), is(true)); + private static class FailingJobVertex extends JobVertex { - fatalErrorHandler.clearError(); + private static final long serialVersionUID = 3218428829168840760L; + + private final Exception failure; + + private FailingJobVertex(String name, Exception failure) { + super(name); + this.failure = failure; + } + + @Override + public void initializeOnMaster(ClassLoader loader) throws Exception { + throw failure; + } } private static class TestingDispatcher extends Dispatcher { - private final CountDownLatch submitJobLatch = new CountDownLatch(2); - /** * Controls whether existing jobs in {@link SubmittedJobGraphStore} should be recovered * when {@link TestingDispatcher} is granted leadership. @@ -525,8 +557,8 @@ private TestingDispatcher( JobManagerMetricGroup jobManagerMetricGroup, @Nullable String metricQueryServicePath, ArchivedExecutionGraphStore archivedExecutionGraphStore, - FatalErrorHandler fatalErrorHandler, - JobID expectedJobId) throws Exception { + JobManagerRunnerFactory jobManagerRunnerFactory, + FatalErrorHandler fatalErrorHandler) throws Exception { super( rpcService, endpointId, @@ -539,20 +571,11 @@ private TestingDispatcher( jobManagerMetricGroup, metricQueryServicePath, archivedExecutionGraphStore, - new ExpectedJobIdJobManagerRunnerFactory(expectedJobId), + jobManagerRunnerFactory, fatalErrorHandler, null); } - @Override - public CompletableFuture submitJob(final JobGraph jobGraph, final Time timeout) { - final CompletableFuture submitJobFuture = super.submitJob(jobGraph, timeout); - - submitJobFuture.thenAccept(ignored -> submitJobLatch.countDown()); - - return submitJobFuture; - } - @Override void recoverJobs() { if (recoverJobsEnabled.get()) { @@ -571,8 +594,11 @@ private static final class ExpectedJobIdJobManagerRunnerFactory implements Dispa private final JobID expectedJobId; - private ExpectedJobIdJobManagerRunnerFactory(JobID expectedJobId) { + private final CountDownLatch createdJobManagerRunnerLatch; + + private ExpectedJobIdJobManagerRunnerFactory(JobID expectedJobId, CountDownLatch createdJobManagerRunnerLatch) { this.expectedJobId = expectedJobId; + this.createdJobManagerRunnerLatch = createdJobManagerRunnerLatch; } @Override @@ -589,6 +615,8 @@ public JobManagerRunner createJobManagerRunner( FatalErrorHandler fatalErrorHandler) throws Exception { assertEquals(expectedJobId, jobGraph.getJobID()); + createdJobManagerRunnerLatch.countDown(); + return Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE.createJobManagerRunner( resourceId, jobGraph, From c7dc3712e0a281591e7c9300201db699cc721d9e Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Thu, 29 Mar 2018 11:37:09 +0200 Subject: [PATCH 6/6] [hotfix] Recover jobs before setting fencing token --- .../flink/runtime/dispatcher/Dispatcher.java | 110 +++++++++--------- .../runtime/dispatcher/DispatcherTest.java | 33 ++---- 2 files changed, 65 insertions(+), 78 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index b0e9985fc38c6..41efaab7989d6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -564,53 +564,32 @@ private CompletableFuture terminateJobManagerRunners() { * Recovers all jobs persisted via the submitted job graph store. */ @VisibleForTesting - void recoverJobs() { + CompletableFuture> recoverJobs() { log.info("Recovering all persisted jobs."); - - getRpcService().execute( + return FutureUtils.supplyAsync( () -> { - final Collection jobIds; + final Collection jobIds = submittedJobGraphStore.getJobIds(); - try { - jobIds = submittedJobGraphStore.getJobIds(); - } catch (Exception e) { - onFatalError(new FlinkException("Could not recover job ids from the submitted job graph store. Aborting recovery.", e)); - return; - } + final List jobGraphs = new ArrayList<>(jobIds.size()); for (JobID jobId : jobIds) { - try { - recoverJob(jobId); - } catch (Exception e) { - onFatalError(new FlinkException("Could not recover the job graph for " + jobId + '.', e)); - } + jobGraphs.add(recoverJob(jobId)); } - }); + + return jobGraphs; + }, + getRpcService().getExecutor()); } - private void recoverJob(JobID jobId) throws Exception { + private JobGraph recoverJob(JobID jobId) throws Exception { + log.debug("Recover job {}.", jobId); SubmittedJobGraph submittedJobGraph = submittedJobGraphStore.recoverJobGraph(jobId); if (submittedJobGraph != null) { - final CompletableFuture runJobFuture = callAsync( - () -> { - runJob(submittedJobGraph.getJobGraph()); - return null; - }, - RpcUtils.INF_TIMEOUT); - - runJobFuture.whenComplete( - (Void ignored, Throwable throwable) -> { - if (throwable != null) { - onFatalError(new FlinkException( - String.format("Could not run the recovered job %s.", jobId), - throwable)); - } - }); + return submittedJobGraph.getJobGraph(); } else { - log.warn("Could not find job {} in submitted job graph store. Ignoring recover call.", jobId); + throw new FlinkJobNotFoundException(jobId); } - } protected void onFatalError(Throwable throwable) { @@ -713,29 +692,55 @@ private List>> queryJobMastersForInformation(F */ @Override public void grantLeadership(final UUID newLeaderSessionID) { - runAsyncWithoutFencing( - () -> { - final DispatcherId dispatcherId = new DispatcherId(newLeaderSessionID); + final DispatcherId dispatcherId = new DispatcherId(newLeaderSessionID); + log.info("Dispatcher {} was granted leadership with fencing token {}", getAddress(), dispatcherId); - log.info("Dispatcher {} was granted leadership with fencing token {}", getAddress(), dispatcherId); + final CompletableFuture> recoveredJobsFuture = recoverJobs(); - // clear the state if we've been the leader before - if (getFencingToken() != null) { - final CompletableFuture jobManagerRunnersTerminationFuture = terminateJobManagerRunners(); - registerOrphanedJobManagerTerminationFuture(jobManagerRunnersTerminationFuture); - jobManagerRunners.clear(); - } + final CompletableFuture fencingTokenFuture = recoveredJobsFuture.thenAcceptAsync( + (Collection recoveredJobs) -> { + setNewFencingToken(dispatcherId); - setFencingToken(dispatcherId); + for (JobGraph recoveredJob : recoveredJobs) { + try { + runJob(recoveredJob); + } catch (Exception e) { + throw new CompletionException( + new FlinkException( + String.format("Failed to recover job %s.", recoveredJob.getJobID()), + e)); + } + } + }, + getUnfencedMainThreadExecutor()); - // confirming the leader session ID might be blocking, - getRpcService().execute( - () -> leaderElectionService.confirmLeaderSessionID(newLeaderSessionID)); + final CompletableFuture confirmationFuture = fencingTokenFuture.thenRunAsync( + () -> leaderElectionService.confirmLeaderSessionID(newLeaderSessionID), + getRpcService().getExecutor()); - recoverJobs(); + confirmationFuture.whenComplete( + (Void ignored, Throwable throwable) -> { + if (throwable != null) { + onFatalError(ExceptionUtils.stripCompletionException(throwable)); + } }); } + private void setNewFencingToken(@Nullable DispatcherId dispatcherId) { + // clear the state if we've been the leader before + if (getFencingToken() != null) { + clearDispatcherState(); + } + + setFencingToken(dispatcherId); + } + + private void clearDispatcherState() { + final CompletableFuture jobManagerRunnersTerminationFuture = terminateJobManagerRunners(); + registerOrphanedJobManagerTerminationFuture(jobManagerRunnersTerminationFuture); + jobManagerRunners.clear(); + } + /** * Callback method when current resourceManager loses leadership. */ @@ -745,12 +750,7 @@ public void revokeLeadership() { () -> { log.info("Dispatcher {} was revoked leadership.", getAddress()); - final CompletableFuture jobManagerRunnersTerminationFuture = terminateJobManagerRunners(); - registerOrphanedJobManagerTerminationFuture(jobManagerRunnersTerminationFuture); - jobManagerRunners.clear(); - - // clear the fencing token indicating that we don't have the leadership right now - setFencingToken(null); + setNewFencingToken(null); }); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index 6d8392eec0357..e87e1ced25b35 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -99,7 +99,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.empty; @@ -148,6 +147,8 @@ public class DispatcherTest extends TestLogger { private Configuration configuration; + private BlobServer blobServer; + /** Instance under test. */ private TestingDispatcher dispatcher; @@ -194,13 +195,15 @@ public void setUp() throws Exception { temporaryFolder.newFolder().getAbsolutePath()); createdJobManagerRunnerLatch = new CountDownLatch(2); + blobServer = new BlobServer(configuration, new VoidBlobStore()); + dispatcher = new TestingDispatcher( rpcService, Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName(), configuration, haServices, new TestingResourceManagerGateway(), - new BlobServer(configuration, new VoidBlobStore()), + blobServer, heartbeatServices, UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(), null, @@ -218,6 +221,10 @@ public void tearDown() throws Exception { } finally { RpcUtils.terminateRpcEndpoint(dispatcher, TIMEOUT); } + + if (blobServer != null) { + blobServer.close(); + } } /** @@ -266,8 +273,6 @@ public void testLeaderElection() throws Exception { */ @Test public void testSubmittedJobGraphListener() throws Exception { - dispatcher.recoverJobsEnabled.set(false); - dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get(); final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); @@ -502,17 +507,12 @@ public void testJobSubmissionErrorAfterJobRecovery() throws Exception { fatalErrorHandler.clearError(); } - private void electDispatcher() throws InterruptedException, ExecutionException, java.util.concurrent.TimeoutException { + private void electDispatcher() { UUID expectedLeaderSessionId = UUID.randomUUID(); assertNull(dispatcherLeaderElectionService.getConfirmationFuture()); dispatcherLeaderElectionService.isLeader(expectedLeaderSessionId); - - UUID actualLeaderSessionId = dispatcherLeaderElectionService.getConfirmationFuture() - .get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS); - - assertEquals(expectedLeaderSessionId, actualLeaderSessionId); } private JobGraph createFailingJobGraph(Exception failureCause) { @@ -540,12 +540,6 @@ public void initializeOnMaster(ClassLoader loader) throws Exception { private static class TestingDispatcher extends Dispatcher { - /** - * Controls whether existing jobs in {@link SubmittedJobGraphStore} should be recovered - * when {@link TestingDispatcher} is granted leadership. - * */ - private final AtomicBoolean recoverJobsEnabled = new AtomicBoolean(true); - private TestingDispatcher( RpcService rpcService, String endpointId, @@ -576,13 +570,6 @@ private TestingDispatcher( null); } - @Override - void recoverJobs() { - if (recoverJobsEnabled.get()) { - super.recoverJobs(); - } - } - @VisibleForTesting void completeJobExecution(ArchivedExecutionGraph archivedExecutionGraph) { runAsync(