From d5982b183652caf4fc72c9fdd6bdaaf2bf304b7c Mon Sep 17 00:00:00 2001 From: Viraj Jasani Date: Tue, 3 Aug 2021 11:14:00 +0530 Subject: [PATCH 01/13] HADOOP-17612. Upgrade Zookeeper to 3.6.3 and Curator to 5.2.0 (#3241) Signed-off-by: Akira Ajisaka --- LICENSE-binary | 9 +++++++-- hadoop-common-project/hadoop-auth/pom.xml | 9 +++++++++ hadoop-common-project/hadoop-common/pom.xml | 4 ++++ .../org/apache/hadoop/ha/ClientBaseWithFixes.java | 5 +---- .../hadoop/ha/TestZKFailoverControllerStress.java | 3 ++- hadoop-common-project/hadoop-registry/pom.xml | 11 +++++++++++ .../server/services/MicroZookeeperService.java | 2 +- hadoop-hdfs-project/hadoop-hdfs-nfs/pom.xml | 10 ++++++++++ hadoop-hdfs-project/hadoop-hdfs/pom.xml | 10 ++++++++++ hadoop-project/pom.xml | 4 ++-- .../hadoop-yarn-applications-mawo-core/pom.xml | 11 +++++++++++ .../hadoop-yarn/hadoop-yarn-client/pom.xml | 10 ++++++++++ .../hadoop-yarn-server-common/pom.xml | 9 +++++++++ .../hadoop-yarn-server-resourcemanager/pom.xml | 9 +++++++++ 14 files changed, 96 insertions(+), 10 deletions(-) diff --git a/LICENSE-binary b/LICENSE-binary index 84ca5078aa376..a6fafd2e145af 100644 --- a/LICENSE-binary +++ b/LICENSE-binary @@ -316,9 +316,15 @@ org.apache.commons:commons-lang3:3.12.0 org.apache.commons:commons-math3:3.1.1 org.apache.commons:commons-text:1.10.0 org.apache.commons:commons-validator:1.6 +<<<<<<< HEAD org.apache.curator:curator-client:4.2.0 org.apache.curator:curator-framework:4.2.0 org.apache.curator:curator-recipes:4.2.0 +======= +org.apache.curator:curator-client:5.2.0 +org.apache.curator:curator-framework:5.2.0 +org.apache.curator:curator-recipes:5.2.0 +>>>>>>> ccfa072dc77 (HADOOP-17612. Upgrade Zookeeper to 3.6.3 and Curator to 5.2.0 (#3241)) org.apache.geronimo.specs:geronimo-jcache_1.0_spec:1.0-alpha-1 org.apache.hbase:hbase-annotations:1.4.8 org.apache.hbase:hbase-client:1.4.8 @@ -345,8 +351,6 @@ org.apache.kerby:kerby-util:1.0.1 org.apache.kerby:kerby-xdr:1.0.1 org.apache.kerby:token-provider:1.0.1 org.apache.yetus:audience-annotations:0.5.0 -org.apache.zookeeper:zookeeper:3.5.6 -org.apache.zookeeper:zookeeper-jute:3.5.6 org.codehaus.jettison:jettison:1.5.1 org.eclipse.jetty:jetty-annotations:9.4.48.v20220622 org.eclipse.jetty:jetty-http:9.4.48.v20220622 @@ -362,6 +366,7 @@ org.eclipse.jetty:jetty-webapp:9.4.48.v20220622 org.eclipse.jetty:jetty-xml:9.4.48.v20220622 org.eclipse.jetty.websocket:javax-websocket-client-impl:9.4.48.v20220622 org.eclipse.jetty.websocket:javax-websocket-server-impl:9.4.48.v20220622 +org.apache.zookeeper:zookeeper:3.6.3 org.ehcache:ehcache:3.3.1 org.lz4:lz4-java:1.7.1 org.objenesis:objenesis:2.6 diff --git a/hadoop-common-project/hadoop-auth/pom.xml b/hadoop-common-project/hadoop-auth/pom.xml index 6405a8feba7c7..d93372908a956 100644 --- a/hadoop-common-project/hadoop-auth/pom.xml +++ b/hadoop-common-project/hadoop-auth/pom.xml @@ -128,6 +128,15 @@ org.apache.zookeeper zookeeper + + io.dropwizard.metrics + metrics-core + + + org.xerial.snappy + snappy-java + provided + org.apache.curator curator-framework diff --git a/hadoop-common-project/hadoop-common/pom.xml b/hadoop-common-project/hadoop-common/pom.xml index 2b0e7c8079fa0..f31d9afa3bf67 100644 --- a/hadoop-common-project/hadoop-common/pom.xml +++ b/hadoop-common-project/hadoop-common/pom.xml @@ -336,6 +336,10 @@ + + io.dropwizard.metrics + metrics-core + org.apache.zookeeper zookeeper diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java index 666000b2f48ae..7650c9fbf8e6a 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java @@ -41,7 +41,6 @@ import org.apache.zookeeper.ZKTestCase; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.server.ServerCnxnFactory; -import org.apache.zookeeper.server.ServerCnxnFactoryAccessor; import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.ZooKeeperServer; import org.apache.zookeeper.server.persistence.FileTxnLog; @@ -437,9 +436,7 @@ protected void stopServer() throws Exception { protected static ZooKeeperServer getServer(ServerCnxnFactory fac) { - ZooKeeperServer zs = ServerCnxnFactoryAccessor.getZkServer(fac); - - return zs; + return fac.getZooKeeperServer(); } protected void tearDownAll() throws Exception { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverControllerStress.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverControllerStress.java index bdbf1d9c2c286..4dcc74b86d151 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverControllerStress.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverControllerStress.java @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.util.Time; +import org.apache.zookeeper.server.ServerCnxn; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -131,7 +132,7 @@ public void testRandomHealthAndDisconnects() throws Exception { long st = Time.now(); while (Time.now() - st < runFor) { cluster.getTestContext().checkException(); - serverFactory.closeAll(); + serverFactory.closeAll(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN); Thread.sleep(50); } } diff --git a/hadoop-common-project/hadoop-registry/pom.xml b/hadoop-common-project/hadoop-registry/pom.xml index d7de0c49cc67a..204fbbe9da554 100644 --- a/hadoop-common-project/hadoop-registry/pom.xml +++ b/hadoop-common-project/hadoop-registry/pom.xml @@ -135,6 +135,17 @@ dnsjava + + io.dropwizard.metrics + metrics-core + + + + org.xerial.snappy + snappy-java + provided + + diff --git a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/services/MicroZookeeperService.java b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/services/MicroZookeeperService.java index 0ab4cd2f3bfac..994a2565c309a 100644 --- a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/services/MicroZookeeperService.java +++ b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/services/MicroZookeeperService.java @@ -229,7 +229,7 @@ protected void serviceStart() throws Exception { setupSecurity(); FileTxnSnapLog ftxn = new FileTxnSnapLog(dataDir, dataDir); - ZooKeeperServer zkServer = new ZooKeeperServer(ftxn, tickTime); + ZooKeeperServer zkServer = new ZooKeeperServer(ftxn, tickTime, ""); LOG.info("Starting Local Zookeeper service"); factory = ServerCnxnFactory.createFactory(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs-nfs/pom.xml index 03ac256ace86e..fbf73e123e783 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/pom.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/pom.xml @@ -83,6 +83,16 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd"> test-jar test + + io.dropwizard.metrics + metrics-core + provided + + + org.xerial.snappy + snappy-java + provided + org.apache.hadoop.thirdparty hadoop-shaded-guava diff --git a/hadoop-hdfs-project/hadoop-hdfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs/pom.xml index d0aac40646dfc..aef9bb8f35fde 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/pom.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/pom.xml @@ -62,6 +62,16 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd"> test-jar test + + io.dropwizard.metrics + metrics-core + provided + + + org.xerial.snappy + snappy-java + provided + org.apache.hadoop.thirdparty hadoop-shaded-guava diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index 034df4c037baa..17d6b598e6b00 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -99,8 +99,8 @@ ${hadoop-thirdparty-shaded-prefix}.protobuf ${hadoop-thirdparty-shaded-prefix}.com.google.common - 3.5.6 - 4.2.0 + 3.6.3 + 5.2.0 3.0.5 2.1.7 diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/pom.xml index 2e0781e352716..a19b87c87dc93 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/pom.xml @@ -104,6 +104,17 @@ + + io.dropwizard.metrics + metrics-core + + + + org.xerial.snappy + snappy-java + provided + + org.slf4j slf4j-api diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml index 0e52269041797..91c946baba2be 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml @@ -89,6 +89,16 @@ test-jar test + + io.dropwizard.metrics + metrics-core + provided + + + org.xerial.snappy + snappy-java + provided + junit junit diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml index 9e51d4ec048d9..6101467a4056e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml @@ -109,6 +109,15 @@ org.apache.zookeeper zookeeper + + io.dropwizard.metrics + metrics-core + + + org.xerial.snappy + snappy-java + provided + ${leveldbjni.group} leveldbjni-all diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml index b2e46b8b6c69d..566f6bcc6839a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml @@ -217,6 +217,15 @@ org.apache.zookeeper zookeeper + + io.dropwizard.metrics + metrics-core + + + org.xerial.snappy + snappy-java + provided + ${leveldbjni.group} leveldbjni-all From faa80dd397f9029439ffdcb0aa6eeeeed417cd62 Mon Sep 17 00:00:00 2001 From: Viraj Jasani Date: Sat, 7 Aug 2021 07:50:35 +0530 Subject: [PATCH 02/13] HADOOP-17835. Use CuratorCache implementation instead of PathChildrenCache / TreeCache (#3266) Signed-off-by: Akira Ajisaka --- .../ZKDelegationTokenSecretManager.java | 165 +++++++----------- .../TestZKDelegationTokenSecretManager.java | 19 -- .../client/impl/zk/CuratorService.java | 108 +++++------- .../server/dns/RegistryDNSServer.java | 3 +- 4 files changed, 114 insertions(+), 181 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java index c66e77ee47b50..27ef84b955501 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java @@ -23,12 +23,12 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Stream; import org.apache.curator.ensemble.fixed.FixedEnsembleProvider; import org.apache.curator.framework.CuratorFramework; @@ -37,10 +37,9 @@ import org.apache.curator.framework.api.ACLProvider; import org.apache.curator.framework.imps.DefaultACLProvider; import org.apache.curator.framework.recipes.cache.ChildData; -import org.apache.curator.framework.recipes.cache.PathChildrenCache; -import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode; -import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; -import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.framework.recipes.cache.CuratorCache; +import org.apache.curator.framework.recipes.cache.CuratorCacheBridge; +import org.apache.curator.framework.recipes.cache.CuratorCacheListener; import org.apache.curator.framework.recipes.shared.SharedCount; import org.apache.curator.framework.recipes.shared.VersionedValue; import org.apache.curator.retry.RetryNTimes; @@ -110,7 +109,7 @@ public abstract class ZKDelegationTokenSecretManager { + try { + processKeyAddOrUpdate(node.getData()); + } catch (IOException e) { + LOG.error("Error while processing Curator keyCacheListener " + + "NODE_CREATED / NODE_CHANGED event"); + throw new UncheckedIOException(e); } - } - }, listenerThreadPool); - loadFromZKCache(false); - } + }) + .forDeletes(childData -> processKeyRemoved(childData.getPath())) + .build(); + keyCache.listenable().addListener(keyCacheListener); + keyCache.start(); + loadFromZKCache(false); } catch (Exception e) { - throw new IOException("Could not start PathChildrenCache for keys", e); + throw new IOException("Could not start Curator keyCacheListener for keys", + e); } try { - tokenCache = new PathChildrenCache(zkClient, ZK_DTSM_TOKENS_ROOT, true); - if (tokenCache != null) { - tokenCache.start(StartMode.BUILD_INITIAL_CACHE); - tokenCache.getListenable().addListener(new PathChildrenCacheListener() { - - @Override - public void childEvent(CuratorFramework client, - PathChildrenCacheEvent event) throws Exception { - switch (event.getType()) { - case CHILD_ADDED: - processTokenAddOrUpdate(event.getData()); - break; - case CHILD_UPDATED: - processTokenAddOrUpdate(event.getData()); - break; - case CHILD_REMOVED: - processTokenRemoved(event.getData()); - break; - default: - break; + tokenCache = CuratorCache.bridgeBuilder(zkClient, ZK_DTSM_TOKENS_ROOT) + .build(); + CuratorCacheListener tokenCacheListener = CuratorCacheListener.builder() + .forCreatesAndChanges((oldNode, node) -> { + try { + processTokenAddOrUpdate(node.getData()); + } catch (IOException e) { + LOG.error("Error while processing Curator tokenCacheListener " + + "NODE_CREATED / NODE_CHANGED event"); + throw new UncheckedIOException(e); } - } - }, listenerThreadPool); - loadFromZKCache(true); - } + }) + .forDeletes(childData -> { + try { + processTokenRemoved(childData); + } catch (IOException e) { + LOG.error("Error while processing Curator tokenCacheListener " + + "NODE_DELETED event"); + throw new UncheckedIOException(e); + } + }) + .build(); + tokenCache.listenable().addListener(tokenCacheListener); + tokenCache.start(); + loadFromZKCache(true); } catch (Exception e) { - throw new IOException("Could not start PathChildrenCache for tokens", e); + throw new IOException( + "Could not start Curator tokenCacheListener for tokens", e); } super.startThreads(); } /** - * Load the PathChildrenCache into the in-memory map. Possible caches to be + * Load the CuratorCache into the in-memory map. Possible caches to be * loaded are keyCache and tokenCache. * * @param isTokenCache true if loading tokenCache, false if loading keyCache. @@ -365,29 +353,29 @@ public void childEvent(CuratorFramework client, private void loadFromZKCache(final boolean isTokenCache) { final String cacheName = isTokenCache ? "token" : "key"; LOG.info("Starting to load {} cache.", cacheName); - final List children; + final Stream children; if (isTokenCache) { - children = tokenCache.getCurrentData(); + children = tokenCache.stream(); } else { - children = keyCache.getCurrentData(); + children = keyCache.stream(); } - int count = 0; - for (ChildData child : children) { + final AtomicInteger count = new AtomicInteger(0); + children.forEach(childData -> { try { if (isTokenCache) { - processTokenAddOrUpdate(child); + processTokenAddOrUpdate(childData.getData()); } else { - processKeyAddOrUpdate(child.getData()); + processKeyAddOrUpdate(childData.getData()); } } catch (Exception e) { LOG.info("Ignoring node {} because it failed to load.", - child.getPath()); + childData.getPath()); LOG.debug("Failure exception:", e); - ++count; + count.getAndIncrement(); } - } - if (count > 0) { + }); + if (count.get() > 0) { LOG.warn("Ignored {} nodes while loading {} cache.", count, cacheName); } LOG.info("Loaded {} cache.", cacheName); @@ -417,8 +405,8 @@ private void processKeyRemoved(String path) { } } - private void processTokenAddOrUpdate(ChildData data) throws IOException { - ByteArrayInputStream bin = new ByteArrayInputStream(data.getData()); + private void processTokenAddOrUpdate(byte[] data) throws IOException { + ByteArrayInputStream bin = new ByteArrayInputStream(data); DataInputStream din = new DataInputStream(bin); TokenIdent ident = createIdentifier(); ident.readFields(din); @@ -487,20 +475,6 @@ public void stopThreads() { } catch (Exception e) { LOG.error("Could not stop Curator Framework", e); } - if (listenerThreadPool != null) { - listenerThreadPool.shutdown(); - try { - // wait for existing tasks to terminate - if (!listenerThreadPool.awaitTermination(shutdownTimeout, - TimeUnit.MILLISECONDS)) { - LOG.error("Forcing Listener threadPool to shutdown !!"); - listenerThreadPool.shutdownNow(); - } - } catch (InterruptedException ie) { - listenerThreadPool.shutdownNow(); - Thread.currentThread().interrupt(); - } - } } private void createPersistentNode(String nodePath) throws Exception { @@ -905,11 +879,6 @@ static String getNodePath(String root, String nodeName) { return (root + "/" + nodeName); } - @VisibleForTesting - public ExecutorService getListenerThreadPool() { - return listenerThreadPool; - } - @VisibleForTesting DelegationTokenInformation getTokenInfoFromMemory(TokenIdent ident) { return currentTokens.get(ident); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java index 50d3dcfee757b..77b7e57eba396 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java @@ -21,8 +21,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; import java.util.function.Supplier; import org.apache.curator.RetryPolicy; @@ -317,19 +315,13 @@ public void testCancelTokenSingleManager() throws Exception { @SuppressWarnings("rawtypes") protected void verifyDestroy(DelegationTokenManager tm, Configuration conf) throws Exception { - AbstractDelegationTokenSecretManager sm = - tm.getDelegationTokenSecretManager(); - ZKDelegationTokenSecretManager zksm = (ZKDelegationTokenSecretManager) sm; - ExecutorService es = zksm.getListenerThreadPool(); tm.destroy(); - Assert.assertTrue(es.isShutdown()); // wait for the pool to terminate long timeout = conf.getLong( ZKDelegationTokenSecretManager.ZK_DTSM_ZK_SHUTDOWN_TIMEOUT, ZKDelegationTokenSecretManager.ZK_DTSM_ZK_SHUTDOWN_TIMEOUT_DEFAULT); Thread.sleep(timeout * 3); - Assert.assertTrue(es.isTerminated()); } @SuppressWarnings({ "unchecked", "rawtypes" }) @@ -356,17 +348,6 @@ public void testStopThreads() throws Exception { (Token) tm1.createToken(UserGroupInformation.getCurrentUser(), "foo"); Assert.assertNotNull(token); - - AbstractDelegationTokenSecretManager sm = tm1.getDelegationTokenSecretManager(); - ZKDelegationTokenSecretManager zksm = (ZKDelegationTokenSecretManager)sm; - ExecutorService es = zksm.getListenerThreadPool(); - es.submit(new Callable() { - public Void call() throws Exception { - Thread.sleep(shutdownTimeoutMillis * 2); // force this to be shutdownNow - return null; - } - }); - tm1.destroy(); } diff --git a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/CuratorService.java b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/CuratorService.java index a01b7151b6989..3457fa28634a6 100644 --- a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/CuratorService.java +++ b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/CuratorService.java @@ -18,6 +18,9 @@ package org.apache.hadoop.registry.client.impl.zk; +import org.apache.curator.framework.recipes.cache.CuratorCache; +import org.apache.curator.framework.recipes.cache.CuratorCacheBridge; +import org.apache.curator.framework.recipes.cache.CuratorCacheListener; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; import org.apache.curator.ensemble.EnsembleProvider; @@ -28,9 +31,6 @@ import org.apache.curator.framework.api.CreateBuilder; import org.apache.curator.framework.api.DeleteBuilder; import org.apache.curator.framework.api.GetChildrenBuilder; -import org.apache.curator.framework.recipes.cache.TreeCache; -import org.apache.curator.framework.recipes.cache.TreeCacheEvent; -import org.apache.curator.framework.recipes.cache.TreeCacheListener; import org.apache.curator.retry.BoundedExponentialBackoffRetry; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -56,6 +56,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.List; /** @@ -109,9 +110,9 @@ public class CuratorService extends CompositeService private EnsembleProvider ensembleProvider; /** - * Registry tree cache. + * Registry Curator cache. */ - private TreeCache treeCache; + private CuratorCacheBridge curatorCacheBridge; /** * Construct the service. @@ -189,8 +190,8 @@ protected void serviceStart() throws Exception { protected void serviceStop() throws Exception { IOUtils.closeStream(curator); - if (treeCache != null) { - treeCache.close(); + if (curatorCacheBridge != null) { + curatorCacheBridge.close(); } super.serviceStop(); } @@ -824,73 +825,54 @@ protected String dumpRegistryRobustly(boolean verbose) { * * @param listener the listener. * @return a handle allowing for the management of the listener. - * @throws Exception if registration fails due to error. */ - public ListenerHandle registerPathListener(final PathListener listener) - throws Exception { - - final TreeCacheListener pathChildrenCacheListener = - new TreeCacheListener() { - - public void childEvent(CuratorFramework curatorFramework, - TreeCacheEvent event) - throws Exception { - String path = null; - if (event != null && event.getData() != null) { - path = event.getData().getPath(); - } - assert event != null; - switch (event.getType()) { - case NODE_ADDED: - LOG.info("Informing listener of added node {}", path); - listener.nodeAdded(path); - - break; - - case NODE_REMOVED: - LOG.info("Informing listener of removed node {}", path); - listener.nodeRemoved(path); - - break; - - case NODE_UPDATED: - LOG.info("Informing listener of updated node {}", path); - listener.nodeAdded(path); - - break; - - default: - // do nothing - break; - - } + public ListenerHandle registerPathListener(final PathListener listener) { + + CuratorCacheListener cacheListener = CuratorCacheListener.builder() + .forCreatesAndChanges((oldNode, node) -> { + final String path = node.getPath(); + LOG.info("Informing listener of added/updated node {}", path); + try { + listener.nodeAdded(path); + } catch (IOException e) { + LOG.error("Error while processing Curator listener " + + "NODE_CREATED / NODE_CHANGED event"); + throw new UncheckedIOException(e); } - }; - treeCache.getListenable().addListener(pathChildrenCacheListener); - - return new ListenerHandle() { - @Override - public void remove() { - treeCache.getListenable().removeListener(pathChildrenCacheListener); - } - }; + }) + .forDeletes(childData -> { + final String path = childData.getPath(); + LOG.info("Informing listener of removed node {}", path); + try { + listener.nodeRemoved(path); + } catch (IOException e) { + LOG.error("Error while processing Curator listener " + + "NODE_DELETED event"); + throw new UncheckedIOException(e); + } + }) + .build(); + curatorCacheBridge.listenable().addListener(cacheListener); + return () -> curatorCacheBridge.listenable().removeListener(cacheListener); } // TODO: should caches be stopped and then restarted if need be? /** - * Create the tree cache that monitors the registry for node addition, update, - * and deletion. - * - * @throws Exception if any issue arises during monitoring. + * Instantiate the Curator cache that monitors the registry for node + * addition, update and deletion. */ - public void monitorRegistryEntries() - throws Exception { + public void instantiateCacheForRegistry() { String registryPath = getConfig().get(RegistryConstants.KEY_REGISTRY_ZK_ROOT, RegistryConstants.DEFAULT_ZK_REGISTRY_ROOT); - treeCache = new TreeCache(curator, registryPath); - treeCache.start(); + curatorCacheBridge = CuratorCache.bridgeBuilder(curator, registryPath) + .build(); + } + + public void startCache() { + curatorCacheBridge.start(); } + } diff --git a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/dns/RegistryDNSServer.java b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/dns/RegistryDNSServer.java index 1ff5f26b47207..8d0a38cfd47f9 100644 --- a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/dns/RegistryDNSServer.java +++ b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/dns/RegistryDNSServer.java @@ -106,7 +106,7 @@ protected void serviceStart() throws Exception { private void manageRegistryDNS() { try { - registryOperations.monitorRegistryEntries(); + registryOperations.instantiateCacheForRegistry(); registryOperations.registerPathListener(new PathListener() { private String registryRoot = getConfig(). get(RegistryConstants.KEY_REGISTRY_ZK_ROOT, @@ -157,6 +157,7 @@ public void nodeRemoved(String path) throws IOException { } }); + registryOperations.startCache(); // create listener for record deletions From a9a99109fa5b911982ae168f0050734128a5c1cc Mon Sep 17 00:00:00 2001 From: Melissa You Date: Sat, 22 Oct 2022 19:21:50 -0700 Subject: [PATCH 03/13] remove dangling lines of merge conflict --- LICENSE-binary | 3 --- 1 file changed, 3 deletions(-) diff --git a/LICENSE-binary b/LICENSE-binary index a6fafd2e145af..bb1b1427737b1 100644 --- a/LICENSE-binary +++ b/LICENSE-binary @@ -316,15 +316,12 @@ org.apache.commons:commons-lang3:3.12.0 org.apache.commons:commons-math3:3.1.1 org.apache.commons:commons-text:1.10.0 org.apache.commons:commons-validator:1.6 -<<<<<<< HEAD org.apache.curator:curator-client:4.2.0 org.apache.curator:curator-framework:4.2.0 org.apache.curator:curator-recipes:4.2.0 -======= org.apache.curator:curator-client:5.2.0 org.apache.curator:curator-framework:5.2.0 org.apache.curator:curator-recipes:5.2.0 ->>>>>>> ccfa072dc77 (HADOOP-17612. Upgrade Zookeeper to 3.6.3 and Curator to 5.2.0 (#3241)) org.apache.geronimo.specs:geronimo-jcache_1.0_spec:1.0-alpha-1 org.apache.hbase:hbase-annotations:1.4.8 org.apache.hbase:hbase-client:1.4.8 From 42a4f8e518f0ed0bf7f86a39e360117ff296f230 Mon Sep 17 00:00:00 2001 From: lfengnan Date: Tue, 23 Jun 2020 13:12:29 -0700 Subject: [PATCH 04/13] HDFS-15383. RBF: Add support for router delegation token without watch (#2047) Improving router's performance for delegation tokens related operations. It achieves the goal by removing watchers from router on tokens since based on our experience. The huge number of watches inside Zookeeper is degrading Zookeeper's performance pretty hard. The current limit is about 1.2-1.5 million. --- .../AbstractDelegationTokenSecretManager.java | 6 +- .../ZKDelegationTokenSecretManager.java | 171 +++++++------ .../TestZKDelegationTokenSecretManager.java | 12 +- .../ZKDelegationTokenSecretManagerImpl.java | 174 ++++++++++++- ...estZKDelegationTokenSecretManagerImpl.java | 234 ++++++++++++++++++ 5 files changed, 505 insertions(+), 92 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/security/token/TestZKDelegationTokenSecretManagerImpl.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java index 56249960c7ce4..d7a38bf608529 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java @@ -23,11 +23,11 @@ import java.io.IOException; import java.security.MessageDigest; import java.util.Collection; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import javax.crypto.SecretKey; @@ -80,7 +80,7 @@ private String formatTokenId(TokenIdent id) { * to DelegationTokenInformation. Protected by this object lock. */ protected final Map currentTokens - = new HashMap(); + = new ConcurrentHashMap<>(); /** * Sequence number to create DelegationTokenIdentifier. @@ -92,7 +92,7 @@ private String formatTokenId(TokenIdent id) { * Access to allKeys is protected by this object lock */ protected final Map allKeys - = new HashMap(); + = new ConcurrentHashMap<>(); /** * Access to currentId is protected by this object lock. diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java index 27ef84b955501..dba9b2e6a0bf1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java @@ -53,6 +53,7 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager; import org.apache.hadoop.util.curator.ZKCuratorManager; +import static org.apache.hadoop.util.Time.now; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NoNodeException; @@ -77,7 +78,7 @@ public abstract class ZKDelegationTokenSecretManager extends AbstractDelegationTokenSecretManager { - private static final String ZK_CONF_PREFIX = "zk-dt-secret-manager."; + public static final String ZK_CONF_PREFIX = "zk-dt-secret-manager."; public static final String ZK_DTSM_ZK_NUM_RETRIES = ZK_CONF_PREFIX + "zkNumRetries"; public static final String ZK_DTSM_ZK_SESSION_TIMEOUT = ZK_CONF_PREFIX @@ -100,6 +101,9 @@ public abstract class ZKDelegationTokenSecretManager { - try { - processKeyAddOrUpdate(node.getData()); - } catch (IOException e) { - LOG.error("Error while processing Curator keyCacheListener " - + "NODE_CREATED / NODE_CHANGED event"); - throw new UncheckedIOException(e); - } - }) - .forDeletes(childData -> processKeyRemoved(childData.getPath())) - .build(); + keyCache = CuratorCache.bridgeBuilder(zkClient, ZK_DTSM_MASTER_KEY_ROOT).build(); + CuratorCacheListener keyCacheListener = CuratorCacheListener.builder().forCreatesAndChanges((oldNode, node) -> { + try { + processKeyAddOrUpdate(node.getData()); + } catch (IOException e) { + LOG.error("Error while processing Curator keyCacheListener " + "NODE_CREATED / NODE_CHANGED event"); + throw new UncheckedIOException(e); + } + }).forDeletes(childData -> processKeyRemoved(childData.getPath())).build(); keyCache.listenable().addListener(keyCacheListener); keyCache.start(); loadFromZKCache(false); } catch (Exception e) { - throw new IOException("Could not start Curator keyCacheListener for keys", - e); + throw new IOException("Could not start Curator keyCacheListener for keys", e); } - try { - tokenCache = CuratorCache.bridgeBuilder(zkClient, ZK_DTSM_TOKENS_ROOT) - .build(); - CuratorCacheListener tokenCacheListener = CuratorCacheListener.builder() - .forCreatesAndChanges((oldNode, node) -> { - try { - processTokenAddOrUpdate(node.getData()); - } catch (IOException e) { - LOG.error("Error while processing Curator tokenCacheListener " - + "NODE_CREATED / NODE_CHANGED event"); - throw new UncheckedIOException(e); - } - }) - .forDeletes(childData -> { - try { - processTokenRemoved(childData); - } catch (IOException e) { - LOG.error("Error while processing Curator tokenCacheListener " - + "NODE_DELETED event"); - throw new UncheckedIOException(e); - } - }) - .build(); - tokenCache.listenable().addListener(tokenCacheListener); - tokenCache.start(); - loadFromZKCache(true); - } catch (Exception e) { - throw new IOException( - "Could not start Curator tokenCacheListener for tokens", e); + if (isTokenWatcherEnabled) { + LOG.info("TokenCache is enabled"); + try { + tokenCache = CuratorCache.bridgeBuilder(zkClient, ZK_DTSM_TOKENS_ROOT).build(); + CuratorCacheListener tokenCacheListener = CuratorCacheListener.builder().forCreatesAndChanges((oldNode, node) -> { + try { + processTokenAddOrUpdate(node.getData()); + } catch (IOException e) { + LOG.error("Error while processing Curator tokenCacheListener " + "NODE_CREATED / NODE_CHANGED event"); + throw new UncheckedIOException(e); + } + }).forDeletes(childData -> { + try { + processTokenRemoved(childData); + } catch (IOException e) { + LOG.error("Error while processing Curator tokenCacheListener " + "NODE_DELETED event"); + throw new UncheckedIOException(e); + } + }).build(); + tokenCache.listenable().addListener(tokenCacheListener); + tokenCache.start(); + loadFromZKCache(true); + } catch (Exception e) { + throw new IOException("Could not start Curator tokenCacheListener for tokens", e); + } } super.startThreads(); - } - + } /** * Load the CuratorCache into the in-memory map. Possible caches to be * loaded are keyCache and tokenCache. @@ -386,9 +381,7 @@ private void processKeyAddOrUpdate(byte[] data) throws IOException { DataInputStream din = new DataInputStream(bin); DelegationKey key = new DelegationKey(); key.readFields(din); - synchronized (this) { - allKeys.put(key.getKeyId(), key); - } + allKeys.put(key.getKeyId(), key); } private void processKeyRemoved(String path) { @@ -398,14 +391,12 @@ private void processKeyRemoved(String path) { int j = tokSeg.indexOf('_'); if (j > 0) { int keyId = Integer.parseInt(tokSeg.substring(j + 1)); - synchronized (this) { - allKeys.remove(keyId); - } + allKeys.remove(keyId); } } } - private void processTokenAddOrUpdate(byte[] data) throws IOException { + protected TokenIdent processTokenAddOrUpdate(byte[] data) throws IOException { ByteArrayInputStream bin = new ByteArrayInputStream(data); DataInputStream din = new DataInputStream(bin); TokenIdent ident = createIdentifier(); @@ -417,12 +408,10 @@ private void processTokenAddOrUpdate(byte[] data) throws IOException { if (numRead > -1) { DelegationTokenInformation tokenInfo = new DelegationTokenInformation(renewDate, password); - synchronized (this) { - currentTokens.put(ident, tokenInfo); - // The cancel task might be waiting - notifyAll(); - } + currentTokens.put(ident, tokenInfo); + return ident; } + return null; } private void processTokenRemoved(ChildData data) throws IOException { @@ -430,11 +419,7 @@ private void processTokenRemoved(ChildData data) throws IOException { DataInputStream din = new DataInputStream(bin); TokenIdent ident = createIdentifier(); ident.readFields(din); - synchronized (this) { - currentTokens.remove(ident); - // The cancel task might be waiting - notifyAll(); - } + currentTokens.remove(ident); } @Override @@ -621,7 +606,7 @@ protected DelegationTokenInformation getTokenInfo(TokenIdent ident) { * * @param ident Identifier of the token */ - private synchronized void syncLocalCacheWithZk(TokenIdent ident) { + protected void syncLocalCacheWithZk(TokenIdent ident) { try { DelegationTokenInformation tokenInfo = getTokenInfoFromZK(ident); if (tokenInfo != null && !currentTokens.containsKey(ident)) { @@ -635,16 +620,21 @@ private synchronized void syncLocalCacheWithZk(TokenIdent ident) { } } - private DelegationTokenInformation getTokenInfoFromZK(TokenIdent ident) + protected DelegationTokenInformation getTokenInfoFromZK(TokenIdent ident) throws IOException { return getTokenInfoFromZK(ident, false); } - private DelegationTokenInformation getTokenInfoFromZK(TokenIdent ident, + protected DelegationTokenInformation getTokenInfoFromZK(TokenIdent ident, boolean quiet) throws IOException { String nodePath = getNodePath(ZK_DTSM_TOKENS_ROOT, DELEGATION_TOKEN_PREFIX + ident.getSequenceNumber()); + return getTokenInfoFromZK(nodePath, quiet); + } + + protected DelegationTokenInformation getTokenInfoFromZK(String nodePath, + boolean quiet) throws IOException { try { byte[] data = zkClient.getData().forPath(nodePath); if ((data == null) || (data.length == 0)) { @@ -779,15 +769,30 @@ protected void updateToken(TokenIdent ident, @Override protected void removeStoredToken(TokenIdent ident) throws IOException { + removeStoredToken(ident, false); + } + + protected void removeStoredToken(TokenIdent ident, + boolean checkAgainstZkBeforeDeletion) throws IOException { String nodeRemovePath = getNodePath(ZK_DTSM_TOKENS_ROOT, DELEGATION_TOKEN_PREFIX + ident.getSequenceNumber()); - if (LOG.isDebugEnabled()) { - LOG.debug("Removing ZKDTSMDelegationToken_" - + ident.getSequenceNumber()); - } try { - if (zkClient.checkExists().forPath(nodeRemovePath) != null) { + DelegationTokenInformation dtInfo = getTokenInfoFromZK(ident, true); + if (dtInfo != null) { + // For the case there is no sync or watch miss, it is possible that the + // local storage has expired tokens which have been renewed by peer + // so double check again to avoid accidental delete + if (checkAgainstZkBeforeDeletion + && dtInfo.getRenewDate() > now()) { + LOG.info("Node already renewed by peer " + nodeRemovePath + + " so this token should not be deleted"); + return; + } + if (LOG.isDebugEnabled()) { + LOG.debug("Removing ZKDTSMDelegationToken_" + + ident.getSequenceNumber()); + } while(zkClient.checkExists().forPath(nodeRemovePath) != null){ try { zkClient.delete().guaranteed().forPath(nodeRemovePath); @@ -810,7 +815,7 @@ protected void removeStoredToken(TokenIdent ident) } @Override - public synchronized TokenIdent cancelToken(Token token, + public TokenIdent cancelToken(Token token, String canceller) throws IOException { ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier()); DataInputStream in = new DataInputStream(buf); @@ -821,7 +826,7 @@ public synchronized TokenIdent cancelToken(Token token, return super.cancelToken(token, canceller); } - private void addOrUpdateToken(TokenIdent ident, + protected void addOrUpdateToken(TokenIdent ident, DelegationTokenInformation info, boolean isUpdate) throws Exception { String nodeCreatePath = getNodePath(ZK_DTSM_TOKENS_ROOT, DELEGATION_TOKEN_PREFIX @@ -848,6 +853,10 @@ private void addOrUpdateToken(TokenIdent ident, } } + public boolean isTokenWatcherEnabled() { + return isTokenWatcherEnabled; + } + /** * Simple implementation of an {@link ACLProvider} that simply returns an ACL * that gives all permissions only to a single principal. diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java index 77b7e57eba396..f821b4526bcf0 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java @@ -57,15 +57,15 @@ public class TestZKDelegationTokenSecretManager { private static final Logger LOG = LoggerFactory.getLogger(TestZKDelegationTokenSecretManager.class); - private static final int TEST_RETRIES = 2; + protected static final int TEST_RETRIES = 2; - private static final int RETRY_COUNT = 5; + protected static final int RETRY_COUNT = 5; - private static final int RETRY_WAIT = 1000; + protected static final int RETRY_WAIT = 1000; - private static final long DAY_IN_SECS = 86400; + protected static final long DAY_IN_SECS = 86400; - private TestingServer zkServer; + protected TestingServer zkServer; @Rule public Timeout globalTimeout = new Timeout(300000); @@ -406,7 +406,7 @@ private void verifyACL(CuratorFramework curatorFramework, // cancelled but.. that would mean having to make an RPC call for every // verification request. // Thus, the eventual consistency tradef-off should be acceptable here... - private void verifyTokenFail(DelegationTokenManager tm, + protected void verifyTokenFail(DelegationTokenManager tm, Token token) throws IOException, InterruptedException { verifyTokenFailWithRetry(tm, token, RETRY_COUNT); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/ZKDelegationTokenSecretManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/ZKDelegationTokenSecretManagerImpl.java index 4a111187ac46a..2d55026c807af 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/ZKDelegationTokenSecretManagerImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/ZKDelegationTokenSecretManagerImpl.java @@ -19,13 +19,26 @@ package org.apache.hadoop.hdfs.server.federation.router.security.token; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; import org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager; +import org.apache.hadoop.util.Time; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; /** * Zookeeper based router delegation token store implementation. @@ -33,24 +46,181 @@ public class ZKDelegationTokenSecretManagerImpl extends ZKDelegationTokenSecretManager { + public static final String ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL = + ZK_CONF_PREFIX + "router.token.sync.interval"; + public static final int ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL_DEFAULT = 5; + private static final Logger LOG = LoggerFactory.getLogger(ZKDelegationTokenSecretManagerImpl.class); - private Configuration conf = null; + private Configuration conf; + + private final ScheduledExecutorService scheduler = + Executors.newSingleThreadScheduledExecutor(); + + // Local cache of delegation tokens, used for deprecating tokens from + // currentTokenMap + private final Set localTokenCache = + new HashSet<>(); + // Native zk client for getting all tokens + private ZooKeeper zookeeper; + private final String TOKEN_PATH = "/" + zkClient.getNamespace() + + ZK_DTSM_TOKENS_ROOT; + // The flag used to issue an extra check before deletion + // Since cancel token and token remover thread use the same + // API here and one router could have a token that is renewed + // by another router, thus token remover should always check ZK + // to confirm whether it has been renewed or not + private ThreadLocal checkAgainstZkBeforeDeletion = + new ThreadLocal() { + @Override + protected Boolean initialValue() { + return true; + } + }; public ZKDelegationTokenSecretManagerImpl(Configuration conf) { super(conf); this.conf = conf; try { - super.startThreads(); + startThreads(); } catch (IOException e) { LOG.error("Error starting threads for zkDelegationTokens", e); } LOG.info("Zookeeper delegation token secret manager instantiated"); } + @Override + public void startThreads() throws IOException { + super.startThreads(); + // start token cache related work when watcher is disabled + if (!isTokenWatcherEnabled()) { + LOG.info("Watcher for tokens is disabled in this secret manager"); + try { + // By default set this variable + checkAgainstZkBeforeDeletion.set(true); + // Ensure the token root path exists + if (zkClient.checkExists().forPath(ZK_DTSM_TOKENS_ROOT) == null) { + zkClient.create().creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT) + .forPath(ZK_DTSM_TOKENS_ROOT); + } + // Set up zookeeper client + try { + zookeeper = zkClient.getZookeeperClient().getZooKeeper(); + } catch (Exception e) { + LOG.info("Cannot get zookeeper client ", e); + } finally { + if (zookeeper == null) { + throw new IOException("Zookeeper client is null"); + } + } + + LOG.info("Start loading token cache"); + long start = Time.now(); + rebuildTokenCache(true); + LOG.info("Loaded token cache in {} milliseconds", Time.now() - start); + + int syncInterval = conf.getInt(ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL, + ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL_DEFAULT); + scheduler.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + rebuildTokenCache(false); + } catch (Exception e) { + // ignore + } + } + }, syncInterval, syncInterval, TimeUnit.SECONDS); + } catch (Exception e) { + LOG.error("Error rebuilding local cache for zkDelegationTokens ", e); + } + } + } + + @Override + public void stopThreads() { + super.stopThreads(); + scheduler.shutdown(); + } + @Override public DelegationTokenIdentifier createIdentifier() { return new DelegationTokenIdentifier(); } + + /** + * This function will rebuild local token cache from zk storage. + * It is first called when the secret manager is initialized and + * then regularly at a configured interval. + * + * @param initial whether this is called during initialization + * @throws IOException + */ + private void rebuildTokenCache(boolean initial) throws IOException { + localTokenCache.clear(); + // Use bare zookeeper client to get all children since curator will + // wrap the same API with a sorting process. This is time consuming given + // millions of tokens + List zkTokens; + try { + zkTokens = zookeeper.getChildren(TOKEN_PATH, false); + } catch (KeeperException | InterruptedException e) { + throw new IOException("Tokens cannot be fetched from path " + + TOKEN_PATH, e); + } + byte[] data; + for (String tokenPath : zkTokens) { + try { + data = zkClient.getData().forPath( + ZK_DTSM_TOKENS_ROOT + "/" + tokenPath); + } catch (KeeperException.NoNodeException e) { + LOG.debug("No node in path [" + tokenPath + "]"); + continue; + } catch (Exception ex) { + throw new IOException(ex); + } + // Store data to currentTokenMap + AbstractDelegationTokenIdentifier ident = processTokenAddOrUpdate(data); + // Store data to localTokenCache for sync + localTokenCache.add(ident); + } + if (!initial) { + // Sync zkTokens with local cache, specifically + // 1) add/update tokens to local cache from zk, which is done through + // processTokenAddOrUpdate above + // 2) remove tokens in local cache but not in zk anymore + for (AbstractDelegationTokenIdentifier ident : currentTokens.keySet()) { + if (!localTokenCache.contains(ident)) { + currentTokens.remove(ident); + } + } + } + } + + @Override + public AbstractDelegationTokenIdentifier cancelToken( + Token token, String canceller) + throws IOException { + checkAgainstZkBeforeDeletion.set(false); + AbstractDelegationTokenIdentifier ident = super.cancelToken(token, + canceller); + checkAgainstZkBeforeDeletion.set(true); + return ident; + } + + @Override + protected void removeStoredToken(AbstractDelegationTokenIdentifier ident) + throws IOException { + super.removeStoredToken(ident, checkAgainstZkBeforeDeletion.get()); + } + + @Override + protected void addOrUpdateToken(AbstractDelegationTokenIdentifier ident, + DelegationTokenInformation info, boolean isUpdate) throws Exception { + // Store the data in local memory first + currentTokens.put(ident, info); + super.addOrUpdateToken(ident, info, isUpdate); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/security/token/TestZKDelegationTokenSecretManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/security/token/TestZKDelegationTokenSecretManagerImpl.java new file mode 100644 index 0000000000000..3c7f8e88a91d1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/security/token/TestZKDelegationTokenSecretManagerImpl.java @@ -0,0 +1,234 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation.security.token; + +import static org.apache.hadoop.hdfs.server.federation.router.security.token.ZKDelegationTokenSecretManagerImpl.ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL; +import static org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager.ZK_DTSM_TOKEN_WATCHER_ENABLED; +import static org.apache.hadoop.security.token.delegation.web.DelegationTokenManager.REMOVAL_SCAN_INTERVAL; +import static org.apache.hadoop.security.token.delegation.web.DelegationTokenManager.RENEW_INTERVAL; +import static org.junit.Assert.fail; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.router.security.token.ZKDelegationTokenSecretManagerImpl; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.SecretManager; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.delegation.TestZKDelegationTokenSecretManager; +import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier; +import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager; +import org.apache.hadoop.util.Time; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestZKDelegationTokenSecretManagerImpl + extends TestZKDelegationTokenSecretManager { + private static final Logger LOG = + LoggerFactory.getLogger(TestZKDelegationTokenSecretManagerImpl.class); + + @SuppressWarnings("unchecked") + @Test + public void testMultiNodeOperationWithoutWatch() throws Exception { + String connectString = zkServer.getConnectString(); + Configuration conf = getSecretConf(connectString); + // disable watch + conf.setBoolean(ZK_DTSM_TOKEN_WATCHER_ENABLED, false); + conf.setInt(ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL, 3); + + for (int i = 0; i < TEST_RETRIES; i++) { + ZKDelegationTokenSecretManagerImpl dtsm1 = + new ZKDelegationTokenSecretManagerImpl(conf); + ZKDelegationTokenSecretManagerImpl dtsm2 = + new ZKDelegationTokenSecretManagerImpl(conf); + DelegationTokenManager tm1, tm2; + tm1 = new DelegationTokenManager(conf, new Text("bla")); + tm1.setExternalDelegationTokenSecretManager(dtsm1); + tm2 = new DelegationTokenManager(conf, new Text("bla")); + tm2.setExternalDelegationTokenSecretManager(dtsm2); + + // common token operation without watchers should still be working + Token token = + (Token) tm1.createToken( + UserGroupInformation.getCurrentUser(), "foo"); + Assert.assertNotNull(token); + tm2.verifyToken(token); + tm2.renewToken(token, "foo"); + tm1.verifyToken(token); + tm1.cancelToken(token, "foo"); + try { + verifyTokenFail(tm2, token); + fail("Expected InvalidToken"); + } catch (SecretManager.InvalidToken it) { + // Ignore + } + + token = (Token) tm2.createToken( + UserGroupInformation.getCurrentUser(), "bar"); + Assert.assertNotNull(token); + tm1.verifyToken(token); + tm1.renewToken(token, "bar"); + tm2.verifyToken(token); + tm2.cancelToken(token, "bar"); + try { + verifyTokenFail(tm1, token); + fail("Expected InvalidToken"); + } catch (SecretManager.InvalidToken it) { + // Ignore + } + + dtsm1.stopThreads(); + dtsm2.stopThreads(); + verifyDestroy(tm1, conf); + verifyDestroy(tm2, conf); + } + } + + @Test + public void testMultiNodeTokenRemovalShortSyncWithoutWatch() + throws Exception { + String connectString = zkServer.getConnectString(); + Configuration conf = getSecretConf(connectString); + // disable watch + conf.setBoolean(ZK_DTSM_TOKEN_WATCHER_ENABLED, false); + // make sync quick + conf.setInt(ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL, 3); + // set the renew window and removal interval to be a + // short time to trigger the background cleanup + conf.setInt(RENEW_INTERVAL, 10); + conf.setInt(REMOVAL_SCAN_INTERVAL, 10); + + for (int i = 0; i < TEST_RETRIES; i++) { + ZKDelegationTokenSecretManagerImpl dtsm1 = + new ZKDelegationTokenSecretManagerImpl(conf); + ZKDelegationTokenSecretManagerImpl dtsm2 = + new ZKDelegationTokenSecretManagerImpl(conf); + DelegationTokenManager tm1, tm2; + tm1 = new DelegationTokenManager(conf, new Text("bla")); + tm1.setExternalDelegationTokenSecretManager(dtsm1); + tm2 = new DelegationTokenManager(conf, new Text("bla")); + tm2.setExternalDelegationTokenSecretManager(dtsm2); + + // time: X + // token expiry time: + // tm1: X + 10 + // tm2: X + 10 + Token token = + (Token) tm1.createToken( + UserGroupInformation.getCurrentUser(), "foo"); + Assert.assertNotNull(token); + tm2.verifyToken(token); + + // time: X + 9 + // token expiry time: + // tm1: X + 10 + // tm2: X + 19 + Thread.sleep(9 * 1000); + tm2.renewToken(token, "foo"); + tm1.verifyToken(token); + + // time: X + 13 + // token expiry time: (sync happened) + // tm1: X + 19 + // tm2: X + 19 + Thread.sleep(4 * 1000); + tm1.verifyToken(token); + tm2.verifyToken(token); + + dtsm1.stopThreads(); + dtsm2.stopThreads(); + verifyDestroy(tm1, conf); + verifyDestroy(tm2, conf); + } + } + + // This is very unlikely to happen in real case, but worth putting + // the case out + @Test + public void testMultiNodeTokenRemovalLongSyncWithoutWatch() + throws Exception { + String connectString = zkServer.getConnectString(); + Configuration conf = getSecretConf(connectString); + // disable watch + conf.setBoolean(ZK_DTSM_TOKEN_WATCHER_ENABLED, false); + // make sync quick + conf.setInt(ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL, 20); + // set the renew window and removal interval to be a + // short time to trigger the background cleanup + conf.setInt(RENEW_INTERVAL, 10); + conf.setInt(REMOVAL_SCAN_INTERVAL, 10); + + for (int i = 0; i < TEST_RETRIES; i++) { + ZKDelegationTokenSecretManagerImpl dtsm1 = + new ZKDelegationTokenSecretManagerImpl(conf); + ZKDelegationTokenSecretManagerImpl dtsm2 = + new ZKDelegationTokenSecretManagerImpl(conf); + ZKDelegationTokenSecretManagerImpl dtsm3 = + new ZKDelegationTokenSecretManagerImpl(conf); + DelegationTokenManager tm1, tm2, tm3; + tm1 = new DelegationTokenManager(conf, new Text("bla")); + tm1.setExternalDelegationTokenSecretManager(dtsm1); + tm2 = new DelegationTokenManager(conf, new Text("bla")); + tm2.setExternalDelegationTokenSecretManager(dtsm2); + tm3 = new DelegationTokenManager(conf, new Text("bla")); + tm3.setExternalDelegationTokenSecretManager(dtsm3); + + // time: X + // token expiry time: + // tm1: X + 10 + // tm2: X + 10 + // tm3: No token due to no sync + Token token = + (Token) tm1.createToken( + UserGroupInformation.getCurrentUser(), "foo"); + Assert.assertNotNull(token); + tm2.verifyToken(token); + + // time: X + 9 + // token expiry time: + // tm1: X + 10 + // tm2: X + 19 + // tm3: No token due to no sync + Thread.sleep(9 * 1000); + long renewalTime = tm2.renewToken(token, "foo"); + LOG.info("Renew for token {} at current time {} renewal time {}", + token.getIdentifier(), Time.formatTime(Time.now()), + Time.formatTime(renewalTime)); + tm1.verifyToken(token); + + // time: X + 13 + // token expiry time: (sync din't happen) + // tm1: X + 10 + // tm2: X + 19 + // tm3: X + 19 due to fetch from zk + Thread.sleep(4 * 1000); + tm2.verifyToken(token); + tm3.verifyToken(token); + + dtsm1.stopThreads(); + dtsm2.stopThreads(); + dtsm3.stopThreads(); + verifyDestroy(tm1, conf); + verifyDestroy(tm2, conf); + verifyDestroy(tm3, conf); + } + } + +} From b0f6cc135eb7fe9a1ab726f8b15cfda9c1100625 Mon Sep 17 00:00:00 2001 From: Melissa You Date: Sun, 23 Oct 2022 14:08:17 -0700 Subject: [PATCH 05/13] fix checkstyle --- .../src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java index 7650c9fbf8e6a..eca860b80aecf 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java @@ -436,7 +436,7 @@ protected void stopServer() throws Exception { protected static ZooKeeperServer getServer(ServerCnxnFactory fac) { - return fac.getZooKeeperServer(); + return fac.getZooKeeperServer(); } protected void tearDownAll() throws Exception { From 7f8300c5d84ed1f314001836700f23f9045d33cf Mon Sep 17 00:00:00 2001 From: slfan1989 <55643692+slfan1989@users.noreply.github.com> Date: Fri, 9 Sep 2022 02:41:21 +0800 Subject: [PATCH 06/13] HADOOP-18427. Improve ZKDelegationTokenSecretManager#startThead With recommended methods. (#4812) --- .../ZKDelegationTokenSecretManager.java | 9 ++++-- .../TestZKDelegationTokenSecretManager.java | 29 +++++++++++++++++++ 2 files changed, 35 insertions(+), 3 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java index dba9b2e6a0bf1..ee6b2050df457 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java @@ -43,7 +43,6 @@ import org.apache.curator.framework.recipes.shared.SharedCount; import org.apache.curator.framework.recipes.shared.VersionedValue; import org.apache.curator.retry.RetryNTimes; -import org.apache.curator.utils.EnsurePath; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; @@ -135,6 +134,11 @@ public static void setCurator(CuratorFramework curator) { CURATOR_TL.set(curator); } + @VisibleForTesting + protected static CuratorFramework getCurator() { + return CURATOR_TL.get(); + } + private final boolean isExternalClient; protected final CuratorFramework zkClient; private SharedCount delTokSeqCounter; @@ -261,9 +265,8 @@ public void startThreads() throws IOException { // If namespace parents are implicitly created, they won't have ACLs. // So, let's explicitly create them. CuratorFramework nullNsFw = zkClient.usingNamespace(null); - EnsurePath ensureNs = nullNsFw.newNamespaceAwareEnsurePath("/" + zkClient.getNamespace()); try { - ensureNs.ensure(nullNsFw.getZookeeperClient()); + nullNsFw.create().creatingParentContainersIfNeeded().forPath("/" + zkClient.getNamespace()); } catch (Exception e) { throw new IOException("Could not create namespace", e); } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java index f821b4526bcf0..47173b250f92f 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java @@ -40,6 +40,7 @@ import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Id; +import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.server.auth.DigestAuthenticationProvider; import org.junit.After; import org.junit.Assert; @@ -506,4 +507,32 @@ public Boolean get() { } }, 1000, 5000); } + + @Test + public void testCreatingParentContainersIfNeeded() throws Exception { + + String connectString = zkServer.getConnectString(); + RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); + Configuration conf = getSecretConf(connectString); + CuratorFramework curatorFramework = + CuratorFrameworkFactory.builder() + .connectString(connectString) + .retryPolicy(retryPolicy) + .build(); + curatorFramework.start(); + ZKDelegationTokenSecretManager.setCurator(curatorFramework); + DelegationTokenManager tm1 = new DelegationTokenManager(conf, new Text("foo")); + + // When the init method is called, + // the ZKDelegationTokenSecretManager#startThread method will be called, + // and the creatingParentContainersIfNeeded will be called to create the nameSpace. + tm1.init(); + + String workingPath = "/" + conf.get(ZKDelegationTokenSecretManager.ZK_DTSM_ZNODE_WORKING_PATH, + ZKDelegationTokenSecretManager.ZK_DTSM_ZNODE_WORKING_PATH_DEAFULT) + "/ZKDTSMRoot"; + + // Check if the created NameSpace exists. + Stat stat = curatorFramework.checkExists().forPath(workingPath); + Assert.assertNotNull(stat); + } } From bf18e6d9b0b258eb84f18ee60732faa286a3a8dd Mon Sep 17 00:00:00 2001 From: slfan1989 <55643692+slfan1989@users.noreply.github.com> Date: Thu, 15 Sep 2022 00:13:58 +0800 Subject: [PATCH 07/13] HADOOP-18452. Fix TestKMS#testKMSHAZooKeeperDelegationToken Failed By Hadoop-18427. (#4885) --- .../ZKDelegationTokenSecretManager.java | 7 +++- .../TestZKDelegationTokenSecretManager.java | 37 +++++++++++++++++++ 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java index ee6b2050df457..b95a31b16d3e6 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java @@ -60,6 +60,7 @@ import org.apache.zookeeper.client.ZKClientConfig; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Id; +import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -266,7 +267,11 @@ public void startThreads() throws IOException { // So, let's explicitly create them. CuratorFramework nullNsFw = zkClient.usingNamespace(null); try { - nullNsFw.create().creatingParentContainersIfNeeded().forPath("/" + zkClient.getNamespace()); + String nameSpace = "/" + zkClient.getNamespace(); + Stat stat = nullNsFw.checkExists().forPath(nameSpace); + if (stat == null) { + nullNsFw.create().creatingParentContainersIfNeeded().forPath(nameSpace); + } } catch (Exception e) { throw new IOException("Could not create namespace", e); } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java index 47173b250f92f..f94e04a1f40fd 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java @@ -27,6 +27,8 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.ACLProvider; +import org.apache.curator.framework.api.CreateBuilder; +import org.apache.curator.framework.api.ProtectACLCreateModeStatPathAndBytesable; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.test.TestingServer; import org.apache.hadoop.conf.Configuration; @@ -37,6 +39,8 @@ import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier; import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.LambdaTestUtils; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Id; @@ -534,5 +538,38 @@ public void testCreatingParentContainersIfNeeded() throws Exception { // Check if the created NameSpace exists. Stat stat = curatorFramework.checkExists().forPath(workingPath); Assert.assertNotNull(stat); + + tm1.destroy(); + curatorFramework.close(); + } + + @Test + public void testCreateNameSpaceRepeatedly() throws Exception { + + String connectString = zkServer.getConnectString(); + RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); + Configuration conf = getSecretConf(connectString); + CuratorFramework curatorFramework = + CuratorFrameworkFactory.builder(). + connectString(connectString). + retryPolicy(retryPolicy). + build(); + curatorFramework.start(); + + String workingPath = "/" + conf.get(ZKDelegationTokenSecretManager.ZK_DTSM_ZNODE_WORKING_PATH, + ZKDelegationTokenSecretManager.ZK_DTSM_ZNODE_WORKING_PATH_DEAFULT) + "/ZKDTSMRoot-Test"; + CreateBuilder createBuilder = curatorFramework.create(); + ProtectACLCreateModeStatPathAndBytesable createModeStat = + createBuilder.creatingParentContainersIfNeeded(); + createModeStat.forPath(workingPath); + + // Check if the created NameSpace exists. + Stat stat = curatorFramework.checkExists().forPath(workingPath); + Assert.assertNotNull(stat); + + // Repeated creation will throw NodeExists exception + LambdaTestUtils.intercept(KeeperException.class, + "KeeperErrorCode = NodeExists for "+workingPath, + () -> createModeStat.forPath(workingPath)); } } From 51e995ef711f3ecfdc8562177d52d50430fb38b9 Mon Sep 17 00:00:00 2001 From: Melissa You Date: Tue, 25 Oct 2022 14:28:34 -0700 Subject: [PATCH 08/13] fix checkstyle and add supress warning for unchecked --- .../token/delegation/ZKDelegationTokenSecretManager.java | 4 ++-- .../token/TestZKDelegationTokenSecretManagerImpl.java | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java index b95a31b16d3e6..fb7ef111694bc 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java @@ -343,10 +343,10 @@ public void startThreads() throws IOException { loadFromZKCache(true); } catch (Exception e) { throw new IOException("Could not start Curator tokenCacheListener for tokens", e); - } + } } super.startThreads(); - } + } /** * Load the CuratorCache into the in-memory map. Possible caches to be * loaded are keyCache and tokenCache. diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/security/token/TestZKDelegationTokenSecretManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/security/token/TestZKDelegationTokenSecretManagerImpl.java index 3c7f8e88a91d1..d4b49b7cd64d2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/security/token/TestZKDelegationTokenSecretManagerImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/security/token/TestZKDelegationTokenSecretManagerImpl.java @@ -101,6 +101,7 @@ public void testMultiNodeOperationWithoutWatch() throws Exception { } } + @SuppressWarnings("unchecked") @Test public void testMultiNodeTokenRemovalShortSyncWithoutWatch() throws Exception { From 8c681fb14cef71f53a63e8beab03d85eed69c47c Mon Sep 17 00:00:00 2001 From: Melissa You Date: Tue, 25 Oct 2022 15:52:43 -0700 Subject: [PATCH 09/13] Trigger Build From 0c088e8e6a5a2d5708b25ab9f0fd85048e9deca1 Mon Sep 17 00:00:00 2001 From: Melissa You Date: Wed, 26 Oct 2022 17:57:39 -0700 Subject: [PATCH 10/13] checkstyle fix --- .../ZKDelegationTokenSecretManager.java | 39 ++++++++++++------- .../apache/hadoop/ha/ClientBaseWithFixes.java | 2 +- .../ZKDelegationTokenSecretManagerImpl.java | 4 +- ...estZKDelegationTokenSecretManagerImpl.java | 1 + 4 files changed, 27 insertions(+), 19 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java index fb7ef111694bc..2e4980df39919 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java @@ -26,7 +26,6 @@ import java.io.UncheckedIOException; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; @@ -285,7 +284,8 @@ public void startThreads() throws IOException { // by calling the incrSharedCount currentSeqNum = incrSharedCount(delTokSeqCounter, seqNumBatchSize); currentMaxSeqNum = currentSeqNum + seqNumBatchSize; - LOG.info("Fetched initial range of seq num, from {} to {} ", currentSeqNum + 1, currentMaxSeqNum); + LOG.info("Fetched initial range of seq num, from {} to {} ", + currentSeqNum + 1, currentMaxSeqNum); } catch (Exception e) { throw new IOException("Could not start Sequence Counter", e); } @@ -305,14 +305,18 @@ public void startThreads() throws IOException { } try { keyCache = CuratorCache.bridgeBuilder(zkClient, ZK_DTSM_MASTER_KEY_ROOT).build(); - CuratorCacheListener keyCacheListener = CuratorCacheListener.builder().forCreatesAndChanges((oldNode, node) -> { - try { - processKeyAddOrUpdate(node.getData()); - } catch (IOException e) { - LOG.error("Error while processing Curator keyCacheListener " + "NODE_CREATED / NODE_CHANGED event"); - throw new UncheckedIOException(e); - } - }).forDeletes(childData -> processKeyRemoved(childData.getPath())).build(); + CuratorCacheListener keyCacheListener = CuratorCacheListener.builder() + .forCreatesAndChanges((oldNode, node) -> { + try { + processKeyAddOrUpdate(node.getData()); + } catch (IOException e) { + LOG.error("Error while processing Curator keyCacheListener " + + "NODE_CREATED / NODE_CHANGED event"); + throw new UncheckedIOException(e); + } + }) + .forDeletes(childData -> processKeyRemoved(childData.getPath())) + .build(); keyCache.listenable().addListener(keyCacheListener); keyCache.start(); loadFromZKCache(false); @@ -323,21 +327,26 @@ public void startThreads() throws IOException { LOG.info("TokenCache is enabled"); try { tokenCache = CuratorCache.bridgeBuilder(zkClient, ZK_DTSM_TOKENS_ROOT).build(); - CuratorCacheListener tokenCacheListener = CuratorCacheListener.builder().forCreatesAndChanges((oldNode, node) -> { + CuratorCacheListener tokenCacheListener = CuratorCacheListener.builder() + .forCreatesAndChanges((oldNode, node) -> { try { processTokenAddOrUpdate(node.getData()); } catch (IOException e) { - LOG.error("Error while processing Curator tokenCacheListener " + "NODE_CREATED / NODE_CHANGED event"); + LOG.error("Error while processing Curator tokenCacheListener " + + "NODE_CREATED / NODE_CHANGED event"); throw new UncheckedIOException(e); } - }).forDeletes(childData -> { + }) + .forDeletes(childData -> { try { processTokenRemoved(childData); } catch (IOException e) { - LOG.error("Error while processing Curator tokenCacheListener " + "NODE_DELETED event"); + LOG.error("Error while processing Curator tokenCacheListener " + + "NODE_DELETED event"); throw new UncheckedIOException(e); } - }).build(); + }) + .build(); tokenCache.listenable().addListener(tokenCacheListener); tokenCache.start(); loadFromZKCache(true); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java index eca860b80aecf..7650c9fbf8e6a 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java @@ -436,7 +436,7 @@ protected void stopServer() throws Exception { protected static ZooKeeperServer getServer(ServerCnxnFactory fac) { - return fac.getZooKeeperServer(); + return fac.getZooKeeperServer(); } protected void tearDownAll() throws Exception { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/ZKDelegationTokenSecretManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/ZKDelegationTokenSecretManagerImpl.java index 2d55026c807af..dcb05159500ae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/ZKDelegationTokenSecretManagerImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/ZKDelegationTokenSecretManagerImpl.java @@ -30,8 +30,6 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; import java.io.IOException; import java.util.HashSet; import java.util.List; @@ -193,7 +191,7 @@ private void rebuildTokenCache(boolean initial) throws IOException { // 2) remove tokens in local cache but not in zk anymore for (AbstractDelegationTokenIdentifier ident : currentTokens.keySet()) { if (!localTokenCache.contains(ident)) { - currentTokens.remove(ident); + currentTokens.remove(ident); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/security/token/TestZKDelegationTokenSecretManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/security/token/TestZKDelegationTokenSecretManagerImpl.java index d4b49b7cd64d2..63c9b2e422ea0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/security/token/TestZKDelegationTokenSecretManagerImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/security/token/TestZKDelegationTokenSecretManagerImpl.java @@ -162,6 +162,7 @@ public void testMultiNodeTokenRemovalShortSyncWithoutWatch() // This is very unlikely to happen in real case, but worth putting // the case out + @SuppressWarnings("unchecked") @Test public void testMultiNodeTokenRemovalLongSyncWithoutWatch() throws Exception { From 97dac96ccd59953eb0d7f2cb242e4f2a842785e4 Mon Sep 17 00:00:00 2001 From: Melissa You Date: Thu, 27 Oct 2022 12:23:23 -0700 Subject: [PATCH 11/13] space fix --- .../token/delegation/ZKDelegationTokenSecretManager.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java index 2e4980df39919..0666389d948a8 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java @@ -284,7 +284,7 @@ public void startThreads() throws IOException { // by calling the incrSharedCount currentSeqNum = incrSharedCount(delTokSeqCounter, seqNumBatchSize); currentMaxSeqNum = currentSeqNum + seqNumBatchSize; - LOG.info("Fetched initial range of seq num, from {} to {} ", + LOG.info("Fetched initial range of seq num, from {} to {} ", currentSeqNum + 1, currentMaxSeqNum); } catch (Exception e) { throw new IOException("Could not start Sequence Counter", e); @@ -310,7 +310,7 @@ public void startThreads() throws IOException { try { processKeyAddOrUpdate(node.getData()); } catch (IOException e) { - LOG.error("Error while processing Curator keyCacheListener " + LOG.error("Error while processing Curator keyCacheListener " + "NODE_CREATED / NODE_CHANGED event"); throw new UncheckedIOException(e); } @@ -332,7 +332,7 @@ public void startThreads() throws IOException { try { processTokenAddOrUpdate(node.getData()); } catch (IOException e) { - LOG.error("Error while processing Curator tokenCacheListener " + LOG.error("Error while processing Curator tokenCacheListener " + "NODE_CREATED / NODE_CHANGED event"); throw new UncheckedIOException(e); } @@ -341,7 +341,7 @@ public void startThreads() throws IOException { try { processTokenRemoved(childData); } catch (IOException e) { - LOG.error("Error while processing Curator tokenCacheListener " + LOG.error("Error while processing Curator tokenCacheListener " + "NODE_DELETED event"); throw new UncheckedIOException(e); } From 7880263da5484d7492ccb6a2ca370dfcb5c03f95 Mon Sep 17 00:00:00 2001 From: Melissa You Date: Fri, 28 Oct 2022 09:30:01 -0700 Subject: [PATCH 12/13] Trigger Build From 9429be98288ad7150ce88212428fe683410056a9 Mon Sep 17 00:00:00 2001 From: Melissa You Date: Mon, 31 Oct 2022 09:26:30 -0700 Subject: [PATCH 13/13] Trigger Build