From 8d962f00a7f12675503294116af9d90ee59dbbeb Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Fri, 16 Feb 2018 15:04:32 +0100 Subject: [PATCH] [FLINK-8673] [flip6] Use JobManagerRunner#resultFuture for success and failure communication This commit removes the OnCompletionActions and FatalErrorHandler from the JobManagerRunner. Instead it communicates a successful job execution of the failure case through the JobManagerRunner#resultFuture. Furthermore, this commit no longer allows the JobManagerRunner to shut down itself. All shut down logic must be triggered by the owner of the JobManagerRunner. This closes #5510. --- .../flink/runtime/dispatcher/Dispatcher.java | 60 ++---- .../runtime/dispatcher/MiniDispatcher.java | 4 +- .../runtime/jobmaster/JobManagerRunner.java | 86 +++----- .../jobmaster/JobNotFinishedException.java | 33 +++ .../minicluster/MiniClusterJobDispatcher.java | 44 +++- .../apache/flink/runtime/rpc/RpcService.java | 4 +- .../runtime/dispatcher/DispatcherTest.java | 21 +- .../dispatcher/MiniDispatcherTest.java | 55 +++-- .../jobmaster/JobManagerRunnerTest.java | 201 +++++++++++++++++- 9 files changed, 370 insertions(+), 138 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobNotFinishedException.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index 89b82ef42ceef..e751bc47b0be7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -34,11 +34,11 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.jobmanager.OnCompletionActions; import org.apache.flink.runtime.jobmanager.SubmittedJobGraph; import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore; import org.apache.flink.runtime.jobmaster.JobManagerRunner; import org.apache.flink.runtime.jobmaster.JobManagerSharedServices; +import org.apache.flink.runtime.jobmaster.JobNotFinishedException; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.leaderelection.LeaderContender; import org.apache.flink.runtime.leaderelection.LeaderElectionService; @@ -243,10 +243,23 @@ public CompletableFuture submitJob(JobGraph jobGraph, Time timeout) blobServer, jobManagerSharedServices, metricRegistry, - new DispatcherOnCompleteActions(jobGraph.getJobID()), - fatalErrorHandler, restAddress); + jobManagerRunner.getResultFuture().whenCompleteAsync( + (ArchivedExecutionGraph archivedExecutionGraph, Throwable throwable) -> { + if (archivedExecutionGraph != null) { + jobReachedGloballyTerminalState(archivedExecutionGraph); + } else { + final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable); + + if (strippedThrowable instanceof JobNotFinishedException) { + jobNotFinished(jobId); + } else { + onFatalError(new FlinkException("JobManagerRunner for job " + jobId + " failed.", strippedThrowable)); + } + } + }, getMainThreadExecutor()); + jobManagerRunner.start(); } catch (Exception e) { try { @@ -544,6 +557,8 @@ protected void jobReachedGloballyTerminalState(ArchivedExecutionGraph archivedEx archivedExecutionGraph.getJobID(), archivedExecutionGraph.getState()); + log.info("Job {} reached globally terminal state {}.", archivedExecutionGraph.getJobID(), archivedExecutionGraph.getState()); + try { archivedExecutionGraphStore.put(archivedExecutionGraph); } catch (IOException e) { @@ -563,7 +578,9 @@ protected void jobReachedGloballyTerminalState(ArchivedExecutionGraph archivedEx } } - protected void jobFinishedByOther(JobID jobId) { + protected void jobNotFinished(JobID jobId) { + log.info("Job {} was not finished by JobManager.", jobId); + try { removeJob(jobId, false); } catch (Exception e) { @@ -667,35 +684,6 @@ public void onRemovedJobGraph(final JobID jobId) { }); } - //------------------------------------------------------ - // Utility classes - //------------------------------------------------------ - - @VisibleForTesting - class DispatcherOnCompleteActions implements OnCompletionActions { - - private final JobID jobId; - - DispatcherOnCompleteActions(JobID jobId) { - this.jobId = Preconditions.checkNotNull(jobId); - } - - @Override - public void jobReachedGloballyTerminalState(ArchivedExecutionGraph executionGraph) { - log.info("Job {} reached globally terminal state {}.", jobId, executionGraph.getState()); - - runAsync(() -> Dispatcher.this.jobReachedGloballyTerminalState(executionGraph)); - } - - @Override - public void jobFinishedByOther() { - log.info("Job {} was finished by other JobManager.", jobId); - - runAsync( - () -> Dispatcher.this.jobFinishedByOther(jobId)); - } - } - //------------------------------------------------------ // Factories //------------------------------------------------------ @@ -715,8 +703,6 @@ JobManagerRunner createJobManagerRunner( BlobServer blobServer, JobManagerSharedServices jobManagerServices, MetricRegistry metricRegistry, - OnCompletionActions onCompleteActions, - FatalErrorHandler fatalErrorHandler, @Nullable String restAddress) throws Exception; } @@ -737,8 +723,6 @@ public JobManagerRunner createJobManagerRunner( BlobServer blobServer, JobManagerSharedServices jobManagerServices, MetricRegistry metricRegistry, - OnCompletionActions onCompleteActions, - FatalErrorHandler fatalErrorHandler, @Nullable String restAddress) throws Exception { return new JobManagerRunner( resourceId, @@ -750,8 +734,6 @@ public JobManagerRunner createJobManagerRunner( blobServer, jobManagerServices, metricRegistry, - onCompleteActions, - fatalErrorHandler, restAddress); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java index 36732cef8424f..c648131a7c09e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java @@ -124,8 +124,8 @@ protected void jobReachedGloballyTerminalState(ArchivedExecutionGraph archivedEx } @Override - protected void jobFinishedByOther(JobID jobId) { - super.jobFinishedByOther(jobId); + protected void jobNotFinished(JobID jobId) { + super.jobNotFinished(jobId); // shut down since we have done our job shutDown(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java index 5e476bf2e3f63..285cb4af58855 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java @@ -51,7 +51,6 @@ import java.io.IOException; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -72,12 +71,6 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F /** The job graph needs to run. */ private final JobGraph jobGraph; - /** The listener to notify once the job completes - either successfully or unsuccessfully. */ - private final OnCompletionActions toNotifyOnComplete; - - /** The handler to call in case of fatal (unrecoverable) errors. */ - private final FatalErrorHandler errorHandler; - /** Used to check whether a job needs to be run. */ private final RunningJobsRegistry runningJobsRegistry; @@ -94,6 +87,8 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F private final CompletableFuture resultFuture; + private final CompletableFuture terminationFuture; + /** flag marking the runner as shut down. */ private volatile boolean shutdown; @@ -119,17 +114,16 @@ public JobManagerRunner( final BlobServer blobServer, final JobManagerSharedServices jobManagerSharedServices, final MetricRegistry metricRegistry, - final OnCompletionActions toNotifyOnComplete, - final FatalErrorHandler errorHandler, @Nullable final String restAddress) throws Exception { JobManagerMetricGroup jobManagerMetrics = null; + this.resultFuture = new CompletableFuture<>(); + this.terminationFuture = new CompletableFuture<>(); + // make sure we cleanly shut down out JobManager services if initialization fails try { this.jobGraph = checkNotNull(jobGraph); - this.toNotifyOnComplete = checkNotNull(toNotifyOnComplete); - this.errorHandler = checkNotNull(errorHandler); this.jobManagerSharedServices = checkNotNull(jobManagerSharedServices); checkArgument(jobGraph.getNumberOfVertices() > 0, "The given job is empty"); @@ -176,8 +170,6 @@ public JobManagerRunner( userCodeLoader, restAddress, metricRegistry.getMetricQueryServicePath()); - - this.resultFuture = new CompletableFuture<>(); } catch (Throwable t) { // clean up everything @@ -185,6 +177,9 @@ public JobManagerRunner( jobManagerMetrics.close(); } + terminationFuture.completeExceptionally(t); + resultFuture.completeExceptionally(t); + throw new JobExecutionException(jobGraph.getJobID(), "Could not set up JobManager", t); } } @@ -225,34 +220,43 @@ public void shutdown() throws Exception { private CompletableFuture shutdownInternally() { synchronized (lock) { - shutdown = true; + if (!shutdown) { + shutdown = true; - jobManager.shutDown(); + jobManager.shutDown(); - return jobManager.getTerminationFuture() - .thenAccept( - ignored -> { - Throwable exception = null; + final CompletableFuture jobManagerTerminationFuture = jobManager.getTerminationFuture(); + + jobManagerTerminationFuture.whenComplete( + (Boolean ignored, Throwable throwable) -> { try { leaderElectionService.stop(); } catch (Throwable t) { - exception = ExceptionUtils.firstOrSuppressed(t, exception); + throwable = ExceptionUtils.firstOrSuppressed(t, ExceptionUtils.stripCompletionException(throwable)); } // make all registered metrics go away try { jobManagerMetricGroup.close(); } catch (Throwable t) { - exception = ExceptionUtils.firstOrSuppressed(t, exception); + throwable = ExceptionUtils.firstOrSuppressed(t, throwable); } - if (exception != null) { - throw new CompletionException(new FlinkException("Could not properly shut down the JobManagerRunner.", exception)); + if (throwable != null) { + terminationFuture.completeExceptionally( + new FlinkException("Could not properly shut down the JobManagerRunner", throwable)); + } else { + terminationFuture.complete(null); } + }); - // cancel the result future if not already completed - resultFuture.cancel(false); + terminationFuture.whenComplete( + (Void ignored, Throwable throwable) -> { + resultFuture.completeExceptionally(new JobNotFinishedException(jobGraph.getJobID())); }); + } + + return terminationFuture; } } @@ -267,16 +271,7 @@ private CompletableFuture shutdownInternally() { public void jobReachedGloballyTerminalState(ArchivedExecutionGraph executionGraph) { // complete the result future with the terminal execution graph resultFuture.complete(executionGraph); - - try { - unregisterJobFromHighAvailability(); - shutdownInternally(); - } - finally { - if (toNotifyOnComplete != null) { - toNotifyOnComplete.jobReachedGloballyTerminalState(executionGraph); - } - } + unregisterJobFromHighAvailability(); } /** @@ -284,14 +279,7 @@ public void jobReachedGloballyTerminalState(ArchivedExecutionGraph executionGrap */ @Override public void jobFinishedByOther() { - try { - shutdownInternally(); - } - finally { - if (toNotifyOnComplete != null) { - toNotifyOnComplete.jobFinishedByOther(); - } - } + resultFuture.completeExceptionally(new JobNotFinishedException(jobGraph.getJobID())); } /** @@ -306,17 +294,7 @@ public void onFatalError(Throwable exception) { log.error("JobManager runner encountered a fatal error.", exception); } catch (Throwable ignored) {} - // in any case, notify our handler, so it can react fast - try { - if (errorHandler != null) { - errorHandler.onFatalError(exception); - } - } - finally { - // the shutdown may not even needed any more, if the fatal error - // handler kills the process. that is fine, a process kill cleans up better than anything. - shutdownInternally(); - } + resultFuture.completeExceptionally(exception); } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobNotFinishedException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobNotFinishedException.java new file mode 100644 index 0000000000000..3515f2f9a094b --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobNotFinishedException.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.JobException; + +/** + * Exception indicating that a Flink job has not been finished. + */ +public class JobNotFinishedException extends JobException { + private static final long serialVersionUID = 611413276562570622L; + + public JobNotFinishedException(JobID jobId) { + super("The job (" + jobId + ") has been not been finished."); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java index 6fba534a2bbd0..ef7a6e41e9393 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.jobmanager.OnCompletionActions; import org.apache.flink.runtime.jobmaster.JobManagerRunner; import org.apache.flink.runtime.jobmaster.JobManagerSharedServices; +import org.apache.flink.runtime.jobmaster.JobNotFinishedException; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.rpc.FatalErrorHandler; @@ -142,10 +143,10 @@ public MiniClusterJobDispatcher( MetricRegistry metricRegistry, int numJobManagers, RpcService[] rpcServices) throws Exception { - + checkArgument(numJobManagers >= 1); checkArgument(rpcServices.length == numJobManagers); - + this.configuration = checkNotNull(config); this.rpcServices = rpcServices; this.haServices = checkNotNull(haServices); @@ -237,7 +238,7 @@ public void runDetached(JobGraph job) throws JobExecutionException { * This method runs a job in blocking mode. The method returns only after the job * completed successfully, or after it failed terminally. * - * @param job The Flink job to execute + * @param job The Flink job to execute * @return The result of the job execution * * @throws JobExecutionException Thrown if anything went amiss during initial job launch, @@ -245,7 +246,7 @@ public void runDetached(JobGraph job) throws JobExecutionException { */ public JobExecutionResult runJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException { checkNotNull(job); - + LOG.info("Received job for blocking execution: {} ({})", job.getName(), job.getJobID()); final BlockingJobSync sync = new BlockingJobSync(job.getJobID(), numJobManagers); @@ -287,9 +288,32 @@ private JobManagerRunner[] startJobRunners( blobServer, jobManagerSharedServices, metricRegistry, - onCompletion, - errorHandler, null); + + final int index = i; + + runners[i].getResultFuture() + .whenComplete( + (ArchivedExecutionGraph archivedExecutionGraph, Throwable throwable) -> { + try { + runners[index].shutdown(); + } catch (Exception e) { + errorHandler.onFatalError(e); + } + + if (archivedExecutionGraph != null) { + onCompletion.jobReachedGloballyTerminalState(archivedExecutionGraph); + } else { + final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable); + + if (strippedThrowable instanceof JobNotFinishedException) { + onCompletion.jobFinishedByOther(); + } else { + errorHandler.onFatalError(strippedThrowable); + } + } + }); + runners[i].start(); } catch (Throwable t) { @@ -395,8 +419,8 @@ private void decrementCheckAndCleanup() { * This class is used to sync on blocking jobs across multiple runners. * Only after all runners reported back that they are finished, the * result will be released. - * - * That way it is guaranteed that after the blocking job submit call returns, + * + *

That way it is guaranteed that after the blocking job submit call returns, * the dispatcher is immediately free to accept another job. */ private static class BlockingJobSync implements OnCompletionActions, FatalErrorHandler { @@ -408,7 +432,7 @@ private static class BlockingJobSync implements OnCompletionActions, FatalErrorH private volatile Throwable runnerException; private volatile JobResult result; - + BlockingJobSync(JobID jobId, int numJobMastersToWaitFor) { this.jobId = jobId; this.jobMastersToWaitFor = new CountDownLatch(numJobMastersToWaitFor); @@ -430,6 +454,8 @@ public void onFatalError(Throwable exception) { if (runnerException == null) { runnerException = exception; } + + jobMastersToWaitFor.countDown(); } public JobExecutionResult getResult() throws JobExecutionException, InterruptedException { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java index 9b2e318888e63..089e4b0161b56 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java @@ -130,13 +130,13 @@ > CompletableFuture con /** * Gets the executor, provided by this RPC service. This executor can be used for example for * the {@code handleAsync(...)} or {@code thenAcceptAsync(...)} methods of futures. - * + * *

IMPORTANT: This executor does not isolate the method invocations against * any concurrent invocations and is therefore not suitable to run completion methods of futures * that modify state of an {@link RpcEndpoint}. For such operations, one needs to use the * {@link RpcEndpoint#getMainThreadExecutor() MainThreadExecutionContext} of that * {@code RpcEndpoint}. - * + * * @return The execution context provided by the RPC service */ Executor getExecutor(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index 6ea6383a39504..46cd9e26773e0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.dispatcher; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.BlobServerOptions; @@ -35,7 +36,6 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertex; -import org.apache.flink.runtime.jobmanager.OnCompletionActions; import org.apache.flink.runtime.jobmanager.SubmittedJobGraph; import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore; import org.apache.flink.runtime.jobmaster.JobManagerRunner; @@ -129,12 +129,12 @@ public class DispatcherTest extends TestLogger { private TestingDispatcher dispatcher; @BeforeClass - public static void setup() { + public static void setupClass() { rpcService = new TestingRpcService(); } @AfterClass - public static void teardown() { + public static void teardownClass() { if (rpcService != null) { rpcService.stopService(); @@ -274,10 +274,7 @@ public void testCacheJobExecutionResult() throws Exception { final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); - OnCompletionActions onCompletionActions; - final JobID failedJobId = new JobID(); - onCompletionActions = dispatcher.new DispatcherOnCompleteActions(failedJobId); final JobStatus expectedState = JobStatus.FAILED; final ArchivedExecutionGraph failedExecutionGraph = new ArchivedExecutionGraphBuilder() @@ -286,7 +283,7 @@ public void testCacheJobExecutionResult() throws Exception { .setFailureCause(new ErrorInfo(new RuntimeException("expected"), 1L)) .build(); - onCompletionActions.jobReachedGloballyTerminalState(failedExecutionGraph); + dispatcher.completeJobExecution(failedExecutionGraph); assertThat( dispatcherGateway.requestJobStatus(failedJobId, TIMEOUT).get(), @@ -398,6 +395,12 @@ void recoverJobs() { super.recoverJobs(); } } + + @VisibleForTesting + void completeJobExecution(ArchivedExecutionGraph archivedExecutionGraph) { + runAsync( + () -> jobReachedGloballyTerminalState(archivedExecutionGraph)); + } } private static final class ExpectedJobIdJobManagerRunnerFactory implements Dispatcher.JobManagerRunnerFactory { @@ -419,8 +422,6 @@ public JobManagerRunner createJobManagerRunner( BlobServer blobServer, JobManagerSharedServices jobManagerSharedServices, MetricRegistry metricRegistry, - OnCompletionActions onCompleteActions, - FatalErrorHandler fatalErrorHandler, @Nullable String restAddress) throws Exception { assertEquals(expectedJobId, jobGraph.getJobID()); @@ -434,8 +435,6 @@ public JobManagerRunner createJobManagerRunner( blobServer, jobManagerSharedServices, metricRegistry, - onCompleteActions, - fatalErrorHandler, restAddress); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java index 1040eee2ea2cc..dfabc611f301e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java @@ -31,7 +31,6 @@ import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; -import org.apache.flink.runtime.jobmanager.OnCompletionActions; import org.apache.flink.runtime.jobmaster.JobManagerRunner; import org.apache.flink.runtime.jobmaster.JobManagerSharedServices; import org.apache.flink.runtime.jobmaster.JobResult; @@ -40,7 +39,6 @@ import org.apache.flink.runtime.metrics.NoOpMetricRegistry; import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; -import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.TestingRpcService; @@ -67,6 +65,7 @@ import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; /** * Tests for the {@link MiniDispatcher}. @@ -74,18 +73,15 @@ @Category(Flip6.class) public class MiniDispatcherTest extends TestLogger { - private static final JobGraph jobGraph = new JobGraph(); - - private static final ArchivedExecutionGraph archivedExecutionGraph = new ArchivedExecutionGraphBuilder() - .setJobID(jobGraph.getJobID()) - .setState(JobStatus.FINISHED) - .build(); - private static final Time timeout = Time.seconds(10L); @ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder(); + private static JobGraph jobGraph; + + private static ArchivedExecutionGraph archivedExecutionGraph; + private static TestingRpcService rpcService; private static Configuration configuration; @@ -100,6 +96,8 @@ public class MiniDispatcherTest extends TestLogger { private CompletableFuture jobGraphFuture; + private CompletableFuture resultFuture; + private TestingLeaderElectionService dispatcherLeaderElectionService; private TestingHighAvailabilityServices highAvailabilityServices; @@ -110,6 +108,13 @@ public class MiniDispatcherTest extends TestLogger { @BeforeClass public static void setupClass() throws IOException { + jobGraph = new JobGraph(); + + archivedExecutionGraph = new ArchivedExecutionGraphBuilder() + .setJobID(jobGraph.getJobID()) + .setState(JobStatus.FINISHED) + .build(); + rpcService = new TestingRpcService(); configuration = new Configuration(); @@ -127,8 +132,9 @@ public void setup() throws Exception { highAvailabilityServices.setDispatcherLeaderElectionService(dispatcherLeaderElectionService); jobGraphFuture = new CompletableFuture<>(); + resultFuture = new CompletableFuture<>(); - testingJobManagerRunnerFactory = new TestingJobManagerRunnerFactory(jobGraphFuture); + testingJobManagerRunnerFactory = new TestingJobManagerRunnerFactory(jobGraphFuture, resultFuture); } @After @@ -180,15 +186,13 @@ public void testTerminationAfterJobCompletion() throws Exception { miniDispatcher.start(); try { - final Dispatcher.DispatcherOnCompleteActions completeActions = miniDispatcher.new DispatcherOnCompleteActions(jobGraph.getJobID()); - // wait until the Dispatcher is the leader dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get(); // wait until we have submitted the job jobGraphFuture.get(); - completeActions.jobReachedGloballyTerminalState(archivedExecutionGraph); + resultFuture.complete(archivedExecutionGraph); // wait until we terminate miniDispatcher.getTerminationFuture().get(); @@ -208,15 +212,13 @@ public void testJobResultRetrieval() throws Exception { miniDispatcher.start(); try { - final Dispatcher.DispatcherOnCompleteActions completeActions = miniDispatcher.new DispatcherOnCompleteActions(jobGraph.getJobID()); - // wait until the Dispatcher is the leader dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get(); // wait until we have submitted the job jobGraphFuture.get(); - completeActions.jobReachedGloballyTerminalState(archivedExecutionGraph); + resultFuture.complete(archivedExecutionGraph); final CompletableFuture terminationFuture = miniDispatcher.getTerminationFuture(); @@ -262,16 +264,31 @@ private MiniDispatcher createMiniDispatcher(ClusterEntrypoint.ExecutionMode exec private static final class TestingJobManagerRunnerFactory implements Dispatcher.JobManagerRunnerFactory { private final CompletableFuture jobGraphFuture; + private final CompletableFuture resultFuture; - private TestingJobManagerRunnerFactory(CompletableFuture jobGraphFuture) { + private TestingJobManagerRunnerFactory(CompletableFuture jobGraphFuture, CompletableFuture resultFuture) { this.jobGraphFuture = jobGraphFuture; + this.resultFuture = resultFuture; } @Override - public JobManagerRunner createJobManagerRunner(ResourceID resourceId, JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, BlobServer blobServer, JobManagerSharedServices jobManagerSharedServices, MetricRegistry metricRegistry, OnCompletionActions onCompleteActions, FatalErrorHandler fatalErrorHandler, @Nullable String restAddress) throws Exception { + public JobManagerRunner createJobManagerRunner( + ResourceID resourceId, + JobGraph jobGraph, + Configuration configuration, + RpcService rpcService, + HighAvailabilityServices highAvailabilityServices, + HeartbeatServices heartbeatServices, + BlobServer blobServer, + JobManagerSharedServices jobManagerSharedServices, + MetricRegistry metricRegistry, + @Nullable String restAddress) throws Exception { jobGraphFuture.complete(jobGraph); - return mock(JobManagerRunner.class); + final JobManagerRunner mock = mock(JobManagerRunner.class); + when(mock.getResultFuture()).thenReturn(resultFuture); + + return mock; } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java index 174422f9da001..ba72293c9cfde 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java @@ -18,7 +18,204 @@ package org.apache.flink.runtime.jobmaster; -public class JobManagerRunnerTest { +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.blob.VoidBlobStore; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.NoOpMetricRegistry; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nonnull; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +/** + * Tests for the {@link JobManagerRunner} + */ +public class JobManagerRunnerTest extends TestLogger { + + @ClassRule + public static TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static Configuration configuration; + + private static TestingRpcService rpcService; + + private static BlobServer blobServer; + + private static HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 1000L); + + private static JobManagerSharedServices jobManagerSharedServices; + + private static MetricRegistry metricRegistry; + + private static JobGraph jobGraph; + + private static ArchivedExecutionGraph archivedExecutionGraph; + + private TestingHighAvailabilityServices haServices; + + @BeforeClass + public static void setupClass() throws Exception { + configuration = new Configuration(); + rpcService = new TestingRpcService(); + + configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); + + blobServer = new BlobServer( + configuration, + new VoidBlobStore()); + + jobManagerSharedServices = JobManagerSharedServices.fromConfiguration(configuration, blobServer); + + metricRegistry = NoOpMetricRegistry.INSTANCE; + + final JobVertex jobVertex = new JobVertex("Test vertex"); + jobVertex.setInvokableClass(NoOpInvokable.class); + jobGraph = new JobGraph(jobVertex); + + archivedExecutionGraph = new ArchivedExecutionGraphBuilder() + .setJobID(jobGraph.getJobID()) + .setState(JobStatus.FINISHED) + .build(); + } + + @Before + public void setup() { + haServices = new TestingHighAvailabilityServices(); + haServices.setJobMasterLeaderElectionService(jobGraph.getJobID(), new TestingLeaderElectionService()); + haServices.setResourceManagerLeaderRetriever(new TestingLeaderRetrievalService()); + haServices.setCheckpointRecoveryFactory(new StandaloneCheckpointRecoveryFactory()); + } + + @After + public void tearDown() { + + } + + @AfterClass + public static void tearDownClass() throws Exception { + if (jobManagerSharedServices != null) { + jobManagerSharedServices.shutdown(); + } + + if (blobServer != null) { + blobServer.close(); + } + + if (rpcService != null) { + rpcService.stopService(); + } + } - // TODO: Test that + @Test + public void testJobCompletion() throws Exception { + final JobManagerRunner jobManagerRunner = createJobManagerRunner(); + + try { + jobManagerRunner.start(); + + final CompletableFuture resultFuture = jobManagerRunner.getResultFuture(); + + assertThat(resultFuture.isDone(), is(false)); + + jobManagerRunner.jobReachedGloballyTerminalState(archivedExecutionGraph); + + assertThat(resultFuture.get(), is(archivedExecutionGraph)); + } finally { + jobManagerRunner.shutdown(); + } + } + + @Test + public void testJobFinishedByOther() throws Exception { + final JobManagerRunner jobManagerRunner = createJobManagerRunner(); + + try { + jobManagerRunner.start(); + + final CompletableFuture resultFuture = jobManagerRunner.getResultFuture(); + + assertThat(resultFuture.isDone(), is(false)); + + jobManagerRunner.jobFinishedByOther(); + + try { + resultFuture.get(); + fail("Should have failed."); + } catch (ExecutionException ee) { + assertThat(ExceptionUtils.stripExecutionException(ee), instanceOf(JobNotFinishedException.class)); + } + } finally { + jobManagerRunner.shutdown(); + } + } + + @Test + public void testShutDown() throws Exception { + final JobManagerRunner jobManagerRunner = createJobManagerRunner(); + + try { + jobManagerRunner.start(); + + final CompletableFuture resultFuture = jobManagerRunner.getResultFuture(); + + assertThat(resultFuture.isDone(), is(false)); + + jobManagerRunner.shutdown(); + + try { + resultFuture.get(); + fail("Should have failed."); + } catch (ExecutionException ee) { + assertThat(ExceptionUtils.stripExecutionException(ee), instanceOf(JobNotFinishedException.class)); + } + } finally { + jobManagerRunner.shutdown(); + } + } + + @Nonnull + private JobManagerRunner createJobManagerRunner() throws Exception { + return new JobManagerRunner( + ResourceID.generate(), + jobGraph, + configuration, + rpcService, + haServices, + heartbeatServices, + blobServer, + jobManagerSharedServices, + metricRegistry, + null); + } }