diff --git a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java index 7e4c2b71e7908..97b209e6136a9 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java @@ -28,6 +28,13 @@ @PublicEvolving public class AkkaOptions { + /** + * Timeout for akka ask calls + */ + public static final ConfigOption AKKA_ASK_TIMEOUT = ConfigOptions + .key("akka.ask.timeout") + .defaultValue("10 s"); + /** * The Akka tcp connection timeout. */ diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java new file mode 100644 index 0000000000000..6a09f197851f2 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * The set of configuration options relating to the ResourceManager + */ +@PublicEvolving +public class ResourceManagerOptions { + + /** + * Timeout for jobs which don't have a job manager as leader assigned. + */ + public static final ConfigOption JOB_TIMEOUT = ConfigOptions + .key("resourcemanager.job.timeout") + .defaultValue("5 minutes"); + + // --------------------------------------------------------------------------------------------- + + /** Not intended to be instantiated */ + private ResourceManagerOptions() {} +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdActions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdActions.java index 58777ef261f9c..4ca62090a098b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdActions.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdActions.java @@ -36,11 +36,13 @@ public interface JobLeaderIdActions { void jobLeaderLostLeadership(JobID jobId, UUID oldJobLeaderId); /** - * Request to remove the job from the {@link JobLeaderIdService}. + * Notify a job timeout. The job is identified by the given JobID. In order to check + * for the validity of the timeout the timeout id of the triggered timeout is provided. * - * @param jobId identifying the job to remove + * @param jobId JobID which identifies the timed out job + * @param timeoutId Id of the calling timeout to differentiate valid from invalid timeouts */ - void removeJob(JobID jobId); + void notifyJobTimeout(JobID jobId, UUID timeoutId); /** * Callback to report occurring errors. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java index 7ef39de94504f..8bffcd094386e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java @@ -19,12 +19,12 @@ package org.apache.flink.runtime.resourcemanager; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.concurrent.CompletableFuture; import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.highavailability.RunningJobsRegistry; -import org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.util.ExceptionUtils; @@ -32,11 +32,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; +import javax.annotation.Nullable; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import java.util.UUID; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; /** * Service which retrieves for a registered job the current job leader id (the leader id of the @@ -51,8 +54,9 @@ public class JobLeaderIdService { /** High availability services to use by this service */ private final HighAvailabilityServices highAvailabilityServices; - /** Registry to retrieve running jobs */ - private final RunningJobsRegistry runningJobsRegistry; + private final ScheduledExecutor scheduledExecutor; + + private final Time jobTimeout; /** Map of currently monitored jobs */ private final Map jobLeaderIdListeners; @@ -60,10 +64,13 @@ public class JobLeaderIdService { /** Actions to call when the job leader changes */ private JobLeaderIdActions jobLeaderIdActions; - public JobLeaderIdService(HighAvailabilityServices highAvailabilityServices) throws Exception { - this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices); - - this.runningJobsRegistry = highAvailabilityServices.getRunningJobsRegistry(); + public JobLeaderIdService( + HighAvailabilityServices highAvailabilityServices, + ScheduledExecutor scheduledExecutor, + Time jobTimeout) throws Exception { + this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices, "highAvailabilityServices"); + this.scheduledExecutor = Preconditions.checkNotNull(scheduledExecutor, "scheduledExecutor"); + this.jobTimeout = Preconditions.checkNotNull(jobTimeout, "jobTimeout"); jobLeaderIdListeners = new HashMap<>(4); @@ -142,8 +149,8 @@ public void addJob(JobID jobId) throws Exception { if (!jobLeaderIdListeners.containsKey(jobId)) { LeaderRetrievalService leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(jobId); - JobLeaderIdListener jobidListener = new JobLeaderIdListener(jobId, jobLeaderIdActions, leaderRetrievalService); - jobLeaderIdListeners.put(jobId, jobidListener); + JobLeaderIdListener jobIdListener = new JobLeaderIdListener(jobId, jobLeaderIdActions, leaderRetrievalService); + jobLeaderIdListeners.put(jobId, jobIdListener); } } @@ -183,6 +190,16 @@ public Future getLeaderId(JobID jobId) throws Exception { return listener.getLeaderIdFuture(); } + public boolean isValidTimeout(JobID jobId, UUID timeoutId) { + JobLeaderIdListener jobLeaderIdListener = jobLeaderIdListeners.get(jobId); + + if (null != jobLeaderIdListener) { + return Objects.equals(timeoutId, jobLeaderIdListener.getTimeoutId()); + } else { + return false; + } + } + // -------------------------------------------------------------------------------- // Static utility classes // -------------------------------------------------------------------------------- @@ -193,6 +210,7 @@ public Future getLeaderId(JobID jobId) throws Exception { * listener. */ private final class JobLeaderIdListener implements LeaderRetrievalListener { + private final Object timeoutLock = new Object(); private final JobID jobId; private final JobLeaderIdActions listenerJobLeaderIdActions; private final LeaderRetrievalService leaderRetrievalService; @@ -200,6 +218,15 @@ private final class JobLeaderIdListener implements LeaderRetrievalListener { private volatile CompletableFuture leaderIdFuture; private volatile boolean running = true; + /** Null if no timeout has been scheduled; otherwise non null */ + @Nullable + private volatile ScheduledFuture timeoutFuture; + + /** Null if no timeout has been scheduled; otherwise non null */ + @Nullable + private volatile UUID timeoutId; + + private JobLeaderIdListener( JobID jobId, JobLeaderIdActions listenerJobLeaderIdActions, @@ -210,6 +237,8 @@ private JobLeaderIdListener( leaderIdFuture = new FlinkCompletableFuture<>(); + activateTimeout(); + // start the leader service we're listening to leaderRetrievalService.start(this); } @@ -218,9 +247,15 @@ public Future getLeaderIdFuture() { return leaderIdFuture; } + @Nullable + public UUID getTimeoutId() { + return timeoutId; + } + public void stop() throws Exception { running = false; leaderRetrievalService.stop(); + cancelTimeout(); leaderIdFuture.completeExceptionally(new Exception("Job leader id service has been stopped.")); } @@ -244,29 +279,22 @@ public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionId) { leaderIdFuture.complete(leaderSessionId); } - try { - final JobSchedulingStatus jobStatus = runningJobsRegistry.getJobSchedulingStatus(jobId); - if (jobStatus == JobSchedulingStatus.PENDING || jobStatus == JobSchedulingStatus.RUNNING) { - if (leaderSessionId == null) { - // there is no new leader - if (previousJobLeaderId != null) { - // we had a previous job leader, so notify about his lost leadership - listenerJobLeaderIdActions.jobLeaderLostLeadership(jobId, previousJobLeaderId); - } - } else { - if (previousJobLeaderId != null && !leaderSessionId.equals(previousJobLeaderId)) { - // we had a previous leader and he's not the same as the new leader - listenerJobLeaderIdActions.jobLeaderLostLeadership(jobId, previousJobLeaderId); - } + if (previousJobLeaderId != null && !previousJobLeaderId.equals(leaderSessionId)) { + // we had a previous job leader, so notify about his lost leadership + listenerJobLeaderIdActions.jobLeaderLostLeadership(jobId, previousJobLeaderId); + + if (null == leaderSessionId) { + // No current leader active ==> Set a timeout for the job + activateTimeout(); + + // check if we got stopped asynchronously + if (!running) { + cancelTimeout(); } - } else { - // the job is no longer running so remove it - listenerJobLeaderIdActions.removeJob(jobId); } - } catch (IOException e) { - // cannot tell whether the job is still running or not so just remove the listener - LOG.debug("Encountered an error while checking the job registry for running jobs.", e); - listenerJobLeaderIdActions.removeJob(jobId); + } else if (null != leaderSessionId) { + // Cancel timeout because we've found an active leader for it + cancelTimeout(); } } else { LOG.debug("A leader id change {}@{} has been detected after the listener has been stopped.", @@ -283,5 +311,32 @@ public void handleError(Exception exception) { JobLeaderIdListener.class.getSimpleName(), exception); } } + + private void activateTimeout() { + synchronized (timeoutLock) { + cancelTimeout(); + + final UUID newTimeoutId = UUID.randomUUID(); + + timeoutId = newTimeoutId; + timeoutFuture = scheduledExecutor.schedule(new Runnable() { + @Override + public void run() { + listenerJobLeaderIdActions.notifyJobTimeout(jobId, newTimeoutId); + } + }, jobTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); + } + } + + private void cancelTimeout() { + synchronized (timeoutLock) { + if (timeoutFuture != null) { + timeoutFuture.cancel(true); + } + + timeoutFuture = null; + timeoutId = null; + } + } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index 3bcbfda7853cb..badfbe2c7e44f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -817,11 +817,13 @@ public void jobLeaderLostLeadership(final JobID jobId, final UUID oldJobLeaderId } @Override - public void removeJob(final JobID jobId) { + public void notifyJobTimeout(final JobID jobId, final UUID timeoutId) { runAsync(new Runnable() { @Override public void run() { - ResourceManager.this.removeJob(jobId); + if (jobLeaderIdService.isValidTimeout(jobId, timeoutId)) { + removeJob(jobId); + } } }); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java index 920f1fc210514..d04d852c972da 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java @@ -19,10 +19,9 @@ package org.apache.flink.runtime.resourcemanager; import org.apache.flink.api.common.time.Time; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ResourceManagerOptions; import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException; import org.apache.flink.util.Preconditions; import scala.concurrent.duration.Duration; @@ -34,10 +33,15 @@ public class ResourceManagerConfiguration { private final Time timeout; private final Time heartbeatInterval; + private final Time jobTimeout; - public ResourceManagerConfiguration(Time timeout, Time heartbeatInterval) { - this.timeout = Preconditions.checkNotNull(timeout); - this.heartbeatInterval = Preconditions.checkNotNull(heartbeatInterval); + public ResourceManagerConfiguration( + Time timeout, + Time heartbeatInterval, + Time jobTimeout) { + this.timeout = Preconditions.checkNotNull(timeout, "timeout"); + this.heartbeatInterval = Preconditions.checkNotNull(heartbeatInterval, "heartbeatInterval"); + this.jobTimeout = Preconditions.checkNotNull(jobTimeout, "jobTimeout"); } public Time getTimeout() { @@ -48,39 +52,45 @@ public Time getHeartbeatInterval() { return heartbeatInterval; } + public Time getJobTimeout() { + return jobTimeout; + } + // -------------------------------------------------------------------------- // Static factory methods // -------------------------------------------------------------------------- public static ResourceManagerConfiguration fromConfiguration(Configuration configuration) throws ConfigurationException { - ConfigOption timeoutOption = ConfigOptions - .key(ConfigConstants.AKKA_ASK_TIMEOUT) - .defaultValue(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT); - - final String strTimeout = configuration.getString(timeoutOption); + final String strTimeout = configuration.getString(AkkaOptions.AKKA_ASK_TIMEOUT); final Time timeout; try { timeout = Time.milliseconds(Duration.apply(strTimeout).toMillis()); } catch (NumberFormatException e) { throw new ConfigurationException("Could not parse the resource manager's timeout " + - "value " + timeoutOption + '.', e); + "value " + AkkaOptions.AKKA_ASK_TIMEOUT + '.', e); } - ConfigOption heartbeatIntervalOption = ConfigOptions - .key(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL) - .defaultValue(timeout.toString()); - - final String strHeartbeatInterval = configuration.getString(heartbeatIntervalOption); + final String strHeartbeatInterval = configuration.getString(AkkaOptions.AKKA_WATCH_HEARTBEAT_INTERVAL); final Time heartbeatInterval; try { heartbeatInterval = Time.milliseconds(Duration.apply(strHeartbeatInterval).toMillis()); } catch (NumberFormatException e) { throw new ConfigurationException("Could not parse the resource manager's heartbeat interval " + - "value " + timeoutOption + '.', e); + "value " + AkkaOptions.AKKA_WATCH_HEARTBEAT_INTERVAL + '.', e); + } + + final String strJobTimeout = configuration.getString(ResourceManagerOptions.JOB_TIMEOUT); + final Time jobTimeout; + + try { + jobTimeout = Time.milliseconds(Duration.apply(strJobTimeout).toMillis()); + } catch (NumberFormatException e) { + throw new ConfigurationException("Could not parse the resource manager's job timeout " + + "value " + ResourceManagerOptions.JOB_TIMEOUT + '.', e); } - return new ResourceManagerConfiguration(timeout, heartbeatInterval); + return new ResourceManagerConfiguration(timeout, heartbeatInterval, jobTimeout); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java index e0dee0bcb7e70..749b4075223bf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java @@ -54,7 +54,10 @@ public ResourceManagerRunner( final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration); final SlotManagerFactory slotManagerFactory = new DefaultSlotManager.Factory(); - final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(highAvailabilityServices); + final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService( + highAvailabilityServices, + rpcService.getScheduledExecutor(), + resourceManagerConfiguration.getJobTimeout()); this.resourceManager = new StandaloneResourceManager( rpcService, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java new file mode 100644 index 0000000000000..d5e99bd5b1a08 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; +import org.apache.flink.util.TestLogger; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.util.ArrayDeque; +import java.util.Arrays; +import java.util.Queue; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class JobLeaderIdServiceTest extends TestLogger { + + /** + * Tests adding a job and finding out its leader id + */ + @Test(timeout = 10000) + public void testAddingJob() throws Exception { + final JobID jobId = new JobID(); + final String address = "foobar"; + final UUID leaderId = UUID.randomUUID(); + TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); + TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(); + + highAvailabilityServices.setJobMasterLeaderRetriever(jobId, leaderRetrievalService); + + ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class); + Time timeout = Time.milliseconds(5000L); + JobLeaderIdActions jobLeaderIdActions = mock(JobLeaderIdActions.class); + + JobLeaderIdService jobLeaderIdService = new JobLeaderIdService( + highAvailabilityServices, + scheduledExecutor, + timeout); + + jobLeaderIdService.start(jobLeaderIdActions); + + jobLeaderIdService.addJob(jobId); + + Future leaderIdFuture = jobLeaderIdService.getLeaderId(jobId); + + // notify the leader id service about the new leader + leaderRetrievalService.notifyListener(address, leaderId); + + assertEquals(leaderId, leaderIdFuture.get()); + + assertTrue(jobLeaderIdService.containsJob(jobId)); + } + + /** + * Tests that removing a job completes the job leader id future exceptionally + */ + @Test(timeout = 10000) + public void testRemovingJob() throws Exception { + final JobID jobId = new JobID(); + TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); + TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(); + + highAvailabilityServices.setJobMasterLeaderRetriever(jobId, leaderRetrievalService); + + ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class); + Time timeout = Time.milliseconds(5000L); + JobLeaderIdActions jobLeaderIdActions = mock(JobLeaderIdActions.class); + + JobLeaderIdService jobLeaderIdService = new JobLeaderIdService( + highAvailabilityServices, + scheduledExecutor, + timeout); + + jobLeaderIdService.start(jobLeaderIdActions); + + jobLeaderIdService.addJob(jobId); + + Future leaderIdFuture = jobLeaderIdService.getLeaderId(jobId); + + // remove the job before we could find a leader + jobLeaderIdService.removeJob(jobId); + + assertFalse(jobLeaderIdService.containsJob(jobId)); + + try { + leaderIdFuture.get(); + + fail("The leader id future should be completed exceptionally."); + } catch (ExecutionException ignored) { + // expected exception + } + } + + /** + * Tests that the initial job registration registers a timeout which will call + * {@link JobLeaderIdActions#notifyJobTimeout(JobID, UUID)} when executed. + */ + @Test + public void testInitialJobTimeout() throws Exception { + final JobID jobId = new JobID(); + TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); + TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(); + + highAvailabilityServices.setJobMasterLeaderRetriever(jobId, leaderRetrievalService); + + ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class); + Time timeout = Time.milliseconds(5000L); + JobLeaderIdActions jobLeaderIdActions = mock(JobLeaderIdActions.class); + + JobLeaderIdService jobLeaderIdService = new JobLeaderIdService( + highAvailabilityServices, + scheduledExecutor, + timeout); + + jobLeaderIdService.start(jobLeaderIdActions); + + jobLeaderIdService.addJob(jobId); + + assertTrue(jobLeaderIdService.containsJob(jobId)); + + ArgumentCaptor runnableArgumentCaptor = ArgumentCaptor.forClass(Runnable.class); + verify(scheduledExecutor).schedule(runnableArgumentCaptor.capture(), anyLong(), any(TimeUnit.class)); + + Runnable timeoutRunnable = runnableArgumentCaptor.getValue(); + timeoutRunnable.run(); + + ArgumentCaptor timeoutIdArgumentCaptor = ArgumentCaptor.forClass(UUID.class); + + verify(jobLeaderIdActions, times(1)).notifyJobTimeout(eq(jobId), timeoutIdArgumentCaptor.capture()); + + assertTrue(jobLeaderIdService.isValidTimeout(jobId, timeoutIdArgumentCaptor.getValue())); + } + + /** + * Tests that a timeout get cancelled once a job leader has been found. Furthermore, it tests + * that a new timeout is registered after the jobmanager has lost leadership. + */ + @Test(timeout = 10000) + public void jobTimeoutAfterLostLeadership() throws Exception { + final JobID jobId = new JobID(); + final String address = "foobar"; + final UUID leaderId = UUID.randomUUID(); + TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); + TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(); + + highAvailabilityServices.setJobMasterLeaderRetriever(jobId, leaderRetrievalService); + + ScheduledFuture timeout1 = mock(ScheduledFuture.class); + ScheduledFuture timeout2 = mock(ScheduledFuture.class); + final Queue> timeoutQueue = new ArrayDeque<>(Arrays.asList(timeout1, timeout2)); + ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class); + + final AtomicReference lastRunnable = new AtomicReference<>(); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + lastRunnable.set((Runnable) invocation.getArguments()[0]); + + return timeoutQueue.poll(); + } + }).when(scheduledExecutor).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class)); + + Time timeout = Time.milliseconds(5000L); + JobLeaderIdActions jobLeaderIdActions = mock(JobLeaderIdActions.class); + + final AtomicReference lastTimeoutId = new AtomicReference<>(); + + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + lastTimeoutId.set((UUID) invocation.getArguments()[1]); + return null; + } + }).when(jobLeaderIdActions).notifyJobTimeout(eq(jobId), any(UUID.class)); + + JobLeaderIdService jobLeaderIdService = new JobLeaderIdService( + highAvailabilityServices, + scheduledExecutor, + timeout); + + jobLeaderIdService.start(jobLeaderIdActions); + + jobLeaderIdService.addJob(jobId); + + Future leaderIdFuture = jobLeaderIdService.getLeaderId(jobId); + + // notify the leader id service about the new leader + leaderRetrievalService.notifyListener(address, leaderId); + + assertEquals(leaderId, leaderIdFuture.get()); + + assertTrue(jobLeaderIdService.containsJob(jobId)); + + // check that the first timeout got cancelled + verify(timeout1, times(1)).cancel(anyBoolean()); + + verify(scheduledExecutor, times(1)).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class)); + + // initial timeout runnable which should no longer have an effect + Runnable runnable = lastRunnable.get(); + + assertNotNull(runnable); + + runnable.run(); + + verify(jobLeaderIdActions, times(1)).notifyJobTimeout(eq(jobId), any(UUID.class)); + + // the timeout should no longer be valid + assertFalse(jobLeaderIdService.isValidTimeout(jobId, lastTimeoutId.get())); + + // lose leadership + leaderRetrievalService.notifyListener("", null); + + verify(scheduledExecutor, times(2)).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class)); + + + // the second runnable should be the new timeout + runnable = lastRunnable.get(); + + assertNotNull(runnable); + + runnable.run(); + + verify(jobLeaderIdActions, times(2)).notifyJobTimeout(eq(jobId), any(UUID.class)); + + // the new timeout should be valid + assertTrue(jobLeaderIdService.isValidTimeout(jobId, lastTimeoutId.get())); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java index 2e52eeb101044..58dedc33f3ae8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java @@ -46,10 +46,16 @@ public void testGrantAndRevokeLeadership() throws Exception { TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService); - ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(Time.seconds(5L), Time.seconds(5L)); + ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration( + Time.seconds(5L), + Time.seconds(5L), + Time.minutes(5L)); SlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory(); MetricRegistry metricRegistry = mock(MetricRegistry.class); - JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(highAvailabilityServices); + JobLeaderIdService jobLeaderIdService = new JobLeaderIdService( + highAvailabilityServices, + rpcService.getScheduledExecutor(), + resourceManagerConfiguration.getJobTimeout()); TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); final ResourceManager resourceManager = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java index 2622634dfcb78..031f76ec753f5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java @@ -193,10 +193,16 @@ private ResourceManager createAndStartResourceManager( highAvailabilityServices.setResourceManagerLeaderElectionService(resourceManagerLeaderElectionService); highAvailabilityServices.setJobMasterLeaderRetriever(jobID, jobMasterLeaderRetrievalService); - ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(Time.seconds(5L), Time.seconds(5L)); + ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration( + Time.seconds(5L), + Time.seconds(5L), + Time.minutes(5L)); SlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory(); MetricRegistry metricRegistry = mock(MetricRegistry.class); - JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(highAvailabilityServices); + JobLeaderIdService jobLeaderIdService = new JobLeaderIdService( + highAvailabilityServices, + rpcService.getScheduledExecutor(), + resourceManagerConfiguration.getJobTimeout()); ResourceManager resourceManager = new StandaloneResourceManager( rpcService, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java index 10161817dc621..4456235f5aab0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java @@ -146,9 +146,15 @@ private StandaloneResourceManager createAndStartResourceManager(TestingLeaderEle TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService); TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory(); - ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(Time.seconds(5L), Time.seconds(5L)); + ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration( + Time.seconds(5L), + Time.seconds(5L), + Time.minutes(5L)); MetricRegistry metricRegistry = mock(MetricRegistry.class); - JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(highAvailabilityServices); + JobLeaderIdService jobLeaderIdService = new JobLeaderIdService( + highAvailabilityServices, + rpcService.getScheduledExecutor(), + resourceManagerConfiguration.getJobTimeout()); StandaloneResourceManager resourceManager = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java index a3ba436a74c34..1e5edbed364d8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java @@ -107,8 +107,15 @@ public void testSlotsUnavailableRequest() throws Exception { TestingLeaderElectionService rmLeaderElectionService = configureHA(testingHaServices, jobID, rmAddress, rmLeaderID, jmAddress, jmLeaderID); - ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(Time.seconds(5L), Time.seconds(5L)); - JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(testingHaServices); + ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration( + Time.seconds(5L), + Time.seconds(5L), + Time.minutes(5L)); + + JobLeaderIdService jobLeaderIdService = new JobLeaderIdService( + testingHaServices, + testRpcService.getScheduledExecutor(), + resourceManagerConfiguration.getJobTimeout()); final TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory(); SpiedResourceManager resourceManager = @@ -208,9 +215,15 @@ public void testSlotAvailableRequest() throws Exception { .thenReturn(new FlinkCompletableFuture()); testRpcService.registerGateway(tmAddress, taskExecutorGateway); - ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(Time.seconds(5L), Time.seconds(5L)); + ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration( + Time.seconds(5L), + Time.seconds(5L), + Time.minutes(5L)); - JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(testingHaServices); + JobLeaderIdService jobLeaderIdService = new JobLeaderIdService( + testingHaServices, + testRpcService.getScheduledExecutor(), + resourceManagerConfiguration.getJobTimeout()); TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory(); ResourceManager resourceManager = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java index 0f884f26ec505..898584c8df6a6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java @@ -93,9 +93,15 @@ public void testSlotAllocation() throws Exception { testingHAServices.setJobMasterLeaderRetriever(jobId, new TestingLeaderRetrievalService(jmAddress, jmLeaderId)); TestingSerialRpcService rpcService = new TestingSerialRpcService(); - ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(Time.milliseconds(500L), Time.milliseconds(500L)); + ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration( + Time.milliseconds(500L), + Time.milliseconds(500L), + Time.minutes(5L)); SlotManagerFactory slotManagerFactory = new DefaultSlotManager.Factory(); - JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(testingHAServices); + JobLeaderIdService jobLeaderIdService = new JobLeaderIdService( + testingHAServices, + rpcService.getScheduledExecutor(), + resourceManagerConfiguration.getJobTimeout()); MetricRegistry metricRegistry = mock(MetricRegistry.class); final TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration); diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java index e2aa6eccdca93..ddeb02e4fe768 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java @@ -187,7 +187,10 @@ protected RpcService createRpcService( private ResourceManager createResourceManager(Configuration config) throws Exception { final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(config); final SlotManagerFactory slotManagerFactory = new DefaultSlotManager.Factory(); - final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(haServices); + final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService( + haServices, + commonRpcService.getScheduledExecutor(), + resourceManagerConfiguration.getJobTimeout()); return new YarnResourceManager(config, ENV,