Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-14316] Properly manage rpcConnection in JobManagerLeaderListener under leader change #11603

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -271,9 +271,7 @@ public void stop() {
if (!stopped) {
stopped = true;

if (rpcConnection != null) {
rpcConnection.close();
}
closeRpcConnection();
}
}
}
Expand Down Expand Up @@ -310,37 +308,16 @@ public void notifyLeaderAddress(@Nullable final String leaderAddress, @Nullable

if (leaderAddress == null || leaderAddress.isEmpty()) {
// the leader lost leadership but there is no other leader yet.
if (rpcConnection != null) {
rpcConnection.close();
rpcConnection = null;
}

jobManagerLostLeadership = Optional.ofNullable(currentJobMasterId);
currentJobMasterId = jobMasterId;
closeRpcConnection();
} else {
currentJobMasterId = jobMasterId;

if (rpcConnection != null) {
// check if we are already trying to connect to this leader
if (!Objects.equals(jobMasterId, rpcConnection.getTargetLeaderId())) {
rpcConnection.close();

rpcConnection = new JobManagerRegisteredRpcConnection(
LOG,
leaderAddress,
jobMasterId,
rpcService.getExecutor());
}
// check whether we are already connecting to this leader
if (Objects.equals(jobMasterId, currentJobMasterId)) {
LOG.debug("Trying already connecting to leader of job {}. Ignoring duplicate leader information.", jobId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reads a bit weird? How about "Ongoing attempt to connect to leader of job{}. ..."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Will update it.

} else {
rpcConnection = new JobManagerRegisteredRpcConnection(
LOG,
leaderAddress,
jobMasterId,
rpcService.getExecutor());
closeRpcConnection();
openRpcConnectionTo(leaderAddress, jobMasterId);
}

LOG.info("Try to register at job manager {} with leader id {}.", leaderAddress, leaderId);
rpcConnection.start();
}
}
}
Expand All @@ -349,6 +326,32 @@ public void notifyLeaderAddress(@Nullable final String leaderAddress, @Nullable
jobManagerLostLeadership.ifPresent(oldJobMasterId -> jobLeaderListener.jobManagerLostLeadership(jobId, oldJobMasterId));
}

@GuardedBy("lock")
private void openRpcConnectionTo(String leaderAddress, JobMasterId jobMasterId) {
Preconditions.checkState(
currentJobMasterId == null && rpcConnection == null,
"Cannot open a new rpc connection if the previous connection has not been closed.");

currentJobMasterId = jobMasterId;
rpcConnection = new JobManagerRegisteredRpcConnection(
LOG,
leaderAddress,
jobMasterId,
rpcService.getExecutor());

LOG.info("Try to register at job manager {} with leader id {}.", leaderAddress, jobMasterId.toUUID());
rpcConnection.start();
}

@GuardedBy("lock")
private void closeRpcConnection() {
if (rpcConnection != null) {
rpcConnection.close();
rpcConnection = null;
currentJobMasterId = null;
}
}

@Override
public void handleError(Exception exception) {
if (stopped) {
Expand Down
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.common.JobID;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServicesBuilder;
Expand All @@ -41,6 +42,10 @@
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;

import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;

/**
* Tests for the {@link JobLeaderService}.
Expand Down Expand Up @@ -106,34 +111,25 @@ public void go() throws Exception {
*/
@Test
public void doesNotReconnectAfterTargetLostLeadership() throws Exception {
final JobLeaderService jobLeaderService = new JobLeaderService(
new LocalUnresolvedTaskManagerLocation(),
RetryingRegistrationConfiguration.defaultConfiguration());

final JobID jobId = new JobID();

final SettableLeaderRetrievalService leaderRetrievalService = new SettableLeaderRetrievalService();
final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServicesBuilder()
.setJobMasterLeaderRetrieverFunction(ignored -> leaderRetrievalService)
.build();
final TestingJobMasterGateway jobMasterGateway = registerJobMaster();

final String jmAddress = "foobar";
final TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder().build();
rpcServiceResource.getTestingRpcService().registerGateway(jmAddress, jobMasterGateway);
final OneShotLatch jobManagerGainedLeadership = new OneShotLatch();
final TestingJobLeaderListener testingJobLeaderListener = new TestingJobLeaderListener(ignored -> jobManagerGainedLeadership.trigger());

final TestingJobLeaderListener testingJobLeaderListener = new TestingJobLeaderListener();
jobLeaderService.start(
"foobar",
rpcServiceResource.getTestingRpcService(),
haServices,
testingJobLeaderListener);
final JobLeaderService jobLeaderService = createAndStartJobLeaderService(haServices, testingJobLeaderListener);

try {
jobLeaderService.addJob(jobId, jmAddress);
jobLeaderService.addJob(jobId, jobMasterGateway.getAddress());

leaderRetrievalService.notifyListener(jmAddress, UUID.randomUUID());
leaderRetrievalService.notifyListener(jobMasterGateway.getAddress(), UUID.randomUUID());

testingJobLeaderListener.waitUntilJobManagerGainedLeadership();
jobManagerGainedLeadership.await();

// revoke the leadership
leaderRetrievalService.notifyListener(null, null);
Expand All @@ -145,14 +141,86 @@ public void doesNotReconnectAfterTargetLostLeadership() throws Exception {
}
}

/**
* Tests that the JobLeaderService can reconnect to an old leader which seemed
* to have lost the leadership in between. See FLINK-14316.
*/
@Test
public void canReconnectToOldLeaderWithSameLeaderAddress() throws Exception {
final JobID jobId = new JobID();

final SettableLeaderRetrievalService leaderRetrievalService = new SettableLeaderRetrievalService();
final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServicesBuilder()
.setJobMasterLeaderRetrieverFunction(ignored -> leaderRetrievalService)
.build();

final TestingJobMasterGateway jobMasterGateway = registerJobMaster();

final BlockingQueue<JobID> leadershipQueue = new ArrayBlockingQueue<>(1);
final TestingJobLeaderListener testingJobLeaderListener = new TestingJobLeaderListener(leadershipQueue::offer);

final JobLeaderService jobLeaderService = createAndStartJobLeaderService(haServices, testingJobLeaderListener);

try {
jobLeaderService.addJob(jobId, jobMasterGateway.getAddress());

final UUID leaderSessionId = UUID.randomUUID();
leaderRetrievalService.notifyListener(jobMasterGateway.getAddress(), leaderSessionId);

// wait for the first leadership
assertThat(leadershipQueue.take(), is(jobId));

// revoke the leadership
leaderRetrievalService.notifyListener(null, null);

testingJobLeaderListener.waitUntilJobManagerLostLeadership();

leaderRetrievalService.notifyListener(jobMasterGateway.getAddress(), leaderSessionId);

// check that we obtain the leadership a second time
assertThat(leadershipQueue.take(), is(jobId));
} finally {
jobLeaderService.stop();
}
}

private JobLeaderService createAndStartJobLeaderService(HighAvailabilityServices haServices, JobLeaderListener testingJobLeaderListener) {
final JobLeaderService jobLeaderService = new JobLeaderService(
new LocalUnresolvedTaskManagerLocation(),
RetryingRegistrationConfiguration.defaultConfiguration());

jobLeaderService.start(
"foobar",
rpcServiceResource.getTestingRpcService(),
haServices,
testingJobLeaderListener);
return jobLeaderService;
}

private TestingJobMasterGateway registerJobMaster() {
final TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder().build();
rpcServiceResource.getTestingRpcService().registerGateway(jobMasterGateway.getAddress(), jobMasterGateway);

return jobMasterGateway;
}

private static final class TestingJobLeaderListener implements JobLeaderListener {

private final CountDownLatch jobManagerGainedLeadership = new CountDownLatch(1);
private final CountDownLatch jobManagerLostLeadership = new CountDownLatch(1);

private final Consumer<JobID> jobManagerGainedLeadership;

private TestingJobLeaderListener() {
this(ignored -> {});
}

private TestingJobLeaderListener(Consumer<JobID> jobManagerGainedLeadership) {
this.jobManagerGainedLeadership = jobManagerGainedLeadership;
}

@Override
public void jobManagerGainedLeadership(JobID jobId, JobMasterGateway jobManagerGateway, JMTMRegistrationSuccess registrationMessage) {
jobManagerGainedLeadership.countDown();
jobManagerGainedLeadership.accept(jobId);
}

@Override
Expand All @@ -165,10 +233,6 @@ public void handleError(Throwable throwable) {
// ignored
}

private void waitUntilJobManagerGainedLeadership() throws InterruptedException {
jobManagerGainedLeadership.await();
}

private void waitUntilJobManagerLostLeadership() throws InterruptedException {
jobManagerLostLeadership.await();
}
Expand Down