Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.security.PrivilegedExceptionAction;
import java.util.Objects;
import java.util.Optional;

Expand All @@ -47,6 +48,8 @@ public class HBaseDelegationTokenProvider implements HadoopDelegationTokenProvid

org.apache.hadoop.conf.Configuration hbaseConf;

private KerberosLoginProvider kerberosLoginProvider;

@Override
public String serviceName() {
return "hbase";
Expand All @@ -55,6 +58,7 @@ public String serviceName() {
@Override
public void init(Configuration configuration) throws Exception {
hbaseConf = getHBaseConfiguration(configuration);
kerberosLoginProvider = new KerberosLoginProvider(configuration);
}

private org.apache.hadoop.conf.Configuration getHBaseConfiguration(Configuration conf) {
Expand Down Expand Up @@ -100,68 +104,91 @@ public boolean delegationTokensRequired() {

@Override
public Optional<Long> obtainDelegationTokens(Credentials credentials) throws Exception {
Token<?> token;
try {
Preconditions.checkNotNull(hbaseConf);
try {
LOG.info("Obtaining Kerberos security token for HBase");
// ----
// Intended call: Token<AuthenticationTokenIdentifier> token =
// TokenUtil.obtainToken(conf);
token =
(Token<?>)
Class.forName("org.apache.hadoop.hbase.security.token.TokenUtil")
.getMethod(
"obtainToken",
org.apache.hadoop.conf.Configuration.class)
.invoke(null, hbaseConf);
} catch (NoSuchMethodException e) {
// for HBase 2
if (kerberosLoginProvider.isLoginPossible()) {
UserGroupInformation freshUGI = kerberosLoginProvider.doLoginAndReturnUGI();
return freshUGI.doAs(
(PrivilegedExceptionAction<Optional<Long>>)
() -> {
Token<?> token;
try {
Preconditions.checkNotNull(hbaseConf);
try {
LOG.info("Obtaining Kerberos security token for HBase");
// ----
// Intended call: Token<AuthenticationTokenIdentifier> token
// =
// TokenUtil.obtainToken(conf);
token =
(Token<?>)
Class.forName(
"org.apache.hadoop.hbase.security.token.TokenUtil")
.getMethod(
"obtainToken",
org.apache.hadoop.conf
.Configuration
.class)
.invoke(null, hbaseConf);
} catch (NoSuchMethodException e) {
// for HBase 2

// ----
// Intended call: ConnectionFactory connectionFactory =
// ConnectionFactory.createConnection(conf);
Closeable connectionFactory =
(Closeable)
Class.forName("org.apache.hadoop.hbase.client.ConnectionFactory")
.getMethod(
"createConnection",
org.apache.hadoop.conf.Configuration.class)
.invoke(null, hbaseConf);
// ----
Class<?> connectionClass =
Class.forName("org.apache.hadoop.hbase.client.Connection");
// ----
// Intended call: Token<AuthenticationTokenIdentifier> token =
// TokenUtil.obtainToken(connectionFactory);
token =
(Token<?>)
Class.forName("org.apache.hadoop.hbase.security.token.TokenUtil")
.getMethod("obtainToken", connectionClass)
.invoke(null, connectionFactory);
if (null != connectionFactory) {
connectionFactory.close();
}
}
if (token == null) {
LOG.error("No Kerberos security token for HBase available");
} else {
credentials.addToken(token.getService(), token);
LOG.info("Added HBase Kerberos security token to credentials.");
}
} catch (ClassNotFoundException
| NoSuchMethodException
| IllegalAccessException
| InvocationTargetException
| IOException e) {
LOG.info(
"HBase is not available (failed to obtain delegation tokens): {} : \"{}\".",
e.getClass().getSimpleName(),
e.getMessage());
}
// ----
// Intended call: ConnectionFactory connectionFactory =
// ConnectionFactory.createConnection(conf);
Closeable connectionFactory =
(Closeable)
Class.forName(
"org.apache.hadoop.hbase.client.ConnectionFactory")
.getMethod(
"createConnection",
org.apache.hadoop.conf
.Configuration
.class)
.invoke(null, hbaseConf);
// ----
Class<?> connectionClass =
Class.forName(
"org.apache.hadoop.hbase.client.Connection");
// ----
// Intended call: Token<AuthenticationTokenIdentifier> token
// =
// TokenUtil.obtainToken(connectionFactory);
token =
(Token<?>)
Class.forName(
"org.apache.hadoop.hbase.security.token.TokenUtil")
.getMethod(
"obtainToken",
connectionClass)
.invoke(null, connectionFactory);
if (null != connectionFactory) {
connectionFactory.close();
}
}
if (token == null) {
LOG.error("No Kerberos security token for HBase available");
} else {
credentials.addToken(token.getService(), token);
LOG.info(
"Added HBase Kerberos security token to credentials.");
}
} catch (ClassNotFoundException
| NoSuchMethodException
| IllegalAccessException
| InvocationTargetException
| IOException e) {
LOG.info(
"HBase is not available (failed to obtain delegation tokens): {} : \"{}\".",
e.getClass().getSimpleName(),
e.getMessage());
}

// HBase does not support to renew the delegation token currently
// https://cwiki.apache.org/confluence/display/HADOOP2/Hbase+HBaseTokenAuthentication
return Optional.empty();
// HBase does not support to renew the delegation token currently
// https://cwiki.apache.org/confluence/display/HADOOP2/Hbase+HBaseTokenAuthentication
return Optional.empty();
});
} else {
LOG.info("Real user has no kerberos credentials so no tokens obtained");
return Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@
*
* <h2>Important Notes</h2>
*
* <p>"obtainDelegationTokens" function is running in "UserGroupInformation.doAs" context so
* authentication is handled inside {@link DelegationTokenManager}
*
* <p>Tokens are stored in {@link UserGroupInformation}
*/
@Experimental
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import javax.annotation.Nullable;

import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.time.Clock;
import java.util.HashSet;
import java.util.Optional;
Expand All @@ -55,6 +56,8 @@ public class HadoopFSDelegationTokenProvider implements HadoopDelegationTokenPro

private org.apache.hadoop.conf.Configuration hadoopConfiguration;

private KerberosLoginProvider kerberosLoginProvider;

private Optional<Long> tokenRenewalInterval;

@Override
Expand All @@ -66,6 +69,7 @@ public String serviceName() {
public void init(Configuration configuration) throws Exception {
flinkConfiguration = configuration;
hadoopConfiguration = HadoopUtils.getHadoopConfiguration(configuration);
kerberosLoginProvider = new KerberosLoginProvider(configuration);
}

@Override
Expand All @@ -75,17 +79,32 @@ public boolean delegationTokensRequired() throws Exception {

@Override
public Optional<Long> obtainDelegationTokens(Credentials credentials) throws Exception {
Clock clock = Clock.systemDefaultZone();
Set<FileSystem> fileSystemsToAccess = getFileSystemsToAccess();

obtainDelegationTokens(getRenewer(), fileSystemsToAccess, credentials);

// Get the token renewal interval if it is not set. It will be called only once.
if (tokenRenewalInterval == null) {
tokenRenewalInterval = getTokenRenewalInterval(clock, fileSystemsToAccess);
if (kerberosLoginProvider.isLoginPossible()) {
UserGroupInformation freshUGI = kerberosLoginProvider.doLoginAndReturnUGI();
return freshUGI.doAs(
(PrivilegedExceptionAction<Optional<Long>>)
() -> {
Clock clock = Clock.systemDefaultZone();
Set<FileSystem> fileSystemsToAccess = getFileSystemsToAccess();

obtainDelegationTokens(
getRenewer(), fileSystemsToAccess, credentials);

// Get the token renewal interval if it is not set. It will be
// called
// only once.
if (tokenRenewalInterval == null) {
tokenRenewalInterval =
getTokenRenewalInterval(clock, fileSystemsToAccess);
}
return tokenRenewalInterval.flatMap(
interval ->
getTokenRenewalDate(clock, credentials, interval));
});
} else {
LOG.info("Real user has no kerberos credentials so no tokens obtained");
return Optional.empty();
}
return tokenRenewalInterval.flatMap(
interval -> getTokenRenewalDate(clock, credentials, interval));
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import javax.annotation.concurrent.GuardedBy;

import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.time.Clock;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -91,23 +90,11 @@ public KerberosDelegationTokenManager(
Configuration configuration,
@Nullable ScheduledExecutor scheduledExecutor,
@Nullable ExecutorService ioExecutor) {
this(
configuration,
scheduledExecutor,
ioExecutor,
new KerberosLoginProvider(configuration));
}

public KerberosDelegationTokenManager(
Configuration configuration,
@Nullable ScheduledExecutor scheduledExecutor,
@Nullable ExecutorService ioExecutor,
KerberosLoginProvider kerberosLoginProvider) {
this.configuration = checkNotNull(configuration, "Flink configuration must not be null");
this.tokensRenewalTimeRatio = configuration.get(KERBEROS_TOKENS_RENEWAL_TIME_RATIO);
this.renewalRetryBackoffPeriod =
configuration.get(KERBEROS_TOKENS_RENEWAL_RETRY_BACKOFF).toMillis();
this.kerberosLoginProvider = kerberosLoginProvider;
this.kerberosLoginProvider = new KerberosLoginProvider(configuration);
this.delegationTokenProviders = loadProviders();
this.scheduledExecutor = scheduledExecutor;
this.ioExecutor = ioExecutor;
Expand Down Expand Up @@ -166,21 +153,8 @@ boolean isProviderLoaded(String serviceName) {
@Override
public void obtainDelegationTokens(Credentials credentials) throws Exception {
LOG.info("Obtaining delegation tokens");

// Delegation tokens can only be obtained if the real user has Kerberos credentials, so
// skip creation when those are not available.
if (kerberosLoginProvider.isLoginPossible()) {
UserGroupInformation freshUGI = kerberosLoginProvider.doLoginAndReturnUGI();
freshUGI.doAs(
(PrivilegedExceptionAction<Void>)
() -> {
obtainDelegationTokensAndGetNextRenewal(credentials);
return null;
});
LOG.info("Delegation tokens obtained successfully");
} else {
LOG.info("Real user has no kerberos credentials so no tokens obtained");
}
obtainDelegationTokensAndGetNextRenewal(credentials);
LOG.info("Delegation tokens obtained successfully");
}

protected Optional<Long> obtainDelegationTokensAndGetNextRenewal(Credentials credentials) {
Expand Down Expand Up @@ -292,11 +266,7 @@ void startTokensUpdate() {
try {
LOG.info("Starting tokens update task");
Credentials credentials = new Credentials();
UserGroupInformation freshUGI = kerberosLoginProvider.doLoginAndReturnUGI();
Optional<Long> nextRenewal =
freshUGI.doAs(
(PrivilegedExceptionAction<Optional<Long>>)
() -> obtainDelegationTokensAndGetNextRenewal(credentials));
Optional<Long> nextRenewal = obtainDelegationTokensAndGetNextRenewal(credentials);

if (credentials.numberOfTokens() > 0) {
byte[] credentialsBytes = DelegationTokenConverter.serialize(credentials);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import org.apache.flink.configuration.Configuration;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;

import java.util.Optional;

Expand All @@ -31,9 +33,16 @@
public class ExceptionThrowingHadoopDelegationTokenProvider
implements HadoopDelegationTokenProvider {

public static volatile boolean enabled = false;
public static volatile boolean throwInInit = false;
public static volatile boolean throwInUsage = false;
public static volatile boolean constructed = false;

public static void reset() {
throwInInit = false;
throwInUsage = false;
constructed = false;
}

public ExceptionThrowingHadoopDelegationTokenProvider() {
constructed = true;
}
Expand All @@ -45,24 +54,28 @@ public String serviceName() {

@Override
public void init(Configuration configuration) {
if (enabled) {
if (throwInInit) {
throw new IllegalArgumentException();
}
}

@Override
public boolean delegationTokensRequired() {
if (enabled) {
if (throwInUsage) {
throw new IllegalArgumentException();
}
return false;
return true;
}

@Override
public Optional<Long> obtainDelegationTokens(Credentials credentials) {
if (enabled) {
if (throwInUsage) {
throw new IllegalArgumentException();
}
final Text tokenKind = new Text("TEST_TOKEN_KIND");
final Text tokenService = new Text("TEST_TOKEN_SERVICE");
credentials.addToken(
tokenService, new Token<>(new byte[4], new byte[4], tokenKind, tokenService));
return Optional.empty();
}
}
Loading