Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -5158,11 +5158,6 @@ public static enum ConfVars {
LLAP_EXTERNAL_SPLITS_TEMP_TABLE_STORAGE_FORMAT("hive.llap.external.splits.temp.table.storage.format",
"orc", new StringSet("default", "text", "orc"),
"Storage format for temp tables created using LLAP external client"),
LLAP_EXTERNAL_SPLITS_ORDER_BY_FORCE_SINGLE_SPLIT("hive.llap.external.splits.order.by.force.single.split",
true,
"If LLAP external clients submits ORDER BY queries, force return a single split to guarantee reading\n" +
"data out in ordered way. Setting this to false will let external clients read data out in parallel\n" +
"losing the ordering (external clients are responsible for guaranteeing the ordering)"),
LLAP_EXTERNAL_CLIENT_USE_HYBRID_CALENDAR("hive.llap.external.client.use.hybrid.calendar",
false,
"Whether to use hybrid calendar for parsing of data/timestamps."),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,13 +201,13 @@ protected void testGenericUDTFOrderBySplitCount1OnPartitionedTable(String udtfNa
runQuery(query, getConfigs(), expectedCounts[1]);

query = "select " + udtfName + "(" + "'select id from " + partitionedTableName + " limit 2', 5)";
runQuery(query, getConfigs(), expectedCounts[1]);
runQuery(query, getConfigs(), expectedCounts[2]);

query = "select " + udtfName + "(" + "'select id from " + partitionedTableName + " where id != 0 limit 2', 5)";
runQuery(query, getConfigs(), expectedCounts[1]);
runQuery(query, getConfigs(), expectedCounts[3]);

query = "select " + udtfName + "(" + "'select id from " + partitionedTableName + " group by id limit 2', 5)";
runQuery(query, getConfigs(), expectedCounts[1]);
runQuery(query, getConfigs(), expectedCounts[4]);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void testGenericUDTFOrderBySplitCount1() throws Exception {

@Test(timeout = 200000)
public void testGenericUDTFOrderBySplitCount1OnPartitionedTable() throws Exception {
super.testGenericUDTFOrderBySplitCount1OnPartitionedTable("get_splits", new int[]{5, 1, 2, 2, 2});
super.testGenericUDTFOrderBySplitCount1OnPartitionedTable("get_splits", new int[]{5, 5, 1, 1, 1});
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public void testGenericUDTFOrderBySplitCount1() throws Exception {

@Test(timeout = 200000)
public void testGenericUDTFOrderBySplitCount1OnPartitionedTable() throws Exception {
super.testGenericUDTFOrderBySplitCount1OnPartitionedTable("get_llap_splits", new int[]{7, 3, 4, 4, 4});
super.testGenericUDTFOrderBySplitCount1OnPartitionedTable("get_llap_splits", new int[]{7, 7, 3, 3, 3});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,7 @@ public class GenericUDTFGetSplits extends GenericUDTF {
protected transient StringObjectInspector stringOI;
protected transient IntObjectInspector intOI;
protected transient JobConf jc;
private boolean orderByQuery;
private boolean limitQuery;
private boolean forceSingleSplit;
protected ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
protected DataOutput dos = new DataOutputStream(bos);
protected String inputArgQuery;
Expand Down Expand Up @@ -253,28 +251,19 @@ protected SplitResult getSplitResult(boolean generateLightWeightSplits)
TezWork tezWork = fragment.work;
Schema schema = fragment.schema;

boolean generateSingleSplit = forceSingleSplit && orderByQuery;

SplitResult splitResult = getSplits(jc, tezWork, schema, extClientAppId, generateSingleSplit,
generateLightWeightSplits);
validateSplitResult(splitResult, generateLightWeightSplits, generateSingleSplit);
SplitResult splitResult = getSplits(jc, tezWork, schema, extClientAppId, generateLightWeightSplits);
validateSplitResult(splitResult, generateLightWeightSplits);
return splitResult;
}

private void validateSplitResult(SplitResult splitResult, boolean generateLightWeightSplits,
boolean generateSingleSplit) throws HiveException {
private void validateSplitResult(SplitResult splitResult, boolean generateLightWeightSplits) {
Preconditions.checkNotNull(splitResult.schemaSplit, "schema split cannot be null");
if (!schemaSplitOnly) {
InputSplit[] splits = splitResult.actualSplits;
if (splits.length > 0 && generateLightWeightSplits) {
Preconditions.checkNotNull(splitResult.planSplit, "plan split cannot be null");
}
LOG.info("Generated {} splits for query {}. orderByQuery: {} forceSingleSplit: {}", splits.length, inputArgQuery,
orderByQuery, forceSingleSplit);
if (generateSingleSplit && splits.length > 1) {
throw new HiveException("Got more than one split (Got: " + splits.length
+ ") for order by query: " + inputArgQuery);
}
LOG.info("Generated {} splits for query {}", splits.length, inputArgQuery);
}
}

Expand All @@ -294,9 +283,6 @@ private PlanFragment createPlanFragment(String query, ApplicationId splitsAppId)
// Tez/LLAP requires RPC query plan
HiveConf.setBoolVar(conf, ConfVars.HIVE_RPC_QUERY_PLAN, true);
HiveConf.setBoolVar(conf, ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED, false);
// spark-llap always wraps query under a subquery, until that is removed from spark-llap
// hive compiler is going to remove inner order by. disable that optimization until then.
HiveConf.setBoolVar(conf, ConfVars.HIVE_REMOVE_ORDERBY_IN_SUBQUERY, false);

if (schemaSplitOnly) {
//Schema only
Expand Down Expand Up @@ -333,10 +319,7 @@ private PlanFragment createPlanFragment(String query, ApplicationId splitsAppId)
}

QueryPlan plan = driver.getPlan();
orderByQuery = plan.getQueryProperties().hasOrderBy() || plan.getQueryProperties().hasOuterOrderBy();
limitQuery = plan.getQueryProperties().getOuterQueryLimit() != -1;
forceSingleSplit = orderByQuery &&
HiveConf.getBoolVar(conf, ConfVars.LLAP_EXTERNAL_SPLITS_ORDER_BY_FORCE_SINGLE_SPLIT);
List<Task<?>> roots = plan.getRootTasks();
Schema schema = convertSchema(plan.getResultSchema());
boolean fetchTask = plan.getFetchTask() != null;
Expand Down Expand Up @@ -435,8 +418,7 @@ private PlanFragment createPlanFragment(String query, ApplicationId splitsAppId)
// 1) schema and planBytes[] in each LlapInputSplit are not populated
// 2) schemaSplit(contains only schema) and planSplit(contains only planBytes[]) are populated in SplitResult
private SplitResult getSplits(JobConf job, TezWork work, Schema schema, ApplicationId extClientAppId,
final boolean generateSingleSplit, boolean generateLightWeightSplits)
throws IOException {
boolean generateLightWeightSplits) throws IOException {

SplitResult splitResult = new SplitResult();
splitResult.schemaSplit = new LlapInputSplit(
Expand Down Expand Up @@ -483,8 +465,7 @@ private SplitResult getSplits(JobConf job, TezWork work, Schema schema, Applicat
Preconditions.checkState(HiveConf.getBoolVar(wxConf,
ConfVars.LLAP_CLIENT_CONSISTENT_SPLITS));

HiveSplitGenerator splitGenerator = new HiveSplitGenerator(wxConf, mapWork,
generateSingleSplit, inputArgNumSplits);
HiveSplitGenerator splitGenerator = new HiveSplitGenerator(wxConf, mapWork, false, inputArgNumSplits);
List<Event> eventList = splitGenerator.initialize();
int numGroupedSplitsGenerated = eventList.size() - 1;
InputSplit[] result = new InputSplit[numGroupedSplitsGenerated];
Expand Down