From 4b0bc85d066f8582b55d76092c391bad04bd48a5 Mon Sep 17 00:00:00 2001 From: randgalt Date: Mon, 31 Dec 2018 03:24:02 -0800 Subject: [PATCH 1/2] CURATOR-498 - include session ID in log message for injecting session expiration --- .../curator/framework/state/ConnectionStateManager.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java index 5e28b3d8f7..bbcb5c2d12 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java @@ -25,6 +25,7 @@ import org.apache.curator.framework.listen.ListenerContainer; import org.apache.curator.utils.Compatibility; import org.apache.curator.utils.ThreadUtils; +import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Closeable; @@ -302,10 +303,11 @@ private void checkSessionExpiration() if ( elapsedMs >= useSessionTimeoutMs ) { startOfSuspendedEpoch = System.currentTimeMillis(); // reset startOfSuspendedEpoch to avoid spinning on this session expiration injection CURATOR-405 - log.warn(String.format("Session timeout has elapsed while SUSPENDED. Injecting a session expiration. Elapsed ms: %d. Adjusted session timeout ms: %d", elapsedMs, useSessionTimeoutMs)); try { - Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper()); + ZooKeeper zooKeeper = client.getZookeeperClient().getZooKeeper(); + log.warn(String.format("Session timeout has elapsed while SUSPENDED. Injecting a session expiration. Elapsed ms: %d. Adjusted session timeout ms: %d. SessionId: 0x%s", elapsedMs, useSessionTimeoutMs, Long.toHexString(zooKeeper.getSessionId()))); + Compatibility.injectSessionExpiration(zooKeeper); } catch ( Exception e ) { From dafd091412a834a128c9882d2b9534d1a0ff7735 Mon Sep 17 00:00:00 2001 From: randgalt Date: Tue, 1 Jan 2019 22:34:41 -0500 Subject: [PATCH 2/2] CURATOR-498 "Protection" has a potential bug. If the connection is lost for long enough, Curator will want to kill the session. Session deletions must be handled by the Leader ZK instance. At the same time that the session kill is being processed, Curator's protection mode handling could be calling the follower that it's connected to get the current list of children - this can be handled directly by the follower instance without needing to call the leader. So, in this scenario, the client will get a list of children that includes the ZNode that will get deleted as part of killing the session. This bug has been in Curator since we added the protection feature to it more than 6 years ago. The fix is to include the session ID in the protection ID that is generated for the node name when the create mode is an ephemeral type. Then, if findProtectedNodeInForeground() finds the node in the use-case we've been discussing, it can compare the session ID to the current ZooKeeper handle's session ID and disregard the found node if they don't match. --- .../framework/imps/CreateBuilderImpl.java | 55 +++++++++---- ...indAndDeleteProtectedNodeInBackground.java | 2 +- .../framework/imps/TestFrameworkEdges.java | 82 ++++++++++++++++++- 3 files changed, 121 insertions(+), 18 deletions(-) diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java index ce82542f75..0108063433 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java @@ -674,7 +674,7 @@ public void processResult(int rc, String path, Object ctx, String name) }; client.getZooKeeper().create ( - operationAndData.getData().getPath(), + insertSessionIdIfNeeded(operationAndData.getData().getPath()), data, acling.getAclList(operationAndData.getData().getPath()), createMode, @@ -687,7 +687,7 @@ public void processResult(int rc, String path, Object ctx, String name) CreateZK35.create ( client.getZooKeeper(), - operationAndData.getData().getPath(), + insertSessionIdIfNeeded(operationAndData.getData().getPath()), data, acling.getAclList(operationAndData.getData().getPath()), createMode, @@ -784,9 +784,13 @@ public ProtectACLCreateModePathAndBytesable creatingParentContainersIfNe }; } - private static String getProtectedPrefix(String protectedId) + private static String getProtectedPrefix(String protectedId, long sessionId) { - return PROTECTED_PREFIX + protectedId + "-"; + if ( sessionId != 0 ) + { + return PROTECTED_PREFIX + protectedId + '-' + sessionId + '-'; + } + return PROTECTED_PREFIX + protectedId + '-'; } static void backgroundCreateParentsThenNode(final CuratorFrameworkImpl client, final OperationAndData mainOperationAndData, final String path, Backgrounding backgrounding, final InternalACLProvider aclProvider, final boolean createParentsAsContainers) @@ -1134,8 +1138,8 @@ void callPerformBackgroundOperation() throws Exception if ( failNextCreateForTesting ) { - pathInForeground(path, data, acling.getAclList(path)); // simulate success on server without notification to client failNextCreateForTesting = false; + pathInForeground(path, data, acling.getAclList(path)); // simulate success on server without notification to client throw new KeeperException.ConnectionLossException(); } @@ -1172,29 +1176,30 @@ public String call() throws Exception if ( createdPath == null ) { + String sessionAdjustedPath = insertSessionIdIfNeeded(path); try { if ( client.isZk34CompatibilityMode() ) { - createdPath = client.getZooKeeper().create(path, data, aclList, createMode); + createdPath = client.getZooKeeper().create(sessionAdjustedPath, data, aclList, createMode); } else { - createdPath = client.getZooKeeper().create(path, data, aclList, createMode, storingStat, ttl); + createdPath = client.getZooKeeper().create(sessionAdjustedPath, data, aclList, createMode, storingStat, ttl); } } catch ( KeeperException.NoNodeException e ) { if ( createParentsIfNeeded ) { - ZKPaths.mkdirs(client.getZooKeeper(), path, false, acling.getACLProviderForParents(), createParentsAsContainers); + ZKPaths.mkdirs(client.getZooKeeper(), sessionAdjustedPath, false, acling.getACLProviderForParents(), createParentsAsContainers); if ( client.isZk34CompatibilityMode() ) { - createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode); + createdPath = client.getZooKeeper().create(sessionAdjustedPath, data, acling.getAclList(sessionAdjustedPath), createMode); } else { - createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode, storingStat, ttl); + createdPath = client.getZooKeeper().create(sessionAdjustedPath, data, acling.getAclList(sessionAdjustedPath), createMode, storingStat, ttl); } } else @@ -1206,12 +1211,12 @@ public String call() throws Exception { if ( setDataIfExists ) { - Stat setStat = client.getZooKeeper().setData(path, data, setDataIfExistsVersion); + Stat setStat = client.getZooKeeper().setData(sessionAdjustedPath, data, setDataIfExistsVersion); if(storingStat != null) { DataTree.copyStat(setStat, storingStat); } - createdPath = path; + createdPath = sessionAdjustedPath; } else { @@ -1252,7 +1257,7 @@ public String call() throws Exception final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path); List children = client.getZooKeeper().getChildren(pathAndNode.getPath(), false); - foundNode = findNode(children, pathAndNode.getPath(), protectedId); + foundNode = findNode(children, pathAndNode.getPath(), protectedId, createMode.isEphemeral() ? client.getZooKeeper().getSessionId() : 0); } catch ( KeeperException.NoNodeException ignore ) { @@ -1273,7 +1278,7 @@ String adjustPath(String path) throws Exception if ( doProtected ) { ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path); - String name = getProtectedPrefix(protectedId) + pathAndNode.getNode(); + String name = getProtectedPrefix(protectedId, 0) + pathAndNode.getNode(); path = ZKPaths.makePath(pathAndNode.getPath(), name); } return path; @@ -1285,11 +1290,12 @@ String adjustPath(String path) throws Exception * @param children a list of candidates znodes * @param path the path * @param protectedId the protected id + * @param sessionId if a session id should be included, 0 if not * @return the absolute path of the znode or null if it is not found */ - static String findNode(final List children, final String path, final String protectedId) + static String findNode(final List children, final String path, final String protectedId, final long sessionId) { - final String protectedPrefix = getProtectedPrefix(protectedId); + final String protectedPrefix = getProtectedPrefix(protectedId, sessionId); String foundNode = Iterables.find ( children, @@ -1309,4 +1315,21 @@ public boolean apply(String node) } return foundNode; } + + private String insertSessionIdIfNeeded(String path) throws Exception + { + if ( doProtected && createMode.isEphemeral() ) + { + // per CURATOR-498 - it's been discovered that the protected mode search can discover + // a created ephemeral node that will get deleted due to session timeouts. To work around + // this include the session ID in the ZNode name and search for the node with the current + // session ID thus ignoring created ZNodes with stale session IDs. A UUID is still included + // in the node name so that the protection mode contract is maintained + ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path); + String unadjustedNode = pathAndNode.getNode().substring(PROTECTED_PREFIX.length() + protectedId.length() + 1); + String newNode = getProtectedPrefix(protectedId, client.getZooKeeper().getSessionId()) + unadjustedNode; + return ZKPaths.makePath(pathAndNode.getPath(), newNode); + } + return path; + } } diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/FindAndDeleteProtectedNodeInBackground.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/FindAndDeleteProtectedNodeInBackground.java index de91552c6c..928a550377 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/FindAndDeleteProtectedNodeInBackground.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/FindAndDeleteProtectedNodeInBackground.java @@ -81,7 +81,7 @@ public void processResult(int rc, String path, Object o, List strings, S if ( rc == KeeperException.Code.OK.intValue() ) { - final String node = CreateBuilderImpl.findNode(strings, "/", protectedId); // due to namespacing, don't let CreateBuilderImpl.findNode adjust the path + final String node = CreateBuilderImpl.findNode(strings, "/", protectedId, 0); // due to namespacing, don't let CreateBuilderImpl.findNode adjust the path if ( node != null ) { try diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java index a28d6c5504..1cd96707c3 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java @@ -30,11 +30,15 @@ import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.api.CuratorEventType; import org.apache.curator.framework.api.CuratorListener; +import org.apache.curator.framework.api.ErrorListenerPathAndBytesable; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.retry.RetryForever; import org.apache.curator.retry.RetryNTimes; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.BaseClassForTests; +import org.apache.curator.test.InstanceSpec; +import org.apache.curator.test.TestingCluster; import org.apache.curator.test.TestingServer; import org.apache.curator.test.compatibility.KillSession2; import org.apache.curator.test.compatibility.Timing2; @@ -50,11 +54,15 @@ import org.testng.Assert; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -62,7 +70,6 @@ public class TestFrameworkEdges extends BaseClassForTests { - private final Logger log = LoggerFactory.getLogger(getClass()); private final Timing2 timing = new Timing2(); @@ -72,6 +79,79 @@ public static void setUpClass() System.setProperty("zookeeper.extendedTypesEnabled", "true"); } + @Test + public void testProtectionWithKilledSession() throws Exception + { + server.stop(); // not needed + + // see CURATOR-498 + // attempt to re-create the state described in the bug report: create a 3 Instance ensemble; + // have Curator connect to only 1 one of those instances; set failNextCreateForTesting to + // simulate protection mode searching; kill the connected server when this happens; + // wait for session timeout to elapse and then restart the instance. In most cases + // this will cause the scenario as Curator will send the session cancel and do protection mode + // search around the same time. The protection mode search should return first as it can be resolved + // by the Instance Curator is connected to but the session kill needs a quorum vote (it's a + // transaction) + + try ( TestingCluster cluster = new TestingCluster(3) ) + { + cluster.start(); + InstanceSpec instanceSpec0 = cluster.getServers().get(0).getInstanceSpec(); + + CountDownLatch serverStoppedLatch = new CountDownLatch(1); + RetryPolicy retryPolicy = new RetryForever(100) + { + @Override + public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper) + { + if ( serverStoppedLatch.getCount() > 0 ) + { + try + { + cluster.killServer(instanceSpec0); + } + catch ( Exception e ) + { + // ignore + } + serverStoppedLatch.countDown(); + } + return super.allowRetry(retryCount, elapsedTimeMs, sleeper); + } + }; + + try (CuratorFramework client = CuratorFrameworkFactory.newClient(instanceSpec0.getConnectString(), timing.session(), timing.connection(), retryPolicy)) + { + BlockingQueue createdNode = new LinkedBlockingQueue<>(); + BackgroundCallback callback = (__, event) -> { + if ( event.getType() == CuratorEventType.CREATE ) + { + createdNode.offer(event.getPath()); + } + }; + + client.start(); + client.create().forPath("/test"); + + ErrorListenerPathAndBytesable builder = client.create().withProtection().withMode(CreateMode.EPHEMERAL).inBackground(callback); + ((CreateBuilderImpl)builder).failNextCreateForTesting = true; + + builder.forPath("/test/hey"); + + Assert.assertTrue(timing.awaitLatch(serverStoppedLatch)); + timing.forSessionSleep().sleep(); // wait for session to expire + cluster.restartServer(instanceSpec0); + + String path = timing.takeFromQueue(createdNode); + timing.sleepABit(); + List children = client.getChildren().forPath("/test"); + Assert.assertEquals(children, Collections.singletonList(ZKPaths.getNodeFromPath(path)), path + " is not equal to getChildren: " + children); + Assert.assertTrue(path.contains(Long.toString(client.getZookeeperClient().getZooKeeper().getSessionId()))); + } + } + } + @Test public void testBackgroundLatencyUnSleep() throws Exception {