diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java index eed6f114faed3..702c05e87d138 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java @@ -40,9 +40,12 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -228,20 +231,27 @@ public void reconnect(final JobID jobId) { /** * Leader listener which tries to establish a connection to a newly detected job leader. */ + @ThreadSafe private final class JobManagerLeaderListener implements LeaderRetrievalListener { + private final Object lock = new Object(); + /** Job id identifying the job to look for a leader. */ private final JobID jobId; /** Rpc connection to the job leader. */ - private volatile RegisteredRpcConnection rpcConnection; + @GuardedBy("lock") + @Nullable + private RegisteredRpcConnection rpcConnection; + + /** Leader id of the current job leader. */ + @GuardedBy("lock") + @Nullable + private JobMasterId currentJobMasterId; /** State of the listener. */ private volatile boolean stopped; - /** Leader id of the current job leader. */ - private volatile JobMasterId currentJobMasterId; - private JobManagerLeaderListener(JobID jobId) { this.jobId = Preconditions.checkNotNull(jobId); @@ -250,91 +260,96 @@ private JobManagerLeaderListener(JobID jobId) { currentJobMasterId = null; } + private JobMasterId getCurrentJobMasterId() { + synchronized (lock) { + return currentJobMasterId; + } + } + public void stop() { - stopped = true; + synchronized (lock) { + if (!stopped) { + stopped = true; - if (rpcConnection != null) { - rpcConnection.close(); + closeRpcConnection(); + } } } public void reconnect() { - if (stopped) { - LOG.debug("Cannot reconnect because the JobManagerLeaderListener has already been stopped."); - } else { - final RegisteredRpcConnection currentRpcConnection = rpcConnection; - - if (currentRpcConnection != null) { - if (currentRpcConnection.isConnected()) { - - if (currentRpcConnection.tryReconnect()) { - // double check for concurrent stop operation - if (stopped) { - currentRpcConnection.close(); - } - } else { - LOG.debug("Could not reconnect to the JobMaster {}.", currentRpcConnection.getTargetAddress()); - } + synchronized (lock) { + if (stopped) { + LOG.debug("Cannot reconnect because the JobManagerLeaderListener has already been stopped."); + } else { + if (rpcConnection != null) { + Preconditions.checkState( + rpcConnection.tryReconnect(), + "Illegal concurrent modification of the JobManagerLeaderListener rpc connection."); } else { - LOG.debug("Ongoing registration to JobMaster {}.", currentRpcConnection.getTargetAddress()); + LOG.debug("Cannot reconnect to an unknown JobMaster."); } - } else { - LOG.debug("Cannot reconnect to an unknown JobMaster."); } } } @Override public void notifyLeaderAddress(final @Nullable String leaderAddress, final @Nullable UUID leaderId) { - if (stopped) { - LOG.debug("{}'s leader retrieval listener reported a new leader for job {}. " + - "However, the service is no longer running.", JobLeaderService.class.getSimpleName(), jobId); - } else { - final JobMasterId jobMasterId = JobMasterId.fromUuidOrNull(leaderId); + Optional jobManagerLostLeadership = Optional.empty(); - LOG.debug("New leader information for job {}. Address: {}, leader id: {}.", - jobId, leaderAddress, jobMasterId); - - if (leaderAddress == null || leaderAddress.isEmpty()) { - // the leader lost leadership but there is no other leader yet. - if (rpcConnection != null) { - rpcConnection.close(); - } - - jobLeaderListener.jobManagerLostLeadership(jobId, currentJobMasterId); - - currentJobMasterId = jobMasterId; + synchronized (lock) { + if (stopped) { + LOG.debug("{}'s leader retrieval listener reported a new leader for job {}. " + + "However, the service is no longer running.", JobLeaderService.class.getSimpleName(), jobId); } else { - currentJobMasterId = jobMasterId; + final JobMasterId jobMasterId = JobMasterId.fromUuidOrNull(leaderId); - if (rpcConnection != null) { - // check if we are already trying to connect to this leader - if (!Objects.equals(jobMasterId, rpcConnection.getTargetLeaderId())) { - rpcConnection.close(); - - rpcConnection = new JobManagerRegisteredRpcConnection( - LOG, - leaderAddress, - jobMasterId, - rpcService.getExecutor()); - } - } else { - rpcConnection = new JobManagerRegisteredRpcConnection( - LOG, - leaderAddress, - jobMasterId, - rpcService.getExecutor()); - } + LOG.debug("New leader information for job {}. Address: {}, leader id: {}.", + jobId, leaderAddress, jobMasterId); - // double check for a concurrent stop operation - if (stopped) { - rpcConnection.close(); + if (leaderAddress == null || leaderAddress.isEmpty()) { + // the leader lost leadership but there is no other leader yet. + jobManagerLostLeadership = Optional.ofNullable(currentJobMasterId); + closeRpcConnection(); } else { - LOG.info("Try to register at job manager {} with leader id {}.", leaderAddress, leaderId); - rpcConnection.start(); + // check whether we are already connecting to this leader + if (Objects.equals(jobMasterId, currentJobMasterId)) { + LOG.debug("Trying already connecting to leader of job {}. Ignoring duplicate leader information.", jobId); + } else { + closeRpcConnection(); + openRpcConnectionTo(leaderAddress, jobMasterId); + } } } } + + // send callbacks outside of the lock scope + jobManagerLostLeadership.ifPresent(oldJobMasterId -> jobLeaderListener.jobManagerLostLeadership(jobId, oldJobMasterId)); + } + + @GuardedBy("lock") + private void openRpcConnectionTo(String leaderAddress, JobMasterId jobMasterId) { + Preconditions.checkState( + currentJobMasterId == null && rpcConnection == null, + "Cannot open a new rpc connection if the previous connection has not been closed."); + + currentJobMasterId = jobMasterId; + rpcConnection = new JobManagerRegisteredRpcConnection( + LOG, + leaderAddress, + jobMasterId, + rpcService.getExecutor()); + + LOG.info("Try to register at job manager {} with leader id {}.", leaderAddress, jobMasterId.toUUID()); + rpcConnection.start(); + } + + @GuardedBy("lock") + private void closeRpcConnection() { + if (rpcConnection != null) { + rpcConnection.close(); + rpcConnection = null; + currentJobMasterId = null; + } } @Override @@ -378,7 +393,7 @@ protected RetryingRegistration instantiatedLeaderRetrievalServices = new ArrayBlockingQueue<>(numberOperations); + + final HighAvailabilityServices haServices = new TestingHighAvailabilityServicesBuilder() + .setJobMasterLeaderRetrieverFunction( + leaderForJobId -> { + final SettableLeaderRetrievalService leaderRetrievalService = new SettableLeaderRetrievalService(); + instantiatedLeaderRetrievalServices.offer(leaderRetrievalService); + return leaderRetrievalService; + }) + .build(); + + jobLeaderService.start( + "foobar", + rpcServiceResource.getTestingRpcService(), + haServices, + jobLeaderListener); + + final CheckedThread addJobAction = new CheckedThread() { + @Override + public void go() throws Exception { + for (int i = 0; i < numberOperations; i++) { + final JobID jobId = JobID.generate(); + jobLeaderService.addJob(jobId, "foobar"); + Thread.yield(); + jobLeaderService.removeJob(jobId); + } + } + }; + addJobAction.start(); + + for (int i = 0; i < numberOperations; i++) { + final SettableLeaderRetrievalService leaderRetrievalService = instantiatedLeaderRetrievalServices.take(); + leaderRetrievalService.notifyListener("foobar", UUID.randomUUID()); + } + + addJobAction.sync(); + } + + /** + * Tests that the JobLeaderService won't try to reconnect to JobMaster after it + * has lost the leadership. See FLINK-16836. + */ + @Test + public void doesNotReconnectAfterTargetLostLeadership() throws Exception { + final JobID jobId = new JobID(); + + final SettableLeaderRetrievalService leaderRetrievalService = new SettableLeaderRetrievalService(); + final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServicesBuilder() + .setJobMasterLeaderRetrieverFunction(ignored -> leaderRetrievalService) + .build(); + final TestingJobMasterGateway jobMasterGateway = registerJobMaster(); + + final OneShotLatch jobManagerGainedLeadership = new OneShotLatch(); + final TestingJobLeaderListener testingJobLeaderListener = new TestingJobLeaderListener(ignored -> jobManagerGainedLeadership.trigger()); + + final JobLeaderService jobLeaderService = createAndStartJobLeaderService(haServices, testingJobLeaderListener); + + try { + jobLeaderService.addJob(jobId, jobMasterGateway.getAddress()); + + leaderRetrievalService.notifyListener(jobMasterGateway.getAddress(), UUID.randomUUID()); + + jobManagerGainedLeadership.await(); + + // revoke the leadership + leaderRetrievalService.notifyListener(null, null); + testingJobLeaderListener.waitUntilJobManagerLostLeadership(); + + jobLeaderService.reconnect(jobId); + } finally { + jobLeaderService.stop(); + } + } + + /** + * Tests that the JobLeaderService can reconnect to an old leader which seemed + * to have lost the leadership in between. See FLINK-14316. + */ + @Test + public void canReconnectToOldLeaderWithSameLeaderAddress() throws Exception { + final JobID jobId = new JobID(); + + final SettableLeaderRetrievalService leaderRetrievalService = new SettableLeaderRetrievalService(); + final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServicesBuilder() + .setJobMasterLeaderRetrieverFunction(ignored -> leaderRetrievalService) + .build(); + + final TestingJobMasterGateway jobMasterGateway = registerJobMaster(); + + final BlockingQueue leadershipQueue = new ArrayBlockingQueue<>(1); + final TestingJobLeaderListener testingJobLeaderListener = new TestingJobLeaderListener(leadershipQueue::offer); + + final JobLeaderService jobLeaderService = createAndStartJobLeaderService(haServices, testingJobLeaderListener); + + try { + jobLeaderService.addJob(jobId, jobMasterGateway.getAddress()); + + final UUID leaderSessionId = UUID.randomUUID(); + leaderRetrievalService.notifyListener(jobMasterGateway.getAddress(), leaderSessionId); + + // wait for the first leadership + assertThat(leadershipQueue.take(), is(jobId)); + + // revoke the leadership + leaderRetrievalService.notifyListener(null, null); + + testingJobLeaderListener.waitUntilJobManagerLostLeadership(); + + leaderRetrievalService.notifyListener(jobMasterGateway.getAddress(), leaderSessionId); + + // check that we obtain the leadership a second time + assertThat(leadershipQueue.take(), is(jobId)); + } finally { + jobLeaderService.stop(); + } + } + + private JobLeaderService createAndStartJobLeaderService(HighAvailabilityServices haServices, JobLeaderListener testingJobLeaderListener) { + final JobLeaderService jobLeaderService = new JobLeaderService( + new LocalTaskManagerLocation(), + RetryingRegistrationConfiguration.defaultConfiguration()); + + jobLeaderService.start( + "foobar", + rpcServiceResource.getTestingRpcService(), + haServices, + testingJobLeaderListener); + return jobLeaderService; + } + + private TestingJobMasterGateway registerJobMaster() { + final TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder().build(); + rpcServiceResource.getTestingRpcService().registerGateway(jobMasterGateway.getAddress(), jobMasterGateway); + + return jobMasterGateway; + } + + private static final class TestingJobLeaderListener implements JobLeaderListener { + + private final CountDownLatch jobManagerLostLeadership = new CountDownLatch(1); + + private final Consumer jobManagerGainedLeadership; + + private TestingJobLeaderListener() { + this(ignored -> {}); + } + + private TestingJobLeaderListener(Consumer jobManagerGainedLeadership) { + this.jobManagerGainedLeadership = jobManagerGainedLeadership; + } + + @Override + public void jobManagerGainedLeadership(JobID jobId, JobMasterGateway jobManagerGateway, JMTMRegistrationSuccess registrationMessage) { + jobManagerGainedLeadership.accept(jobId); + } + + @Override + public void jobManagerLostLeadership(JobID jobId, JobMasterId jobMasterId) { + jobManagerLostLeadership.countDown(); + } + + @Override + public void handleError(Throwable throwable) { + // ignored + } + + private void waitUntilJobManagerLostLeadership() throws InterruptedException { + jobManagerLostLeadership.await(); + } + } +}