From 24a28eaadd17d2594eb88e098455d68aa74380a7 Mon Sep 17 00:00:00 2001 From: Kishor Patil Date: Wed, 26 Sep 2018 04:26:54 -0700 Subject: [PATCH] Fix WorkerToken renewal criteria and refactor --- .../apache/storm/daemon/nimbus/Nimbus.java | 26 ++----------- .../auth/workertoken/WorkerTokenManager.java | 37 +++++++++++++++++-- .../auth/workertoken/WorkerTokenTest.java | 16 +++----- 3 files changed, 41 insertions(+), 38 deletions(-) diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java index d66c0ef859c..732ad9fc203 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java @@ -2949,30 +2949,10 @@ public void submitTopology(String name, String uploadedJarLocation, String jsonC private void upsertWorkerTokensInCreds(Map creds, String user, String topologyId) { if (workerTokenManager != null) { - final long renewIfExpirationBefore = workerTokenManager.getMaxExpirationTimeForRenewal(); - for (WorkerTokenServiceType type : WorkerTokenServiceType.values()) { - boolean shouldAdd = true; - WorkerToken oldToken = ClientAuthUtils.readWorkerToken(creds, type); - if (oldToken != null) { - try { - WorkerTokenInfo info = ClientAuthUtils.getWorkerTokenInfo(oldToken); - if (info.is_set_expirationTimeMillis() || info.get_expirationTimeMillis() > renewIfExpirationBefore) { - //Found an existing token and it is not going to expire any time soon, so don't bother adding in a new - // token. - shouldAdd = false; - } - } catch (Exception e) { - //The old token could not be deserialized. This is bad, but we are going to replace it anyways so just keep going. - LOG.error("Could not deserialize token info", e); - } - } - if (shouldAdd) { - ClientAuthUtils.setWorkerToken(creds, workerTokenManager.createOrUpdateTokenFor(type, user, topologyId)); - } - } - //Remove any expired keys after possibly inserting new ones. - stormClusterState.removeExpiredPrivateWorkerKeys(topologyId); + workerTokenManager.upsertWorkerTokensInCredsForTopo(creds, user, topologyId); } + //Remove any expired keys after possibly inserting new ones. + stormClusterState.removeExpiredPrivateWorkerKeys(topologyId); } @Override diff --git a/storm-server/src/main/java/org/apache/storm/security/auth/workertoken/WorkerTokenManager.java b/storm-server/src/main/java/org/apache/storm/security/auth/workertoken/WorkerTokenManager.java index ce60f614d52..63b995c8a56 100644 --- a/storm-server/src/main/java/org/apache/storm/security/auth/workertoken/WorkerTokenManager.java +++ b/storm-server/src/main/java/org/apache/storm/security/auth/workertoken/WorkerTokenManager.java @@ -18,8 +18,10 @@ package org.apache.storm.security.auth.workertoken; +import com.google.common.annotations.VisibleForTesting; import java.nio.ByteBuffer; import java.security.NoSuchAlgorithmException; +import java.util.Arrays; import java.util.Map; import java.util.concurrent.TimeUnit; import javax.crypto.KeyGenerator; @@ -52,6 +54,7 @@ public class WorkerTokenManager { private final KeyGenerator keyGen; private final IStormClusterState state; private final long tokenLifetimeMillis; + /** * Constructor. This assumes that state can store the tokens securely, and that they should be enabled at all. Please use * ClientAuthUtils.areWorkerTokensEnabledServer to validate this first. @@ -118,11 +121,37 @@ public WorkerToken createOrUpdateTokenFor(WorkerTokenServiceType serviceType, St } /** - * Get the maximum expiration token time that should be renewed. + * Create or renew WorkerToken credentials for a topology. * - * @return any token with an expiration less than the returned value should be renewed. + * @param creds the map of credentials for. + * @param user the user the credentials are for + * @param topologyId the topology the credentials are for */ - public long getMaxExpirationTimeForRenewal() { - return Time.currentTimeMillis() + (tokenLifetimeMillis / 2); + public void upsertWorkerTokensInCredsForTopo(Map creds, String user, String topologyId) { + Arrays.stream(WorkerTokenServiceType.values()).filter(type -> shouldRenewWorkerToken(creds, type)) + .forEach(type -> {ClientAuthUtils.setWorkerToken(creds, createOrUpdateTokenFor(type, user, topologyId)); + }); } + + @VisibleForTesting + public boolean shouldRenewWorkerToken(Map creds, WorkerTokenServiceType type) { + boolean shouldAdd = true; + WorkerToken oldToken = ClientAuthUtils.readWorkerToken(creds, type); + if (oldToken != null) { + try { + WorkerTokenInfo info = ClientAuthUtils.getWorkerTokenInfo(oldToken); + if (!info.is_set_expirationTimeMillis() || info.get_expirationTimeMillis() - Time.currentTimeMillis() > (tokenLifetimeMillis + / 2)) { + //Found an existing token and it is not going to expire any time soon, so don't bother adding in a new + // token. + shouldAdd = false; + } + } catch (Exception e) { + //The old token could not be deserialized. This is bad, but we are going to replace it anyways so just keep going. + LOG.error("Could not deserialize token info", e); + } + } + return shouldAdd; + } + } diff --git a/storm-server/src/test/java/org/apache/storm/security/auth/workertoken/WorkerTokenTest.java b/storm-server/src/test/java/org/apache/storm/security/auth/workertoken/WorkerTokenTest.java index 91635032019..3f1fb045162 100644 --- a/storm-server/src/test/java/org/apache/storm/security/auth/workertoken/WorkerTokenTest.java +++ b/storm-server/src/test/java/org/apache/storm/security/auth/workertoken/WorkerTokenTest.java @@ -143,17 +143,11 @@ public void testExpiration() { } catch (IllegalArgumentException ia) { //What we want... } - } - } - - @Test - public void testRenewalTimeDefault() { - try (Time.SimulatedTime sim = new Time.SimulatedTime()) { - IStormClusterState mockState = mock(IStormClusterState.class); - Map conf = new HashMap<>(); - WorkerTokenManager wtm = new WorkerTokenManager(conf, mockState); - assertEquals(ONE_DAY_MILLIS/2, wtm.getMaxExpirationTimeForRenewal()); + //Verify if WorkerTokenManager recognizes the expired WorkerToken. + Map creds = new HashMap<>(); + ClientAuthUtils.setWorkerToken(creds, wt); + assertTrue("Expired WorkerToken should be eligible for renewal", wtm.shouldRenewWorkerToken(creds, type)); } } -} \ No newline at end of file +}