Skip to content

Commit

Permalink
HIVE-4281 add hive.map.groupby.sorted.testmode
Browse files Browse the repository at this point in the history
(Namit via Gang Tim Liu)



git-svn-id: https://svn.apache.org/repos/asf/hive/trunk@1464277 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
Namit Jain committed Apr 4, 2013
1 parent 2dc0efc commit 97ddec6
Show file tree
Hide file tree
Showing 8 changed files with 193 additions and 6 deletions.
3 changes: 2 additions & 1 deletion common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ public static enum ConfVars {
HIVEMAPAGGRHASHMINREDUCTION("hive.map.aggr.hash.min.reduction", (float) 0.5),
HIVEMULTIGROUPBYSINGLEREDUCER("hive.multigroupby.singlereducer", true),
HIVE_MAP_GROUPBY_SORT("hive.map.groupby.sorted", false),
HIVE_MAP_GROUPBY_SORT_TESTMODE("hive.map.groupby.sorted.testmode", false),
HIVE_GROUPBY_ORDERBY_POSITION_ALIAS("hive.groupby.orderby.position.alias", false),
HIVE_NEW_JOB_GROUPING_SET_CARDINALITY("hive.new.job.grouping.set.cardinality", 30),

Expand Down Expand Up @@ -765,7 +766,7 @@ public static enum ConfVars {
// ptf partition constants
HIVE_PTF_PARTITION_PERSISTENCE_CLASS("hive.ptf.partition.persistence",
"org.apache.hadoop.hive.ql.exec.PTFPersistence$PartitionedByteBasedList"),
HIVE_PTF_PARTITION_PERSISTENT_SIZE("hive.ptf.partition.persistence.memsize",
HIVE_PTF_PARTITION_PERSISTENT_SIZE("hive.ptf.partition.persistence.memsize",
(int) Math.pow(2, (5 + 10 + 10)) ), // 32MB
;

Expand Down
9 changes: 9 additions & 0 deletions conf/hive-default.xml.template
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,15 @@
</description>
</property>

<property>
<name>hive.map.groupby.sorted.testmode</name>
<value>false</value>
<description>If the bucketing/sorting properties of the table exactly match the grouping key, whether to
perform the group by in the mapper by using BucketizedHiveInputFormat. If the test mode is set, the plan
is not converted, but a query property is set to denote the same.
</description>
</property>

<property>
<name>hive.new.job.grouping.set.cardinality</name>
<value>30</value>
Expand Down
9 changes: 9 additions & 0 deletions ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class QueryProperties {
boolean hasDistributeBy = false;
boolean hasClusterBy = false;
boolean mapJoinRemoved = false;
boolean hasMapGroupBy = false;

public boolean hasJoin() {
return hasJoin;
Expand Down Expand Up @@ -134,4 +135,12 @@ public boolean isMapJoinRemoved() {
public void setMapJoinRemoved(boolean mapJoinRemoved) {
this.mapJoinRemoved = mapJoinRemoved;
}

public boolean isHasMapGroupBy() {
return hasMapGroupBy;
}

public void setHasMapGroupBy(boolean hasMapGroupBy) {
this.hasMapGroupBy = hasMapGroupBy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ protected void processGroupBy(GroupByOptimizerContext ctx,
// Dont remove the operator for distincts
if (useMapperSort && !groupByOp.getConf().isDistinct() &&
(match == GroupByOptimizerSortMatch.COMPLETE_MATCH)) {
convertGroupByMapSideSortedGroupBy(groupByOp, depth);
convertGroupByMapSideSortedGroupBy(hiveConf, groupByOp, depth);
}
else if ((match == GroupByOptimizerSortMatch.PARTIAL_MATCH) ||
(match == GroupByOptimizerSortMatch.COMPLETE_MATCH)) {
Expand Down Expand Up @@ -455,7 +455,14 @@ private GroupByOptimizerSortMatch matchBucketSortCols(

// Convert the group by to a map-side group by
// The operators specified by depth and removed from the tree.
protected void convertGroupByMapSideSortedGroupBy(GroupByOperator groupByOp, int depth) {
protected void convertGroupByMapSideSortedGroupBy(
HiveConf conf, GroupByOperator groupByOp, int depth) {
// In test mode, dont change the query plan. However, setup a query property
pGraphContext.getQueryProperties().setHasMapGroupBy(true);
if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_MAP_GROUPBY_SORT_TESTMODE)) {
return;
}

if (groupByOp.removeChildren(depth)) {
// Use bucketized hive input format - that makes sure that one mapper reads the entire file
groupByOp.setUseBucketizedHiveInputFormat(true);
Expand Down
14 changes: 13 additions & 1 deletion ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.QueryProperties;
import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator;
import org.apache.hadoop.hive.ql.exec.FetchTask;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
Expand Down Expand Up @@ -113,6 +114,7 @@ public class ParseContext {
private List<Task<? extends Serializable>> rootTasks;

private FetchTask fetchTask;
private QueryProperties queryProperties;

public ParseContext() {
}
Expand Down Expand Up @@ -180,7 +182,8 @@ public ParseContext(
HashSet<ReadEntity> semanticInputs, List<Task<? extends Serializable>> rootTasks,
Map<TableScanOperator, Map<String, ExprNodeDesc>> opToPartToSkewedPruner,
Map<String, ReadEntity> viewAliasToInput,
List<ReduceSinkOperator> reduceSinkOperatorsAddedByEnforceBucketingSorting) {
List<ReduceSinkOperator> reduceSinkOperatorsAddedByEnforceBucketingSorting,
QueryProperties queryProperties) {
this.conf = conf;
this.qb = qb;
this.ast = ast;
Expand Down Expand Up @@ -212,6 +215,7 @@ public ParseContext(
this.viewAliasToInput = viewAliasToInput;
this.reduceSinkOperatorsAddedByEnforceBucketingSorting =
reduceSinkOperatorsAddedByEnforceBucketingSorting;
this.queryProperties = queryProperties;
}

/**
Expand Down Expand Up @@ -623,4 +627,12 @@ public void setOpPartToSkewedPruner(
public Map<String, ReadEntity> getViewAliasToInput() {
return viewAliasToInput;
}

public QueryProperties getQueryProperties() {
return queryProperties;
}

public void setQueryProperties(QueryProperties queryProperties) {
this.queryProperties = queryProperties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,8 @@ public ParseContext getParseContext() {
listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions,
opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks,
opToPartToSkewedPruner, viewAliasToInput,
reduceSinkOperatorsAddedByEnforceBucketingSorting);
reduceSinkOperatorsAddedByEnforceBucketingSorting,
queryProperties);
}

@SuppressWarnings("nls")
Expand Down Expand Up @@ -8692,7 +8693,7 @@ public void analyzeInternal(ASTNode ast) throws SemanticException {
listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions,
opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks,
opToPartToSkewedPruner, viewAliasToInput,
reduceSinkOperatorsAddedByEnforceBucketingSorting);
reduceSinkOperatorsAddedByEnforceBucketingSorting, queryProperties);

// Generate table access stats if required
if (HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_STATS_COLLECT_TABLEKEYS) == true) {
Expand Down
21 changes: 21 additions & 0 deletions ql/src/test/queries/clientpositive/groupby_sort_test_1.q
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
set hive.enforce.bucketing = true;
set hive.enforce.sorting = true;
set hive.exec.reducers.max = 10;
set hive.map.groupby.sorted=true;
set hive.map.groupby.sorted.testmode=true;

CREATE TABLE T1(key STRING, val STRING)
CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE;

LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1;

-- perform an insert to make sure there are 2 files
INSERT OVERWRITE TABLE T1 select key, val from T1;

CREATE TABLE outputTbl1(key int, cnt int);

-- The plan should be converted to a map-side group by if the group by key
-- matches the sorted key. However, in test mode, the group by wont be converted.
EXPLAIN
INSERT OVERWRITE TABLE outputTbl1
SELECT key, count(1) FROM T1 GROUP BY key;
127 changes: 127 additions & 0 deletions ql/src/test/results/clientpositive/groupby_sort_test_1.q.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
PREHOOK: query: CREATE TABLE T1(key STRING, val STRING)
CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE
PREHOOK: type: CREATETABLE
POSTHOOK: query: CREATE TABLE T1(key STRING, val STRING)
CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE
POSTHOOK: type: CREATETABLE
POSTHOOK: Output: default@T1
PREHOOK: query: LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1
PREHOOK: type: LOAD
PREHOOK: Output: default@t1
POSTHOOK: query: LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1
POSTHOOK: type: LOAD
POSTHOOK: Output: default@t1
PREHOOK: query: -- perform an insert to make sure there are 2 files
INSERT OVERWRITE TABLE T1 select key, val from T1
PREHOOK: type: QUERY
PREHOOK: Input: default@t1
PREHOOK: Output: default@t1
POSTHOOK: query: -- perform an insert to make sure there are 2 files
INSERT OVERWRITE TABLE T1 select key, val from T1
POSTHOOK: type: QUERY
POSTHOOK: Input: default@t1
POSTHOOK: Output: default@t1
POSTHOOK: Lineage: t1.key SIMPLE [(t1)t1.FieldSchema(name:key, type:string, comment:null), ]
POSTHOOK: Lineage: t1.val SIMPLE [(t1)t1.FieldSchema(name:val, type:string, comment:null), ]
PREHOOK: query: CREATE TABLE outputTbl1(key int, cnt int)
PREHOOK: type: CREATETABLE
POSTHOOK: query: CREATE TABLE outputTbl1(key int, cnt int)
POSTHOOK: type: CREATETABLE
POSTHOOK: Output: default@outputTbl1
POSTHOOK: Lineage: t1.key SIMPLE [(t1)t1.FieldSchema(name:key, type:string, comment:null), ]
POSTHOOK: Lineage: t1.val SIMPLE [(t1)t1.FieldSchema(name:val, type:string, comment:null), ]
PREHOOK: query: -- The plan should be converted to a map-side group by if the group by key
-- matches the sorted key. However, in test mode, the group by wont be converted.
EXPLAIN
INSERT OVERWRITE TABLE outputTbl1
SELECT key, count(1) FROM T1 GROUP BY key
PREHOOK: type: QUERY
POSTHOOK: query: -- The plan should be converted to a map-side group by if the group by key
-- matches the sorted key. However, in test mode, the group by wont be converted.
EXPLAIN
INSERT OVERWRITE TABLE outputTbl1
SELECT key, count(1) FROM T1 GROUP BY key
POSTHOOK: type: QUERY
POSTHOOK: Lineage: t1.key SIMPLE [(t1)t1.FieldSchema(name:key, type:string, comment:null), ]
POSTHOOK: Lineage: t1.val SIMPLE [(t1)t1.FieldSchema(name:val, type:string, comment:null), ]
ABSTRACT SYNTAX TREE:
(TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME T1))) (TOK_INSERT (TOK_DESTINATION (TOK_TAB (TOK_TABNAME outputTbl1))) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL key)) (TOK_SELEXPR (TOK_FUNCTION count 1))) (TOK_GROUPBY (TOK_TABLE_OR_COL key))))

STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1
Stage-2 depends on stages: Stage-0

STAGE PLANS:
Stage: Stage-1
Map Reduce
Alias -> Map Operator Tree:
t1
TableScan
alias: t1
Select Operator
expressions:
expr: key
type: string
outputColumnNames: key
Group By Operator
aggregations:
expr: count(1)
bucketGroup: false
keys:
expr: key
type: string
mode: hash
outputColumnNames: _col0, _col1
Reduce Output Operator
key expressions:
expr: _col0
type: string
sort order: +
Map-reduce partition columns:
expr: _col0
type: string
tag: -1
value expressions:
expr: _col1
type: bigint
Reduce Operator Tree:
Group By Operator
aggregations:
expr: count(VALUE._col0)
bucketGroup: false
keys:
expr: KEY._col0
type: string
mode: mergepartial
outputColumnNames: _col0, _col1
Select Operator
expressions:
expr: UDFToInteger(_col0)
type: int
expr: UDFToInteger(_col1)
type: int
outputColumnNames: _col0, _col1
File Output Operator
compressed: false
GlobalTableId: 1
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: default.outputtbl1

Stage: Stage-0
Move Operator
tables:
replace: true
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: default.outputtbl1

Stage: Stage-2
Stats-Aggr Operator


0 comments on commit 97ddec6

Please sign in to comment.