Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.DelegationTokenIssuer;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.KMSUtil;
Expand Down Expand Up @@ -109,13 +110,13 @@ private LoadBalancingKMSClientProvider(URI uri,
// canonical service (credentials alias) will be the first underlying
// provider's service. must be deterministic before shuffle so multiple
// calls for a token do not obtain another unnecessary token.
canonicalService = new Text(providers[0].getCanonicalServiceName());
canonicalService = KMSClientProvider.getDtService(KMSUtil.getKeyProviderUri(conf));
}

// shuffle unless seed is 0 which is used by tests for determinism.
this.providers = (seed != 0) ? shuffle(providers) : providers;
for (KMSClientProvider provider : providers) {
provider.setClientTokenProvider(this);
provider.setClientTokenProvider(provider);
}
this.currentIdx = new AtomicInteger((int)(seed % providers.length));
int maxNumRetries = conf.getInt(CommonConfigurationKeysPublic.
Expand Down Expand Up @@ -571,4 +572,31 @@ private static KMSClientProvider[] shuffle(KMSClientProvider[] providers) {
Collections.shuffle(list);
return list.toArray(providers);
}

@Override
public DelegationTokenIssuer[] getAdditionalTokenIssuers() throws IOException {
KMSClientProvider[] providers = getProviders();
DelegationTokenIssuer[] result = new DelegationTokenIssuer[providers.length];
for (int i = 0; i < providers.length; ++i) {
final KMSClientProvider provider = providers[i];
result[i] = new DelegationTokenIssuer() {
@Override
public String getCanonicalServiceName() {
return provider.getCanonicalServiceName();
}

@Override
public Token<?> getDelegationToken(String renewer) {
LOG.debug("KMS provider at [{}] call getDelegationToken()", provider.getKMSUrl());
try {
return provider.getDelegationToken(renewer);
} catch (IOException ioe) {
LOG.warn("KMS provider at [{}] threw an IOException", provider.getKMSUrl(), ioe);
return null;
}
}
};
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.security.GeneralSecurityException;
import java.security.NoSuchAlgorithmException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
Expand All @@ -61,6 +62,7 @@
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -888,16 +890,17 @@ public void testClientRetriesNonIdempotentOpWithSocketTimeoutExceptionFails()
public void testTokenServiceCreationWithLegacyFormat() throws Exception {
Configuration conf = new Configuration();
// Create keyprovider with old token format (ip:port)
String confStrUri = "kms:/something";
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
"kms:/something");
confStrUri);
String authority = "host1:9600";
URI kmsUri = URI.create("kms://http@" + authority + "/kms/foo");
KeyProvider kp =
new KMSClientProvider.Factory().createProvider(kmsUri, conf);
assertTrue(kp instanceof LoadBalancingKMSClientProvider);
LoadBalancingKMSClientProvider lbkp = (LoadBalancingKMSClientProvider) kp;
assertEquals(1, lbkp.getProviders().length);
assertEquals(authority, lbkp.getCanonicalServiceName());
assertEquals(confStrUri, lbkp.getCanonicalServiceName());
for (KMSClientProvider provider : lbkp.getProviders()) {
assertEquals(authority, provider.getCanonicalServiceName());
}
Expand Down Expand Up @@ -935,9 +938,7 @@ private void testTokenSelectionWithConf(Configuration conf) throws Exception {

final URI kmsUri = URI.create(providerUriString);
// create a fake kms dt
final Token token = new Token();
token.setKind(TOKEN_KIND);
token.setService(new Text(providerUriString));
List<Token> tokenList = getMockTokenList(providerUriString);
// call getActualUgi() with the current user.
UserGroupInformation actualUgi =
ugi.doAs(new PrivilegedExceptionAction<UserGroupInformation>(){
Expand All @@ -948,7 +949,9 @@ public UserGroupInformation run() throws Exception {
final LoadBalancingKMSClientProvider lbkp =
(LoadBalancingKMSClientProvider) kp;
final Credentials creds = new Credentials();
creds.addToken(token.getService(), token);
for (Token t : tokenList) {
creds.addToken(t.getService(), t);
}
UserGroupInformation.getCurrentUser().addCredentials(creds);

KMSClientProvider[] providers = lbkp.getProviders();
Expand Down Expand Up @@ -983,4 +986,52 @@ public void testGetActualUGI() throws Exception {
// test client without hadoop.security.key.provider.path configured.
testTokenSelectionWithConf(conf);
}

public List<Token> getMockTokenList(String providerUriString) throws Exception {
final URI kmsUri = URI.create(providerUriString);
final List<Token> tokenList = new ArrayList<>();
String protocal = "";
Token token = new Token();
token.setKind(TOKEN_KIND);
token.setService(new Text(providerUriString));
tokenList.add(token);

String authority = kmsUri.getAuthority();
// check for ';' which delimits the backup hosts
if (Strings.isNullOrEmpty(authority)) {
throw new IOException(
"No valid authority in kms uri [" + kmsUri + "]");
}
// Check if port is present in authority
// In the current scheme, all hosts have to run on the same port
int port = -1;
String hostsPart = authority;
if (hostsPart.contains("@")) {
String[] tmps = authority.split("@");
if (tmps.length > 1) {
authority = tmps[1];
protocal = tmps[0];
}
}
if (authority.contains(":")) {
String[] t = authority.split(":");
try {
port = Integer.parseInt(t[1]);
} catch (Exception e) {
throw new IOException(
"Could not parse port in kms uri [" + kmsUri + "]");
}
hostsPart = t[0];
}
String[] hosts = hostsPart.split(";");
for (int i = 0; i < hosts.length; i++) {
token = new Token();
token.setKind(TOKEN_KIND);
URI tmpUri = new URI(kmsUri.getScheme(), protocal, hosts[i], port,
kmsUri.getPath(), null, null);
token.setService(new Text(tmpUri.toString()));
tokenList.add(token);
}
return tokenList;
}
}