diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezConfigurationFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezConfigurationFactory.java index 84ae54157eed..170c1a70dd64 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezConfigurationFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezConfigurationFactory.java @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.JobConf; +import org.apache.tez.dag.api.Scope; import org.apache.tez.dag.api.TezConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,26 +61,13 @@ public static Configuration copyInto(Configuration target, Configuration src, Map.Entry entry = iter.next(); String name = entry.getKey(); String value = entry.getValue(); - String[] sources; - try { - sources = ((Map)updatingResource.get(src)).get(name); - } catch (IllegalArgumentException | IllegalAccessException e) { - throw new RuntimeException(e); - } - final String source; - if (sources == null || sources.length == 0) { - source = null; - } else { - /* - * If the property or its source wasn't found. Otherwise, returns a list of the sources of - * the resource. The older sources are the first ones in the list. - */ - source = sources[sources.length - 1]; - } + String[] sources = getPropertyUpdatingResources(src, name); + final String source = resolveFirstUpdatingResource(sources); if (sourceFilter == null || sourceFilter.test(source)) { - target.set(name, value); + target.set(name, value, source); } else { + LOG.debug("'{}' didn't pass filter, skipping adding it", name); } } return target; @@ -91,4 +79,54 @@ public static JobConf wrapWithJobConf(Configuration conf, Predicate sour copyInto(jc, conf, sourceFilter); return jc; } + + public static void addProgrammaticallyAddedTezOptsToDagConf(Map dagConf, JobConf srcConf) { + Iterator> iter = srcConf.iterator(); + while (iter.hasNext()) { + Map.Entry entry = iter.next(); + String name = entry.getKey(); + + if (name.startsWith("tez")) { + String value = entry.getValue(); + String[] sources = getPropertyUpdatingResources(srcConf, name); + final String source = resolveFirstUpdatingResource(sources); + + if ("programmatically".equalsIgnoreCase(source)) { + try { + TezConfiguration.validateProperty(name, Scope.DAG); + LOG.debug("Adding programmatically set config to dag: {}={}", name, value); + dagConf.put(name, value); + } catch (IllegalStateException e) { + // DAGImpl will throw an exception if dagConf contains an AM scoped property + // let's not add it here programmatically (even if user added it by accident) + LOG.warn("Skip adding '{}' to dagConf, as it's an AM scoped property", name); + } + } + } + } + } + + private static String[] getPropertyUpdatingResources(Configuration src, String name) { + String[] sources; + try { + sources = ((Map) updatingResource.get(src)).get(name); + } catch (IllegalArgumentException | IllegalAccessException e) { + throw new RuntimeException(e); + } + return sources; + } + + private static String resolveFirstUpdatingResource(String[] sources) { + final String source; + if (sources == null || sources.length == 0) { + source = null; + } else { + /* + * If the property or its source wasn't found. Otherwise, returns a list of the sources of + * the resource. The older sources are the first ones in the list. + */ + source = sources[sources.length - 1]; + } + return source; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index c9562c24de29..32942ef98a71 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -491,6 +491,7 @@ DAG build(JobConf conf, TezWork tezWork, Path scratchDir, Context ctx, LOG.debug("DagInfo: {}", dagInfo); + TezConfigurationFactory.addProgrammaticallyAddedTezOptsToDagConf(dag.getDagConf(), conf); dag.setDAGInfo(dagInfo); dag.setCredentials(conf.getCredentials()); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezConfigurationFactory.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezConfigurationFactory.java new file mode 100644 index 000000000000..c11384b6fb6e --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezConfigurationFactory.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobConf; +import org.apache.tez.dag.api.TezConfiguration; +import org.junit.Assert; +import org.junit.Test; +import java.util.Map; + +import java.lang.reflect.Field; +import java.util.HashMap; + +public class TestTezConfigurationFactory { + private static final Field updatingResource; + + static { + try { + //Cache the field handle so that we can avoid expensive conf.getPropertySources(key) later + updatingResource = Configuration.class.getDeclaredField("updatingResource"); + } catch (NoSuchFieldException | SecurityException e) { + throw new RuntimeException(e); + } + updatingResource.setAccessible(true); + } + + @Test + public void testAddProgrammaticallyAddedTezOptsToDagConf() { + Map dagConf = new HashMap<>(); + JobConf conf = new JobConf(); + + conf.set(TezConfiguration.TEZ_GENERATE_DEBUG_ARTIFACTS, "true"); + TezConfigurationFactory.addProgrammaticallyAddedTezOptsToDagConf(dagConf, conf); + Assert.assertTrue("dagConfig should contain value for " + TezConfiguration.TEZ_GENERATE_DEBUG_ARTIFACTS, + dagConf.containsKey(TezConfiguration.TEZ_GENERATE_DEBUG_ARTIFACTS)); + + conf.clear(); + dagConf.clear(); + + conf.set(TezConfiguration.TEZ_GENERATE_DEBUG_ARTIFACTS, "true", "tez-site.xml"); + TezConfigurationFactory.addProgrammaticallyAddedTezOptsToDagConf(dagConf, conf); + Assert.assertFalse( + "dagConfig should not contain value for xml sourced config " + TezConfiguration.TEZ_GENERATE_DEBUG_ARTIFACTS, + dagConf.containsKey(TezConfiguration.TEZ_GENERATE_DEBUG_ARTIFACTS)); + + conf.set(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS, "asdf"); + TezConfigurationFactory.addProgrammaticallyAddedTezOptsToDagConf(dagConf, conf); + // filtered out because it's AM scoped + Assert.assertFalse("dagConfig should not contain value for " + TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS, + dagConf.containsKey(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS)); + + conf.set("hive.something", "asdf"); + TezConfigurationFactory.addProgrammaticallyAddedTezOptsToDagConf(dagConf, conf); + // only tez options are added + Assert.assertFalse("dagConfig should not contain value for a hive config", dagConf.containsKey("hive.something")); + } +}