Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,10 @@ public static boolean isAclEnabled(Configuration conf) {
RM_PREFIX + "delegation-token-renewer.thread-retry-max-attempts";
public static final int DEFAULT_RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS =
10;
public static final String RM_DT_RENEWER_THREAD_CLOCK_SKEW_TIME =
RM_PREFIX + "delegation-token-renewer.thread-clock-skew-time";
public static final long DEFAULT_RM_DT_RENEWER_THREAD_CLOCK_SKEW_TIME =
TimeUnit.MINUTES.toMillis(10); // 10 minutes

public static final String RECOVERY_ENABLED = RM_PREFIX + "recovery.enabled";
public static final boolean DEFAULT_RM_RECOVERY_ENABLED = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1093,6 +1093,14 @@
<value>10</value>
</property>

<property>
<description>
Extra skew time to make sure token expire time is not affected by local machine clock
</description>
<name>yarn.resourcemanager.delegation-token-renewer.thread-clock-skew-time</name>
<value>10</value>
</property>

<property>
<description>
Time interval between each RM DelegationTokenRenewer thread retry attempt
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

Expand Down Expand Up @@ -123,6 +124,7 @@ public class DelegationTokenRenewer extends AbstractService {
private long tokenRenewerThreadTimeout;
private long tokenRenewerThreadRetryInterval;
private int tokenRenewerThreadRetryMaxAttempts;
private long clockSkewExtraTime;
private final Map<DelegationTokenRenewerEvent, Future<?>> futures =
new ConcurrentHashMap<>();
private boolean delegationTokenRenewerPoolTrackerFlag = true;
Expand Down Expand Up @@ -165,6 +167,8 @@ protected void serviceInit(Configuration conf) throws Exception {
tokenRenewerThreadRetryMaxAttempts =
conf.getInt(YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS);
clockSkewExtraTime = conf.getLong(YarnConfiguration.RM_DT_RENEWER_THREAD_CLOCK_SKEW_TIME,
YarnConfiguration.DEFAULT_RM_DT_RENEWER_THREAD_CLOCK_SKEW_TIME);
setLocalSecretManagerAndServiceAddr();
renewerService = createNewThreadPoolService(conf);
pendingEventQueue = new LinkedBlockingQueue<DelegationTokenRenewerEvent>();
Expand Down Expand Up @@ -491,12 +495,13 @@ private void handleAppSubmitEvent(AbstractDelegationTokenRenewerAppEvent evt)
Set<DelegationTokenToRenew> tokenList = new HashSet<DelegationTokenToRenew>();
boolean hasHdfsToken = false;
for (Token<?> token : tokens) {
AtomicLong tokenExpiredTime = new AtomicLong(now);
if (token.isManaged()) {
if (token.getKind().equals(HDFS_DELEGATION_KIND)) {
LOG.info(applicationId + " found existing hdfs token " + token);
hasHdfsToken = true;
}
if (skipTokenRenewal(token)) {
if (skipTokenRenewalAndUpdateExpiredTime(token, tokenExpiredTime)) {
continue;
}

Expand All @@ -523,9 +528,14 @@ private void handleAppSubmitEvent(AbstractDelegationTokenRenewerAppEvent evt)
tokenConf = getConfig();
}
dttr = new DelegationTokenToRenew(Arrays.asList(applicationId), token,
tokenConf, now, shouldCancelAtEnd, evt.getUser());
tokenConf, tokenExpiredTime.get(), shouldCancelAtEnd, evt.getUser());

try {
renewToken(dttr);
// if expire date is not greater than now, renew token.
// add extra time in case of clock skew
if (tokenExpiredTime.get() <= now + clockSkewExtraTime) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I didn't get why add clock skew condition check here. I think we should file another JIRA to fix if it is issue indeed.

renewToken(dttr);
}
} catch (IOException ioe) {
if (ioe instanceof SecretManager.InvalidToken
&& dttr.maxDate < Time.now()
Expand Down Expand Up @@ -617,8 +627,9 @@ public boolean cancel() {
* Skip renewing token if the renewer of the token is set to ""
* Caller is expected to have examined that token.isManaged() returns
* true before calling this method.
* if identifier is not null, get the max expired time from token identifier.
*/
private boolean skipTokenRenewal(Token<?> token)
private boolean skipTokenRenewalAndUpdateExpiredTime(Token<?> token, AtomicLong expiredTime)
throws IOException {

@SuppressWarnings("unchecked")
Expand All @@ -627,6 +638,7 @@ private boolean skipTokenRenewal(Token<?> token)
if (identifier == null) {
return false;
}
expiredTime.set(identifier.getMaxDate());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we just to reset the expireTime only when the result of skipTokenRenewal is false?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be identifier.getIssueDate()

Text renewer = identifier.getRenewer();
return (renewer != null && renewer.toString().equals(""));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just suggest to check if delegation token need to renew while app is just submitted.

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenRenewer;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.StringUtils;
Expand Down Expand Up @@ -183,7 +184,7 @@ public long renew(Token<?> t, Configuration conf) throws IOException {
}

@Override
public void cancel(Token<?> t, Configuration conf) {
public void cancel(Token<?> t, Configuration conf) throws IOException {
cancelled = true;
if (t instanceof MyToken) {
MyToken token = (MyToken) t;
Expand Down Expand Up @@ -307,7 +308,12 @@ public MyToken(DelegationTokenIdentifier dtId1,

public boolean isCanceled() {return status.equals(CANCELED);}

public void cancelToken() {this.status=CANCELED;}
public void cancelToken() throws IOException {
this.status=CANCELED;
DelegationTokenIdentifier tokenIdentifier = this.decodeIdentifier();
tokenIdentifier.setMaxDate(0);
this.setID(tokenIdentifier.getBytes());
}

@Override
public long renew(Configuration conf) throws IOException,
Expand Down Expand Up @@ -450,8 +456,15 @@ public void testDTRenewal () throws Exception {
token2 = dfs.getDelegationToken("user2");
token3 = dfs.getDelegationToken("user3");

// make token3 expire time short.
AbstractDelegationTokenIdentifier identifier = token3.decodeIdentifier();
identifier.setMaxDate(System.currentTimeMillis());
token3.setID(identifier.getBytes());

//to cause this one to be set for renew in 2 secs
Renewer.tokenToRenewIn2Sec = token1;
AbstractDelegationTokenIdentifier identifier1 = token1.decodeIdentifier();
identifier1.setMaxDate(System.currentTimeMillis() + 2_000);
token1.setID(identifier1.getBytes());
LOG.info("token="+token1+" should be renewed for 2 secs");

// three distinct Namenodes
Expand All @@ -473,8 +486,8 @@ public void testDTRenewal () throws Exception {
new Configuration());
waitForEventsToGetProcessed(delegationTokenRenewer);

// first 3 initial renewals + 1 real
int numberOfExpectedRenewals = 3+1;
// token1 and token3 should be renewed.
int numberOfExpectedRenewals = 2;

int attempts = 10;
while(attempts-- > 0) {
Expand All @@ -489,7 +502,7 @@ public void testDTRenewal () throws Exception {

LOG.info("dfs=" + dfs.hashCode() +
";Counter = " + Renewer.counter + ";t="+ Renewer.lastRenewed);
assertEquals("renew wasn't called as many times as expected(4):",
assertEquals("renew wasn't called as many times as expected(2):",
numberOfExpectedRenewals, Renewer.counter);
assertEquals("most recently renewed token mismatch", Renewer.lastRenewed,
token1);
Expand Down