From 839071367754338b83096e9d1f908f859e9d5493 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Wed, 19 Sep 2018 10:05:23 -0500 Subject: [PATCH 1/2] STORM-3230: Add in sync if key not found --- .../apache/storm/cluster/StormClusterStateImpl.java | 11 ++++++++++- .../auth/workertoken/WorkerTokenAuthorizer.java | 9 ++++++--- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java index 69d25f4c744..14688688986 100644 --- a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java +++ b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java @@ -840,7 +840,14 @@ public void disconnect() { @Override public PrivateWorkerKey getPrivateWorkerKey(WorkerTokenServiceType type, String topologyId, long keyVersion) { String path = ClusterUtils.secretKeysPath(type, topologyId, keyVersion); - return ClusterUtils.maybeDeserialize(stateStorage.get_data(path, false), PrivateWorkerKey.class); + byte[] data = stateStorage.get_data(path, false); + if (data == null) { + LOG.debug("Could not find entry at {} will sync to see if that fixes it", path); + //We didn't find it, but there are races, so we want to check again after a sync + stateStorage.sync_path(path); + data = stateStorage.get_data(path, false); + } + return ClusterUtils.maybeDeserialize(data, PrivateWorkerKey.class); } @Override @@ -879,6 +886,7 @@ public void removeExpiredPrivateWorkerKeys(String topologyId) { PrivateWorkerKey key = ClusterUtils.maybeDeserialize(stateStorage.get_data(fullPath, false), PrivateWorkerKey.class); if (Time.currentTimeMillis() > key.get_expirationTimeMillis()) { + LOG.debug("Removing expired worker key {}", fullPath); stateStorage.delete_node(fullPath); } } catch (RuntimeException e) { @@ -903,6 +911,7 @@ public void removeAllPrivateWorkerKeys(String topologyId) { for (WorkerTokenServiceType type : WorkerTokenServiceType.values()) { String path = ClusterUtils.secretKeysPath(type, topologyId); try { + LOG.debug("Removing worker keys under {}", path); stateStorage.delete_node(path); } catch (RuntimeException e) { //This should never happen because only the primary nimbus is active, but just in case diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java b/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java index 7144c79a2f7..3919cb7ec53 100644 --- a/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java +++ b/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java @@ -95,9 +95,12 @@ byte[] getSignedPasswordFor(byte[] user, WorkerTokenInfo deser) { throw new IllegalArgumentException("Token is not valid, token has expired."); } - PrivateWorkerKey key = keyCache.getUnchecked(deser); - if (key == null) { - throw new IllegalArgumentException("Token is not valid, private key not found."); + PrivateWorkerKey key; + try { + key = keyCache.getUnchecked(deser); + } catch (CacheLoader.InvalidCacheLoadException e) { + //This happens when the cache has a null returned to it. + throw new IllegalArgumentException("Token is not valid, private key not found.", e); } if (key.is_set_expirationTimeMillis() && key.get_expirationTimeMillis() <= Time.currentTimeMillis()) { From f89349ca83662d088f02a3ea1382109982d8cf06 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Wed, 19 Sep 2018 13:54:12 -0500 Subject: [PATCH 2/2] STORM-3230: Addressed review comments --- .../jvm/org/apache/storm/cluster/StormClusterStateImpl.java | 6 +++--- .../security/auth/workertoken/WorkerTokenAuthorizer.java | 3 ++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java index 14688688986..644f465ba7a 100644 --- a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java +++ b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java @@ -856,7 +856,7 @@ public void addPrivateWorkerKey(WorkerTokenServiceType type, String topologyId, stateStorage.mkdirs(ClusterUtils.SECRET_KEYS_SUBTREE, defaultAcls); List secretAcls = context.getZkSecretAcls(type); String path = ClusterUtils.secretKeysPath(type, topologyId, keyVersion); - LOG.debug("Storing private key for {} connecting to a {} at {} with ACL {}\n\n", topologyId, type, path, secretAcls); + LOG.info("Storing private key for {} connecting to a {} at {} with ACL {}", topologyId, type, path, secretAcls); stateStorage.set_data(path, Utils.serialize(key), secretAcls); } @@ -886,7 +886,7 @@ public void removeExpiredPrivateWorkerKeys(String topologyId) { PrivateWorkerKey key = ClusterUtils.maybeDeserialize(stateStorage.get_data(fullPath, false), PrivateWorkerKey.class); if (Time.currentTimeMillis() > key.get_expirationTimeMillis()) { - LOG.debug("Removing expired worker key {}", fullPath); + LOG.info("Removing expired worker key {}", fullPath); stateStorage.delete_node(fullPath); } } catch (RuntimeException e) { @@ -911,7 +911,7 @@ public void removeAllPrivateWorkerKeys(String topologyId) { for (WorkerTokenServiceType type : WorkerTokenServiceType.values()) { String path = ClusterUtils.secretKeysPath(type, topologyId); try { - LOG.debug("Removing worker keys under {}", path); + LOG.info("Removing worker keys under {}", path); stateStorage.delete_node(path); } catch (RuntimeException e) { //This should never happen because only the primary nimbus is active, but just in case diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java b/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java index 3919cb7ec53..fcc5eebc8e3 100644 --- a/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java +++ b/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java @@ -99,7 +99,8 @@ byte[] getSignedPasswordFor(byte[] user, WorkerTokenInfo deser) { try { key = keyCache.getUnchecked(deser); } catch (CacheLoader.InvalidCacheLoadException e) { - //This happens when the cache has a null returned to it. + //This happens when the key is not found, the cache loader returns a null and this exception is thrown. + // because the cache cannot store a null. throw new IllegalArgumentException("Token is not valid, private key not found.", e); }