Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@

@InterfaceAudience.Public
@InterfaceStability.Evolving
public abstract
class AbstractDelegationTokenSecretManager<TokenIdent
extends AbstractDelegationTokenIdentifier>
public abstract
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a little worried that changes in this class may affect many sub classes
image

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, this is a fairly fundamental class in Hadoop in general.
I would propose to do a separate JIRA to clean this class up.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think any of these changes will affect any other functionality in any sub class - I verified the same .

And the changes are either very generic like encapsulation, exception handling, concurrency handling, etc - let me know if this still warrants a seperate JIRA.

class AbstractDelegationTokenSecretManager<TokenIdent
extends AbstractDelegationTokenIdentifier>
extends SecretManager<TokenIdent> {
private static final Logger LOG = LoggerFactory
.getLogger(AbstractDelegationTokenSecretManager.class);
Expand All @@ -84,11 +84,11 @@ private String formatTokenId(TokenIdent id) {
return "(" + id + ")";
}

/**
* Cache of currently valid tokens, mapping from DelegationTokenIdentifier
/**
* Cache of currently valid tokens, mapping from DelegationTokenIdentifier
* to DelegationTokenInformation. Protected by this object lock.
*/
protected final Map<TokenIdent, DelegationTokenInformation> currentTokens
protected final Map<TokenIdent, DelegationTokenInformation> currentTokens
= new ConcurrentHashMap<>();

/**
Expand All @@ -102,26 +102,26 @@ private String formatTokenId(TokenIdent id) {
* Protected by this object lock.
*/
protected int delegationTokenSequenceNumber = 0;

/**
* Access to allKeys is protected by this object lock
*/
protected final Map<Integer, DelegationKey> allKeys
protected final Map<Integer, DelegationKey> allKeys
= new ConcurrentHashMap<>();

/**
* Access to currentId is protected by this object lock.
*/
protected int currentId = 0;
/**
* Access to currentKey is protected by this object lock
*/
private DelegationKey currentKey;
private long keyUpdateInterval;
private long tokenMaxLifetime;
private long tokenRemoverScanInterval;
private long tokenRenewInterval;
private volatile DelegationKey currentKey;
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have set it to volatile to make sure currentKey changes are visible across threads


private final long keyUpdateInterval;
private final long tokenMaxLifetime;
private final long tokenRemoverScanInterval;
private final long tokenRenewInterval;
/**
* Whether to store a token's tracking ID in its TokenInformation.
* Can be overridden by a subclass.
Expand Down Expand Up @@ -170,7 +170,7 @@ public void startThreads() throws IOException {
tokenRemoverThread.start();
}
}

/**
* Reset all data structures and mutable state.
*/
Expand All @@ -190,14 +190,14 @@ public long getCurrentTokensSize() {
return currentTokens.size();
}

/**
* Add a previously used master key to cache (when NN restarts),
/**
* Add a previously used master key to cache (when NN restarts),
* should be called before activate().
*
* @param key delegation key.
* @throws IOException raised on errors performing I/O.
*/
public synchronized void addKey(DelegationKey key) throws IOException {
protected synchronized void addKey(DelegationKey key) throws IOException {
if (running) // a safety check
throw new IOException("Can't add delegation key to a running SecretManager.");
if (key.getKeyId() > getCurrentKeyId()) {
Expand Down Expand Up @@ -237,7 +237,7 @@ protected void storeNewToken(TokenIdent ident, long renewDate) throws IOExceptio

// RM
protected void removeStoredToken(TokenIdent ident) throws IOException {

return;
}
// RM
protected void updateStoredToken(TokenIdent ident, long renewDate) throws IOException {
Expand Down Expand Up @@ -310,8 +310,9 @@ protected synchronized void setDelegationTokenSeqNum(int seqNum) {
*
* @param keyId keyId.
* @return DelegationKey.
* @throws IOException raised on errors performing I/O.
*/
protected DelegationKey getDelegationKey(int keyId) {
protected DelegationKey getDelegationKey(int keyId) throws IOException {
return allKeys.get(keyId);
}

Expand Down Expand Up @@ -344,8 +345,9 @@ protected void updateDelegationKey(DelegationKey key) throws IOException {
*
* @param ident ident.
* @return DelegationTokenInformation.
* @throws IOException raised on errors performing I/O.
*/
protected DelegationTokenInformation getTokenInfo(TokenIdent ident) {
protected DelegationTokenInformation getTokenInfo(TokenIdent ident) throws IOException {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is not just a KV lookup but actually compares all attributes of TokenIdent (maxDate, masterKeyId, owner, etc) - this is important because if we provide just a KV lookup, any user can create a TokenIdent object with a random key (sequence no) & get authenticated (RM just checks for presence of token for authentication).

Corresponding change needs to be done in stateless secret manager

return currentTokens.get(ident);
}

Expand Down Expand Up @@ -378,6 +380,15 @@ protected void updateToken(TokenIdent ident,
updateStoredToken(ident, tokenInfo.getRenewDate());
}

protected void removeToken(TokenIdent ident) throws IOException {
DelegationTokenInformation info = currentTokens.remove(ident);
if (info == null) {
throw new InvalidToken("Token not found " + formatTokenId(ident));
}
removeTokenForOwnerStats(ident);
removeStoredToken(ident);
}

/**
* This method is intended to be used for recovering persisted delegation
* tokens. Tokens that have an unknown <code>DelegationKey</code> are
Expand Down Expand Up @@ -419,12 +430,13 @@ public synchronized void addPersistedDelegationToken(
}
}

/**
* Update the current master key
* This is called once by startThreads before tokenRemoverThread is created,
/**
* Update the current master key
* This is called once by startThreads before tokenRemoverThread is created,
* and only by tokenRemoverThread afterwards.
* @throws IOException raised on errors performing I/O.
*/
private void updateCurrentKey() throws IOException {
protected void updateCurrentKey() throws IOException {
LOG.info("Updating the current master key for generating delegation tokens");
/* Create a new currentKey with an estimated expiry date. */
int newCurrentId;
Expand All @@ -434,16 +446,17 @@ private void updateCurrentKey() throws IOException {
DelegationKey newKey = new DelegationKey(newCurrentId, System
.currentTimeMillis()
+ keyUpdateInterval + tokenMaxLifetime, generateSecret());
//Log must be invoked outside the lock on 'this'
logUpdateMasterKey(newKey);
synchronized (this) {
storeDelegationKey(newKey);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I understand that we shouldn't update currentKey when some other thread is using it, but does the storeDelegationKey(newKey) also need to be inside the synchronized block?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually even updating currentKey needn't be in synchronized block because currentKey is volatile so the update will be visible.

I had originally removed synchronized here but added it back to avoid the change in this PR and to make it seperately.

wanted to keep the PR focused specifically on whats required for reliable stateless setup - in this particular case, trying to store in DB first before updating currentKey

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't updating currentKey need to be synchronized? Because if some other thread is using this in createPassword() and in between lines identifier.setMasterKeyId(currentKey.getKeyId()) and createPassword(identifier.getBytes(), currentKey.getKey()); the currentKey changes, won't that result in inconsistencies?

// Set current key only if storeDelegationKey is successful (doesn't throw an exception)
currentKey = newKey;
storeDelegationKey(currentKey);
}
//Log must be invoked outside the lock on 'this'
logUpdateMasterKey(newKey);
}
/**
* Update the current master key for generating delegation tokens

/**
* Update the current master key for generating delegation tokens
* It should be called only by tokenRemoverThread.
* @throws IOException raised on errors performing I/O.
*/
Expand Down Expand Up @@ -476,11 +489,19 @@ private synchronized void removeExpiredKeys() {
}
}
}

@Override
protected synchronized byte[] createPassword(TokenIdent identifier) {
int sequenceNum;
long now = Time.now();

// If creating new keys repeatedly fails due to state store issues, key can be expired but not yet rolled over.
// In this case, fail new token creation
if (currentKey.getExpiryDate() < now) {
String err = "Master key is expired, so unable to generate new tokens";
LOG.error(err);
throw new RuntimeException(err);
}
sequenceNum = incrementDelegationTokenSeqNum();
identifier.setIssueDate(now);
identifier.setMaxDate(now + tokenMaxLifetime);
Expand All @@ -496,6 +517,9 @@ protected synchronized byte[] createPassword(TokenIdent identifier) {
} catch (IOException ioe) {
LOG.error("Could not store token " + formatTokenId(identifier) + "!!",
ioe);
// TODO - Throw IOException & handle in all clients appropriately
// Currently wrapping this as runtime exception to preserve createPassword contracts
throw new RuntimeException(ioe);
}
return password;
}
Expand All @@ -504,17 +528,23 @@ protected synchronized byte[] createPassword(TokenIdent identifier) {

/**
* Find the DelegationTokenInformation for the given token id, and verify that
* if the token is expired. Note that this method should be called with
* acquiring the secret manager's monitor.
* if the token is expired.
*
* @param identifier identifier.
* @throws InvalidToken invalid token exception.
* @return DelegationTokenInformation.
*/
protected DelegationTokenInformation checkToken(TokenIdent identifier)
throws InvalidToken {
assert Thread.holdsLock(this);
DelegationTokenInformation info = getTokenInfo(identifier);
DelegationTokenInformation info = null;
try {
info = getTokenInfo(identifier);
} catch (IOException e) {
e.printStackTrace();
// Converting IOException to InvalidTokenException as that's the current contract with checkToken
// TODO - Change this so that clients can handle accordingly
throw new InvalidToken("Token for real user: " + identifier.getRealUser() + ", can't be found in database");
}
String err;
if (info == null) {
err = "Token for real user: " + identifier.getRealUser() + ", can't be found in cache";
Expand All @@ -531,7 +561,7 @@ protected DelegationTokenInformation checkToken(TokenIdent identifier)
}
return info;
}

@Override
public synchronized byte[] retrievePassword(TokenIdent identifier)
throws InvalidToken {
Expand All @@ -546,7 +576,12 @@ protected String getTrackingIdIfEnabled(TokenIdent ident) {
}

public synchronized String getTokenTrackingId(TokenIdent identifier) {
DelegationTokenInformation info = getTokenInfo(identifier);
DelegationTokenInformation info = null;
try {
info = getTokenInfo(identifier);
} catch (IOException e) {
e.printStackTrace();
}
if (info == null) {
return null;
}
Expand All @@ -567,7 +602,7 @@ public synchronized void verifyToken(TokenIdent identifier, byte[] password)
+ " is invalid, password doesn't match");
}
}

/**
* Renew a delegation token.
* @param token the token to renew
Expand Down Expand Up @@ -628,7 +663,7 @@ public synchronized long renewToken(Token<TokenIdent> token,
METRICS.trackUpdateToken(() -> updateToken(id, info));
return renewTime;
}

/**
* Cancel a token by removing it from cache.
*
Expand All @@ -646,7 +681,7 @@ public synchronized TokenIdent cancelToken(Token<TokenIdent> token,
id.readFields(in);
LOG.info("Token cancellation requested for identifier: "
+ formatTokenId(id));

if (id.getUser() == null) {
throw new InvalidToken("Token with no owner " + formatTokenId(id));
}
Expand All @@ -660,17 +695,12 @@ public synchronized TokenIdent cancelToken(Token<TokenIdent> token,
throw new AccessControlException(canceller
+ " is not authorized to cancel the token " + formatTokenId(id));
}
DelegationTokenInformation info = currentTokens.remove(id);
if (info == null) {
throw new InvalidToken("Token not found " + formatTokenId(id));
}
METRICS.trackRemoveToken(() -> {
removeTokenForOwnerStats(id);
removeStoredToken(id);
removeToken(id);
});
return id;
}

/**
* Convert the byte[] to a secret key
* @param key the byte[] to create the secret key from
Expand All @@ -683,9 +713,9 @@ public static SecretKey createSecretKey(byte[] key) {
/** Class to encapsulate a token's renew date and password. */
@InterfaceStability.Evolving
public static class DelegationTokenInformation implements Writable {
long renewDate;
byte[] password;
String trackingId;
private long renewDate;
private byte[] password;
private String trackingId;

public DelegationTokenInformation() {
this(0, null);
Expand Down Expand Up @@ -744,7 +774,7 @@ public void readFields(DataInput in) throws IOException {
trackingId = WritableUtils.readString(in);
}
}

/** Remove expired delegation tokens from cache */
private void removeExpiredToken() throws IOException {
long now = Time.now();
Expand Down Expand Up @@ -779,7 +809,7 @@ public void stopThreads() {
if (LOG.isDebugEnabled())
LOG.debug("Stopping expired delegation token remover thread");
running = false;

if (tokenRemoverThread != null) {
synchronized (noInterruptsLock) {
tokenRemoverThread.interrupt();
Expand All @@ -792,15 +822,15 @@ public void stopThreads() {
}
}
}

/**
* is secretMgr running
* @return true if secret mgr is running
*/
public synchronized boolean isRunning() {
return running;
}

private class ExpiredTokenRemover extends Thread {
private long lastMasterKeyUpdate;
private long lastTokenCacheCleanup;
Expand Down Expand Up @@ -841,7 +871,7 @@ public void run() {
/**
* Decode the token identifier. The subclass can customize the way to decode
* the token identifier.
*
*
* @param token the token where to extract the identifier
* @return the delegation token identifier
* @throws IOException raised on errors performing I/O.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ protected void removeStoredToken(TokenIdent ident) throws IOException {
* null if it doesn't exist in the database.
*/
@Override
protected DelegationTokenInformation getTokenInfo(TokenIdent ident) {
protected DelegationTokenInformation getTokenInfo(TokenIdent ident) throws IOException {
// Look for token in local cache
DelegationTokenInformation tokenInfo = super.getTokenInfo(ident);

Expand Down Expand Up @@ -302,7 +302,7 @@ protected void removeStoredMasterKey(DelegationKey key) {
* if it doesn't exist in the database.
*/
@Override
protected DelegationKey getDelegationKey(int keyId) {
protected DelegationKey getDelegationKey(int keyId) throws IOException {
// Look for delegation key in local cache
DelegationKey delegationKey = super.getDelegationKey(keyId);

Expand Down
Loading