Skip to content

Commit

Permalink
HIVE-25829: Tez exec mode support for credential provider for jobs (a…
Browse files Browse the repository at this point in the history
…pache#2911) (Laszlo Bodor reviewed by Panagiotis Garefalakis, Denys Kuzmenko)
  • Loading branch information
abstractdog authored and DongWei-4 committed Oct 28, 2022
1 parent 85e7613 commit dcf06e1
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 13 deletions.
5 changes: 5 additions & 0 deletions common/pom.xml
Expand Up @@ -194,6 +194,11 @@
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.tez</groupId>
<artifactId>tez-api</artifactId>
<version>${tez.version}</version>
</dependency>
<!-- test inter-project -->
<dependency>
Expand Down
2 changes: 1 addition & 1 deletion common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Expand Up @@ -4348,7 +4348,7 @@ public static enum ConfVars {
HIVE_SERVER2_JOB_CREDENTIAL_PROVIDER_PATH("hive.server2.job.credential.provider.path", "",
"If set, this configuration property should provide a comma-separated list of URLs that indicates the type and " +
"location of providers to be used by hadoop credential provider API. It provides HiveServer2 the ability to provide job-specific " +
"credential providers for jobs run using MR and Spark execution engines. This functionality has not been tested against Tez."),
"credential providers for jobs run using Tez, MR, Spark execution engines."),
HIVE_MOVE_FILES_THREAD_COUNT("hive.mv.files.thread", 15, new SizeValidator(0L, true, 1024L, true), "Number of threads"
+ " used to move files in move task. Set it to 0 to disable multi-threaded file moves. This parameter is also used by"
+ " MSCK to check tables."),
Expand Down
36 changes: 29 additions & 7 deletions common/src/java/org/apache/hadoop/hive/conf/HiveConfUtil.java
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.hive.common.util.HiveStringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -195,20 +196,31 @@ public static void updateJobCredentialProviders(Configuration jobConf) {
+ " previous location was " + oldKeyStoreLocation);
}

updateCredentialProviderPasswordForJobs(jobConf);
}

public static void updateCredentialProviderPasswordForJobs(Configuration jobConf) {
String credstorePassword = getJobCredentialProviderPassword(jobConf);
if (credstorePassword != null) {
String execEngine = jobConf.get(ConfVars.HIVE_EXECUTION_ENGINE.varname);

if ("mr".equalsIgnoreCase(execEngine)) {
// if the execution engine is MR set the map/reduce env with the credential store password

if ("mr".equalsIgnoreCase(execEngine) || "tez".equalsIgnoreCase(execEngine)) {
// if the execution engine is MR/Tez set the map/reduce env with the credential store password
Collection<String> redactedProperties =
jobConf.getStringCollection(MRJobConfig.MR_JOB_REDACTED_PROPERTIES);

/*
* There are AM + task related environment props below, used for both MR and Tez.
* Hiveserver2 copies some of them while creating the vertex in
* DagUtils.createVertex -> setTaskEnvironment(getContainerEnvironment(conf)).
* So for clarity's sake, TEZ_TASK_LAUNCH_ENV is not added here to avoid confusion of
* taking care of task env twice. Comments below clarifies which execution engine relies on which property.
* "MR -> Tez" means that DagUtils copies them to tez tasks' environment.
*/
Stream.of(
JobConf.MAPRED_MAP_TASK_ENV,
JobConf.MAPRED_REDUCE_TASK_ENV,
MRJobConfig.MR_AM_ADMIN_USER_ENV)
JobConf.MAPRED_MAP_TASK_ENV, // MR -> Tez
JobConf.MAPRED_REDUCE_TASK_ENV, // MR -> Tez
MRJobConfig.MR_AM_ADMIN_USER_ENV, // MR
TezConfiguration.TEZ_AM_LAUNCH_ENV) // Tez

.forEach(property -> {
addKeyValuePair(jobConf, property,
Expand Down Expand Up @@ -244,6 +256,16 @@ public static String getJobCredentialProviderPassword(Configuration conf) {
return null;
}

/**
* Sets a "keyName=newKeyValue" pair to a jobConf to a given property.
* If the property is empty, it simply inserts keyName=newKeyValue,
* if it's already filled, it takes care of appending or replacing it in the currently present value.
* The property in jobConf contains a value like: "key1=value1,key2=value2".
* @param jobConf
* @param property
* @param keyName
* @param newKeyValue
*/
private static void addKeyValuePair(Configuration jobConf, String property, String keyName, String newKeyValue) {
String existingValue = jobConf.get(property);

Expand Down
4 changes: 3 additions & 1 deletion ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
Expand Up @@ -21,6 +21,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
Expand Down Expand Up @@ -727,7 +728,8 @@ public static Resource getContainerResource(Configuration conf) {
/*
* Helper to setup default environment for a task in YARN.
*/
private Map<String, String> getContainerEnvironment(Configuration conf, boolean isMap) {
@VisibleForTesting
Map<String, String> getContainerEnvironment(Configuration conf, boolean isMap) {
Map<String, String> environment = new HashMap<String, String>();
MRHelpers.updateEnvBasedOnMRTaskEnv(conf, environment, isMap);
return environment;
Expand Down
Expand Up @@ -48,6 +48,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConfUtil;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.LlapUtil;
import org.apache.hadoop.hive.llap.coordinator.LlapCoordinator;
Expand Down Expand Up @@ -350,6 +351,16 @@ protected void openInternal(String[] additionalFilesNotFromConf,

setupSessionAcls(tezConfig, conf);

/*
* Update HADOOP_CREDSTORE_PASSWORD for the TezAM.
* If there is a job specific credential store, it will be set.
* HiveConfUtil.updateJobCredentialProviders should not be used here,
* as it changes the credential store path too, which causes the dag submission fail,
* as this config has an effect in HS2 (on TezClient codepath), and the original hadoop
* credential store should be used.
*/
HiveConfUtil.updateCredentialProviderPasswordForJobs(tezConfig);

String tezJobNameFormat = HiveConf.getVar(conf, ConfVars.HIVETEZJOBNAME);
final TezClient session = TezClient.newBuilder(String.format(tezJobNameFormat, sessionId), tezConfig)
.setIsSession(true).setLocalResources(commonLocalResources)
Expand Down
3 changes: 3 additions & 0 deletions ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
Expand Up @@ -43,6 +43,7 @@
import org.apache.hadoop.hive.common.metrics.common.Metrics;
import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConfUtil;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
Expand Down Expand Up @@ -189,6 +190,8 @@ public int execute() {
// jobConf will hold all the configuration for hadoop, tez, and hive, which are not set in AM defaults
JobConf jobConf = utils.createConfiguration(conf, false);

// Setup the job specific keystore path if exists and put the password into the environment variables of tez am/tasks.
HiveConfUtil.updateJobCredentialProviders(jobConf);

// Get all user jars from work (e.g. input format stuff).
String[] allNonConfFiles = work.configureJobConfAndExtractJars(jobConf);
Expand Down
Expand Up @@ -30,7 +30,7 @@
import org.apache.hadoop.hive.conf.HiveConfUtil;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.MRJobConfig;

import org.apache.tez.dag.api.TezConfiguration;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -51,7 +51,8 @@ public class TestHiveCredentialProviders {
private static final Collection<String> REDACTED_PROPERTIES = Arrays.asList(
JobConf.MAPRED_MAP_TASK_ENV,
JobConf.MAPRED_REDUCE_TASK_ENV,
MRJobConfig.MR_AM_ADMIN_USER_ENV);
MRJobConfig.MR_AM_ADMIN_USER_ENV,
TezConfiguration.TEZ_AM_LAUNCH_ENV);

private Configuration jobConf;

Expand Down Expand Up @@ -106,6 +107,10 @@ public void testJobCredentialProvider() throws Exception {
Assert.assertEquals(HIVE_JOB_CREDSTORE_PASSWORD_ENVVAR_VAL, getValueFromJobConf(
jobConf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV), HADOOP_CREDENTIAL_PASSWORD_ENVVAR));

// make sure TEZ AM environment points to HIVE_JOB_CREDSTORE_PASSWORD
Assert.assertEquals(HIVE_JOB_CREDSTORE_PASSWORD_ENVVAR_VAL, getValueFromJobConf(
jobConf.get(TezConfiguration.TEZ_AM_LAUNCH_ENV), HADOOP_CREDENTIAL_PASSWORD_ENVVAR));

Assert.assertTrue(jobConf.getStringCollection(MRJobConfig.MR_JOB_REDACTED_PROPERTIES)
.containsAll(REDACTED_PROPERTIES));
}
Expand Down Expand Up @@ -133,6 +138,10 @@ public void testHadoopCredentialProvider() throws Exception {
Assert.assertEquals(HADOOP_CREDSTORE_PASSWORD_ENVVAR_VAL, getValueFromJobConf(
jobConf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV), HADOOP_CREDENTIAL_PASSWORD_ENVVAR));

// make sure TEZ AM environment points to HADOOP_CREDSTORE_PASSWORD
Assert.assertEquals(HADOOP_CREDSTORE_PASSWORD_ENVVAR_VAL, getValueFromJobConf(
jobConf.get(TezConfiguration.TEZ_AM_LAUNCH_ENV), HADOOP_CREDENTIAL_PASSWORD_ENVVAR));

Assert.assertTrue(jobConf.getStringCollection(MRJobConfig.MR_JOB_REDACTED_PROPERTIES)
.containsAll(REDACTED_PROPERTIES));
}
Expand All @@ -156,6 +165,9 @@ public void testNoCredentialProviderWithPassword() throws Exception {
Assert.assertNull(getValueFromJobConf(jobConf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV),
HADOOP_CREDENTIAL_PASSWORD_ENVVAR));

Assert.assertNull(getValueFromJobConf(jobConf.get(TezConfiguration.TEZ_AM_LAUNCH_ENV),
HADOOP_CREDENTIAL_PASSWORD_ENVVAR));

REDACTED_PROPERTIES.forEach(property -> Assert.assertFalse(
jobConf.getStringCollection(MRJobConfig.MR_JOB_REDACTED_PROPERTIES)
.contains(property)));
Expand All @@ -182,6 +194,9 @@ public void testJobCredentialProviderWithDefaultPassword() throws Exception {
Assert.assertEquals(HADOOP_CREDSTORE_PASSWORD_ENVVAR_VAL, getValueFromJobConf(
jobConf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV), HADOOP_CREDENTIAL_PASSWORD_ENVVAR));

Assert.assertEquals(HADOOP_CREDSTORE_PASSWORD_ENVVAR_VAL, getValueFromJobConf(
jobConf.get(TezConfiguration.TEZ_AM_LAUNCH_ENV), HADOOP_CREDENTIAL_PASSWORD_ENVVAR));

Assert.assertTrue(jobConf.getStringCollection(MRJobConfig.MR_JOB_REDACTED_PROPERTIES)
.containsAll(REDACTED_PROPERTIES));
}
Expand All @@ -200,6 +215,7 @@ public void testCredentialProviderWithNoPasswords() throws Exception {
Assert.assertNull(jobConf.get(JobConf.MAPRED_MAP_TASK_ENV));
Assert.assertNull(jobConf.get(JobConf.MAPRED_REDUCE_TASK_ENV));
Assert.assertNull(jobConf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV));
Assert.assertNull(jobConf.get(TezConfiguration.TEZ_AM_LAUNCH_ENV));

REDACTED_PROPERTIES.forEach(property -> Assert.assertFalse(
jobConf.getStringCollection(MRJobConfig.MR_JOB_REDACTED_PROPERTIES)
Expand All @@ -214,6 +230,7 @@ public void testCredentialProviderWithNoPasswords() throws Exception {
Assert.assertNull(jobConf.get(JobConf.MAPRED_MAP_TASK_ENV));
Assert.assertNull(jobConf.get(JobConf.MAPRED_REDUCE_TASK_ENV));
Assert.assertNull(jobConf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV));
Assert.assertNull(jobConf.get(TezConfiguration.TEZ_AM_LAUNCH_ENV));

REDACTED_PROPERTIES.forEach(property -> Assert.assertFalse(
jobConf.getStringCollection(MRJobConfig.MR_JOB_REDACTED_PROPERTIES)
Expand Down Expand Up @@ -241,6 +258,9 @@ public void testJobCredentialProviderUnset() throws Exception {
Assert.assertEquals(HADOOP_CREDSTORE_PASSWORD_ENVVAR_VAL, getValueFromJobConf(
jobConf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV), HADOOP_CREDENTIAL_PASSWORD_ENVVAR));

Assert.assertEquals(HADOOP_CREDSTORE_PASSWORD_ENVVAR_VAL, getValueFromJobConf(
jobConf.get(TezConfiguration.TEZ_AM_LAUNCH_ENV), HADOOP_CREDENTIAL_PASSWORD_ENVVAR));

Assert.assertTrue(jobConf.getStringCollection(MRJobConfig.MR_JOB_REDACTED_PROPERTIES)
.containsAll(REDACTED_PROPERTIES));
}
Expand All @@ -264,6 +284,9 @@ public void testNoCredentialProvider() throws Exception {
assertNull(getValueFromJobConf(jobConf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV),
HADOOP_CREDENTIAL_PASSWORD_ENVVAR));

assertNull(getValueFromJobConf(jobConf.get(TezConfiguration.TEZ_AM_LAUNCH_ENV),
HADOOP_CREDENTIAL_PASSWORD_ENVVAR));

REDACTED_PROPERTIES.forEach(property -> Assert.assertFalse(
jobConf.getStringCollection(MRJobConfig.MR_JOB_REDACTED_PROPERTIES)
.contains(property)));
Expand Down
34 changes: 32 additions & 2 deletions ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestDagUtils.java
Expand Up @@ -32,13 +32,16 @@
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Vertex;
import org.junit.Test;

import org.junit.Assert;

/**
*
*/
public class TestDagUtils {

@Test
public void testCredentialsNotOverwritten() throws Exception {
final UserGroupInformation testUser = UserGroupInformation.createUserForTesting("test_user", new String[0]);
Expand Down Expand Up @@ -92,5 +95,32 @@ public void outputCommitterNotOverriddenIfPresent() throws IOException {
assertEquals(TestTezOutputCommitter.CountingOutputCommitter.class.getName(),
configuration.get("mapred.output.committer.class"));
}


@Test
public void testMapTezTaskEnvIsCopiedFromMrProperties() {
final DagUtils dagUtils = DagUtils.getInstance();

Vertex map = Vertex.create("mapWorkName", null);
HiveConf conf = new HiveConf();
Assert.assertNull(map.getTaskEnvironment().get("key"));

conf.set(JobConf.MAPRED_MAP_TASK_ENV, "key=value");
map.setTaskEnvironment(dagUtils.getContainerEnvironment(conf, true));

Assert.assertEquals("value", map.getTaskEnvironment().get("key"));
}

@Test
public void testReduceTezTaskEnvIsCopiedFromMrProperties() {
final DagUtils dagUtils = DagUtils.getInstance();

Vertex reduce = Vertex.create("reduceWorkName", null);
HiveConf conf = new HiveConf();
Assert.assertNull(reduce.getTaskEnvironment().get("key"));

conf.set(JobConf.MAPRED_REDUCE_TASK_ENV, "key=value");
reduce.setTaskEnvironment(dagUtils.getContainerEnvironment(conf, false));

Assert.assertEquals("value", reduce.getTaskEnvironment().get("key"));
}
}

0 comments on commit dcf06e1

Please sign in to comment.