Skip to content

Commit

Permalink
HIVE-2329 Not using map aggregation, fails to execute group-by after
Browse files Browse the repository at this point in the history
cluster-by with same key (Navis via namit)



git-svn-id: https://svn.apache.org/repos/asf/hive/trunk@1212551 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
Namit Jain committed Dec 9, 2011
1 parent 183ddc2 commit 5a3c7ab
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hadoop.hive.ql.exec.ExtractOperator;
import org.apache.hadoop.hive.ql.exec.FilterOperator;
import org.apache.hadoop.hive.ql.exec.ForwardOperator;
import org.apache.hadoop.hive.ql.exec.GroupByOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.OperatorFactory;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
Expand Down Expand Up @@ -64,11 +65,11 @@
public class ReduceSinkDeDuplication implements Transform{

protected ParseContext pGraphContext;

@Override
public ParseContext transform(ParseContext pctx) throws SemanticException {
pGraphContext = pctx;

// generate pruned column list for all relevant operators
ReduceSinkDeduplicateProcCtx cppCtx = new ReduceSinkDeduplicateProcCtx(pGraphContext);

Expand All @@ -88,7 +89,7 @@ public ParseContext transform(ParseContext pctx) throws SemanticException {
ogw.startWalking(topNodes, null);
return pGraphContext;
}

class ReduceSinkDeduplicateProcCtx implements NodeProcessorCtx{
ParseContext pctx;
List<ReduceSinkOperator> rejectedRSList;
Expand All @@ -97,11 +98,11 @@ public ReduceSinkDeduplicateProcCtx(ParseContext pctx) {
rejectedRSList = new ArrayList<ReduceSinkOperator>();
this.pctx = pctx;
}

public boolean contains (ReduceSinkOperator rsOp) {
return rejectedRSList.contains(rsOp);
}

public void addRejectedReduceSinkOperator(ReduceSinkOperator rsOp) {
if (!rejectedRSList.contains(rsOp)) {
rejectedRSList.add(rsOp);
Expand All @@ -116,10 +117,10 @@ public void setPctx(ParseContext pctx) {
this.pctx = pctx;
}
}


static class ReduceSinkDeduplicateProcFactory {


public static NodeProcessor getReducerReducerProc() {
return new ReducerReducerProc();
Expand All @@ -140,19 +141,25 @@ public Object process(Node nd, Stack<Node> stack,
return null;
}
}

static class ReducerReducerProc implements NodeProcessor {
@Override
public Object process(Node nd, Stack<Node> stack,
NodeProcessorCtx procCtx, Object... nodeOutputs)
throws SemanticException {
ReduceSinkDeduplicateProcCtx ctx = (ReduceSinkDeduplicateProcCtx) procCtx;
ReduceSinkOperator childReduceSink = (ReduceSinkOperator)nd;

if(ctx.contains(childReduceSink)) {
return null;
}


List<Operator<? extends Serializable>> childOp = childReduceSink.getChildOperators();
if (childOp != null && childOp.size() == 1 && childOp.get(0) instanceof GroupByOperator) {
ctx.addRejectedReduceSinkOperator(childReduceSink);
return null;
}

ParseContext pGraphContext = ctx.getPctx();
HashMap<String, String> childColumnMapping = getPartitionAndKeyColumnMapping(childReduceSink);
ReduceSinkOperator parentRS = null;
Expand All @@ -171,7 +178,7 @@ public Object process(Node nd, Stack<Node> stack,
} else {
stopBacktrackFlagOp = parentRS.getParentOperators().get(0);
}

boolean succeed = backTrackColumnNames(childColumnMapping, childReduceSink, stopBacktrackFlagOp, pGraphContext);
if (!succeed) {
return null;
Expand All @@ -180,7 +187,7 @@ public Object process(Node nd, Stack<Node> stack,
if (!succeed) {
return null;
}

boolean same = compareReduceSink(childReduceSink, parentRS, childColumnMapping, parentColumnMapping);
if (!same) {
return null;
Expand All @@ -193,18 +200,18 @@ private void replaceReduceSinkWithSelectOperator(
ReduceSinkOperator childReduceSink, ParseContext pGraphContext) throws SemanticException {
List<Operator<? extends Serializable>> parentOp = childReduceSink.getParentOperators();
List<Operator<? extends Serializable>> childOp = childReduceSink.getChildOperators();

Operator<? extends Serializable> oldParent = childReduceSink;

if (childOp != null && childOp.size() == 1
&& ((childOp.get(0)) instanceof ExtractOperator)) {
oldParent = childOp.get(0);
childOp = childOp.get(0).getChildOperators();
}

Operator<? extends Serializable> input = parentOp.get(0);
input.getChildOperators().clear();

RowResolver inputRR = pGraphContext.getOpParseCtx().get(input).getRowResolver();

ArrayList<ExprNodeDesc> exprs = new ArrayList<ExprNodeDesc>();
Expand Down Expand Up @@ -239,9 +246,9 @@ private void replaceReduceSinkWithSelectOperator(
for (Operator<? extends Serializable> ch : childOp) {
ch.replaceParent(oldParent, sel);
}

}

private Operator<? extends Serializable> putOpInsertMap(
Operator<? extends Serializable> op, RowResolver rr, ParseContext pGraphContext) {
OpParseContext ctx = new OpParseContext(rr);
Expand All @@ -253,24 +260,24 @@ private boolean compareReduceSink(ReduceSinkOperator childReduceSink,
ReduceSinkOperator parentRS,
HashMap<String, String> childColumnMapping,
HashMap<String, String> parentColumnMapping) {

ArrayList<ExprNodeDesc> childPartitionCols = childReduceSink.getConf().getPartitionCols();
ArrayList<ExprNodeDesc> parentPartitionCols = parentRS.getConf().getPartitionCols();

boolean ret = compareExprNodes(childColumnMapping, parentColumnMapping,
childPartitionCols, parentPartitionCols);
if (!ret) {
return false;
}

ArrayList<ExprNodeDesc> childReduceKeyCols = childReduceSink.getConf().getKeyCols();
ArrayList<ExprNodeDesc> parentReduceKeyCols = parentRS.getConf().getKeyCols();
ret = compareExprNodes(childColumnMapping, parentColumnMapping,
childReduceKeyCols, parentReduceKeyCols);
if (!ret) {
return false;
}

String childRSOrder = childReduceSink.getConf().getOrder();
String parentRSOrder = parentRS.getConf().getOrder();
boolean moveChildRSOrderToParent = false;
Expand All @@ -285,45 +292,45 @@ private boolean compareReduceSink(ReduceSinkOperator childReduceSink,
moveChildRSOrderToParent = true;
}
}

int childNumReducers = childReduceSink.getConf().getNumReducers();
int parentNumReducers = parentRS.getConf().getNumReducers();
boolean moveChildReducerNumToParent = false;
//move child reduce sink's number reducers to the parent reduce sink operator.
if (childNumReducers != parentNumReducers) {
if (childNumReducers == -1) {
//do nothing.
//do nothing.
} else if (parentNumReducers == -1) {
//set childNumReducers in the parent reduce sink operator.
moveChildReducerNumToParent = true;
} else {
return false;
}
}

if(moveChildRSOrderToParent) {
parentRS.getConf().setOrder(childRSOrder);
parentRS.getConf().setOrder(childRSOrder);
}

if(moveChildReducerNumToParent) {
parentRS.getConf().setNumReducers(childNumReducers);
}

return true;
}

private boolean compareExprNodes(HashMap<String, String> childColumnMapping,
HashMap<String, String> parentColumnMapping,
ArrayList<ExprNodeDesc> childColExprs,
ArrayList<ExprNodeDesc> parentColExprs) {

boolean childEmpty = childColExprs == null || childColExprs.size() == 0;
boolean parentEmpty = parentColExprs == null || parentColExprs.size() == 0;

if (childEmpty) { //both empty
return true;
}

//child not empty here
if (parentEmpty) { // child not empty, but parent empty
return false;
Expand Down Expand Up @@ -383,13 +390,13 @@ private boolean backTrackColumnNames(
}
}
}

return true;
}

private HashMap<String, String> getPartitionAndKeyColumnMapping(ReduceSinkOperator reduceSink) {
HashMap<String, String> columnMapping = new HashMap<String, String> ();
ReduceSinkDesc reduceSinkDesc = reduceSink.getConf();
ReduceSinkDesc reduceSinkDesc = reduceSink.getConf();
ArrayList<ExprNodeDesc> partitionCols = reduceSinkDesc.getPartitionCols();
ArrayList<ExprNodeDesc> reduceKeyCols = reduceSinkDesc.getKeyCols();
if(partitionCols != null) {
Expand Down Expand Up @@ -419,7 +426,7 @@ private ReduceSinkOperator findSingleParentReduceSink(ReduceSinkOperator childRe
// this potentially is a join operator
return null;
}

boolean allowed = false;
if ((start instanceof SelectOperator)
|| (start instanceof FilterOperator)
Expand All @@ -429,17 +436,17 @@ private ReduceSinkOperator findSingleParentReduceSink(ReduceSinkOperator childRe
|| (start instanceof ReduceSinkOperator)) {
allowed = true;
}

if (!allowed) {
return null;
}

if ((start instanceof ScriptOperator)
&& !HiveConf.getBoolVar(pGraphContext.getConf(),
HiveConf.ConfVars.HIVESCRIPTOPERATORTRUST)) {
return null;
}

start = start.getParentOperators().get(0);
if(start instanceof ReduceSinkOperator) {
return (ReduceSinkOperator)start;
Expand All @@ -448,6 +455,6 @@ private ReduceSinkOperator findSingleParentReduceSink(ReduceSinkOperator childRe
return null;
}
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
create table t1( key_int1 int, key_int2 int, key_string1 string, key_string2 string);

set hive.map.aggr=false;
select Q1.key_int1, sum(Q1.key_int1) from (select * from t1 cluster by key_int1) Q1 group by Q1.key_int1;

drop table t1;
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
PREHOOK: query: create table t1( key_int1 int, key_int2 int, key_string1 string, key_string2 string)
PREHOOK: type: CREATETABLE
POSTHOOK: query: create table t1( key_int1 int, key_int2 int, key_string1 string, key_string2 string)
POSTHOOK: type: CREATETABLE
POSTHOOK: Output: default@t1
PREHOOK: query: select Q1.key_int1, sum(Q1.key_int1) from (select * from t1 cluster by key_int1) Q1 group by Q1.key_int1
PREHOOK: type: QUERY
PREHOOK: Input: default@t1
PREHOOK: Output: file:/tmp/navis/hive_2011-11-27_23-31-58_414_7329222711647021730/-mr-10000
POSTHOOK: query: select Q1.key_int1, sum(Q1.key_int1) from (select * from t1 cluster by key_int1) Q1 group by Q1.key_int1
POSTHOOK: type: QUERY
POSTHOOK: Input: default@t1
POSTHOOK: Output: file:/tmp/navis/hive_2011-11-27_23-31-58_414_7329222711647021730/-mr-10000
PREHOOK: query: drop table t1
PREHOOK: type: DROPTABLE
PREHOOK: Input: default@t1
PREHOOK: Output: default@t1
POSTHOOK: query: drop table t1
POSTHOOK: type: DROPTABLE
POSTHOOK: Input: default@t1
POSTHOOK: Output: default@t1

0 comments on commit 5a3c7ab

Please sign in to comment.