diff --git a/common/pom.xml b/common/pom.xml
index 26f422659d95..12a4338bfbad 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -194,6 +194,11 @@
commons-logging
+
+
+ org.apache.tez
+ tez-api
+ ${tez.version}
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index fe88e04a1521..c384583c13ef 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -4348,7 +4348,7 @@ public static enum ConfVars {
HIVE_SERVER2_JOB_CREDENTIAL_PROVIDER_PATH("hive.server2.job.credential.provider.path", "",
"If set, this configuration property should provide a comma-separated list of URLs that indicates the type and " +
"location of providers to be used by hadoop credential provider API. It provides HiveServer2 the ability to provide job-specific " +
- "credential providers for jobs run using MR and Spark execution engines. This functionality has not been tested against Tez."),
+ "credential providers for jobs run using Tez, MR, Spark execution engines."),
HIVE_MOVE_FILES_THREAD_COUNT("hive.mv.files.thread", 15, new SizeValidator(0L, true, 1024L, true), "Number of threads"
+ " used to move files in move task. Set it to 0 to disable multi-threaded file moves. This parameter is also used by"
+ " MSCK to check tables."),
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConfUtil.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConfUtil.java
index a28580cba1ac..798575893245 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConfUtil.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConfUtil.java
@@ -25,6 +25,7 @@
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.tez.dag.api.TezConfiguration;
import org.apache.hive.common.util.HiveStringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -195,20 +196,31 @@ public static void updateJobCredentialProviders(Configuration jobConf) {
+ " previous location was " + oldKeyStoreLocation);
}
+ updateCredentialProviderPasswordForJobs(jobConf);
+ }
+
+ public static void updateCredentialProviderPasswordForJobs(Configuration jobConf) {
String credstorePassword = getJobCredentialProviderPassword(jobConf);
if (credstorePassword != null) {
String execEngine = jobConf.get(ConfVars.HIVE_EXECUTION_ENGINE.varname);
- if ("mr".equalsIgnoreCase(execEngine)) {
- // if the execution engine is MR set the map/reduce env with the credential store password
-
+ if ("mr".equalsIgnoreCase(execEngine) || "tez".equalsIgnoreCase(execEngine)) {
+ // if the execution engine is MR/Tez set the map/reduce env with the credential store password
Collection redactedProperties =
jobConf.getStringCollection(MRJobConfig.MR_JOB_REDACTED_PROPERTIES);
-
+ /*
+ * There are AM + task related environment props below, used for both MR and Tez.
+ * Hiveserver2 copies some of them while creating the vertex in
+ * DagUtils.createVertex -> setTaskEnvironment(getContainerEnvironment(conf)).
+ * So for clarity's sake, TEZ_TASK_LAUNCH_ENV is not added here to avoid confusion of
+ * taking care of task env twice. Comments below clarifies which execution engine relies on which property.
+ * "MR -> Tez" means that DagUtils copies them to tez tasks' environment.
+ */
Stream.of(
- JobConf.MAPRED_MAP_TASK_ENV,
- JobConf.MAPRED_REDUCE_TASK_ENV,
- MRJobConfig.MR_AM_ADMIN_USER_ENV)
+ JobConf.MAPRED_MAP_TASK_ENV, // MR -> Tez
+ JobConf.MAPRED_REDUCE_TASK_ENV, // MR -> Tez
+ MRJobConfig.MR_AM_ADMIN_USER_ENV, // MR
+ TezConfiguration.TEZ_AM_LAUNCH_ENV) // Tez
.forEach(property -> {
addKeyValuePair(jobConf, property,
@@ -244,6 +256,16 @@ public static String getJobCredentialProviderPassword(Configuration conf) {
return null;
}
+ /**
+ * Sets a "keyName=newKeyValue" pair to a jobConf to a given property.
+ * If the property is empty, it simply inserts keyName=newKeyValue,
+ * if it's already filled, it takes care of appending or replacing it in the currently present value.
+ * The property in jobConf contains a value like: "key1=value1,key2=value2".
+ * @param jobConf
+ * @param property
+ * @param keyName
+ * @param newKeyValue
+ */
private static void addKeyValuePair(Configuration jobConf, String property, String keyName, String newKeyValue) {
String existingValue = jobConf.get(property);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
index f6b0bba1faed..28dd4dff7457 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
@@ -21,6 +21,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
@@ -727,7 +728,8 @@ public static Resource getContainerResource(Configuration conf) {
/*
* Helper to setup default environment for a task in YARN.
*/
- private Map getContainerEnvironment(Configuration conf, boolean isMap) {
+ @VisibleForTesting
+ Map getContainerEnvironment(Configuration conf, boolean isMap) {
Map environment = new HashMap();
MRHelpers.updateEnvBasedOnMRTaskEnv(conf, environment, isMap);
return environment;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index 1a7fc5bc3eba..ba8d53fbfb25 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -48,6 +48,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConfUtil;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.LlapUtil;
import org.apache.hadoop.hive.llap.coordinator.LlapCoordinator;
@@ -350,6 +351,16 @@ protected void openInternal(String[] additionalFilesNotFromConf,
setupSessionAcls(tezConfig, conf);
+ /*
+ * Update HADOOP_CREDSTORE_PASSWORD for the TezAM.
+ * If there is a job specific credential store, it will be set.
+ * HiveConfUtil.updateJobCredentialProviders should not be used here,
+ * as it changes the credential store path too, which causes the dag submission fail,
+ * as this config has an effect in HS2 (on TezClient codepath), and the original hadoop
+ * credential store should be used.
+ */
+ HiveConfUtil.updateCredentialProviderPasswordForJobs(tezConfig);
+
String tezJobNameFormat = HiveConf.getVar(conf, ConfVars.HIVETEZJOBNAME);
final TezClient session = TezClient.newBuilder(String.format(tezJobNameFormat, sessionId), tezConfig)
.setIsSession(true).setLocalResources(commonLocalResources)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index cc129707f9d9..6e76a339fa11 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -43,6 +43,7 @@
import org.apache.hadoop.hive.common.metrics.common.Metrics;
import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConfUtil;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
@@ -189,6 +190,8 @@ public int execute() {
// jobConf will hold all the configuration for hadoop, tez, and hive, which are not set in AM defaults
JobConf jobConf = utils.createConfiguration(conf, false);
+ // Setup the job specific keystore path if exists and put the password into the environment variables of tez am/tasks.
+ HiveConfUtil.updateJobCredentialProviders(jobConf);
// Get all user jars from work (e.g. input format stuff).
String[] allNonConfFiles = work.configureJobConfAndExtractJars(jobConf);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestHiveCredentialProviders.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestHiveCredentialProviders.java
index 603005510878..dd8b1ef1e88b 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestHiveCredentialProviders.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestHiveCredentialProviders.java
@@ -30,7 +30,7 @@
import org.apache.hadoop.hive.conf.HiveConfUtil;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.MRJobConfig;
-
+import org.apache.tez.dag.api.TezConfiguration;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -51,7 +51,8 @@ public class TestHiveCredentialProviders {
private static final Collection REDACTED_PROPERTIES = Arrays.asList(
JobConf.MAPRED_MAP_TASK_ENV,
JobConf.MAPRED_REDUCE_TASK_ENV,
- MRJobConfig.MR_AM_ADMIN_USER_ENV);
+ MRJobConfig.MR_AM_ADMIN_USER_ENV,
+ TezConfiguration.TEZ_AM_LAUNCH_ENV);
private Configuration jobConf;
@@ -106,6 +107,10 @@ public void testJobCredentialProvider() throws Exception {
Assert.assertEquals(HIVE_JOB_CREDSTORE_PASSWORD_ENVVAR_VAL, getValueFromJobConf(
jobConf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV), HADOOP_CREDENTIAL_PASSWORD_ENVVAR));
+ // make sure TEZ AM environment points to HIVE_JOB_CREDSTORE_PASSWORD
+ Assert.assertEquals(HIVE_JOB_CREDSTORE_PASSWORD_ENVVAR_VAL, getValueFromJobConf(
+ jobConf.get(TezConfiguration.TEZ_AM_LAUNCH_ENV), HADOOP_CREDENTIAL_PASSWORD_ENVVAR));
+
Assert.assertTrue(jobConf.getStringCollection(MRJobConfig.MR_JOB_REDACTED_PROPERTIES)
.containsAll(REDACTED_PROPERTIES));
}
@@ -133,6 +138,10 @@ public void testHadoopCredentialProvider() throws Exception {
Assert.assertEquals(HADOOP_CREDSTORE_PASSWORD_ENVVAR_VAL, getValueFromJobConf(
jobConf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV), HADOOP_CREDENTIAL_PASSWORD_ENVVAR));
+ // make sure TEZ AM environment points to HADOOP_CREDSTORE_PASSWORD
+ Assert.assertEquals(HADOOP_CREDSTORE_PASSWORD_ENVVAR_VAL, getValueFromJobConf(
+ jobConf.get(TezConfiguration.TEZ_AM_LAUNCH_ENV), HADOOP_CREDENTIAL_PASSWORD_ENVVAR));
+
Assert.assertTrue(jobConf.getStringCollection(MRJobConfig.MR_JOB_REDACTED_PROPERTIES)
.containsAll(REDACTED_PROPERTIES));
}
@@ -156,6 +165,9 @@ public void testNoCredentialProviderWithPassword() throws Exception {
Assert.assertNull(getValueFromJobConf(jobConf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV),
HADOOP_CREDENTIAL_PASSWORD_ENVVAR));
+ Assert.assertNull(getValueFromJobConf(jobConf.get(TezConfiguration.TEZ_AM_LAUNCH_ENV),
+ HADOOP_CREDENTIAL_PASSWORD_ENVVAR));
+
REDACTED_PROPERTIES.forEach(property -> Assert.assertFalse(
jobConf.getStringCollection(MRJobConfig.MR_JOB_REDACTED_PROPERTIES)
.contains(property)));
@@ -182,6 +194,9 @@ public void testJobCredentialProviderWithDefaultPassword() throws Exception {
Assert.assertEquals(HADOOP_CREDSTORE_PASSWORD_ENVVAR_VAL, getValueFromJobConf(
jobConf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV), HADOOP_CREDENTIAL_PASSWORD_ENVVAR));
+ Assert.assertEquals(HADOOP_CREDSTORE_PASSWORD_ENVVAR_VAL, getValueFromJobConf(
+ jobConf.get(TezConfiguration.TEZ_AM_LAUNCH_ENV), HADOOP_CREDENTIAL_PASSWORD_ENVVAR));
+
Assert.assertTrue(jobConf.getStringCollection(MRJobConfig.MR_JOB_REDACTED_PROPERTIES)
.containsAll(REDACTED_PROPERTIES));
}
@@ -200,6 +215,7 @@ public void testCredentialProviderWithNoPasswords() throws Exception {
Assert.assertNull(jobConf.get(JobConf.MAPRED_MAP_TASK_ENV));
Assert.assertNull(jobConf.get(JobConf.MAPRED_REDUCE_TASK_ENV));
Assert.assertNull(jobConf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV));
+ Assert.assertNull(jobConf.get(TezConfiguration.TEZ_AM_LAUNCH_ENV));
REDACTED_PROPERTIES.forEach(property -> Assert.assertFalse(
jobConf.getStringCollection(MRJobConfig.MR_JOB_REDACTED_PROPERTIES)
@@ -214,6 +230,7 @@ public void testCredentialProviderWithNoPasswords() throws Exception {
Assert.assertNull(jobConf.get(JobConf.MAPRED_MAP_TASK_ENV));
Assert.assertNull(jobConf.get(JobConf.MAPRED_REDUCE_TASK_ENV));
Assert.assertNull(jobConf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV));
+ Assert.assertNull(jobConf.get(TezConfiguration.TEZ_AM_LAUNCH_ENV));
REDACTED_PROPERTIES.forEach(property -> Assert.assertFalse(
jobConf.getStringCollection(MRJobConfig.MR_JOB_REDACTED_PROPERTIES)
@@ -241,6 +258,9 @@ public void testJobCredentialProviderUnset() throws Exception {
Assert.assertEquals(HADOOP_CREDSTORE_PASSWORD_ENVVAR_VAL, getValueFromJobConf(
jobConf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV), HADOOP_CREDENTIAL_PASSWORD_ENVVAR));
+ Assert.assertEquals(HADOOP_CREDSTORE_PASSWORD_ENVVAR_VAL, getValueFromJobConf(
+ jobConf.get(TezConfiguration.TEZ_AM_LAUNCH_ENV), HADOOP_CREDENTIAL_PASSWORD_ENVVAR));
+
Assert.assertTrue(jobConf.getStringCollection(MRJobConfig.MR_JOB_REDACTED_PROPERTIES)
.containsAll(REDACTED_PROPERTIES));
}
@@ -264,6 +284,9 @@ public void testNoCredentialProvider() throws Exception {
assertNull(getValueFromJobConf(jobConf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV),
HADOOP_CREDENTIAL_PASSWORD_ENVVAR));
+ assertNull(getValueFromJobConf(jobConf.get(TezConfiguration.TEZ_AM_LAUNCH_ENV),
+ HADOOP_CREDENTIAL_PASSWORD_ENVVAR));
+
REDACTED_PROPERTIES.forEach(property -> Assert.assertFalse(
jobConf.getStringCollection(MRJobConfig.MR_JOB_REDACTED_PROPERTIES)
.contains(property)));
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestDagUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestDagUtils.java
index 1a83c64504e7..960caafcced2 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestDagUtils.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestDagUtils.java
@@ -32,13 +32,16 @@
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Vertex;
import org.junit.Test;
+import org.junit.Assert;
+
/**
*
*/
public class TestDagUtils {
-
+
@Test
public void testCredentialsNotOverwritten() throws Exception {
final UserGroupInformation testUser = UserGroupInformation.createUserForTesting("test_user", new String[0]);
@@ -92,5 +95,32 @@ public void outputCommitterNotOverriddenIfPresent() throws IOException {
assertEquals(TestTezOutputCommitter.CountingOutputCommitter.class.getName(),
configuration.get("mapred.output.committer.class"));
}
-
+
+ @Test
+ public void testMapTezTaskEnvIsCopiedFromMrProperties() {
+ final DagUtils dagUtils = DagUtils.getInstance();
+
+ Vertex map = Vertex.create("mapWorkName", null);
+ HiveConf conf = new HiveConf();
+ Assert.assertNull(map.getTaskEnvironment().get("key"));
+
+ conf.set(JobConf.MAPRED_MAP_TASK_ENV, "key=value");
+ map.setTaskEnvironment(dagUtils.getContainerEnvironment(conf, true));
+
+ Assert.assertEquals("value", map.getTaskEnvironment().get("key"));
+ }
+
+ @Test
+ public void testReduceTezTaskEnvIsCopiedFromMrProperties() {
+ final DagUtils dagUtils = DagUtils.getInstance();
+
+ Vertex reduce = Vertex.create("reduceWorkName", null);
+ HiveConf conf = new HiveConf();
+ Assert.assertNull(reduce.getTaskEnvironment().get("key"));
+
+ conf.set(JobConf.MAPRED_REDUCE_TASK_ENV, "key=value");
+ reduce.setTaskEnvironment(dagUtils.getContainerEnvironment(conf, false));
+
+ Assert.assertEquals("value", reduce.getTaskEnvironment().get("key"));
+ }
}