Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-18205] Mitigate the use of reflection in Utils and HadoopModule #12624

Merged
merged 4 commits into from Jun 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -83,42 +83,23 @@ public void install() throws SecurityInstallException {
// supplement with any available tokens
String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
if (fileLocation != null) {
// Use reflection API since the API semantics are not available in Hadoop1 profile. Below APIs are
// used in the context of reading the stored tokens from UGI.
// Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf);
// loginUser.addCredentials(cred);
try {
Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile",
File.class, org.apache.hadoop.conf.Configuration.class);
Credentials cred =
(Credentials) readTokenStorageFileMethod.invoke(
null,
new File(fileLocation),
hadoopConfiguration);

// if UGI uses Kerberos keytabs for login, do not load HDFS delegation token since
// the UGI would prefer the delegation token instead, which eventually expires
// and does not fallback to using Kerberos tickets
Method getAllTokensMethod = Credentials.class.getMethod("getAllTokens");
Credentials credentials = new Credentials();
final Text hdfsDelegationTokenKind = new Text("HDFS_DELEGATION_TOKEN");
Collection<Token<? extends TokenIdentifier>> usrTok = (Collection<Token<? extends TokenIdentifier>>) getAllTokensMethod.invoke(cred);
//If UGI use keytab for login, do not load HDFS delegation token.
for (Token<? extends TokenIdentifier> token : usrTok) {
if (!token.getKind().equals(hdfsDelegationTokenKind)) {
final Text id = new Text(token.getIdentifier());
credentials.addToken(id, token);
}
Credentials credentialsFromTokenStorageFile = Credentials.readTokenStorageFile(new File(fileLocation), hadoopConfiguration);

// if UGI uses Kerberos keytabs for login, do not load HDFS delegation token since
// the UGI would prefer the delegation token instead, which eventually expires
// and does not fallback to using Kerberos tickets
Credentials credentialsToBeAdded = new Credentials();
final Text hdfsDelegationTokenKind = new Text("HDFS_DELEGATION_TOKEN");
Collection<Token<? extends TokenIdentifier>> usrTok = credentialsFromTokenStorageFile.getAllTokens();
//If UGI use keytab for login, do not load HDFS delegation token.
for (Token<? extends TokenIdentifier> token : usrTok) {
if (!token.getKind().equals(hdfsDelegationTokenKind)) {
final Text id = new Text(token.getIdentifier());
credentialsToBeAdded.addToken(id, token);
}

Method addCredentialsMethod = UserGroupInformation.class.getMethod("addCredentials",
Credentials.class);
addCredentialsMethod.invoke(loginUser, credentials);
} catch (NoSuchMethodException e) {
LOG.warn("Could not find method implementations in the shaded jar.", e);
} catch (InvocationTargetException e) {
throw e.getTargetException();
}

loginUser.addCredentials(credentialsToBeAdded);
}
} else {
// login with current user credentials (e.g. ticket cache, OS login)
Expand Down Expand Up @@ -150,7 +131,7 @@ public void install() throws SecurityInstallException {
}

@Override
public void uninstall() throws SecurityInstallException {
public void uninstall() {
throw new UnsupportedOperationException();
}
}
10 changes: 1 addition & 9 deletions flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
Expand Up @@ -51,7 +51,6 @@
import java.io.File;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -460,14 +459,7 @@ static ContainerLaunchContext createTaskExecutorContext(
log.debug("Adding security tokens to TaskExecutor's container launch context.");

try (DataOutputBuffer dob = new DataOutputBuffer()) {
Method readTokenStorageFileMethod = Credentials.class.getMethod(
"readTokenStorageFile", File.class, org.apache.hadoop.conf.Configuration.class);

Credentials cred =
(Credentials) readTokenStorageFileMethod.invoke(
null,
new File(fileLocation),
HadoopUtils.getHadoopConfiguration(flinkConfig));
Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), HadoopUtils.getHadoopConfiguration(flinkConfig));

// Filter out AMRMToken before setting the tokens to the TaskManager container context.
Credentials taskManagerCred = new Credentials();
Expand Down