diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index d42562cf6140a..165bb30c8cd2a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -814,6 +814,10 @@ public static boolean isAclEnabled(Configuration conf) {
RM_PREFIX + "delegation-token-renewer.thread-retry-max-attempts";
public static final int DEFAULT_RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS =
10;
+ public static final String RM_DT_RENEWER_THREAD_CLOCK_SKEW_TIME =
+ RM_PREFIX + "delegation-token-renewer.thread-clock-skew-time";
+ public static final long DEFAULT_RM_DT_RENEWER_THREAD_CLOCK_SKEW_TIME =
+ TimeUnit.MINUTES.toMillis(10); // 10 minutes
public static final String RECOVERY_ENABLED = RM_PREFIX + "recovery.enabled";
public static final boolean DEFAULT_RM_RECOVERY_ENABLED = false;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 407ef74d3d062..28558eef4b833 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1093,6 +1093,14 @@
10
+
+
+ Extra skew time to make sure token expire time is not affected by local machine clock
+
+ yarn.resourcemanager.delegation-token-renewer.thread-clock-skew-time
+ 10
+
+
Time interval between each RM DelegationTokenRenewer thread retry attempt
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
index d0e0bf61748c4..fc5e406948375 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
@@ -43,6 +43,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -123,6 +124,7 @@ public class DelegationTokenRenewer extends AbstractService {
private long tokenRenewerThreadTimeout;
private long tokenRenewerThreadRetryInterval;
private int tokenRenewerThreadRetryMaxAttempts;
+ private long clockSkewExtraTime;
private final Map> futures =
new ConcurrentHashMap<>();
private boolean delegationTokenRenewerPoolTrackerFlag = true;
@@ -165,6 +167,8 @@ protected void serviceInit(Configuration conf) throws Exception {
tokenRenewerThreadRetryMaxAttempts =
conf.getInt(YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS);
+ clockSkewExtraTime = conf.getLong(YarnConfiguration.RM_DT_RENEWER_THREAD_CLOCK_SKEW_TIME,
+ YarnConfiguration.DEFAULT_RM_DT_RENEWER_THREAD_CLOCK_SKEW_TIME);
setLocalSecretManagerAndServiceAddr();
renewerService = createNewThreadPoolService(conf);
pendingEventQueue = new LinkedBlockingQueue();
@@ -491,12 +495,13 @@ private void handleAppSubmitEvent(AbstractDelegationTokenRenewerAppEvent evt)
Set tokenList = new HashSet();
boolean hasHdfsToken = false;
for (Token> token : tokens) {
+ AtomicLong tokenExpiredTime = new AtomicLong(now);
if (token.isManaged()) {
if (token.getKind().equals(HDFS_DELEGATION_KIND)) {
LOG.info(applicationId + " found existing hdfs token " + token);
hasHdfsToken = true;
}
- if (skipTokenRenewal(token)) {
+ if (skipTokenRenewalAndUpdateExpiredTime(token, tokenExpiredTime)) {
continue;
}
@@ -523,9 +528,14 @@ private void handleAppSubmitEvent(AbstractDelegationTokenRenewerAppEvent evt)
tokenConf = getConfig();
}
dttr = new DelegationTokenToRenew(Arrays.asList(applicationId), token,
- tokenConf, now, shouldCancelAtEnd, evt.getUser());
+ tokenConf, tokenExpiredTime.get(), shouldCancelAtEnd, evt.getUser());
+
try {
- renewToken(dttr);
+ // if expire date is not greater than now, renew token.
+ // add extra time in case of clock skew
+ if (tokenExpiredTime.get() <= now + clockSkewExtraTime) {
+ renewToken(dttr);
+ }
} catch (IOException ioe) {
if (ioe instanceof SecretManager.InvalidToken
&& dttr.maxDate < Time.now()
@@ -617,8 +627,9 @@ public boolean cancel() {
* Skip renewing token if the renewer of the token is set to ""
* Caller is expected to have examined that token.isManaged() returns
* true before calling this method.
+ * if identifier is not null, get the max expired time from token identifier.
*/
- private boolean skipTokenRenewal(Token> token)
+ private boolean skipTokenRenewalAndUpdateExpiredTime(Token> token, AtomicLong expiredTime)
throws IOException {
@SuppressWarnings("unchecked")
@@ -627,6 +638,7 @@ private boolean skipTokenRenewal(Token> token)
if (identifier == null) {
return false;
}
+ expiredTime.set(identifier.getMaxDate());
Text renewer = identifier.getRenewer();
return (renewer != null && renewer.toString().equals(""));
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
index 2856c271f39c4..ca47a80f7e658 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
@@ -68,6 +68,7 @@
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenRenewer;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.StringUtils;
@@ -183,7 +184,7 @@ public long renew(Token> t, Configuration conf) throws IOException {
}
@Override
- public void cancel(Token> t, Configuration conf) {
+ public void cancel(Token> t, Configuration conf) throws IOException {
cancelled = true;
if (t instanceof MyToken) {
MyToken token = (MyToken) t;
@@ -307,7 +308,12 @@ public MyToken(DelegationTokenIdentifier dtId1,
public boolean isCanceled() {return status.equals(CANCELED);}
- public void cancelToken() {this.status=CANCELED;}
+ public void cancelToken() throws IOException {
+ this.status=CANCELED;
+ DelegationTokenIdentifier tokenIdentifier = this.decodeIdentifier();
+ tokenIdentifier.setMaxDate(0);
+ this.setID(tokenIdentifier.getBytes());
+ }
@Override
public long renew(Configuration conf) throws IOException,
@@ -450,8 +456,15 @@ public void testDTRenewal () throws Exception {
token2 = dfs.getDelegationToken("user2");
token3 = dfs.getDelegationToken("user3");
+ // make token3 expire time short.
+ AbstractDelegationTokenIdentifier identifier = token3.decodeIdentifier();
+ identifier.setMaxDate(System.currentTimeMillis());
+ token3.setID(identifier.getBytes());
+
//to cause this one to be set for renew in 2 secs
- Renewer.tokenToRenewIn2Sec = token1;
+ AbstractDelegationTokenIdentifier identifier1 = token1.decodeIdentifier();
+ identifier1.setMaxDate(System.currentTimeMillis() + 2_000);
+ token1.setID(identifier1.getBytes());
LOG.info("token="+token1+" should be renewed for 2 secs");
// three distinct Namenodes
@@ -473,8 +486,8 @@ public void testDTRenewal () throws Exception {
new Configuration());
waitForEventsToGetProcessed(delegationTokenRenewer);
- // first 3 initial renewals + 1 real
- int numberOfExpectedRenewals = 3+1;
+ // token1 and token3 should be renewed.
+ int numberOfExpectedRenewals = 2;
int attempts = 10;
while(attempts-- > 0) {
@@ -489,7 +502,7 @@ public void testDTRenewal () throws Exception {
LOG.info("dfs=" + dfs.hashCode() +
";Counter = " + Renewer.counter + ";t="+ Renewer.lastRenewed);
- assertEquals("renew wasn't called as many times as expected(4):",
+ assertEquals("renew wasn't called as many times as expected(2):",
numberOfExpectedRenewals, Renewer.counter);
assertEquals("most recently renewed token mismatch", Renewer.lastRenewed,
token1);