Skip to content

Commit

Permalink
HIVE-26671: Incorrect results with Top N Key optimization (Stephen Ca…
Browse files Browse the repository at this point in the history
…rlin, reviewed by Krisztian Kasa)
  • Loading branch information
scarlin-cloudera authored Oct 30, 2022
1 parent 16ce755 commit 856d807
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,15 @@ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
return null;
}

// HIVE-26671: We do not want to create a TopNKey processor when the reduce sink
// operator contains a count distinct. This would result in a topnkey operator
// with an extra group in its sort order. The TopNKey Pushdown Processor could then
// push down this operator and it would be incorrect since the count distinct adds
// a group that is only temporarily used for calculating a value.
if (reduceSinkDesc.hasADistinctColumnIndex()) {
return null;
}

// Check whether there already is a top n key operator
Operator<? extends OperatorDesc> parentOperator = reduceSinkOperator.getParentOperators().get(0);
if (parentOperator instanceof TopNKeyOperator) {
Expand Down
13 changes: 13 additions & 0 deletions ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Objects;
import java.util.Set;

import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.optimizer.signature.Signature;
Expand Down Expand Up @@ -430,6 +431,18 @@ public void setDistinctColumnIndices(
this.distinctColumnIndices = distinctColumnIndices;
}

public boolean hasADistinctColumnIndex() {
if (this.distinctColumnIndices == null) {
return false;
}
for (List<Integer> distinctColumnIndex : this.distinctColumnIndices) {
if (CollectionUtils.isNotEmpty(distinctColumnIndex)) {
return true;
}
}
return false;
}

@Explain(displayName = "outputname", explainLevels = { Level.USER })
public String getOutputName() {
return outputName;
Expand Down
8 changes: 8 additions & 0 deletions ql/src/test/queries/clientpositive/topnkey.q
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,12 @@ SELECT a, b FROM t_test GROUP BY a, b ORDER BY a, b LIMIT 3;
SET hive.optimize.topnkey=false;
SELECT a, b FROM t_test GROUP BY a, b ORDER BY a, b LIMIT 3;

SET hive.optimize.topnkey=true;
EXPLAIN
SELECT a, count(distinct b), min(c) FROM t_test GROUP BY a ORDER BY a LIMIT 3;
SELECT a, count(distinct b), min(c) FROM t_test GROUP BY a ORDER BY a LIMIT 3;

SET hive.optimize.topnkey=false;
SELECT a, count(distinct b), min(c) FROM t_test GROUP BY a ORDER BY a LIMIT 3;

DROP TABLE t_test;
116 changes: 116 additions & 0 deletions ql/src/test/results/clientpositive/llap/topnkey.q.out
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,122 @@ POSTHOOK: Input: default@t_test
5 1
5 2
6 2
PREHOOK: query: EXPLAIN
SELECT a, count(distinct b), min(c) FROM t_test GROUP BY a ORDER BY a LIMIT 3
PREHOOK: type: QUERY
PREHOOK: Input: default@t_test
#### A masked pattern was here ####
POSTHOOK: query: EXPLAIN
SELECT a, count(distinct b), min(c) FROM t_test GROUP BY a ORDER BY a LIMIT 3
POSTHOOK: type: QUERY
POSTHOOK: Input: default@t_test
#### A masked pattern was here ####
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1

STAGE PLANS:
Stage: Stage-1
Tez
#### A masked pattern was here ####
Edges:
Reducer 2 <- Map 1 (SIMPLE_EDGE)
Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
#### A masked pattern was here ####
Vertices:
Map 1
Map Operator Tree:
TableScan
alias: t_test
Statistics: Num rows: 8 Data size: 96 Basic stats: COMPLETE Column stats: COMPLETE
Top N Key Operator
sort order: +
keys: a (type: int)
null sort order: z
Statistics: Num rows: 8 Data size: 96 Basic stats: COMPLETE Column stats: COMPLETE
top n: 3
Select Operator
expressions: a (type: int), b (type: int), c (type: int)
outputColumnNames: a, b, c
Statistics: Num rows: 8 Data size: 96 Basic stats: COMPLETE Column stats: COMPLETE
Group By Operator
aggregations: count(DISTINCT b), min(c)
keys: a (type: int), b (type: int)
minReductionHashAggr: 0.4
mode: hash
outputColumnNames: _col0, _col1, _col2, _col3
Statistics: Num rows: 4 Data size: 80 Basic stats: COMPLETE Column stats: COMPLETE
Reduce Output Operator
key expressions: _col0 (type: int), _col1 (type: int)
null sort order: zz
sort order: ++
Map-reduce partition columns: _col0 (type: int)
Statistics: Num rows: 4 Data size: 80 Basic stats: COMPLETE Column stats: COMPLETE
TopN Hash Memory Usage: 0.1
value expressions: _col3 (type: int)
Execution mode: llap
LLAP IO: all inputs
Reducer 2
Execution mode: llap
Reduce Operator Tree:
Group By Operator
aggregations: count(DISTINCT KEY._col1:0._col0), min(VALUE._col1)
keys: KEY._col0 (type: int)
mode: mergepartial
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 3 Data size: 48 Basic stats: COMPLETE Column stats: COMPLETE
Reduce Output Operator
key expressions: _col0 (type: int)
null sort order: z
sort order: +
Statistics: Num rows: 3 Data size: 48 Basic stats: COMPLETE Column stats: COMPLETE
value expressions: _col1 (type: bigint), _col2 (type: int)
Reducer 3
Execution mode: llap
Reduce Operator Tree:
Select Operator
expressions: KEY.reducesinkkey0 (type: int), VALUE._col0 (type: bigint), VALUE._col1 (type: int)
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 3 Data size: 48 Basic stats: COMPLETE Column stats: COMPLETE
Limit
Number of rows: 3
Statistics: Num rows: 3 Data size: 48 Basic stats: COMPLETE Column stats: COMPLETE
File Output Operator
compressed: false
Statistics: Num rows: 3 Data size: 48 Basic stats: COMPLETE Column stats: COMPLETE
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

Stage: Stage-0
Fetch Operator
limit: 3
Processor Tree:
ListSink

PREHOOK: query: SELECT a, count(distinct b), min(c) FROM t_test GROUP BY a ORDER BY a LIMIT 3
PREHOOK: type: QUERY
PREHOOK: Input: default@t_test
#### A masked pattern was here ####
POSTHOOK: query: SELECT a, count(distinct b), min(c) FROM t_test GROUP BY a ORDER BY a LIMIT 3
POSTHOOK: type: QUERY
POSTHOOK: Input: default@t_test
#### A masked pattern was here ####
5 2 2
6 1 1
7 1 4
PREHOOK: query: SELECT a, count(distinct b), min(c) FROM t_test GROUP BY a ORDER BY a LIMIT 3
PREHOOK: type: QUERY
PREHOOK: Input: default@t_test
#### A masked pattern was here ####
POSTHOOK: query: SELECT a, count(distinct b), min(c) FROM t_test GROUP BY a ORDER BY a LIMIT 3
POSTHOOK: type: QUERY
POSTHOOK: Input: default@t_test
#### A masked pattern was here ####
5 2 2
6 1 1
7 1 4
PREHOOK: query: DROP TABLE t_test
PREHOOK: type: DROPTABLE
PREHOOK: Input: default@t_test
Expand Down

0 comments on commit 856d807

Please sign in to comment.