diff --git a/LICENSE-binary b/LICENSE-binary
index 84ca5078aa376..bb1b1427737b1 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -319,6 +319,9 @@ org.apache.commons:commons-validator:1.6
org.apache.curator:curator-client:4.2.0
org.apache.curator:curator-framework:4.2.0
org.apache.curator:curator-recipes:4.2.0
+org.apache.curator:curator-client:5.2.0
+org.apache.curator:curator-framework:5.2.0
+org.apache.curator:curator-recipes:5.2.0
org.apache.geronimo.specs:geronimo-jcache_1.0_spec:1.0-alpha-1
org.apache.hbase:hbase-annotations:1.4.8
org.apache.hbase:hbase-client:1.4.8
@@ -345,8 +348,6 @@ org.apache.kerby:kerby-util:1.0.1
org.apache.kerby:kerby-xdr:1.0.1
org.apache.kerby:token-provider:1.0.1
org.apache.yetus:audience-annotations:0.5.0
-org.apache.zookeeper:zookeeper:3.5.6
-org.apache.zookeeper:zookeeper-jute:3.5.6
org.codehaus.jettison:jettison:1.5.1
org.eclipse.jetty:jetty-annotations:9.4.48.v20220622
org.eclipse.jetty:jetty-http:9.4.48.v20220622
@@ -362,6 +363,7 @@ org.eclipse.jetty:jetty-webapp:9.4.48.v20220622
org.eclipse.jetty:jetty-xml:9.4.48.v20220622
org.eclipse.jetty.websocket:javax-websocket-client-impl:9.4.48.v20220622
org.eclipse.jetty.websocket:javax-websocket-server-impl:9.4.48.v20220622
+org.apache.zookeeper:zookeeper:3.6.3
org.ehcache:ehcache:3.3.1
org.lz4:lz4-java:1.7.1
org.objenesis:objenesis:2.6
diff --git a/hadoop-common-project/hadoop-auth/pom.xml b/hadoop-common-project/hadoop-auth/pom.xml
index 6405a8feba7c7..d93372908a956 100644
--- a/hadoop-common-project/hadoop-auth/pom.xml
+++ b/hadoop-common-project/hadoop-auth/pom.xml
@@ -128,6 +128,15 @@
org.apache.zookeeper
zookeeper
+
+ io.dropwizard.metrics
+ metrics-core
+
+
+ org.xerial.snappy
+ snappy-java
+ provided
+
org.apache.curator
curator-framework
diff --git a/hadoop-common-project/hadoop-common/pom.xml b/hadoop-common-project/hadoop-common/pom.xml
index 2b0e7c8079fa0..f31d9afa3bf67 100644
--- a/hadoop-common-project/hadoop-common/pom.xml
+++ b/hadoop-common-project/hadoop-common/pom.xml
@@ -336,6 +336,10 @@
+
+ io.dropwizard.metrics
+ metrics-core
+
org.apache.zookeeper
zookeeper
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java
index 56249960c7ce4..d7a38bf608529 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java
@@ -23,11 +23,11 @@
import java.io.IOException;
import java.security.MessageDigest;
import java.util.Collection;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import javax.crypto.SecretKey;
@@ -80,7 +80,7 @@ private String formatTokenId(TokenIdent id) {
* to DelegationTokenInformation. Protected by this object lock.
*/
protected final Map currentTokens
- = new HashMap();
+ = new ConcurrentHashMap<>();
/**
* Sequence number to create DelegationTokenIdentifier.
@@ -92,7 +92,7 @@ private String formatTokenId(TokenIdent id) {
* Access to allKeys is protected by this object lock
*/
protected final Map allKeys
- = new HashMap();
+ = new ConcurrentHashMap<>();
/**
* Access to currentId is protected by this object lock.
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java
index c66e77ee47b50..0666389d948a8 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java
@@ -23,12 +23,11 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Stream;
import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
import org.apache.curator.framework.CuratorFramework;
@@ -37,14 +36,12 @@
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.imps.DefaultACLProvider;
import org.apache.curator.framework.recipes.cache.ChildData;
-import org.apache.curator.framework.recipes.cache.PathChildrenCache;
-import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
-import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
-import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheBridge;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.curator.framework.recipes.shared.SharedCount;
import org.apache.curator.framework.recipes.shared.VersionedValue;
import org.apache.curator.retry.RetryNTimes;
-import org.apache.curator.utils.EnsurePath;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
@@ -54,6 +51,7 @@
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
import org.apache.hadoop.util.curator.ZKCuratorManager;
+import static org.apache.hadoop.util.Time.now;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
@@ -61,6 +59,7 @@
import org.apache.zookeeper.client.ZKClientConfig;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -78,7 +77,7 @@
public abstract class ZKDelegationTokenSecretManager
extends AbstractDelegationTokenSecretManager {
- private static final String ZK_CONF_PREFIX = "zk-dt-secret-manager.";
+ public static final String ZK_CONF_PREFIX = "zk-dt-secret-manager.";
public static final String ZK_DTSM_ZK_NUM_RETRIES = ZK_CONF_PREFIX
+ "zkNumRetries";
public static final String ZK_DTSM_ZK_SESSION_TIMEOUT = ZK_CONF_PREFIX
@@ -101,6 +100,9 @@ public abstract class ZKDelegationTokenSecretManager {
+ try {
+ processKeyAddOrUpdate(node.getData());
+ } catch (IOException e) {
+ LOG.error("Error while processing Curator keyCacheListener "
+ + "NODE_CREATED / NODE_CHANGED event");
+ throw new UncheckedIOException(e);
}
- }
- }, listenerThreadPool);
- loadFromZKCache(false);
- }
+ })
+ .forDeletes(childData -> processKeyRemoved(childData.getPath()))
+ .build();
+ keyCache.listenable().addListener(keyCacheListener);
+ keyCache.start();
+ loadFromZKCache(false);
} catch (Exception e) {
- throw new IOException("Could not start PathChildrenCache for keys", e);
+ throw new IOException("Could not start Curator keyCacheListener for keys", e);
}
- try {
- tokenCache = new PathChildrenCache(zkClient, ZK_DTSM_TOKENS_ROOT, true);
- if (tokenCache != null) {
- tokenCache.start(StartMode.BUILD_INITIAL_CACHE);
- tokenCache.getListenable().addListener(new PathChildrenCacheListener() {
-
- @Override
- public void childEvent(CuratorFramework client,
- PathChildrenCacheEvent event) throws Exception {
- switch (event.getType()) {
- case CHILD_ADDED:
- processTokenAddOrUpdate(event.getData());
- break;
- case CHILD_UPDATED:
- processTokenAddOrUpdate(event.getData());
- break;
- case CHILD_REMOVED:
- processTokenRemoved(event.getData());
- break;
- default:
- break;
- }
- }
- }, listenerThreadPool);
+ if (isTokenWatcherEnabled) {
+ LOG.info("TokenCache is enabled");
+ try {
+ tokenCache = CuratorCache.bridgeBuilder(zkClient, ZK_DTSM_TOKENS_ROOT).build();
+ CuratorCacheListener tokenCacheListener = CuratorCacheListener.builder()
+ .forCreatesAndChanges((oldNode, node) -> {
+ try {
+ processTokenAddOrUpdate(node.getData());
+ } catch (IOException e) {
+ LOG.error("Error while processing Curator tokenCacheListener "
+ + "NODE_CREATED / NODE_CHANGED event");
+ throw new UncheckedIOException(e);
+ }
+ })
+ .forDeletes(childData -> {
+ try {
+ processTokenRemoved(childData);
+ } catch (IOException e) {
+ LOG.error("Error while processing Curator tokenCacheListener "
+ + "NODE_DELETED event");
+ throw new UncheckedIOException(e);
+ }
+ })
+ .build();
+ tokenCache.listenable().addListener(tokenCacheListener);
+ tokenCache.start();
loadFromZKCache(true);
+ } catch (Exception e) {
+ throw new IOException("Could not start Curator tokenCacheListener for tokens", e);
}
- } catch (Exception e) {
- throw new IOException("Could not start PathChildrenCache for tokens", e);
}
super.startThreads();
}
-
/**
- * Load the PathChildrenCache into the in-memory map. Possible caches to be
+ * Load the CuratorCache into the in-memory map. Possible caches to be
* loaded are keyCache and tokenCache.
*
* @param isTokenCache true if loading tokenCache, false if loading keyCache.
@@ -365,29 +365,29 @@ public void childEvent(CuratorFramework client,
private void loadFromZKCache(final boolean isTokenCache) {
final String cacheName = isTokenCache ? "token" : "key";
LOG.info("Starting to load {} cache.", cacheName);
- final List children;
+ final Stream children;
if (isTokenCache) {
- children = tokenCache.getCurrentData();
+ children = tokenCache.stream();
} else {
- children = keyCache.getCurrentData();
+ children = keyCache.stream();
}
- int count = 0;
- for (ChildData child : children) {
+ final AtomicInteger count = new AtomicInteger(0);
+ children.forEach(childData -> {
try {
if (isTokenCache) {
- processTokenAddOrUpdate(child);
+ processTokenAddOrUpdate(childData.getData());
} else {
- processKeyAddOrUpdate(child.getData());
+ processKeyAddOrUpdate(childData.getData());
}
} catch (Exception e) {
LOG.info("Ignoring node {} because it failed to load.",
- child.getPath());
+ childData.getPath());
LOG.debug("Failure exception:", e);
- ++count;
+ count.getAndIncrement();
}
- }
- if (count > 0) {
+ });
+ if (count.get() > 0) {
LOG.warn("Ignored {} nodes while loading {} cache.", count, cacheName);
}
LOG.info("Loaded {} cache.", cacheName);
@@ -398,9 +398,7 @@ private void processKeyAddOrUpdate(byte[] data) throws IOException {
DataInputStream din = new DataInputStream(bin);
DelegationKey key = new DelegationKey();
key.readFields(din);
- synchronized (this) {
- allKeys.put(key.getKeyId(), key);
- }
+ allKeys.put(key.getKeyId(), key);
}
private void processKeyRemoved(String path) {
@@ -410,15 +408,13 @@ private void processKeyRemoved(String path) {
int j = tokSeg.indexOf('_');
if (j > 0) {
int keyId = Integer.parseInt(tokSeg.substring(j + 1));
- synchronized (this) {
- allKeys.remove(keyId);
- }
+ allKeys.remove(keyId);
}
}
}
- private void processTokenAddOrUpdate(ChildData data) throws IOException {
- ByteArrayInputStream bin = new ByteArrayInputStream(data.getData());
+ protected TokenIdent processTokenAddOrUpdate(byte[] data) throws IOException {
+ ByteArrayInputStream bin = new ByteArrayInputStream(data);
DataInputStream din = new DataInputStream(bin);
TokenIdent ident = createIdentifier();
ident.readFields(din);
@@ -429,12 +425,10 @@ private void processTokenAddOrUpdate(ChildData data) throws IOException {
if (numRead > -1) {
DelegationTokenInformation tokenInfo =
new DelegationTokenInformation(renewDate, password);
- synchronized (this) {
- currentTokens.put(ident, tokenInfo);
- // The cancel task might be waiting
- notifyAll();
- }
+ currentTokens.put(ident, tokenInfo);
+ return ident;
}
+ return null;
}
private void processTokenRemoved(ChildData data) throws IOException {
@@ -442,11 +436,7 @@ private void processTokenRemoved(ChildData data) throws IOException {
DataInputStream din = new DataInputStream(bin);
TokenIdent ident = createIdentifier();
ident.readFields(din);
- synchronized (this) {
- currentTokens.remove(ident);
- // The cancel task might be waiting
- notifyAll();
- }
+ currentTokens.remove(ident);
}
@Override
@@ -487,20 +477,6 @@ public void stopThreads() {
} catch (Exception e) {
LOG.error("Could not stop Curator Framework", e);
}
- if (listenerThreadPool != null) {
- listenerThreadPool.shutdown();
- try {
- // wait for existing tasks to terminate
- if (!listenerThreadPool.awaitTermination(shutdownTimeout,
- TimeUnit.MILLISECONDS)) {
- LOG.error("Forcing Listener threadPool to shutdown !!");
- listenerThreadPool.shutdownNow();
- }
- } catch (InterruptedException ie) {
- listenerThreadPool.shutdownNow();
- Thread.currentThread().interrupt();
- }
- }
}
private void createPersistentNode(String nodePath) throws Exception {
@@ -647,7 +623,7 @@ protected DelegationTokenInformation getTokenInfo(TokenIdent ident) {
*
* @param ident Identifier of the token
*/
- private synchronized void syncLocalCacheWithZk(TokenIdent ident) {
+ protected void syncLocalCacheWithZk(TokenIdent ident) {
try {
DelegationTokenInformation tokenInfo = getTokenInfoFromZK(ident);
if (tokenInfo != null && !currentTokens.containsKey(ident)) {
@@ -661,16 +637,21 @@ private synchronized void syncLocalCacheWithZk(TokenIdent ident) {
}
}
- private DelegationTokenInformation getTokenInfoFromZK(TokenIdent ident)
+ protected DelegationTokenInformation getTokenInfoFromZK(TokenIdent ident)
throws IOException {
return getTokenInfoFromZK(ident, false);
}
- private DelegationTokenInformation getTokenInfoFromZK(TokenIdent ident,
+ protected DelegationTokenInformation getTokenInfoFromZK(TokenIdent ident,
boolean quiet) throws IOException {
String nodePath =
getNodePath(ZK_DTSM_TOKENS_ROOT,
DELEGATION_TOKEN_PREFIX + ident.getSequenceNumber());
+ return getTokenInfoFromZK(nodePath, quiet);
+ }
+
+ protected DelegationTokenInformation getTokenInfoFromZK(String nodePath,
+ boolean quiet) throws IOException {
try {
byte[] data = zkClient.getData().forPath(nodePath);
if ((data == null) || (data.length == 0)) {
@@ -805,15 +786,30 @@ protected void updateToken(TokenIdent ident,
@Override
protected void removeStoredToken(TokenIdent ident)
throws IOException {
+ removeStoredToken(ident, false);
+ }
+
+ protected void removeStoredToken(TokenIdent ident,
+ boolean checkAgainstZkBeforeDeletion) throws IOException {
String nodeRemovePath =
getNodePath(ZK_DTSM_TOKENS_ROOT, DELEGATION_TOKEN_PREFIX
+ ident.getSequenceNumber());
- if (LOG.isDebugEnabled()) {
- LOG.debug("Removing ZKDTSMDelegationToken_"
- + ident.getSequenceNumber());
- }
try {
- if (zkClient.checkExists().forPath(nodeRemovePath) != null) {
+ DelegationTokenInformation dtInfo = getTokenInfoFromZK(ident, true);
+ if (dtInfo != null) {
+ // For the case there is no sync or watch miss, it is possible that the
+ // local storage has expired tokens which have been renewed by peer
+ // so double check again to avoid accidental delete
+ if (checkAgainstZkBeforeDeletion
+ && dtInfo.getRenewDate() > now()) {
+ LOG.info("Node already renewed by peer " + nodeRemovePath +
+ " so this token should not be deleted");
+ return;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Removing ZKDTSMDelegationToken_"
+ + ident.getSequenceNumber());
+ }
while(zkClient.checkExists().forPath(nodeRemovePath) != null){
try {
zkClient.delete().guaranteed().forPath(nodeRemovePath);
@@ -836,7 +832,7 @@ protected void removeStoredToken(TokenIdent ident)
}
@Override
- public synchronized TokenIdent cancelToken(Token token,
+ public TokenIdent cancelToken(Token token,
String canceller) throws IOException {
ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
DataInputStream in = new DataInputStream(buf);
@@ -847,7 +843,7 @@ public synchronized TokenIdent cancelToken(Token token,
return super.cancelToken(token, canceller);
}
- private void addOrUpdateToken(TokenIdent ident,
+ protected void addOrUpdateToken(TokenIdent ident,
DelegationTokenInformation info, boolean isUpdate) throws Exception {
String nodeCreatePath =
getNodePath(ZK_DTSM_TOKENS_ROOT, DELEGATION_TOKEN_PREFIX
@@ -874,6 +870,10 @@ private void addOrUpdateToken(TokenIdent ident,
}
}
+ public boolean isTokenWatcherEnabled() {
+ return isTokenWatcherEnabled;
+ }
+
/**
* Simple implementation of an {@link ACLProvider} that simply returns an ACL
* that gives all permissions only to a single principal.
@@ -905,11 +905,6 @@ static String getNodePath(String root, String nodeName) {
return (root + "/" + nodeName);
}
- @VisibleForTesting
- public ExecutorService getListenerThreadPool() {
- return listenerThreadPool;
- }
-
@VisibleForTesting
DelegationTokenInformation getTokenInfoFromMemory(TokenIdent ident) {
return currentTokens.get(ident);
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java
index 666000b2f48ae..7650c9fbf8e6a 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java
@@ -41,7 +41,6 @@
import org.apache.zookeeper.ZKTestCase;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.server.ServerCnxnFactory;
-import org.apache.zookeeper.server.ServerCnxnFactoryAccessor;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.persistence.FileTxnLog;
@@ -437,9 +436,7 @@ protected void stopServer() throws Exception {
protected static ZooKeeperServer getServer(ServerCnxnFactory fac) {
- ZooKeeperServer zs = ServerCnxnFactoryAccessor.getZkServer(fac);
-
- return zs;
+ return fac.getZooKeeperServer();
}
protected void tearDownAll() throws Exception {
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverControllerStress.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverControllerStress.java
index bdbf1d9c2c286..4dcc74b86d151 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverControllerStress.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverControllerStress.java
@@ -23,6 +23,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.util.Time;
+import org.apache.zookeeper.server.ServerCnxn;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -131,7 +132,7 @@ public void testRandomHealthAndDisconnects() throws Exception {
long st = Time.now();
while (Time.now() - st < runFor) {
cluster.getTestContext().checkException();
- serverFactory.closeAll();
+ serverFactory.closeAll(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN);
Thread.sleep(50);
}
}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java
index 50d3dcfee757b..f94e04a1f40fd 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java
@@ -21,14 +21,14 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
+import org.apache.curator.framework.api.CreateBuilder;
+import org.apache.curator.framework.api.ProtectACLCreateModeStatPathAndBytesable;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
import org.apache.hadoop.conf.Configuration;
@@ -39,9 +39,12 @@
import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
import org.junit.After;
import org.junit.Assert;
@@ -59,15 +62,15 @@ public class TestZKDelegationTokenSecretManager {
private static final Logger LOG =
LoggerFactory.getLogger(TestZKDelegationTokenSecretManager.class);
- private static final int TEST_RETRIES = 2;
+ protected static final int TEST_RETRIES = 2;
- private static final int RETRY_COUNT = 5;
+ protected static final int RETRY_COUNT = 5;
- private static final int RETRY_WAIT = 1000;
+ protected static final int RETRY_WAIT = 1000;
- private static final long DAY_IN_SECS = 86400;
+ protected static final long DAY_IN_SECS = 86400;
- private TestingServer zkServer;
+ protected TestingServer zkServer;
@Rule
public Timeout globalTimeout = new Timeout(300000);
@@ -317,19 +320,13 @@ public void testCancelTokenSingleManager() throws Exception {
@SuppressWarnings("rawtypes")
protected void verifyDestroy(DelegationTokenManager tm, Configuration conf)
throws Exception {
- AbstractDelegationTokenSecretManager sm =
- tm.getDelegationTokenSecretManager();
- ZKDelegationTokenSecretManager zksm = (ZKDelegationTokenSecretManager) sm;
- ExecutorService es = zksm.getListenerThreadPool();
tm.destroy();
- Assert.assertTrue(es.isShutdown());
// wait for the pool to terminate
long timeout =
conf.getLong(
ZKDelegationTokenSecretManager.ZK_DTSM_ZK_SHUTDOWN_TIMEOUT,
ZKDelegationTokenSecretManager.ZK_DTSM_ZK_SHUTDOWN_TIMEOUT_DEFAULT);
Thread.sleep(timeout * 3);
- Assert.assertTrue(es.isTerminated());
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@@ -356,17 +353,6 @@ public void testStopThreads() throws Exception {
(Token)
tm1.createToken(UserGroupInformation.getCurrentUser(), "foo");
Assert.assertNotNull(token);
-
- AbstractDelegationTokenSecretManager sm = tm1.getDelegationTokenSecretManager();
- ZKDelegationTokenSecretManager zksm = (ZKDelegationTokenSecretManager)sm;
- ExecutorService es = zksm.getListenerThreadPool();
- es.submit(new Callable() {
- public Void call() throws Exception {
- Thread.sleep(shutdownTimeoutMillis * 2); // force this to be shutdownNow
- return null;
- }
- });
-
tm1.destroy();
}
@@ -425,7 +411,7 @@ private void verifyACL(CuratorFramework curatorFramework,
// cancelled but.. that would mean having to make an RPC call for every
// verification request.
// Thus, the eventual consistency tradef-off should be acceptable here...
- private void verifyTokenFail(DelegationTokenManager tm,
+ protected void verifyTokenFail(DelegationTokenManager tm,
Token token) throws IOException,
InterruptedException {
verifyTokenFailWithRetry(tm, token, RETRY_COUNT);
@@ -525,4 +511,65 @@ public Boolean get() {
}
}, 1000, 5000);
}
+
+ @Test
+ public void testCreatingParentContainersIfNeeded() throws Exception {
+
+ String connectString = zkServer.getConnectString();
+ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+ Configuration conf = getSecretConf(connectString);
+ CuratorFramework curatorFramework =
+ CuratorFrameworkFactory.builder()
+ .connectString(connectString)
+ .retryPolicy(retryPolicy)
+ .build();
+ curatorFramework.start();
+ ZKDelegationTokenSecretManager.setCurator(curatorFramework);
+ DelegationTokenManager tm1 = new DelegationTokenManager(conf, new Text("foo"));
+
+ // When the init method is called,
+ // the ZKDelegationTokenSecretManager#startThread method will be called,
+ // and the creatingParentContainersIfNeeded will be called to create the nameSpace.
+ tm1.init();
+
+ String workingPath = "/" + conf.get(ZKDelegationTokenSecretManager.ZK_DTSM_ZNODE_WORKING_PATH,
+ ZKDelegationTokenSecretManager.ZK_DTSM_ZNODE_WORKING_PATH_DEAFULT) + "/ZKDTSMRoot";
+
+ // Check if the created NameSpace exists.
+ Stat stat = curatorFramework.checkExists().forPath(workingPath);
+ Assert.assertNotNull(stat);
+
+ tm1.destroy();
+ curatorFramework.close();
+ }
+
+ @Test
+ public void testCreateNameSpaceRepeatedly() throws Exception {
+
+ String connectString = zkServer.getConnectString();
+ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+ Configuration conf = getSecretConf(connectString);
+ CuratorFramework curatorFramework =
+ CuratorFrameworkFactory.builder().
+ connectString(connectString).
+ retryPolicy(retryPolicy).
+ build();
+ curatorFramework.start();
+
+ String workingPath = "/" + conf.get(ZKDelegationTokenSecretManager.ZK_DTSM_ZNODE_WORKING_PATH,
+ ZKDelegationTokenSecretManager.ZK_DTSM_ZNODE_WORKING_PATH_DEAFULT) + "/ZKDTSMRoot-Test";
+ CreateBuilder createBuilder = curatorFramework.create();
+ ProtectACLCreateModeStatPathAndBytesable createModeStat =
+ createBuilder.creatingParentContainersIfNeeded();
+ createModeStat.forPath(workingPath);
+
+ // Check if the created NameSpace exists.
+ Stat stat = curatorFramework.checkExists().forPath(workingPath);
+ Assert.assertNotNull(stat);
+
+ // Repeated creation will throw NodeExists exception
+ LambdaTestUtils.intercept(KeeperException.class,
+ "KeeperErrorCode = NodeExists for "+workingPath,
+ () -> createModeStat.forPath(workingPath));
+ }
}
diff --git a/hadoop-common-project/hadoop-registry/pom.xml b/hadoop-common-project/hadoop-registry/pom.xml
index d7de0c49cc67a..204fbbe9da554 100644
--- a/hadoop-common-project/hadoop-registry/pom.xml
+++ b/hadoop-common-project/hadoop-registry/pom.xml
@@ -135,6 +135,17 @@
dnsjava
+
+ io.dropwizard.metrics
+ metrics-core
+
+
+
+ org.xerial.snappy
+ snappy-java
+ provided
+
+
diff --git a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/CuratorService.java b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/CuratorService.java
index a01b7151b6989..3457fa28634a6 100644
--- a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/CuratorService.java
+++ b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/CuratorService.java
@@ -18,6 +18,9 @@
package org.apache.hadoop.registry.client.impl.zk;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheBridge;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.apache.curator.ensemble.EnsembleProvider;
@@ -28,9 +31,6 @@
import org.apache.curator.framework.api.CreateBuilder;
import org.apache.curator.framework.api.DeleteBuilder;
import org.apache.curator.framework.api.GetChildrenBuilder;
-import org.apache.curator.framework.recipes.cache.TreeCache;
-import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
-import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.retry.BoundedExponentialBackoffRetry;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@@ -56,6 +56,7 @@
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.List;
/**
@@ -109,9 +110,9 @@ public class CuratorService extends CompositeService
private EnsembleProvider ensembleProvider;
/**
- * Registry tree cache.
+ * Registry Curator cache.
*/
- private TreeCache treeCache;
+ private CuratorCacheBridge curatorCacheBridge;
/**
* Construct the service.
@@ -189,8 +190,8 @@ protected void serviceStart() throws Exception {
protected void serviceStop() throws Exception {
IOUtils.closeStream(curator);
- if (treeCache != null) {
- treeCache.close();
+ if (curatorCacheBridge != null) {
+ curatorCacheBridge.close();
}
super.serviceStop();
}
@@ -824,73 +825,54 @@ protected String dumpRegistryRobustly(boolean verbose) {
*
* @param listener the listener.
* @return a handle allowing for the management of the listener.
- * @throws Exception if registration fails due to error.
*/
- public ListenerHandle registerPathListener(final PathListener listener)
- throws Exception {
-
- final TreeCacheListener pathChildrenCacheListener =
- new TreeCacheListener() {
-
- public void childEvent(CuratorFramework curatorFramework,
- TreeCacheEvent event)
- throws Exception {
- String path = null;
- if (event != null && event.getData() != null) {
- path = event.getData().getPath();
- }
- assert event != null;
- switch (event.getType()) {
- case NODE_ADDED:
- LOG.info("Informing listener of added node {}", path);
- listener.nodeAdded(path);
-
- break;
-
- case NODE_REMOVED:
- LOG.info("Informing listener of removed node {}", path);
- listener.nodeRemoved(path);
-
- break;
-
- case NODE_UPDATED:
- LOG.info("Informing listener of updated node {}", path);
- listener.nodeAdded(path);
-
- break;
-
- default:
- // do nothing
- break;
-
- }
+ public ListenerHandle registerPathListener(final PathListener listener) {
+
+ CuratorCacheListener cacheListener = CuratorCacheListener.builder()
+ .forCreatesAndChanges((oldNode, node) -> {
+ final String path = node.getPath();
+ LOG.info("Informing listener of added/updated node {}", path);
+ try {
+ listener.nodeAdded(path);
+ } catch (IOException e) {
+ LOG.error("Error while processing Curator listener "
+ + "NODE_CREATED / NODE_CHANGED event");
+ throw new UncheckedIOException(e);
}
- };
- treeCache.getListenable().addListener(pathChildrenCacheListener);
-
- return new ListenerHandle() {
- @Override
- public void remove() {
- treeCache.getListenable().removeListener(pathChildrenCacheListener);
- }
- };
+ })
+ .forDeletes(childData -> {
+ final String path = childData.getPath();
+ LOG.info("Informing listener of removed node {}", path);
+ try {
+ listener.nodeRemoved(path);
+ } catch (IOException e) {
+ LOG.error("Error while processing Curator listener "
+ + "NODE_DELETED event");
+ throw new UncheckedIOException(e);
+ }
+ })
+ .build();
+ curatorCacheBridge.listenable().addListener(cacheListener);
+ return () -> curatorCacheBridge.listenable().removeListener(cacheListener);
}
// TODO: should caches be stopped and then restarted if need be?
/**
- * Create the tree cache that monitors the registry for node addition, update,
- * and deletion.
- *
- * @throws Exception if any issue arises during monitoring.
+ * Instantiate the Curator cache that monitors the registry for node
+ * addition, update and deletion.
*/
- public void monitorRegistryEntries()
- throws Exception {
+ public void instantiateCacheForRegistry() {
String registryPath =
getConfig().get(RegistryConstants.KEY_REGISTRY_ZK_ROOT,
RegistryConstants.DEFAULT_ZK_REGISTRY_ROOT);
- treeCache = new TreeCache(curator, registryPath);
- treeCache.start();
+ curatorCacheBridge = CuratorCache.bridgeBuilder(curator, registryPath)
+ .build();
+ }
+
+ public void startCache() {
+ curatorCacheBridge.start();
}
+
}
diff --git a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/dns/RegistryDNSServer.java b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/dns/RegistryDNSServer.java
index 1ff5f26b47207..8d0a38cfd47f9 100644
--- a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/dns/RegistryDNSServer.java
+++ b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/dns/RegistryDNSServer.java
@@ -106,7 +106,7 @@ protected void serviceStart() throws Exception {
private void manageRegistryDNS() {
try {
- registryOperations.monitorRegistryEntries();
+ registryOperations.instantiateCacheForRegistry();
registryOperations.registerPathListener(new PathListener() {
private String registryRoot = getConfig().
get(RegistryConstants.KEY_REGISTRY_ZK_ROOT,
@@ -157,6 +157,7 @@ public void nodeRemoved(String path) throws IOException {
}
});
+ registryOperations.startCache();
// create listener for record deletions
diff --git a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/services/MicroZookeeperService.java b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/services/MicroZookeeperService.java
index 0ab4cd2f3bfac..994a2565c309a 100644
--- a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/services/MicroZookeeperService.java
+++ b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/services/MicroZookeeperService.java
@@ -229,7 +229,7 @@ protected void serviceStart() throws Exception {
setupSecurity();
FileTxnSnapLog ftxn = new FileTxnSnapLog(dataDir, dataDir);
- ZooKeeperServer zkServer = new ZooKeeperServer(ftxn, tickTime);
+ ZooKeeperServer zkServer = new ZooKeeperServer(ftxn, tickTime, "");
LOG.info("Starting Local Zookeeper service");
factory = ServerCnxnFactory.createFactory();
diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs-nfs/pom.xml
index 03ac256ace86e..fbf73e123e783 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-nfs/pom.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/pom.xml
@@ -83,6 +83,16 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
test-jar
test
+
+ io.dropwizard.metrics
+ metrics-core
+ provided
+
+
+ org.xerial.snappy
+ snappy-java
+ provided
+
org.apache.hadoop.thirdparty
hadoop-shaded-guava
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/ZKDelegationTokenSecretManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/ZKDelegationTokenSecretManagerImpl.java
index 4a111187ac46a..dcb05159500ae 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/ZKDelegationTokenSecretManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/ZKDelegationTokenSecretManagerImpl.java
@@ -19,13 +19,24 @@
package org.apache.hadoop.hdfs.server.federation.router.security.token;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
import org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager;
+import org.apache.hadoop.util.Time;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
/**
* Zookeeper based router delegation token store implementation.
@@ -33,24 +44,181 @@
public class ZKDelegationTokenSecretManagerImpl extends
ZKDelegationTokenSecretManager {
+ public static final String ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL =
+ ZK_CONF_PREFIX + "router.token.sync.interval";
+ public static final int ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL_DEFAULT = 5;
+
private static final Logger LOG =
LoggerFactory.getLogger(ZKDelegationTokenSecretManagerImpl.class);
- private Configuration conf = null;
+ private Configuration conf;
+
+ private final ScheduledExecutorService scheduler =
+ Executors.newSingleThreadScheduledExecutor();
+
+ // Local cache of delegation tokens, used for deprecating tokens from
+ // currentTokenMap
+ private final Set localTokenCache =
+ new HashSet<>();
+ // Native zk client for getting all tokens
+ private ZooKeeper zookeeper;
+ private final String TOKEN_PATH = "/" + zkClient.getNamespace()
+ + ZK_DTSM_TOKENS_ROOT;
+ // The flag used to issue an extra check before deletion
+ // Since cancel token and token remover thread use the same
+ // API here and one router could have a token that is renewed
+ // by another router, thus token remover should always check ZK
+ // to confirm whether it has been renewed or not
+ private ThreadLocal checkAgainstZkBeforeDeletion =
+ new ThreadLocal() {
+ @Override
+ protected Boolean initialValue() {
+ return true;
+ }
+ };
public ZKDelegationTokenSecretManagerImpl(Configuration conf) {
super(conf);
this.conf = conf;
try {
- super.startThreads();
+ startThreads();
} catch (IOException e) {
LOG.error("Error starting threads for zkDelegationTokens", e);
}
LOG.info("Zookeeper delegation token secret manager instantiated");
}
+ @Override
+ public void startThreads() throws IOException {
+ super.startThreads();
+ // start token cache related work when watcher is disabled
+ if (!isTokenWatcherEnabled()) {
+ LOG.info("Watcher for tokens is disabled in this secret manager");
+ try {
+ // By default set this variable
+ checkAgainstZkBeforeDeletion.set(true);
+ // Ensure the token root path exists
+ if (zkClient.checkExists().forPath(ZK_DTSM_TOKENS_ROOT) == null) {
+ zkClient.create().creatingParentsIfNeeded()
+ .withMode(CreateMode.PERSISTENT)
+ .forPath(ZK_DTSM_TOKENS_ROOT);
+ }
+ // Set up zookeeper client
+ try {
+ zookeeper = zkClient.getZookeeperClient().getZooKeeper();
+ } catch (Exception e) {
+ LOG.info("Cannot get zookeeper client ", e);
+ } finally {
+ if (zookeeper == null) {
+ throw new IOException("Zookeeper client is null");
+ }
+ }
+
+ LOG.info("Start loading token cache");
+ long start = Time.now();
+ rebuildTokenCache(true);
+ LOG.info("Loaded token cache in {} milliseconds", Time.now() - start);
+
+ int syncInterval = conf.getInt(ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL,
+ ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL_DEFAULT);
+ scheduler.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ rebuildTokenCache(false);
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ }, syncInterval, syncInterval, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ LOG.error("Error rebuilding local cache for zkDelegationTokens ", e);
+ }
+ }
+ }
+
+ @Override
+ public void stopThreads() {
+ super.stopThreads();
+ scheduler.shutdown();
+ }
+
@Override
public DelegationTokenIdentifier createIdentifier() {
return new DelegationTokenIdentifier();
}
+
+ /**
+ * This function will rebuild local token cache from zk storage.
+ * It is first called when the secret manager is initialized and
+ * then regularly at a configured interval.
+ *
+ * @param initial whether this is called during initialization
+ * @throws IOException
+ */
+ private void rebuildTokenCache(boolean initial) throws IOException {
+ localTokenCache.clear();
+ // Use bare zookeeper client to get all children since curator will
+ // wrap the same API with a sorting process. This is time consuming given
+ // millions of tokens
+ List zkTokens;
+ try {
+ zkTokens = zookeeper.getChildren(TOKEN_PATH, false);
+ } catch (KeeperException | InterruptedException e) {
+ throw new IOException("Tokens cannot be fetched from path "
+ + TOKEN_PATH, e);
+ }
+ byte[] data;
+ for (String tokenPath : zkTokens) {
+ try {
+ data = zkClient.getData().forPath(
+ ZK_DTSM_TOKENS_ROOT + "/" + tokenPath);
+ } catch (KeeperException.NoNodeException e) {
+ LOG.debug("No node in path [" + tokenPath + "]");
+ continue;
+ } catch (Exception ex) {
+ throw new IOException(ex);
+ }
+ // Store data to currentTokenMap
+ AbstractDelegationTokenIdentifier ident = processTokenAddOrUpdate(data);
+ // Store data to localTokenCache for sync
+ localTokenCache.add(ident);
+ }
+ if (!initial) {
+ // Sync zkTokens with local cache, specifically
+ // 1) add/update tokens to local cache from zk, which is done through
+ // processTokenAddOrUpdate above
+ // 2) remove tokens in local cache but not in zk anymore
+ for (AbstractDelegationTokenIdentifier ident : currentTokens.keySet()) {
+ if (!localTokenCache.contains(ident)) {
+ currentTokens.remove(ident);
+ }
+ }
+ }
+ }
+
+ @Override
+ public AbstractDelegationTokenIdentifier cancelToken(
+ Token token, String canceller)
+ throws IOException {
+ checkAgainstZkBeforeDeletion.set(false);
+ AbstractDelegationTokenIdentifier ident = super.cancelToken(token,
+ canceller);
+ checkAgainstZkBeforeDeletion.set(true);
+ return ident;
+ }
+
+ @Override
+ protected void removeStoredToken(AbstractDelegationTokenIdentifier ident)
+ throws IOException {
+ super.removeStoredToken(ident, checkAgainstZkBeforeDeletion.get());
+ }
+
+ @Override
+ protected void addOrUpdateToken(AbstractDelegationTokenIdentifier ident,
+ DelegationTokenInformation info, boolean isUpdate) throws Exception {
+ // Store the data in local memory first
+ currentTokens.put(ident, info);
+ super.addOrUpdateToken(ident, info, isUpdate);
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/security/token/TestZKDelegationTokenSecretManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/security/token/TestZKDelegationTokenSecretManagerImpl.java
new file mode 100644
index 0000000000000..63c9b2e422ea0
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/security/token/TestZKDelegationTokenSecretManagerImpl.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.security.token;
+
+import static org.apache.hadoop.hdfs.server.federation.router.security.token.ZKDelegationTokenSecretManagerImpl.ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL;
+import static org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager.ZK_DTSM_TOKEN_WATCHER_ENABLED;
+import static org.apache.hadoop.security.token.delegation.web.DelegationTokenManager.REMOVAL_SCAN_INTERVAL;
+import static org.apache.hadoop.security.token.delegation.web.DelegationTokenManager.RENEW_INTERVAL;
+import static org.junit.Assert.fail;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.router.security.token.ZKDelegationTokenSecretManagerImpl;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.TestZKDelegationTokenSecretManager;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
+import org.apache.hadoop.util.Time;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestZKDelegationTokenSecretManagerImpl
+ extends TestZKDelegationTokenSecretManager {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestZKDelegationTokenSecretManagerImpl.class);
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testMultiNodeOperationWithoutWatch() throws Exception {
+ String connectString = zkServer.getConnectString();
+ Configuration conf = getSecretConf(connectString);
+ // disable watch
+ conf.setBoolean(ZK_DTSM_TOKEN_WATCHER_ENABLED, false);
+ conf.setInt(ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL, 3);
+
+ for (int i = 0; i < TEST_RETRIES; i++) {
+ ZKDelegationTokenSecretManagerImpl dtsm1 =
+ new ZKDelegationTokenSecretManagerImpl(conf);
+ ZKDelegationTokenSecretManagerImpl dtsm2 =
+ new ZKDelegationTokenSecretManagerImpl(conf);
+ DelegationTokenManager tm1, tm2;
+ tm1 = new DelegationTokenManager(conf, new Text("bla"));
+ tm1.setExternalDelegationTokenSecretManager(dtsm1);
+ tm2 = new DelegationTokenManager(conf, new Text("bla"));
+ tm2.setExternalDelegationTokenSecretManager(dtsm2);
+
+ // common token operation without watchers should still be working
+ Token token =
+ (Token) tm1.createToken(
+ UserGroupInformation.getCurrentUser(), "foo");
+ Assert.assertNotNull(token);
+ tm2.verifyToken(token);
+ tm2.renewToken(token, "foo");
+ tm1.verifyToken(token);
+ tm1.cancelToken(token, "foo");
+ try {
+ verifyTokenFail(tm2, token);
+ fail("Expected InvalidToken");
+ } catch (SecretManager.InvalidToken it) {
+ // Ignore
+ }
+
+ token = (Token) tm2.createToken(
+ UserGroupInformation.getCurrentUser(), "bar");
+ Assert.assertNotNull(token);
+ tm1.verifyToken(token);
+ tm1.renewToken(token, "bar");
+ tm2.verifyToken(token);
+ tm2.cancelToken(token, "bar");
+ try {
+ verifyTokenFail(tm1, token);
+ fail("Expected InvalidToken");
+ } catch (SecretManager.InvalidToken it) {
+ // Ignore
+ }
+
+ dtsm1.stopThreads();
+ dtsm2.stopThreads();
+ verifyDestroy(tm1, conf);
+ verifyDestroy(tm2, conf);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testMultiNodeTokenRemovalShortSyncWithoutWatch()
+ throws Exception {
+ String connectString = zkServer.getConnectString();
+ Configuration conf = getSecretConf(connectString);
+ // disable watch
+ conf.setBoolean(ZK_DTSM_TOKEN_WATCHER_ENABLED, false);
+ // make sync quick
+ conf.setInt(ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL, 3);
+ // set the renew window and removal interval to be a
+ // short time to trigger the background cleanup
+ conf.setInt(RENEW_INTERVAL, 10);
+ conf.setInt(REMOVAL_SCAN_INTERVAL, 10);
+
+ for (int i = 0; i < TEST_RETRIES; i++) {
+ ZKDelegationTokenSecretManagerImpl dtsm1 =
+ new ZKDelegationTokenSecretManagerImpl(conf);
+ ZKDelegationTokenSecretManagerImpl dtsm2 =
+ new ZKDelegationTokenSecretManagerImpl(conf);
+ DelegationTokenManager tm1, tm2;
+ tm1 = new DelegationTokenManager(conf, new Text("bla"));
+ tm1.setExternalDelegationTokenSecretManager(dtsm1);
+ tm2 = new DelegationTokenManager(conf, new Text("bla"));
+ tm2.setExternalDelegationTokenSecretManager(dtsm2);
+
+ // time: X
+ // token expiry time:
+ // tm1: X + 10
+ // tm2: X + 10
+ Token token =
+ (Token) tm1.createToken(
+ UserGroupInformation.getCurrentUser(), "foo");
+ Assert.assertNotNull(token);
+ tm2.verifyToken(token);
+
+ // time: X + 9
+ // token expiry time:
+ // tm1: X + 10
+ // tm2: X + 19
+ Thread.sleep(9 * 1000);
+ tm2.renewToken(token, "foo");
+ tm1.verifyToken(token);
+
+ // time: X + 13
+ // token expiry time: (sync happened)
+ // tm1: X + 19
+ // tm2: X + 19
+ Thread.sleep(4 * 1000);
+ tm1.verifyToken(token);
+ tm2.verifyToken(token);
+
+ dtsm1.stopThreads();
+ dtsm2.stopThreads();
+ verifyDestroy(tm1, conf);
+ verifyDestroy(tm2, conf);
+ }
+ }
+
+ // This is very unlikely to happen in real case, but worth putting
+ // the case out
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testMultiNodeTokenRemovalLongSyncWithoutWatch()
+ throws Exception {
+ String connectString = zkServer.getConnectString();
+ Configuration conf = getSecretConf(connectString);
+ // disable watch
+ conf.setBoolean(ZK_DTSM_TOKEN_WATCHER_ENABLED, false);
+ // make sync quick
+ conf.setInt(ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL, 20);
+ // set the renew window and removal interval to be a
+ // short time to trigger the background cleanup
+ conf.setInt(RENEW_INTERVAL, 10);
+ conf.setInt(REMOVAL_SCAN_INTERVAL, 10);
+
+ for (int i = 0; i < TEST_RETRIES; i++) {
+ ZKDelegationTokenSecretManagerImpl dtsm1 =
+ new ZKDelegationTokenSecretManagerImpl(conf);
+ ZKDelegationTokenSecretManagerImpl dtsm2 =
+ new ZKDelegationTokenSecretManagerImpl(conf);
+ ZKDelegationTokenSecretManagerImpl dtsm3 =
+ new ZKDelegationTokenSecretManagerImpl(conf);
+ DelegationTokenManager tm1, tm2, tm3;
+ tm1 = new DelegationTokenManager(conf, new Text("bla"));
+ tm1.setExternalDelegationTokenSecretManager(dtsm1);
+ tm2 = new DelegationTokenManager(conf, new Text("bla"));
+ tm2.setExternalDelegationTokenSecretManager(dtsm2);
+ tm3 = new DelegationTokenManager(conf, new Text("bla"));
+ tm3.setExternalDelegationTokenSecretManager(dtsm3);
+
+ // time: X
+ // token expiry time:
+ // tm1: X + 10
+ // tm2: X + 10
+ // tm3: No token due to no sync
+ Token token =
+ (Token) tm1.createToken(
+ UserGroupInformation.getCurrentUser(), "foo");
+ Assert.assertNotNull(token);
+ tm2.verifyToken(token);
+
+ // time: X + 9
+ // token expiry time:
+ // tm1: X + 10
+ // tm2: X + 19
+ // tm3: No token due to no sync
+ Thread.sleep(9 * 1000);
+ long renewalTime = tm2.renewToken(token, "foo");
+ LOG.info("Renew for token {} at current time {} renewal time {}",
+ token.getIdentifier(), Time.formatTime(Time.now()),
+ Time.formatTime(renewalTime));
+ tm1.verifyToken(token);
+
+ // time: X + 13
+ // token expiry time: (sync din't happen)
+ // tm1: X + 10
+ // tm2: X + 19
+ // tm3: X + 19 due to fetch from zk
+ Thread.sleep(4 * 1000);
+ tm2.verifyToken(token);
+ tm3.verifyToken(token);
+
+ dtsm1.stopThreads();
+ dtsm2.stopThreads();
+ dtsm3.stopThreads();
+ verifyDestroy(tm1, conf);
+ verifyDestroy(tm2, conf);
+ verifyDestroy(tm3, conf);
+ }
+ }
+
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs/pom.xml
index d0aac40646dfc..aef9bb8f35fde 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/pom.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/pom.xml
@@ -62,6 +62,16 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
test-jar
test
+
+ io.dropwizard.metrics
+ metrics-core
+ provided
+
+
+ org.xerial.snappy
+ snappy-java
+ provided
+
org.apache.hadoop.thirdparty
hadoop-shaded-guava
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index 034df4c037baa..17d6b598e6b00 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -99,8 +99,8 @@
${hadoop-thirdparty-shaded-prefix}.protobuf
${hadoop-thirdparty-shaded-prefix}.com.google.common
- 3.5.6
- 4.2.0
+ 3.6.3
+ 5.2.0
3.0.5
2.1.7
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/pom.xml
index 2e0781e352716..a19b87c87dc93 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/pom.xml
@@ -104,6 +104,17 @@
+
+ io.dropwizard.metrics
+ metrics-core
+
+
+
+ org.xerial.snappy
+ snappy-java
+ provided
+
+
org.slf4j
slf4j-api
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
index 0e52269041797..91c946baba2be 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
@@ -89,6 +89,16 @@
test-jar
test
+
+ io.dropwizard.metrics
+ metrics-core
+ provided
+
+
+ org.xerial.snappy
+ snappy-java
+ provided
+
junit
junit
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
index 9e51d4ec048d9..6101467a4056e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
@@ -109,6 +109,15 @@
org.apache.zookeeper
zookeeper
+
+ io.dropwizard.metrics
+ metrics-core
+
+
+ org.xerial.snappy
+ snappy-java
+ provided
+
${leveldbjni.group}
leveldbjni-all
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
index b2e46b8b6c69d..566f6bcc6839a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
@@ -217,6 +217,15 @@
org.apache.zookeeper
zookeeper
+
+ io.dropwizard.metrics
+ metrics-core
+
+
+ org.xerial.snappy
+ snappy-java
+ provided
+
${leveldbjni.group}
leveldbjni-all