Skip to content

Commit

Permalink
[FLINK-19557] Trigger LeaderRetrievalListener notification upon ZooKe…
Browse files Browse the repository at this point in the history
…eper reconnection in ZooKeeperLeaderRetrievalService

We have to trigger the LeaderRetrievalListener notification upon reconnecting to ZooKeeper
because the NodeCache might not trigger the nodeChanged call if the server state is the
same as the state cached in the NodeCache. Therefore, we would miss to tell the listener
that the old leader (before the connection loss) is still the valid leader.
  • Loading branch information
tillrohrmann committed Oct 21, 2020
1 parent 05f67a5 commit a606f61
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 2 deletions.
Expand Up @@ -138,7 +138,11 @@ public void stop() throws Exception {
}

@Override
public void nodeChanged() throws Exception {
public void nodeChanged() {
retrieveLeaderInformationFromZooKeeper();
}

private void retrieveLeaderInformationFromZooKeeper() {
synchronized (lock) {
if (running) {
try {
Expand Down Expand Up @@ -170,7 +174,6 @@ public void nodeChanged() throws Exception {
notifyIfNewLeaderAddress(leaderAddress, leaderSessionID);
} catch (Exception e) {
leaderListener.handleError(new Exception("Could not handle node changed event.", e));
throw e;
}
} else {
LOG.debug("Ignoring node change notification since the service has already been stopped.");
Expand All @@ -192,6 +195,7 @@ protected void handleStateChange(ConnectionState newState) {
break;
case RECONNECTED:
LOG.info("Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.");
onReconnectedConnectionState();
break;
case LOST:
LOG.warn("Connection to ZooKeeper lost. Can no longer retrieve the leader from " +
Expand All @@ -203,6 +207,11 @@ protected void handleStateChange(ConnectionState newState) {
}
}

private void onReconnectedConnectionState() {
// check whether we find some new leader information in ZooKeeper
retrieveLeaderInformationFromZooKeeper();
}

@Override
public void unhandledError(String s, Throwable throwable) {
leaderListener.handleError(new FlinkException("Unhandled error in ZooKeeperLeaderRetrievalService:" + s, throwable));
Expand Down
Expand Up @@ -18,10 +18,12 @@

package org.apache.flink.runtime.leaderelection;

import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.util.TestLogger;

Expand All @@ -32,7 +34,9 @@
import org.junit.Before;
import org.junit.Test;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
Expand Down Expand Up @@ -117,6 +121,97 @@ public void testConnectionSuspendedHandling() throws Exception {
assertThat("The next result is expected to be null.", secondAddress.get(), is(nullValue()));
}

@Test
public void testSameLeaderAfterReconnectTriggersListenerNotification() throws Exception {
final String retrievalPath = "/testSameLeaderAfterReconnectTriggersListenerNotification/leaderAddress";
final ZooKeeperLeaderRetrievalService leaderRetrievalService = new ZooKeeperLeaderRetrievalService(zooKeeperClient, retrievalPath);

final QueueLeaderElectionListener queueLeaderElectionListener = new QueueLeaderElectionListener(2);
leaderRetrievalService.start(queueLeaderElectionListener);

final String leaderAddress = "foobar";
final UUID sessionId = UUID.randomUUID();
writeLeaderInformationToZooKeeper(retrievalPath, leaderAddress, sessionId);

// pop new leader
queueLeaderElectionListener.next();

testingServer.stop();

final CompletableFuture<String> connectionSuspension = queueLeaderElectionListener.next();

// wait until the ZK connection is suspended
connectionSuspension.join();

testingServer.restart();

// new old leader information should be announced
final CompletableFuture<String> connectionReconnect = queueLeaderElectionListener.next();
assertThat(connectionReconnect.get(), is(leaderAddress));
}

private void writeLeaderInformationToZooKeeper(
String retrievalPath,
String leaderAddress,
UUID sessionId) throws Exception {
final byte[] data = createLeaderInformation(leaderAddress, sessionId);
if (zooKeeperClient.checkExists().forPath(retrievalPath) != null) {
zooKeeperClient.setData().forPath(retrievalPath, data);
} else {
zooKeeperClient.create().creatingParentsIfNeeded().forPath(retrievalPath, data);
}
}

private byte[] createLeaderInformation(String leaderAddress, UUID sessionId) throws IOException {
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final ObjectOutputStream oos = new ObjectOutputStream(baos)) {

oos.writeUTF(leaderAddress);
oos.writeObject(sessionId);

oos.flush();

return baos.toByteArray();
}
}

@Test
public void testNewLeaderAfterReconnectTriggersListenerNotification() throws Exception {
final String retrievalPath = "/testNewLeaderAfterReconnectTriggersListenerNotification/leaderAddress";
final ZooKeeperLeaderRetrievalService leaderRetrievalService = new ZooKeeperLeaderRetrievalService(zooKeeperClient, retrievalPath);

final QueueLeaderElectionListener queueLeaderElectionListener = new QueueLeaderElectionListener(2);
leaderRetrievalService.start(queueLeaderElectionListener);

final String leaderAddress = "foobar";
final UUID sessionId = UUID.randomUUID();
writeLeaderInformationToZooKeeper(retrievalPath, leaderAddress, sessionId);

// pop new leader
queueLeaderElectionListener.next();

testingServer.stop();

final CompletableFuture<String> connectionSuspension = queueLeaderElectionListener.next();

// wait until the ZK connection is suspended
connectionSuspension.join();

testingServer.restart();

final String newLeaderAddress = "barfoo";
final UUID newSessionId = UUID.randomUUID();
writeLeaderInformationToZooKeeper(retrievalPath, newLeaderAddress, newSessionId);

// check that we find the new leader information eventually
CommonTestUtils.waitUntilCondition(
() -> {
final CompletableFuture<String> afterConnectionReconnect = queueLeaderElectionListener.next();
return afterConnectionReconnect.get().equals(newLeaderAddress);
},
Deadline.fromNow(Duration.ofSeconds(30L)));
}

private void closeTestServer() throws IOException {
if (testingServer != null) {
testingServer.close();
Expand Down

0 comments on commit a606f61

Please sign in to comment.