diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index 49432aff11789..af2d995c3e513 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -50,6 +50,7 @@ import org.apache.hadoop.security.SaslRpcServer.AuthMethod; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; import org.apache.hadoop.util.ProtoUtil; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; @@ -613,6 +614,10 @@ private synchronized boolean shouldAuthenticateOverKrb() throws IOException { return false; } + private synchronized boolean shouldAuthenticateUsingDelegationTokens() throws IOException { + return UserGroupInformation.getCurrentUser().isFromDelegationToken(); + } + private synchronized AuthMethod setupSaslConnection(IpcStreams streams) throws IOException { // Do not use Client.conf here! We must use ConnectionId.conf, since the @@ -691,7 +696,7 @@ private synchronized void setupConnection( InetSocketAddress bindAddr = null; if (ticket != null && ticket.hasKerberosCredentials()) { KerberosInfo krbInfo = - remoteId.getProtocol().getAnnotation(KerberosInfo.class); + remoteId.getProtocol().getAnnotation(KerberosInfo.class); if (krbInfo != null) { String principal = ticket.getUserName(); String host = SecurityUtil.getHostFromPrincipal(principal); @@ -755,7 +760,7 @@ public Object run() throws IOException, InterruptedException { final short MAX_BACKOFF = 5000; closeConnection(); disposeSasl(); - if (shouldAuthenticateOverKrb()) { + if (shouldAuthenticateOverKrb() || shouldAuthenticateUsingDelegationTokens()) { if (currRetries < maxRetries) { if(LOG.isDebugEnabled()) { LOG.debug("Exception encountered while connecting to " @@ -766,6 +771,19 @@ public Object run() throws IOException, InterruptedException { UserGroupInformation.getLoginUser().reloginFromKeytab(); } else if (UserGroupInformation.isLoginTicketBased()) { UserGroupInformation.getLoginUser().reloginFromTicketCache(); + } else if (shouldAuthenticateUsingDelegationTokens()) { + UserGroupInformation currUser = UserGroupInformation.getCurrentUser(); + for (AbstractDelegationTokenIdentifier delegationToken: + currUser.getAllDelegationTokens(currUser.getCredentials())){ + LOG.debug("Delegation token for current user after SASL failure " + + "and before refresh ugi is {}", delegationToken.toString()); + } + currUser.reloginFromDelegationTokens(); + for (AbstractDelegationTokenIdentifier delegationToken: + currUser.getAllDelegationTokens(currUser.getCredentials())){ + LOG.debug("Delegation token for current user after SASL failure " + + "and after refresh ugi is {}", delegationToken.toString()); + } } // have granularity of milliseconds //we are sleeping with the Connection lock held but since this @@ -1609,6 +1627,26 @@ private Writable getRpcResponse(final Call call, final Connection connection, if (call.error != null) { if (call.error instanceof RemoteException) { + //We got a delegation token expired error and we want to retry to refresh it + //Since the delegation token's can be externally managed we want the fail + //call to be ignored and retried + Exception unwrapped = ((RemoteException)call.error).unwrapRemoteException( + org.apache.hadoop.security.token.SecretManager.InvalidToken.class); + UserGroupInformation currUser = UserGroupInformation.getCurrentUser(); + if(unwrapped instanceof org.apache.hadoop.security.token.SecretManager.InvalidToken && + currUser.isFromDelegationToken()) { + for (AbstractDelegationTokenIdentifier delegationToken: + currUser.getAllDelegationTokens(currUser.getCredentials())){ + LOG.debug("Delegation Token before refresh is {}", delegationToken.getTrackingId()); + } + currUser.reloginFromDelegationTokens(); + call.error = new RetriableException(unwrapped); + for (AbstractDelegationTokenIdentifier delegationToken: + currUser.getAllDelegationTokens(currUser.getCredentials())){ + LOG.debug("Delegation Token after refresh is {} {}", delegationToken.getTrackingId(), + delegationToken.toString()); + } + } call.error.fillInStackTrace(); throw call.error; } else { // local exception diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/Credentials.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/Credentials.java index e71bf6d40dd21..d6d38e5b2f54a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/Credentials.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/Credentials.java @@ -213,9 +213,8 @@ public Map getSecretKeyMap() { * @param conf * @throws IOException */ - public static Credentials readTokenStorageFile(Path filename, - Configuration conf) - throws IOException { + public static Credentials readTokenStorageFile(Path filename, Configuration conf) + throws IOException { FSDataInputStream in = null; Credentials credentials = new Credentials(); try { @@ -461,4 +460,21 @@ private void addAll(Credentials other, boolean overwrite) { } } } + + /** + * Update the token map to synchronize between HA pair servers + */ + public void synchTokens(Token token) { + for(Map.Entry> entry: tokenMap.entrySet()){ + LOG.debug("synching token.to_s"); + tokenMap.forEach((key, value) -> LOG.debug("Before: " + key + ":" + value)); + if (entry.getValue().getKind().equals(token.getKind())){ + LOG.debug("matched " + entry.getValue().getKind()); + Token clone = new Token<>(token.getIdentifier(), + token.getPassword(), token.getKind(), entry.getValue().getService()); + tokenMap.put(entry.getKey(), clone); + } + tokenMap.forEach((key, value) -> LOG.debug("After: " + key + ":" + value)); + } + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/DelegationTokenUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/DelegationTokenUtil.java new file mode 100644 index 0000000000000..c83be8657952e --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/DelegationTokenUtil.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.security; + +import org.apache.hadoop.conf.Configuration; + +import java.io.File; +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DelegationTokenUtil { + public static final String HADOOP_TOKEN_FILE_LOCATION = + "HADOOP_TOKEN_FILE_LOCATION"; + + static final Logger LOG = LoggerFactory.getLogger( + DelegationTokenUtil.class); + + private DelegationTokenUtil() { + } + + public static synchronized Credentials readDelegationTokens(Configuration conf) + throws IOException { + String fileLocation = System.getenv(HADOOP_TOKEN_FILE_LOCATION); + if (fileLocation != null) { + // Load the token storage file and put all of the tokens into the + // user. Don't use the FileSystem API for reading since it has a lock + // cycle (HADOOP-9212). + File source = new File(fileLocation); + Credentials creds = Credentials.readTokenStorageFile( + source, conf); + LOG.info("Loaded {} tokens", creds.numberOfTokens()); + return creds; + } + return null; + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java index a3b1cbd14d822..062a7d654e1f0 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java @@ -87,6 +87,7 @@ import org.apache.hadoop.security.authentication.util.KerberosUtil; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Time; @@ -186,12 +187,25 @@ private T getCanonicalUser(Class cls) { return null; } + private void addDelegationTokensToSubject() throws LoginException { + try { + Credentials creds = DelegationTokenUtil.readDelegationTokens(conf); + if (creds != null) { + subject.getPrivateCredentials().add(creds); + } + } catch (IOException e) { + throw new LoginException("Failed to load token file from " + + HADOOP_TOKEN_FILE_LOCATION); + } + } + @Override public boolean commit() throws LoginException { LOG.debug("hadoop login commit"); // if we already have a user, we are done. if (!subject.getPrincipals(User.class).isEmpty()) { LOG.debug("Using existing subject: {}", subject.getPrincipals()); + addDelegationTokensToSubject(); return true; } Principal user = getCanonicalUser(KerberosPrincipal.class); @@ -229,6 +243,7 @@ public boolean commit() throws LoginException { LOG.debug("User entry: \"{}\"", userEntry); subject.getPrincipals().add(userEntry); + addDelegationTokensToSubject(); return true; } throw new LoginException("Failed to find user in name " + subject); @@ -740,11 +755,12 @@ UserGroupInformation createLoginUser(Subject subject) throws IOException { LOG.debug("Reading credentials from location {}", tokenFile.getCanonicalPath()); if (tokenFile.exists() && tokenFile.isFile()) { - Credentials cred = Credentials.readTokenStorageFile( - tokenFile, conf); - LOG.debug("Loaded {} tokens from {}", cred.numberOfTokens(), - tokenFile.getCanonicalPath()); - loginUser.addCredentials(cred); + Credentials cred = DelegationTokenUtil.readDelegationTokens(conf); + if (cred != null ) { + LOG.debug("Loaded {} tokens from {}", cred.numberOfTokens(), + tokenFile.getCanonicalPath()); + loginUser.addCredentials(cred); + } } else { LOG.info("Token file {} does not exist", tokenFile.getCanonicalPath()); @@ -851,6 +867,59 @@ private long getRefreshTime(KerberosTicket tgt) { return start + (long) ((end - start) * TICKET_RENEW_WINDOW); } + /** + * Re-Login a user in from delegation tokens + * method assumes that login had happened already. + * The Subject field of this UserGroupInformation object is updated to have + * the new credentials. + * @throws IOException + * @throws IOException on a failure + */ + @InterfaceAudience.Public + @InterfaceStability.Evolving + public synchronized void reloginFromDelegationTokens() throws IOException { + + if (!isFromDelegationToken()) { + throw new IOException("User has not logged on using delegation token"); + } + + synchronized(UserGroupInformation.class){ + Credentials cred = DelegationTokenUtil.readDelegationTokens(conf); + if (cred != null ) { + addCredentials(cred); + } + + for (Token token: cred.getAllTokens()) { + for ( Credentials currentCreds : subject.getPrivateCredentials(Credentials.class)) { + currentCreds.synchTokens(token); + } + } + } + } + + public boolean isFromDelegationToken () { + return !isFromKeytab() && getTGT() == null && !subject.getPrivateCredentials(Credentials.class).isEmpty(); + } + + public Collection getAllDelegationTokens(Credentials cred) { + cred.getAllTokens(); + List delegToks = new ArrayList<>(); + + for(Token t: getCredentials().getAllTokens()) { + try { + TokenIdentifier identifier = t.decodeIdentifier(); + if (identifier == null || !AbstractDelegationTokenIdentifier.class.isAssignableFrom(identifier.getClass())) { + continue; + } + delegToks.add((AbstractDelegationTokenIdentifier) identifier); + } catch (IOException e) { + LOG.warn("Error decoding token identifier of kind " + t.getKind(), e); + } + } + return delegToks; + } + + @InterfaceAudience.Private @InterfaceStability.Unstable public boolean shouldRelogin() { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestUserGroupInformation.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestUserGroupInformation.java index 1327561cf4356..fa918a07a4558 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestUserGroupInformation.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestUserGroupInformation.java @@ -19,6 +19,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; @@ -54,16 +55,21 @@ import java.io.BufferedReader; import java.io.File; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStreamReader; +import java.lang.reflect.Field; import java.lang.reflect.Method; import java.security.Principal; import java.security.PrivilegedExceptionAction; import java.util.Collection; +import java.util.Collections; import java.util.ConcurrentModificationException; import java.util.Date; import java.util.HashSet; import java.util.LinkedHashSet; +import java.util.HashMap; +import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; @@ -110,6 +116,11 @@ public class TestUserGroupInformation { // Rollover interval of percentile metrics (in seconds) private static final int PERCENTILES_INTERVAL = 1; private static Configuration conf; + + private static FileContext localFS = null; + private static final File testDir = new File("target", + TestUserGroupInformation.class.getName() + "-tmpDir").getAbsoluteFile(); + /** * UGI should not use the default security conf, else it will collide @@ -135,7 +146,8 @@ public AppConfigurationEntry[] getAppConfigurationEntry(String name) { /** configure ugi */ @BeforeClass - public static void setup() { + public static void setup() throws + FileNotFoundException, IllegalArgumentException, IOException { javax.security.auth.login.Configuration.setConfiguration( new DummyLoginConfiguration()); // doesn't matter what it is, but getGroups needs it set... @@ -143,10 +155,16 @@ public static void setup() { // that finds winutils.exe String home = System.getenv("HADOOP_HOME"); System.setProperty("hadoop.home.dir", (home != null ? home : ".")); + + localFS = FileContext.getLocalFSFileContext(); + localFS.delete(new Path(testDir.getAbsolutePath()), true); + testDir.mkdir(); } @Before - public void setupUgi() { + public void setupUgi() throws Exception { + Map newEnv = new HashMap(); + setNewEnvironmentHack(newEnv); conf = new Configuration(); UserGroupInformation.reset(); UserGroupInformation.setConfiguration(conf); @@ -1363,4 +1381,105 @@ public void testImportTokensFromProperty() throws IOException { // Cleanup System.clearProperty(CommonConfigurationKeysPublic.HADOOP_TOKENS); } + + /** + * This test checks we can load a delegation token off disk. + */ + @Test + public void testTokenRead() throws Exception { + // Simulate a credentials file + Credentials credentials = new Credentials(); + + Text serviceName = new Text("service1"); + TestTokenIdentifier tokenId = new TestTokenIdentifier(); + Token token = new Token( + tokenId.getBytes(), "password".getBytes(), + tokenId.getKind(), serviceName); + credentials.addToken(new Text("regular-token"), token); + + Path tokenFilePath = new Path(testDir.getAbsolutePath(), "tokens-file"); + credentials.writeTokenStorageFile(tokenFilePath, conf); + + Map newEnv = new HashMap(); + newEnv.put(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION, tokenFilePath + .toUri().getPath()); + setNewEnvironmentHack(newEnv); + + // Set login-user to null as null is the reason why token-file is read by UGI. + UserGroupInformation.setLoginUser(null); + UserGroupInformation.setConfiguration(conf); // pick up changed auth + + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + + Collection> tokens = ugi.getCredentials().getAllTokens(); + assertEquals(1, tokens.size()); + Token tok = (Token)tokens.toArray()[0]; + String tokenStr = token.encodeToUrlString(); + assertEquals(tok.encodeToUrlString(), tokenStr); + } + + /** + * This test checks we can reload a delegation token off disk. + */ + @Test + public void testTokenReload() throws Exception { + Text serviceName = new Text("service1"); + + // Write initial tokenfile + testTokenRead(); + + // Write out a replacement tokenfile + Credentials credentials = new Credentials(); + TestTokenIdentifier tokenId = new TestTokenIdentifier(); + Token token = new Token( + tokenId.getBytes(), "new-password".getBytes(), + tokenId.getKind(), serviceName); + credentials.addToken(new Text("regular-token"), token); + + Path tokenFilePath = new Path(testDir.getAbsolutePath(), "tokens-file"); + credentials.writeTokenStorageFile(tokenFilePath, conf); + + // Relogin and verify tokens + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + ugi.reloginFromDelegationTokens(); + + Collection> tokens = ugi.getCredentials().getAllTokens(); + assertEquals(1, tokens.size()); + Token tok = (Token)tokens.toArray()[0]; + String tokenStr = token.encodeToUrlString(); + assertEquals(tok.encodeToUrlString(), tokenStr); + } + + // A dirty hack to modify the env of the current JVM itself - Dirty, but + // should be okay for testing. (Taken from TestMRAppMaster.java) + @SuppressWarnings({ "rawtypes", "unchecked" }) + private static void setNewEnvironmentHack(Map newenv) + throws Exception { + try { + Class cl = Class.forName("java.lang.ProcessEnvironment"); + Field field = cl.getDeclaredField("theEnvironment"); + field.setAccessible(true); + Map env = (Map) field.get(null); + env.clear(); + env.putAll(newenv); + Field ciField = cl.getDeclaredField("theCaseInsensitiveEnvironment"); + ciField.setAccessible(true); + Map cienv = (Map) ciField.get(null); + cienv.clear(); + cienv.putAll(newenv); + } catch (NoSuchFieldException e) { + Class[] classes = Collections.class.getDeclaredClasses(); + Map env = System.getenv(); + for (Class cl : classes) { + if ("java.util.Collections$UnmodifiableMap".equals(cl.getName())) { + Field field = cl.getDeclaredField("m"); + field.setAccessible(true); + Object obj = field.get(env); + Map map = (Map) obj; + map.clear(); + map.putAll(newenv); + } + } + } + } }