From fcd264a707d3dd8ef4247825752c8639732c943c Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Mon, 6 Mar 2017 16:57:43 +0100 Subject: [PATCH] [FLINK-5971] [flip-6] Add timeout for registered jobs on the ResourceManager This PR introduces a timeout for inactive jobs on the ResourceManager. A job is inactive if there is no active leader known for this job. In case that a job times out, it will be removed from the ResourceManager. Additionally, this PR removes the dependency of the JobLeaderIdService on the RunningJobsRegistry. Fix YarnFlinkApplicationMasterRunner to use correct arguments for JobLeaderIdService Fix race condition in JobLeaderIdListener#cancelTimeout This closes #3488. --- .../flink/configuration/AkkaOptions.java | 7 + .../configuration/ResourceManagerOptions.java | 40 +++ .../resourcemanager/JobLeaderIdActions.java | 8 +- .../resourcemanager/JobLeaderIdService.java | 119 +++++--- .../resourcemanager/ResourceManager.java | 6 +- .../ResourceManagerConfiguration.java | 48 ++-- .../ResourceManagerRunner.java | 5 +- .../JobLeaderIdServiceTest.java | 269 ++++++++++++++++++ .../ResourceManagerHATest.java | 10 +- .../ResourceManagerJobMasterTest.java | 10 +- .../ResourceManagerTaskExecutorTest.java | 10 +- .../slotmanager/SlotProtocolTest.java | 21 +- .../taskexecutor/TaskExecutorITCase.java | 10 +- .../YarnFlinkApplicationMasterRunner.java | 5 +- 14 files changed, 498 insertions(+), 70 deletions(-) create mode 100644 flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java diff --git a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java index 7e4c2b71e7908..97b209e6136a9 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java @@ -28,6 +28,13 @@ @PublicEvolving public class AkkaOptions { + /** + * Timeout for akka ask calls + */ + public static final ConfigOption AKKA_ASK_TIMEOUT = ConfigOptions + .key("akka.ask.timeout") + .defaultValue("10 s"); + /** * The Akka tcp connection timeout. */ diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java new file mode 100644 index 0000000000000..6a09f197851f2 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * The set of configuration options relating to the ResourceManager + */ +@PublicEvolving +public class ResourceManagerOptions { + + /** + * Timeout for jobs which don't have a job manager as leader assigned. + */ + public static final ConfigOption JOB_TIMEOUT = ConfigOptions + .key("resourcemanager.job.timeout") + .defaultValue("5 minutes"); + + // --------------------------------------------------------------------------------------------- + + /** Not intended to be instantiated */ + private ResourceManagerOptions() {} +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdActions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdActions.java index 58777ef261f9c..4ca62090a098b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdActions.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdActions.java @@ -36,11 +36,13 @@ public interface JobLeaderIdActions { void jobLeaderLostLeadership(JobID jobId, UUID oldJobLeaderId); /** - * Request to remove the job from the {@link JobLeaderIdService}. + * Notify a job timeout. The job is identified by the given JobID. In order to check + * for the validity of the timeout the timeout id of the triggered timeout is provided. * - * @param jobId identifying the job to remove + * @param jobId JobID which identifies the timed out job + * @param timeoutId Id of the calling timeout to differentiate valid from invalid timeouts */ - void removeJob(JobID jobId); + void notifyJobTimeout(JobID jobId, UUID timeoutId); /** * Callback to report occurring errors. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java index 7ef39de94504f..8bffcd094386e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java @@ -19,12 +19,12 @@ package org.apache.flink.runtime.resourcemanager; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.concurrent.CompletableFuture; import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.highavailability.RunningJobsRegistry; -import org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.util.ExceptionUtils; @@ -32,11 +32,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; +import javax.annotation.Nullable; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import java.util.UUID; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; /** * Service which retrieves for a registered job the current job leader id (the leader id of the @@ -51,8 +54,9 @@ public class JobLeaderIdService { /** High availability services to use by this service */ private final HighAvailabilityServices highAvailabilityServices; - /** Registry to retrieve running jobs */ - private final RunningJobsRegistry runningJobsRegistry; + private final ScheduledExecutor scheduledExecutor; + + private final Time jobTimeout; /** Map of currently monitored jobs */ private final Map jobLeaderIdListeners; @@ -60,10 +64,13 @@ public class JobLeaderIdService { /** Actions to call when the job leader changes */ private JobLeaderIdActions jobLeaderIdActions; - public JobLeaderIdService(HighAvailabilityServices highAvailabilityServices) throws Exception { - this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices); - - this.runningJobsRegistry = highAvailabilityServices.getRunningJobsRegistry(); + public JobLeaderIdService( + HighAvailabilityServices highAvailabilityServices, + ScheduledExecutor scheduledExecutor, + Time jobTimeout) throws Exception { + this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices, "highAvailabilityServices"); + this.scheduledExecutor = Preconditions.checkNotNull(scheduledExecutor, "scheduledExecutor"); + this.jobTimeout = Preconditions.checkNotNull(jobTimeout, "jobTimeout"); jobLeaderIdListeners = new HashMap<>(4); @@ -142,8 +149,8 @@ public void addJob(JobID jobId) throws Exception { if (!jobLeaderIdListeners.containsKey(jobId)) { LeaderRetrievalService leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(jobId); - JobLeaderIdListener jobidListener = new JobLeaderIdListener(jobId, jobLeaderIdActions, leaderRetrievalService); - jobLeaderIdListeners.put(jobId, jobidListener); + JobLeaderIdListener jobIdListener = new JobLeaderIdListener(jobId, jobLeaderIdActions, leaderRetrievalService); + jobLeaderIdListeners.put(jobId, jobIdListener); } } @@ -183,6 +190,16 @@ public Future getLeaderId(JobID jobId) throws Exception { return listener.getLeaderIdFuture(); } + public boolean isValidTimeout(JobID jobId, UUID timeoutId) { + JobLeaderIdListener jobLeaderIdListener = jobLeaderIdListeners.get(jobId); + + if (null != jobLeaderIdListener) { + return Objects.equals(timeoutId, jobLeaderIdListener.getTimeoutId()); + } else { + return false; + } + } + // -------------------------------------------------------------------------------- // Static utility classes // -------------------------------------------------------------------------------- @@ -193,6 +210,7 @@ public Future getLeaderId(JobID jobId) throws Exception { * listener. */ private final class JobLeaderIdListener implements LeaderRetrievalListener { + private final Object timeoutLock = new Object(); private final JobID jobId; private final JobLeaderIdActions listenerJobLeaderIdActions; private final LeaderRetrievalService leaderRetrievalService; @@ -200,6 +218,15 @@ private final class JobLeaderIdListener implements LeaderRetrievalListener { private volatile CompletableFuture leaderIdFuture; private volatile boolean running = true; + /** Null if no timeout has been scheduled; otherwise non null */ + @Nullable + private volatile ScheduledFuture timeoutFuture; + + /** Null if no timeout has been scheduled; otherwise non null */ + @Nullable + private volatile UUID timeoutId; + + private JobLeaderIdListener( JobID jobId, JobLeaderIdActions listenerJobLeaderIdActions, @@ -210,6 +237,8 @@ private JobLeaderIdListener( leaderIdFuture = new FlinkCompletableFuture<>(); + activateTimeout(); + // start the leader service we're listening to leaderRetrievalService.start(this); } @@ -218,9 +247,15 @@ public Future getLeaderIdFuture() { return leaderIdFuture; } + @Nullable + public UUID getTimeoutId() { + return timeoutId; + } + public void stop() throws Exception { running = false; leaderRetrievalService.stop(); + cancelTimeout(); leaderIdFuture.completeExceptionally(new Exception("Job leader id service has been stopped.")); } @@ -244,29 +279,22 @@ public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionId) { leaderIdFuture.complete(leaderSessionId); } - try { - final JobSchedulingStatus jobStatus = runningJobsRegistry.getJobSchedulingStatus(jobId); - if (jobStatus == JobSchedulingStatus.PENDING || jobStatus == JobSchedulingStatus.RUNNING) { - if (leaderSessionId == null) { - // there is no new leader - if (previousJobLeaderId != null) { - // we had a previous job leader, so notify about his lost leadership - listenerJobLeaderIdActions.jobLeaderLostLeadership(jobId, previousJobLeaderId); - } - } else { - if (previousJobLeaderId != null && !leaderSessionId.equals(previousJobLeaderId)) { - // we had a previous leader and he's not the same as the new leader - listenerJobLeaderIdActions.jobLeaderLostLeadership(jobId, previousJobLeaderId); - } + if (previousJobLeaderId != null && !previousJobLeaderId.equals(leaderSessionId)) { + // we had a previous job leader, so notify about his lost leadership + listenerJobLeaderIdActions.jobLeaderLostLeadership(jobId, previousJobLeaderId); + + if (null == leaderSessionId) { + // No current leader active ==> Set a timeout for the job + activateTimeout(); + + // check if we got stopped asynchronously + if (!running) { + cancelTimeout(); } - } else { - // the job is no longer running so remove it - listenerJobLeaderIdActions.removeJob(jobId); } - } catch (IOException e) { - // cannot tell whether the job is still running or not so just remove the listener - LOG.debug("Encountered an error while checking the job registry for running jobs.", e); - listenerJobLeaderIdActions.removeJob(jobId); + } else if (null != leaderSessionId) { + // Cancel timeout because we've found an active leader for it + cancelTimeout(); } } else { LOG.debug("A leader id change {}@{} has been detected after the listener has been stopped.", @@ -283,5 +311,32 @@ public void handleError(Exception exception) { JobLeaderIdListener.class.getSimpleName(), exception); } } + + private void activateTimeout() { + synchronized (timeoutLock) { + cancelTimeout(); + + final UUID newTimeoutId = UUID.randomUUID(); + + timeoutId = newTimeoutId; + timeoutFuture = scheduledExecutor.schedule(new Runnable() { + @Override + public void run() { + listenerJobLeaderIdActions.notifyJobTimeout(jobId, newTimeoutId); + } + }, jobTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); + } + } + + private void cancelTimeout() { + synchronized (timeoutLock) { + if (timeoutFuture != null) { + timeoutFuture.cancel(true); + } + + timeoutFuture = null; + timeoutId = null; + } + } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index 3bcbfda7853cb..badfbe2c7e44f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -817,11 +817,13 @@ public void jobLeaderLostLeadership(final JobID jobId, final UUID oldJobLeaderId } @Override - public void removeJob(final JobID jobId) { + public void notifyJobTimeout(final JobID jobId, final UUID timeoutId) { runAsync(new Runnable() { @Override public void run() { - ResourceManager.this.removeJob(jobId); + if (jobLeaderIdService.isValidTimeout(jobId, timeoutId)) { + removeJob(jobId); + } } }); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java index 920f1fc210514..d04d852c972da 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java @@ -19,10 +19,9 @@ package org.apache.flink.runtime.resourcemanager; import org.apache.flink.api.common.time.Time; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ResourceManagerOptions; import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException; import org.apache.flink.util.Preconditions; import scala.concurrent.duration.Duration; @@ -34,10 +33,15 @@ public class ResourceManagerConfiguration { private final Time timeout; private final Time heartbeatInterval; + private final Time jobTimeout; - public ResourceManagerConfiguration(Time timeout, Time heartbeatInterval) { - this.timeout = Preconditions.checkNotNull(timeout); - this.heartbeatInterval = Preconditions.checkNotNull(heartbeatInterval); + public ResourceManagerConfiguration( + Time timeout, + Time heartbeatInterval, + Time jobTimeout) { + this.timeout = Preconditions.checkNotNull(timeout, "timeout"); + this.heartbeatInterval = Preconditions.checkNotNull(heartbeatInterval, "heartbeatInterval"); + this.jobTimeout = Preconditions.checkNotNull(jobTimeout, "jobTimeout"); } public Time getTimeout() { @@ -48,39 +52,45 @@ public Time getHeartbeatInterval() { return heartbeatInterval; } + public Time getJobTimeout() { + return jobTimeout; + } + // -------------------------------------------------------------------------- // Static factory methods // -------------------------------------------------------------------------- public static ResourceManagerConfiguration fromConfiguration(Configuration configuration) throws ConfigurationException { - ConfigOption timeoutOption = ConfigOptions - .key(ConfigConstants.AKKA_ASK_TIMEOUT) - .defaultValue(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT); - - final String strTimeout = configuration.getString(timeoutOption); + final String strTimeout = configuration.getString(AkkaOptions.AKKA_ASK_TIMEOUT); final Time timeout; try { timeout = Time.milliseconds(Duration.apply(strTimeout).toMillis()); } catch (NumberFormatException e) { throw new ConfigurationException("Could not parse the resource manager's timeout " + - "value " + timeoutOption + '.', e); + "value " + AkkaOptions.AKKA_ASK_TIMEOUT + '.', e); } - ConfigOption heartbeatIntervalOption = ConfigOptions - .key(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL) - .defaultValue(timeout.toString()); - - final String strHeartbeatInterval = configuration.getString(heartbeatIntervalOption); + final String strHeartbeatInterval = configuration.getString(AkkaOptions.AKKA_WATCH_HEARTBEAT_INTERVAL); final Time heartbeatInterval; try { heartbeatInterval = Time.milliseconds(Duration.apply(strHeartbeatInterval).toMillis()); } catch (NumberFormatException e) { throw new ConfigurationException("Could not parse the resource manager's heartbeat interval " + - "value " + timeoutOption + '.', e); + "value " + AkkaOptions.AKKA_WATCH_HEARTBEAT_INTERVAL + '.', e); + } + + final String strJobTimeout = configuration.getString(ResourceManagerOptions.JOB_TIMEOUT); + final Time jobTimeout; + + try { + jobTimeout = Time.milliseconds(Duration.apply(strJobTimeout).toMillis()); + } catch (NumberFormatException e) { + throw new ConfigurationException("Could not parse the resource manager's job timeout " + + "value " + ResourceManagerOptions.JOB_TIMEOUT + '.', e); } - return new ResourceManagerConfiguration(timeout, heartbeatInterval); + return new ResourceManagerConfiguration(timeout, heartbeatInterval, jobTimeout); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java index e0dee0bcb7e70..749b4075223bf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java @@ -54,7 +54,10 @@ public ResourceManagerRunner( final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration); final SlotManagerFactory slotManagerFactory = new DefaultSlotManager.Factory(); - final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(highAvailabilityServices); + final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService( + highAvailabilityServices, + rpcService.getScheduledExecutor(), + resourceManagerConfiguration.getJobTimeout()); this.resourceManager = new StandaloneResourceManager( rpcService, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java new file mode 100644 index 0000000000000..d5e99bd5b1a08 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; +import org.apache.flink.util.TestLogger; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.util.ArrayDeque; +import java.util.Arrays; +import java.util.Queue; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class JobLeaderIdServiceTest extends TestLogger { + + /** + * Tests adding a job and finding out its leader id + */ + @Test(timeout = 10000) + public void testAddingJob() throws Exception { + final JobID jobId = new JobID(); + final String address = "foobar"; + final UUID leaderId = UUID.randomUUID(); + TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); + TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(); + + highAvailabilityServices.setJobMasterLeaderRetriever(jobId, leaderRetrievalService); + + ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class); + Time timeout = Time.milliseconds(5000L); + JobLeaderIdActions jobLeaderIdActions = mock(JobLeaderIdActions.class); + + JobLeaderIdService jobLeaderIdService = new JobLeaderIdService( + highAvailabilityServices, + scheduledExecutor, + timeout); + + jobLeaderIdService.start(jobLeaderIdActions); + + jobLeaderIdService.addJob(jobId); + + Future leaderIdFuture = jobLeaderIdService.getLeaderId(jobId); + + // notify the leader id service about the new leader + leaderRetrievalService.notifyListener(address, leaderId); + + assertEquals(leaderId, leaderIdFuture.get()); + + assertTrue(jobLeaderIdService.containsJob(jobId)); + } + + /** + * Tests that removing a job completes the job leader id future exceptionally + */ + @Test(timeout = 10000) + public void testRemovingJob() throws Exception { + final JobID jobId = new JobID(); + TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); + TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(); + + highAvailabilityServices.setJobMasterLeaderRetriever(jobId, leaderRetrievalService); + + ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class); + Time timeout = Time.milliseconds(5000L); + JobLeaderIdActions jobLeaderIdActions = mock(JobLeaderIdActions.class); + + JobLeaderIdService jobLeaderIdService = new JobLeaderIdService( + highAvailabilityServices, + scheduledExecutor, + timeout); + + jobLeaderIdService.start(jobLeaderIdActions); + + jobLeaderIdService.addJob(jobId); + + Future leaderIdFuture = jobLeaderIdService.getLeaderId(jobId); + + // remove the job before we could find a leader + jobLeaderIdService.removeJob(jobId); + + assertFalse(jobLeaderIdService.containsJob(jobId)); + + try { + leaderIdFuture.get(); + + fail("The leader id future should be completed exceptionally."); + } catch (ExecutionException ignored) { + // expected exception + } + } + + /** + * Tests that the initial job registration registers a timeout which will call + * {@link JobLeaderIdActions#notifyJobTimeout(JobID, UUID)} when executed. + */ + @Test + public void testInitialJobTimeout() throws Exception { + final JobID jobId = new JobID(); + TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); + TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(); + + highAvailabilityServices.setJobMasterLeaderRetriever(jobId, leaderRetrievalService); + + ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class); + Time timeout = Time.milliseconds(5000L); + JobLeaderIdActions jobLeaderIdActions = mock(JobLeaderIdActions.class); + + JobLeaderIdService jobLeaderIdService = new JobLeaderIdService( + highAvailabilityServices, + scheduledExecutor, + timeout); + + jobLeaderIdService.start(jobLeaderIdActions); + + jobLeaderIdService.addJob(jobId); + + assertTrue(jobLeaderIdService.containsJob(jobId)); + + ArgumentCaptor runnableArgumentCaptor = ArgumentCaptor.forClass(Runnable.class); + verify(scheduledExecutor).schedule(runnableArgumentCaptor.capture(), anyLong(), any(TimeUnit.class)); + + Runnable timeoutRunnable = runnableArgumentCaptor.getValue(); + timeoutRunnable.run(); + + ArgumentCaptor timeoutIdArgumentCaptor = ArgumentCaptor.forClass(UUID.class); + + verify(jobLeaderIdActions, times(1)).notifyJobTimeout(eq(jobId), timeoutIdArgumentCaptor.capture()); + + assertTrue(jobLeaderIdService.isValidTimeout(jobId, timeoutIdArgumentCaptor.getValue())); + } + + /** + * Tests that a timeout get cancelled once a job leader has been found. Furthermore, it tests + * that a new timeout is registered after the jobmanager has lost leadership. + */ + @Test(timeout = 10000) + public void jobTimeoutAfterLostLeadership() throws Exception { + final JobID jobId = new JobID(); + final String address = "foobar"; + final UUID leaderId = UUID.randomUUID(); + TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); + TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(); + + highAvailabilityServices.setJobMasterLeaderRetriever(jobId, leaderRetrievalService); + + ScheduledFuture timeout1 = mock(ScheduledFuture.class); + ScheduledFuture timeout2 = mock(ScheduledFuture.class); + final Queue> timeoutQueue = new ArrayDeque<>(Arrays.asList(timeout1, timeout2)); + ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class); + + final AtomicReference lastRunnable = new AtomicReference<>(); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + lastRunnable.set((Runnable) invocation.getArguments()[0]); + + return timeoutQueue.poll(); + } + }).when(scheduledExecutor).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class)); + + Time timeout = Time.milliseconds(5000L); + JobLeaderIdActions jobLeaderIdActions = mock(JobLeaderIdActions.class); + + final AtomicReference lastTimeoutId = new AtomicReference<>(); + + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + lastTimeoutId.set((UUID) invocation.getArguments()[1]); + return null; + } + }).when(jobLeaderIdActions).notifyJobTimeout(eq(jobId), any(UUID.class)); + + JobLeaderIdService jobLeaderIdService = new JobLeaderIdService( + highAvailabilityServices, + scheduledExecutor, + timeout); + + jobLeaderIdService.start(jobLeaderIdActions); + + jobLeaderIdService.addJob(jobId); + + Future leaderIdFuture = jobLeaderIdService.getLeaderId(jobId); + + // notify the leader id service about the new leader + leaderRetrievalService.notifyListener(address, leaderId); + + assertEquals(leaderId, leaderIdFuture.get()); + + assertTrue(jobLeaderIdService.containsJob(jobId)); + + // check that the first timeout got cancelled + verify(timeout1, times(1)).cancel(anyBoolean()); + + verify(scheduledExecutor, times(1)).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class)); + + // initial timeout runnable which should no longer have an effect + Runnable runnable = lastRunnable.get(); + + assertNotNull(runnable); + + runnable.run(); + + verify(jobLeaderIdActions, times(1)).notifyJobTimeout(eq(jobId), any(UUID.class)); + + // the timeout should no longer be valid + assertFalse(jobLeaderIdService.isValidTimeout(jobId, lastTimeoutId.get())); + + // lose leadership + leaderRetrievalService.notifyListener("", null); + + verify(scheduledExecutor, times(2)).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class)); + + + // the second runnable should be the new timeout + runnable = lastRunnable.get(); + + assertNotNull(runnable); + + runnable.run(); + + verify(jobLeaderIdActions, times(2)).notifyJobTimeout(eq(jobId), any(UUID.class)); + + // the new timeout should be valid + assertTrue(jobLeaderIdService.isValidTimeout(jobId, lastTimeoutId.get())); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java index 2e52eeb101044..58dedc33f3ae8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java @@ -46,10 +46,16 @@ public void testGrantAndRevokeLeadership() throws Exception { TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService); - ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(Time.seconds(5L), Time.seconds(5L)); + ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration( + Time.seconds(5L), + Time.seconds(5L), + Time.minutes(5L)); SlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory(); MetricRegistry metricRegistry = mock(MetricRegistry.class); - JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(highAvailabilityServices); + JobLeaderIdService jobLeaderIdService = new JobLeaderIdService( + highAvailabilityServices, + rpcService.getScheduledExecutor(), + resourceManagerConfiguration.getJobTimeout()); TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); final ResourceManager resourceManager = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java index 2622634dfcb78..031f76ec753f5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java @@ -193,10 +193,16 @@ private ResourceManager createAndStartResourceManager( highAvailabilityServices.setResourceManagerLeaderElectionService(resourceManagerLeaderElectionService); highAvailabilityServices.setJobMasterLeaderRetriever(jobID, jobMasterLeaderRetrievalService); - ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(Time.seconds(5L), Time.seconds(5L)); + ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration( + Time.seconds(5L), + Time.seconds(5L), + Time.minutes(5L)); SlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory(); MetricRegistry metricRegistry = mock(MetricRegistry.class); - JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(highAvailabilityServices); + JobLeaderIdService jobLeaderIdService = new JobLeaderIdService( + highAvailabilityServices, + rpcService.getScheduledExecutor(), + resourceManagerConfiguration.getJobTimeout()); ResourceManager resourceManager = new StandaloneResourceManager( rpcService, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java index 10161817dc621..4456235f5aab0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java @@ -146,9 +146,15 @@ private StandaloneResourceManager createAndStartResourceManager(TestingLeaderEle TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService); TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory(); - ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(Time.seconds(5L), Time.seconds(5L)); + ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration( + Time.seconds(5L), + Time.seconds(5L), + Time.minutes(5L)); MetricRegistry metricRegistry = mock(MetricRegistry.class); - JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(highAvailabilityServices); + JobLeaderIdService jobLeaderIdService = new JobLeaderIdService( + highAvailabilityServices, + rpcService.getScheduledExecutor(), + resourceManagerConfiguration.getJobTimeout()); StandaloneResourceManager resourceManager = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java index a3ba436a74c34..1e5edbed364d8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java @@ -107,8 +107,15 @@ public void testSlotsUnavailableRequest() throws Exception { TestingLeaderElectionService rmLeaderElectionService = configureHA(testingHaServices, jobID, rmAddress, rmLeaderID, jmAddress, jmLeaderID); - ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(Time.seconds(5L), Time.seconds(5L)); - JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(testingHaServices); + ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration( + Time.seconds(5L), + Time.seconds(5L), + Time.minutes(5L)); + + JobLeaderIdService jobLeaderIdService = new JobLeaderIdService( + testingHaServices, + testRpcService.getScheduledExecutor(), + resourceManagerConfiguration.getJobTimeout()); final TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory(); SpiedResourceManager resourceManager = @@ -208,9 +215,15 @@ public void testSlotAvailableRequest() throws Exception { .thenReturn(new FlinkCompletableFuture()); testRpcService.registerGateway(tmAddress, taskExecutorGateway); - ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(Time.seconds(5L), Time.seconds(5L)); + ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration( + Time.seconds(5L), + Time.seconds(5L), + Time.minutes(5L)); - JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(testingHaServices); + JobLeaderIdService jobLeaderIdService = new JobLeaderIdService( + testingHaServices, + testRpcService.getScheduledExecutor(), + resourceManagerConfiguration.getJobTimeout()); TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory(); ResourceManager resourceManager = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java index 0f884f26ec505..898584c8df6a6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java @@ -93,9 +93,15 @@ public void testSlotAllocation() throws Exception { testingHAServices.setJobMasterLeaderRetriever(jobId, new TestingLeaderRetrievalService(jmAddress, jmLeaderId)); TestingSerialRpcService rpcService = new TestingSerialRpcService(); - ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(Time.milliseconds(500L), Time.milliseconds(500L)); + ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration( + Time.milliseconds(500L), + Time.milliseconds(500L), + Time.minutes(5L)); SlotManagerFactory slotManagerFactory = new DefaultSlotManager.Factory(); - JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(testingHAServices); + JobLeaderIdService jobLeaderIdService = new JobLeaderIdService( + testingHAServices, + rpcService.getScheduledExecutor(), + resourceManagerConfiguration.getJobTimeout()); MetricRegistry metricRegistry = mock(MetricRegistry.class); final TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration); diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java index e2aa6eccdca93..ddeb02e4fe768 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java @@ -187,7 +187,10 @@ protected RpcService createRpcService( private ResourceManager createResourceManager(Configuration config) throws Exception { final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(config); final SlotManagerFactory slotManagerFactory = new DefaultSlotManager.Factory(); - final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(haServices); + final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService( + haServices, + commonRpcService.getScheduledExecutor(), + resourceManagerConfiguration.getJobTimeout()); return new YarnResourceManager(config, ENV,