diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/LoadBalancingKMSClientProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/LoadBalancingKMSClientProvider.java index f9cc3f4524ff5..f3fd623e7ce0c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/LoadBalancingKMSClientProvider.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/LoadBalancingKMSClientProvider.java @@ -44,6 +44,7 @@ import org.apache.hadoop.io.retry.RetryPolicy.RetryAction; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.token.DelegationTokenIssuer; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.KMSUtil; @@ -109,13 +110,13 @@ private LoadBalancingKMSClientProvider(URI uri, // canonical service (credentials alias) will be the first underlying // provider's service. must be deterministic before shuffle so multiple // calls for a token do not obtain another unnecessary token. - canonicalService = new Text(providers[0].getCanonicalServiceName()); + canonicalService = KMSClientProvider.getDtService(KMSUtil.getKeyProviderUri(conf)); } // shuffle unless seed is 0 which is used by tests for determinism. this.providers = (seed != 0) ? shuffle(providers) : providers; for (KMSClientProvider provider : providers) { - provider.setClientTokenProvider(this); + provider.setClientTokenProvider(provider); } this.currentIdx = new AtomicInteger((int)(seed % providers.length)); int maxNumRetries = conf.getInt(CommonConfigurationKeysPublic. @@ -571,4 +572,31 @@ private static KMSClientProvider[] shuffle(KMSClientProvider[] providers) { Collections.shuffle(list); return list.toArray(providers); } + + @Override + public DelegationTokenIssuer[] getAdditionalTokenIssuers() throws IOException { + KMSClientProvider[] providers = getProviders(); + DelegationTokenIssuer[] result = new DelegationTokenIssuer[providers.length]; + for (int i = 0; i < providers.length; ++i) { + final KMSClientProvider provider = providers[i]; + result[i] = new DelegationTokenIssuer() { + @Override + public String getCanonicalServiceName() { + return provider.getCanonicalServiceName(); + } + + @Override + public Token getDelegationToken(String renewer) { + LOG.debug("KMS provider at [{}] call getDelegationToken()", provider.getKMSUrl()); + try { + return provider.getDelegationToken(renewer); + } catch (IOException ioe) { + LOG.warn("KMS provider at [{}] threw an IOException", provider.getKMSUrl(), ioe); + return null; + } + } + }; + } + return result; + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestLoadBalancingKMSClientProvider.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestLoadBalancingKMSClientProvider.java index 3bc96c3e2fce0..a0b4b1cee1133 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestLoadBalancingKMSClientProvider.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestLoadBalancingKMSClientProvider.java @@ -38,6 +38,7 @@ import java.security.GeneralSecurityException; import java.security.NoSuchAlgorithmException; import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -61,6 +62,7 @@ import org.apache.hadoop.security.authentication.client.AuthenticationException; import org.apache.hadoop.security.authorize.AuthorizationException; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.thirdparty.com.google.common.base.Strings; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; @@ -888,8 +890,9 @@ public void testClientRetriesNonIdempotentOpWithSocketTimeoutExceptionFails() public void testTokenServiceCreationWithLegacyFormat() throws Exception { Configuration conf = new Configuration(); // Create keyprovider with old token format (ip:port) + String confStrUri = "kms:/something"; conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH, - "kms:/something"); + confStrUri); String authority = "host1:9600"; URI kmsUri = URI.create("kms://http@" + authority + "/kms/foo"); KeyProvider kp = @@ -897,7 +900,7 @@ public void testTokenServiceCreationWithLegacyFormat() throws Exception { assertTrue(kp instanceof LoadBalancingKMSClientProvider); LoadBalancingKMSClientProvider lbkp = (LoadBalancingKMSClientProvider) kp; assertEquals(1, lbkp.getProviders().length); - assertEquals(authority, lbkp.getCanonicalServiceName()); + assertEquals(confStrUri, lbkp.getCanonicalServiceName()); for (KMSClientProvider provider : lbkp.getProviders()) { assertEquals(authority, provider.getCanonicalServiceName()); } @@ -935,9 +938,7 @@ private void testTokenSelectionWithConf(Configuration conf) throws Exception { final URI kmsUri = URI.create(providerUriString); // create a fake kms dt - final Token token = new Token(); - token.setKind(TOKEN_KIND); - token.setService(new Text(providerUriString)); + List tokenList = getMockTokenList(providerUriString); // call getActualUgi() with the current user. UserGroupInformation actualUgi = ugi.doAs(new PrivilegedExceptionAction(){ @@ -948,7 +949,9 @@ public UserGroupInformation run() throws Exception { final LoadBalancingKMSClientProvider lbkp = (LoadBalancingKMSClientProvider) kp; final Credentials creds = new Credentials(); - creds.addToken(token.getService(), token); + for (Token t : tokenList) { + creds.addToken(t.getService(), t); + } UserGroupInformation.getCurrentUser().addCredentials(creds); KMSClientProvider[] providers = lbkp.getProviders(); @@ -983,4 +986,52 @@ public void testGetActualUGI() throws Exception { // test client without hadoop.security.key.provider.path configured. testTokenSelectionWithConf(conf); } + + public List getMockTokenList(String providerUriString) throws Exception { + final URI kmsUri = URI.create(providerUriString); + final List tokenList = new ArrayList<>(); + String protocal = ""; + Token token = new Token(); + token.setKind(TOKEN_KIND); + token.setService(new Text(providerUriString)); + tokenList.add(token); + + String authority = kmsUri.getAuthority(); + // check for ';' which delimits the backup hosts + if (Strings.isNullOrEmpty(authority)) { + throw new IOException( + "No valid authority in kms uri [" + kmsUri + "]"); + } + // Check if port is present in authority + // In the current scheme, all hosts have to run on the same port + int port = -1; + String hostsPart = authority; + if (hostsPart.contains("@")) { + String[] tmps = authority.split("@"); + if (tmps.length > 1) { + authority = tmps[1]; + protocal = tmps[0]; + } + } + if (authority.contains(":")) { + String[] t = authority.split(":"); + try { + port = Integer.parseInt(t[1]); + } catch (Exception e) { + throw new IOException( + "Could not parse port in kms uri [" + kmsUri + "]"); + } + hostsPart = t[0]; + } + String[] hosts = hostsPart.split(";"); + for (int i = 0; i < hosts.length; i++) { + token = new Token(); + token.setKind(TOKEN_KIND); + URI tmpUri = new URI(kmsUri.getScheme(), protocal, hosts[i], port, + kmsUri.getPath(), null, null); + token.setService(new Text(tmpUri.toString())); + tokenList.add(token); + } + return tokenList; + } } \ No newline at end of file