Skip to content

Commit

Permalink
HIVE-27056: Ensure that MR distcp goes to the same Yarn queue as othe…
Browse files Browse the repository at this point in the history
…r parts of the query (#4037) (Laszlo Bodor reviewed by Ayush Saxena)
  • Loading branch information
abstractdog committed Feb 15, 2023
1 parent 9680c30 commit ca9e6c2
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 9 deletions.
3 changes: 3 additions & 0 deletions common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -4020,6 +4020,9 @@ public static enum ConfVars {
"true", new StringSet("true", "false", "ignore"),
"Whether Tez session pool should allow submitting queries to custom queues. The options\n" +
"are true, false (error out), ignore (accept the query but ignore the queue setting)."),
HIVE_MAPRED_JOB_FOLLOW_TEZ_QUEUE("hive.mapred.job.follow.tez.queue", false,
"Whether the MR jobs initiated by a query should be enforced to run in the queue denoted by "
+ "'tez.queue.name', e.g. DistCp jobs."),

// Operation log configuration
HIVE_SERVER2_LOGGING_OPERATION_ENABLED("hive.server2.logging.operation.enabled", true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,10 @@ private TezClient startSessionAndContainers(TezClient session, HiveConf conf,
// Unset this after opening the session so that reopening of session uses the correct queue
// names i.e, if client has not died and if the user has explicitly set a queue name
// then reopened session will use user specified queue name else default cluster queue names.
conf.unset(TezConfiguration.TEZ_QUEUE_NAME);
if (conf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) {
LOG.debug("Unsetting tez.queue.name (from: {})", conf.get(TezConfiguration.TEZ_QUEUE_NAME));
conf.unset(TezConfiguration.TEZ_QUEUE_NAME);
}
return session;
} finally {
if (isOnThread && !isSuccessful) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1146,11 +1146,7 @@ public boolean runDistCp(List<Path> srcPaths, Path dst, Configuration conf) thro

// HIVE-13704 states that we should use run() instead of execute() due to a hadoop known issue
// added by HADOOP-10459
if (distcp.run(params.toArray(new String[0])) == 0) {
return true;
} else {
return false;
}
return runDistCpInternal(distcp, params) == 0;
} catch (Exception e) {
throw new IOException("Cannot execute DistCp process: ", e);
} finally {
Expand All @@ -1172,7 +1168,7 @@ public boolean runDistCpWithSnapshots(String oldSnapshot, String newSnapshot, Li
try {
DistCp distcp = new DistCp(conf, null);
distcp.getConf().setBoolean("mapred.mapper.new-api", true);
int returnCode = distcp.run(params.toArray(new String[0]));
int returnCode = runDistCpInternal(distcp, params);
if (returnCode == 0) {
return true;
} else if (returnCode == DistCpConstants.INVALID_ARGUMENT) {
Expand All @@ -1188,13 +1184,13 @@ public boolean runDistCpWithSnapshots(String oldSnapshot, String newSnapshot, Li
+ "snapshot: {}", srcPaths, dst, oldSnapshot);
List<String> rParams = constructDistCpWithSnapshotParams(srcPaths, dst, ".", oldSnapshot, conf, "-rdiff");
DistCp rDistcp = new DistCp(conf, null);
returnCode = rDistcp.run(rParams.toArray(new String[0]));
returnCode = runDistCpInternal(rDistcp, rParams);
if (returnCode == 0) {
LOG.info("Target restored to previous state. source: {} target: {} snapshot: {}. Reattempting to copy.",
srcPaths, dst, oldSnapshot);
dst.getFileSystem(conf).deleteSnapshot(dst, oldSnapshot);
dst.getFileSystem(conf).createSnapshot(dst, oldSnapshot);
returnCode = distcp.run(params.toArray(new String[0]));
returnCode = runDistCpInternal(distcp, params);
if (returnCode == 0) {
return true;
} else {
Expand All @@ -1211,6 +1207,39 @@ public boolean runDistCpWithSnapshots(String oldSnapshot, String newSnapshot, Li
return false;
}

protected int runDistCpInternal(DistCp distcp, List<String> params) {
ensureMapReduceQueue(distcp.getConf());
return distcp.run(params.toArray(new String[0]));
}

/**
* This method ensures if there is an explicit tez.queue.name set, the hadoop shim will submit jobs
* to the same yarn queue. This solves a security issue where e.g settings have the following values:
* tez.queue.name=sample
* hive.server2.tez.queue.access.check=true
* In this case, when a query submits Tez DAGs, the tez client layer checks whether the end user has access to
* the yarn queue 'sample' via YarnQueueHelper, but this is not respected in case of MR jobs that run
* even if the query execution engine is Tez. E.g. an EXPORT TABLE can submit DistCp MR jobs at some stages when
* certain criteria are met. We tend to restrict the setting of mapreduce.job.queuename in order to bypass this
* security flaw, and even the default queue is unexpected if we explicitly set tez.queue.name.
* Under the hood the desired behavior is to have DistCp jobs in the same yarn queue as other parts
* of the query. Most of the time, the user isn't aware that a query involves DistCp jobs, hence isn't aware
* of these details.
*/
protected void ensureMapReduceQueue(Configuration conf) {
String queueName = conf.get(TezConfiguration.TEZ_QUEUE_NAME);
boolean isTez = "tez".equalsIgnoreCase(conf.get("hive.execution.engine"));
boolean shouldMapredJobsFollowTezQueue = conf.getBoolean("hive.mapred.job.follow.tez.queue", false);

LOG.debug("Checking tez.queue.name {}, isTez: {}, shouldMapredJobsFollowTezQueue: {}", queueName, isTez,
shouldMapredJobsFollowTezQueue);
if (isTez && shouldMapredJobsFollowTezQueue && queueName != null && queueName.length() > 0) {
LOG.info("Setting mapreduce.job.queuename (current: '{}') to become tez.queue.name: '{}'",
conf.get(MRJobConfig.QUEUE_NAME), queueName);
conf.set(MRJobConfig.QUEUE_NAME, queueName);
}
}

/**
* Checks wether reverse diff on the snapshot should be performed or not.
* @param p path where snapshot exists.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.tools.DistCp;
import org.apache.tez.dag.api.TezConfiguration;
import org.junit.Test;

import java.io.FileNotFoundException;
Expand Down Expand Up @@ -190,4 +193,46 @@ public void testGetFileIdForNonexistingPath() throws Exception {
shims.getFileId(fs, "badpath");
}

@Test
public void testMapReduceQueueIsSetToTezQueue() throws Exception {
Configuration conf = new Configuration();
// there is a tez.queue.name, but hive.mapred.job.follow.tez.queue is not allowed
conf.set(TezConfiguration.TEZ_QUEUE_NAME, "helloQ");
conf.set("hive.execution.engine", "tez");
DistCp distCp = runMockDistCp(conf);
assertEquals("default", distCp.getConf().get(MRJobConfig.QUEUE_NAME));

// there is a tez.queue.name, and hive.mapred.job.follow.tez.queue is allowed
conf.set(TezConfiguration.TEZ_QUEUE_NAME, "helloQ");
conf.setBoolean("hive.mapred.job.follow.tez.queue", true);
conf.set("hive.execution.engine", "tez");
distCp = runMockDistCp(conf);
assertEquals("helloQ", distCp.getConf().get(MRJobConfig.QUEUE_NAME));

// there is a tez.queue.name, also hive.mapred.job.follow.tez.queue is allowed,
// but execution engine is set to legacy 'mr': queue follow is not activated
conf.set(TezConfiguration.TEZ_QUEUE_NAME, "helloQ");
conf.setBoolean("hive.mapred.job.follow.tez.queue", true);
conf.set("hive.execution.engine", "mr");
distCp = runMockDistCp(conf);
assertEquals("default", distCp.getConf().get(MRJobConfig.QUEUE_NAME));

// there is no tez.queue.name set at all
conf = new Configuration();
conf.setBoolean("hive.mapred.job.follow.tez.queue", true);
conf.set("hive.execution.engine", "tez");
distCp = runMockDistCp(conf);
assertEquals("default", distCp.getConf().get(MRJobConfig.QUEUE_NAME));
}

private DistCp runMockDistCp(Configuration conf) throws Exception {
Path copySrc = getMockedPath(false);
Path copyDst = getMockedPath(false);
Hadoop23Shims shims = new Hadoop23Shims();
List<String> params = shims.constructDistCpParams(Collections.singletonList(copySrc), copyDst, conf);

DistCp distCp = new DistCp(conf, null);
shims.runDistCpInternal(distCp, params);
return distCp;
}
}

0 comments on commit ca9e6c2

Please sign in to comment.