Skip to content

Commit

Permalink
[FLINK-2268] Dynamically load Hadoop security module when available
Browse files Browse the repository at this point in the history
  • Loading branch information
aljoscha committed Sep 27, 2017
1 parent ed11548 commit 7f1c233
Show file tree
Hide file tree
Showing 21 changed files with 474 additions and 144 deletions.
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.security.SecurityUtils; import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.security.modules.HadoopModule;
import org.apache.flink.streaming.util.TestStreamEnvironment; import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.test.util.SecureTestEnvironment; import org.apache.flink.test.util.SecureTestEnvironment;
import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.test.util.TestBaseUtils;
Expand All @@ -45,6 +46,7 @@
import java.io.File; import java.io.File;
import java.io.FileWriter; import java.io.FileWriter;
import java.io.IOException; import java.io.IOException;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;


Expand Down Expand Up @@ -124,7 +126,10 @@ public static void startSecureCluster() throws Exception {
flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL,
SecureTestEnvironment.getHadoopServicePrincipal()); SecureTestEnvironment.getHadoopServicePrincipal());


SecurityUtils.SecurityConfiguration ctx = new SecurityUtils.SecurityConfiguration(flinkConfig, conf); SecurityUtils.SecurityConfiguration ctx =
new SecurityUtils.SecurityConfiguration(
flinkConfig,
Collections.singletonList(securityConfig -> new HadoopModule(securityConfig, conf)));
try { try {
TestingSecurityContext.install(ctx, SecureTestEnvironment.getClientSecurityConfigurationMap()); TestingSecurityContext.install(ctx, SecureTestEnvironment.getClientSecurityConfigurationMap());
} catch (Exception e) { } catch (Exception e) {
Expand Down
Expand Up @@ -23,16 +23,11 @@
import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.GlobalConfiguration;


import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import java.io.File; import java.io.File;
import java.util.Collection;
import java.util.Map; import java.util.Map;


/** /**
Expand All @@ -43,8 +38,6 @@ public final class HadoopUtils {


private static final Logger LOG = LoggerFactory.getLogger(HadoopUtils.class); private static final Logger LOG = LoggerFactory.getLogger(HadoopUtils.class);


private static final Text HDFS_DELEGATION_TOKEN_KIND = new Text("HDFS_DELEGATION_TOKEN");

/** /**
* Merge HadoopConfiguration into JobConf. This is necessary for the HDFS configuration. * Merge HadoopConfiguration into JobConf. This is necessary for the HDFS configuration.
*/ */
Expand Down Expand Up @@ -125,20 +118,6 @@ public static Configuration getHadoopConfiguration(org.apache.flink.configuratio
return retConf; return retConf;
} }


/**
* Indicates whether the current user has an HDFS delegation token.
*/
public static boolean hasHDFSDelegationToken() throws Exception {
UserGroupInformation loginUser = UserGroupInformation.getCurrentUser();
Collection<Token<? extends TokenIdentifier>> usrTok = loginUser.getTokens();
for (Token<? extends TokenIdentifier> token : usrTok) {
if (token.getKind().equals(HDFS_DELEGATION_TOKEN_KIND)) {
return true;
}
}
return false;
}

/** /**
* Private constructor to prevent instantiation. * Private constructor to prevent instantiation.
*/ */
Expand Down
Expand Up @@ -20,7 +20,6 @@


import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.Internal;


import org.apache.hadoop.security.authentication.util.KerberosUtil;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


Expand Down Expand Up @@ -52,6 +51,13 @@ public class KerberosUtils {


private static final AppConfigurationEntry userKerberosAce; private static final AppConfigurationEntry userKerberosAce;


/* Return the Kerberos login module name */
public static String getKrb5LoginModuleName() {
return System.getProperty("java.vendor").contains("IBM")
? "com.ibm.security.auth.module.Krb5LoginModule"
: "com.sun.security.auth.module.Krb5LoginModule";
}

static { static {


IBM_JAVA = JAVA_VENDOR_NAME.contains("IBM"); IBM_JAVA = JAVA_VENDOR_NAME.contains("IBM");
Expand Down Expand Up @@ -80,7 +86,7 @@ public class KerberosUtils {
kerberosCacheOptions.putAll(debugOptions); kerberosCacheOptions.putAll(debugOptions);


userKerberosAce = new AppConfigurationEntry( userKerberosAce = new AppConfigurationEntry(
KerberosUtil.getKrb5LoginModuleName(), getKrb5LoginModuleName(),
AppConfigurationEntry.LoginModuleControlFlag.OPTIONAL, AppConfigurationEntry.LoginModuleControlFlag.OPTIONAL,
kerberosCacheOptions); kerberosCacheOptions);


Expand Down Expand Up @@ -112,7 +118,7 @@ public static AppConfigurationEntry keytabEntry(String keytab, String principal)
keytabKerberosOptions.putAll(debugOptions); keytabKerberosOptions.putAll(debugOptions);


AppConfigurationEntry keytabKerberosAce = new AppConfigurationEntry( AppConfigurationEntry keytabKerberosAce = new AppConfigurationEntry(
KerberosUtil.getKrb5LoginModuleName(), getKrb5LoginModuleName(),
AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
keytabKerberosOptions); keytabKerberosOptions);


Expand Down
Expand Up @@ -19,16 +19,15 @@
package org.apache.flink.runtime.security; package org.apache.flink.runtime.security;


import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.security.modules.HadoopModule; import org.apache.flink.runtime.security.modules.HadoopModuleFactory;
import org.apache.flink.runtime.security.modules.JaasModule; import org.apache.flink.runtime.security.modules.JaasModuleFactory;
import org.apache.flink.runtime.security.modules.SecurityModule; import org.apache.flink.runtime.security.modules.SecurityModule;
import org.apache.flink.runtime.security.modules.ZooKeeperModule; import org.apache.flink.runtime.security.modules.SecurityModuleFactory;


import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; import org.apache.flink.runtime.security.modules.ZookeeperModuleFactory;


import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
Expand All @@ -41,8 +40,6 @@
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;


import static org.apache.flink.util.Preconditions.checkNotNull;

/** /**
* Utils for configuring security. The following security subsystems are supported: * Utils for configuring security. The following security subsystems are supported:
* 1. Java Authentication and Authorization Service (JAAS) * 1. Java Authentication and Authorization Service (JAAS)
Expand Down Expand Up @@ -76,29 +73,46 @@ public static void install(SecurityConfiguration config) throws Exception {
// install the security modules // install the security modules
List<SecurityModule> modules = new ArrayList<>(); List<SecurityModule> modules = new ArrayList<>();
try { try {
for (Class<? extends SecurityModule> moduleClass : config.getSecurityModules()) { for (SecurityModuleFactory moduleFactory : config.getSecurityModuleFactories()) {
SecurityModule module = moduleClass.newInstance(); SecurityModule module = moduleFactory.createModule(config);
module.install(config); // can be null if a SecurityModule is not supported in the current environment
modules.add(module); if (module != null) {
module.install();
modules.add(module);
}
} }
} }
catch (Exception ex) { catch (Exception ex) {
throw new Exception("unable to establish the security context", ex); throw new Exception("unable to establish the security context", ex);
} }
installedModules = modules; installedModules = modules;


// install a security context // First check if we have Hadoop in the ClassPath. If not, we simply don't do anything.
// use the Hadoop login user as the subject of the installed security context try {
if (!(installedContext instanceof NoOpSecurityContext)) { Class.forName(
LOG.warn("overriding previous security context"); "org.apache.hadoop.security.UserGroupInformation",
false,
SecurityUtils.class.getClassLoader());

// install a security context
// use the Hadoop login user as the subject of the installed security context
if (!(installedContext instanceof NoOpSecurityContext)) {
LOG.warn("overriding previous security context");
}
UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
installedContext = new HadoopSecurityContext(loginUser);
} catch (ClassNotFoundException e) {
LOG.info("Cannot install HadoopSecurityContext because Hadoop cannot be found in the Classpath.");
} catch (LinkageError e) {
LOG.error("Cannot install HadoopSecurityContext.", e);
} }
UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
installedContext = new HadoopSecurityContext(loginUser);
} }


static void uninstall() { static void uninstall() {
if (installedModules != null) { if (installedModules != null) {
for (SecurityModule module : Lists.reverse(installedModules)) { // uninstall them in reverse order
for (int i = installedModules.size() - 1; i >= 0; i--) {
SecurityModule module = installedModules.get(i);
try { try {
module.uninstall(); module.uninstall();
} }
Expand All @@ -121,12 +135,12 @@ static void uninstall() {
*/ */
public static class SecurityConfiguration { public static class SecurityConfiguration {


private static final List<Class<? extends SecurityModule>> DEFAULT_MODULES = Collections.unmodifiableList( private static final List<SecurityModuleFactory> DEFAULT_MODULES = Collections.unmodifiableList(
Arrays.asList(HadoopModule.class, JaasModule.class, ZooKeeperModule.class)); Arrays.asList(new HadoopModuleFactory(), new JaasModuleFactory(), new ZookeeperModuleFactory()));


private final List<Class<? extends SecurityModule>> securityModules; private final List<SecurityModuleFactory> securityModuleFactories;


private final org.apache.hadoop.conf.Configuration hadoopConf; private final Configuration flinkConfig;


private final boolean isZkSaslDisable; private final boolean isZkSaslDisable;


Expand All @@ -147,37 +161,25 @@ public static class SecurityConfiguration {
* @param flinkConf the Flink global configuration. * @param flinkConf the Flink global configuration.
*/ */
public SecurityConfiguration(Configuration flinkConf) { public SecurityConfiguration(Configuration flinkConf) {
this(flinkConf, HadoopUtils.getHadoopConfiguration(flinkConf)); this(flinkConf, DEFAULT_MODULES);
} }


/** /**
* Create a security configuration from the global configuration. * Create a security configuration from the global configuration.
* @param flinkConf the Flink global configuration. * @param flinkConf the Flink global configuration.
* @param hadoopConf the Hadoop configuration. * @param securityModuleFactories the security modules to apply.
*/
public SecurityConfiguration(Configuration flinkConf, org.apache.hadoop.conf.Configuration hadoopConf) {
this(flinkConf, hadoopConf, DEFAULT_MODULES);
}

/**
* Create a security configuration from the global configuration.
* @param flinkConf the Flink global configuration.
* @param hadoopConf the Hadoop configuration.
* @param securityModules the security modules to apply.
*/ */
public SecurityConfiguration(Configuration flinkConf, public SecurityConfiguration(Configuration flinkConf,
org.apache.hadoop.conf.Configuration hadoopConf, List<SecurityModuleFactory> securityModuleFactories) {
List<? extends Class<? extends SecurityModule>> securityModules) {
this.hadoopConf = checkNotNull(hadoopConf);
this.isZkSaslDisable = flinkConf.getBoolean(SecurityOptions.ZOOKEEPER_SASL_DISABLE); this.isZkSaslDisable = flinkConf.getBoolean(SecurityOptions.ZOOKEEPER_SASL_DISABLE);
this.keytab = flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB); this.keytab = flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
this.principal = flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL); this.principal = flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
this.useTicketCache = flinkConf.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE); this.useTicketCache = flinkConf.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);
this.loginContextNames = parseList(flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_CONTEXTS)); this.loginContextNames = parseList(flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_CONTEXTS));
this.zkServiceName = flinkConf.getString(SecurityOptions.ZOOKEEPER_SASL_SERVICE_NAME); this.zkServiceName = flinkConf.getString(SecurityOptions.ZOOKEEPER_SASL_SERVICE_NAME);
this.zkLoginContextName = flinkConf.getString(SecurityOptions.ZOOKEEPER_SASL_LOGIN_CONTEXT_NAME); this.zkLoginContextName = flinkConf.getString(SecurityOptions.ZOOKEEPER_SASL_LOGIN_CONTEXT_NAME);
this.securityModules = Collections.unmodifiableList(securityModules); this.securityModuleFactories = Collections.unmodifiableList(securityModuleFactories);

this.flinkConfig = checkNotNull(flinkConf);
validate(); validate();
} }


Expand All @@ -197,12 +199,12 @@ public boolean useTicketCache() {
return useTicketCache; return useTicketCache;
} }


public org.apache.hadoop.conf.Configuration getHadoopConfiguration() { public Configuration getFlinkConfig() {
return hadoopConf; return flinkConfig;
} }


public List<Class<? extends SecurityModule>> getSecurityModules() { public List<SecurityModuleFactory> getSecurityModuleFactories() {
return securityModules; return securityModuleFactories;
} }


public List<String> getLoginContextNames() { public List<String> getLoginContextNames() {
Expand Down
Expand Up @@ -18,10 +18,11 @@


package org.apache.flink.runtime.security.modules; package org.apache.flink.runtime.security.modules;


import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
import org.apache.flink.runtime.security.SecurityUtils; import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.util.HadoopUtils;


import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
Expand All @@ -37,19 +38,32 @@
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.Collection; import java.util.Collection;


import static org.apache.flink.util.Preconditions.checkNotNull;

/** /**
* Responsible for installing a Hadoop login user. * Responsible for installing a Hadoop login user.
*/ */
public class HadoopModule implements SecurityModule { public class HadoopModule implements SecurityModule {


private static final Logger LOG = LoggerFactory.getLogger(HadoopModule.class); private static final Logger LOG = LoggerFactory.getLogger(HadoopModule.class);


UserGroupInformation loginUser; private final SecurityUtils.SecurityConfiguration securityConfig;

private final Configuration hadoopConfiguration;

public HadoopModule(
SecurityUtils.SecurityConfiguration securityConfiguration,
Configuration hadoopConfiguration) {
this.securityConfig = checkNotNull(securityConfiguration);
this.hadoopConfiguration = checkNotNull(hadoopConfiguration);
}


@Override @Override
public void install(SecurityUtils.SecurityConfiguration securityConfig) throws SecurityInstallException { public void install() throws SecurityInstallException {

UserGroupInformation.setConfiguration(hadoopConfiguration);


UserGroupInformation.setConfiguration(securityConfig.getHadoopConfiguration()); UserGroupInformation loginUser;


try { try {
if (UserGroupInformation.isSecurityEnabled() && if (UserGroupInformation.isSecurityEnabled() &&
Expand All @@ -70,8 +84,11 @@ public void install(SecurityUtils.SecurityConfiguration securityConfig) throws S
try { try {
Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile", Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile",
File.class, org.apache.hadoop.conf.Configuration.class); File.class, org.apache.hadoop.conf.Configuration.class);
Credentials cred = (Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation), Credentials cred =
securityConfig.getHadoopConfiguration()); (Credentials) readTokenStorageFileMethod.invoke(
null,
new File(fileLocation),
hadoopConfiguration);


// if UGI uses Kerberos keytabs for login, do not load HDFS delegation token since // if UGI uses Kerberos keytabs for login, do not load HDFS delegation token since
// the UGI would prefer the delegation token instead, which eventually expires // the UGI would prefer the delegation token instead, which eventually expires
Expand Down

0 comments on commit 7f1c233

Please sign in to comment.