Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
import org.apache.hadoop.util.ProtoUtil;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
Expand Down Expand Up @@ -613,6 +614,10 @@ private synchronized boolean shouldAuthenticateOverKrb() throws IOException {
return false;
}

private synchronized boolean shouldAuthenticateUsingDelegationTokens() throws IOException {
return UserGroupInformation.getCurrentUser().isFromDelegationToken();
}

private synchronized AuthMethod setupSaslConnection(IpcStreams streams)
throws IOException {
// Do not use Client.conf here! We must use ConnectionId.conf, since the
Expand Down Expand Up @@ -691,7 +696,7 @@ private synchronized void setupConnection(
InetSocketAddress bindAddr = null;
if (ticket != null && ticket.hasKerberosCredentials()) {
KerberosInfo krbInfo =
remoteId.getProtocol().getAnnotation(KerberosInfo.class);
remoteId.getProtocol().getAnnotation(KerberosInfo.class);
if (krbInfo != null) {
String principal = ticket.getUserName();
String host = SecurityUtil.getHostFromPrincipal(principal);
Expand Down Expand Up @@ -755,7 +760,7 @@ public Object run() throws IOException, InterruptedException {
final short MAX_BACKOFF = 5000;
closeConnection();
disposeSasl();
if (shouldAuthenticateOverKrb()) {
if (shouldAuthenticateOverKrb() || shouldAuthenticateUsingDelegationTokens()) {
if (currRetries < maxRetries) {
if(LOG.isDebugEnabled()) {
LOG.debug("Exception encountered while connecting to "
Expand All @@ -766,6 +771,19 @@ public Object run() throws IOException, InterruptedException {
UserGroupInformation.getLoginUser().reloginFromKeytab();
} else if (UserGroupInformation.isLoginTicketBased()) {
UserGroupInformation.getLoginUser().reloginFromTicketCache();
} else if (shouldAuthenticateUsingDelegationTokens()) {
UserGroupInformation currUser = UserGroupInformation.getCurrentUser();
for (AbstractDelegationTokenIdentifier delegationToken:
currUser.getAllDelegationTokens(currUser.getCredentials())){
LOG.debug("Delegation token for current user after SASL failure " +
"and before refresh ugi is {}", delegationToken.toString());
}
currUser.reloginFromDelegationTokens();
for (AbstractDelegationTokenIdentifier delegationToken:
currUser.getAllDelegationTokens(currUser.getCredentials())){
LOG.debug("Delegation token for current user after SASL failure " +
"and after refresh ugi is {}", delegationToken.toString());
}
}
// have granularity of milliseconds
//we are sleeping with the Connection lock held but since this
Expand Down Expand Up @@ -1609,6 +1627,26 @@ private Writable getRpcResponse(final Call call, final Connection connection,

if (call.error != null) {
if (call.error instanceof RemoteException) {
//We got a delegation token expired error and we want to retry to refresh it
//Since the delegation token's can be externally managed we want the fail
//call to be ignored and retried
Exception unwrapped = ((RemoteException)call.error).unwrapRemoteException(
org.apache.hadoop.security.token.SecretManager.InvalidToken.class);
UserGroupInformation currUser = UserGroupInformation.getCurrentUser();
if(unwrapped instanceof org.apache.hadoop.security.token.SecretManager.InvalidToken &&
currUser.isFromDelegationToken()) {
for (AbstractDelegationTokenIdentifier delegationToken:
currUser.getAllDelegationTokens(currUser.getCredentials())){
LOG.debug("Delegation Token before refresh is {}", delegationToken.getTrackingId());
}
currUser.reloginFromDelegationTokens();
call.error = new RetriableException(unwrapped);
for (AbstractDelegationTokenIdentifier delegationToken:
currUser.getAllDelegationTokens(currUser.getCredentials())){
LOG.debug("Delegation Token after refresh is {} {}", delegationToken.getTrackingId(),
delegationToken.toString());
}
}
call.error.fillInStackTrace();
throw call.error;
} else { // local exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,8 @@ public Map<Text, byte[]> getSecretKeyMap() {
* @param conf
* @throws IOException
*/
public static Credentials readTokenStorageFile(Path filename,
Configuration conf)
throws IOException {
public static Credentials readTokenStorageFile(Path filename, Configuration conf)
throws IOException {
FSDataInputStream in = null;
Credentials credentials = new Credentials();
try {
Expand Down Expand Up @@ -461,4 +460,21 @@ private void addAll(Credentials other, boolean overwrite) {
}
}
}

/**
* Update the token map to synchronize between HA pair servers
*/
public void synchTokens(Token<? extends TokenIdentifier> token) {
for(Map.Entry<Text, Token<?>> entry: tokenMap.entrySet()){
LOG.debug("synching token.to_s");
tokenMap.forEach((key, value) -> LOG.debug("Before: " + key + ":" + value));
if (entry.getValue().getKind().equals(token.getKind())){
LOG.debug("matched " + entry.getValue().getKind());
Token<? extends TokenIdentifier> clone = new Token<>(token.getIdentifier(),
token.getPassword(), token.getKind(), entry.getValue().getService());
tokenMap.put(entry.getKey(), clone);
}
tokenMap.forEach((key, value) -> LOG.debug("After: " + key + ":" + value));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.security;

import org.apache.hadoop.conf.Configuration;

import java.io.File;
import java.io.IOException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DelegationTokenUtil {
public static final String HADOOP_TOKEN_FILE_LOCATION =
"HADOOP_TOKEN_FILE_LOCATION";

static final Logger LOG = LoggerFactory.getLogger(
DelegationTokenUtil.class);

private DelegationTokenUtil() {
}

public static synchronized Credentials readDelegationTokens(Configuration conf)
throws IOException {
String fileLocation = System.getenv(HADOOP_TOKEN_FILE_LOCATION);
if (fileLocation != null) {
// Load the token storage file and put all of the tokens into the
// user. Don't use the FileSystem API for reading since it has a lock
// cycle (HADOOP-9212).
File source = new File(fileLocation);
Credentials creds = Credentials.readTokenStorageFile(
source, conf);
LOG.info("Loaded {} tokens", creds.numberOfTokens());
return creds;
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import org.apache.hadoop.security.authentication.util.KerberosUtil;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Time;

Expand Down Expand Up @@ -186,12 +187,25 @@ private <T extends Principal> T getCanonicalUser(Class<T> cls) {
return null;
}

private void addDelegationTokensToSubject() throws LoginException {
try {
Credentials creds = DelegationTokenUtil.readDelegationTokens(conf);
if (creds != null) {
subject.getPrivateCredentials().add(creds);
}
} catch (IOException e) {
throw new LoginException("Failed to load token file from " +
HADOOP_TOKEN_FILE_LOCATION);
}
}

@Override
public boolean commit() throws LoginException {
LOG.debug("hadoop login commit");
// if we already have a user, we are done.
if (!subject.getPrincipals(User.class).isEmpty()) {
LOG.debug("Using existing subject: {}", subject.getPrincipals());
addDelegationTokensToSubject();
return true;
}
Principal user = getCanonicalUser(KerberosPrincipal.class);
Expand Down Expand Up @@ -229,6 +243,7 @@ public boolean commit() throws LoginException {
LOG.debug("User entry: \"{}\"", userEntry);

subject.getPrincipals().add(userEntry);
addDelegationTokensToSubject();
return true;
}
throw new LoginException("Failed to find user in name " + subject);
Expand Down Expand Up @@ -740,11 +755,12 @@ UserGroupInformation createLoginUser(Subject subject) throws IOException {
LOG.debug("Reading credentials from location {}",
tokenFile.getCanonicalPath());
if (tokenFile.exists() && tokenFile.isFile()) {
Credentials cred = Credentials.readTokenStorageFile(
tokenFile, conf);
LOG.debug("Loaded {} tokens from {}", cred.numberOfTokens(),
tokenFile.getCanonicalPath());
loginUser.addCredentials(cred);
Credentials cred = DelegationTokenUtil.readDelegationTokens(conf);
if (cred != null ) {
LOG.debug("Loaded {} tokens from {}", cred.numberOfTokens(),
tokenFile.getCanonicalPath());
loginUser.addCredentials(cred);
}
} else {
LOG.info("Token file {} does not exist",
tokenFile.getCanonicalPath());
Expand Down Expand Up @@ -851,6 +867,59 @@ private long getRefreshTime(KerberosTicket tgt) {
return start + (long) ((end - start) * TICKET_RENEW_WINDOW);
}

/**
* Re-Login a user in from delegation tokens
* method assumes that login had happened already.
* The Subject field of this UserGroupInformation object is updated to have
* the new credentials.
* @throws IOException
* @throws IOException on a failure
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public synchronized void reloginFromDelegationTokens() throws IOException {

if (!isFromDelegationToken()) {
throw new IOException("User has not logged on using delegation token");
}

synchronized(UserGroupInformation.class){
Credentials cred = DelegationTokenUtil.readDelegationTokens(conf);
if (cred != null ) {
addCredentials(cred);
}

for (Token<? extends TokenIdentifier> token: cred.getAllTokens()) {
for ( Credentials currentCreds : subject.getPrivateCredentials(Credentials.class)) {
currentCreds.synchTokens(token);
}
}
}
}

public boolean isFromDelegationToken () {
return !isFromKeytab() && getTGT() == null && !subject.getPrivateCredentials(Credentials.class).isEmpty();
}

public Collection<AbstractDelegationTokenIdentifier> getAllDelegationTokens(Credentials cred) {
cred.getAllTokens();
List<AbstractDelegationTokenIdentifier> delegToks = new ArrayList<>();

for(Token<? extends TokenIdentifier> t: getCredentials().getAllTokens()) {
try {
TokenIdentifier identifier = t.decodeIdentifier();
if (identifier == null || !AbstractDelegationTokenIdentifier.class.isAssignableFrom(identifier.getClass())) {
continue;
}
delegToks.add((AbstractDelegationTokenIdentifier) identifier);
} catch (IOException e) {
LOG.warn("Error decoding token identifier of kind " + t.getKind(), e);
}
}
return delegToks;
}


@InterfaceAudience.Private
@InterfaceStability.Unstable
public boolean shouldRelogin() {
Expand Down
Loading