diff --git a/CHANGES b/CHANGES index eb90021553..391b716b65 100644 --- a/CHANGES +++ b/CHANGES @@ -161,21 +161,6 @@ Release 0.9.0 - unreleased BUG FIXES - TAJO-1067: INSERT OVERWRITE INTO should not remove all partitions. (jaehwa) - - TAJO-1065: The \admin -cluster argument doesn't run as expected. - (Jongyoung Park via hyunsik) - - TAJO-1072: CLI gets stuck when wrong host/port is provided. - (Jihun Kang via hyunsik) - - TAJO-1081: Non-forwarded (simple) query shows wrong rows. (hyunsik) - - TAJO-981: Help command (\?) in tsql takes too long time. (YeonSu Han via - jaehwa) - - TAJO-962: Column reference used in LIMIT clause incurs NPE. - TAJO-1074: Query calculates wrong progress before subquery init. (jinho) TAJO-1025: Network disconnection during query processing can cause @@ -457,12 +442,6 @@ Release 0.9.0 - unreleased TASKS - TAJO-668: Add datetime function documentation. (Jongyoung Park via hyunsik) - - TAJO-1077: Add Derby configuration documentation. (hyunsik) - - TAJO-1068: Add SQL Query documentation. (hyunsik) - TAJO-1078: Update contributor list. (hyunsik) TAJO-1070: BSTIndexScanExec should not seek a negative offset. (jinho) @@ -500,10 +479,6 @@ Release 0.9.0 - unreleased SUB TASKS - TAJO-1096: Update download source documentation (Mai Hai Thanh via jaehwa) - - TAJO-1062: Update TSQL documentation. (jaehwa) - TAJO-1061: TAJO-1061: Update build documentation. (jaehwa) TAJO-1060: Apply updated hadoop versions to README and BUILDING files. diff --git a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java index cc875b2c1a..1229849819 100644 --- a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java +++ b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java @@ -98,6 +98,8 @@ public enum SessionVars implements ConfigKey { TABLE_PARTITION_PER_SHUFFLE_SIZE(ConfVars.$DIST_QUERY_TABLE_PARTITION_VOLUME, "shuffle output size for partition table write (mb)", DEFAULT), + GROUPBY_MULTI_LEVEL_ENABLED(ConfVars.$GROUPBY_MULTI_LEVEL_ENABLED, "Multiple level groupby enabled", DEFAULT), + // for physical Executors EXTSORT_BUFFER_SIZE(ConfVars.$EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE, "sort buffer size for external sort (mb)", DEFAULT), HASH_JOIN_SIZE_LIMIT(ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD, "limited size for hash join (mb)", DEFAULT), diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index f9f5e4a0ba..66d3030f43 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -316,6 +316,8 @@ public static enum ConfVars implements ConfigKey { $DIST_QUERY_GROUPBY_PARTITION_VOLUME("tajo.dist-query.groupby.partition-volume-mb", 256), $DIST_QUERY_TABLE_PARTITION_VOLUME("tajo.dist-query.table-partition.task-volume-mb", 256), + $GROUPBY_MULTI_LEVEL_ENABLED("tajo.dist-query.groupby.multi-level-aggr", true), + // for physical Executors $EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE("tajo.executor.external-sort.buffer-mb", 200L), $EXECUTOR_HASH_JOIN_SIZE_THRESHOLD("tajo.executor.join.common.in-memory-hash-threshold-bytes", diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/eval/AggregationFunctionCallEval.java b/tajo-core/src/main/java/org/apache/tajo/engine/eval/AggregationFunctionCallEval.java index ab18aa915e..32165199c5 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/eval/AggregationFunctionCallEval.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/eval/AggregationFunctionCallEval.java @@ -30,7 +30,10 @@ public class AggregationFunctionCallEval extends FunctionEval implements Cloneable { @Expose protected AggFunction instance; - @Expose boolean firstPhase = false; + @Expose boolean intermediatePhase = false; + @Expose boolean finalPhase = true; + @Expose String alias; + private Tuple params; protected AggregationFunctionCallEval(EvalType type, FunctionDesc desc, AggFunction instance, EvalNode[] givenArgs) { @@ -58,7 +61,8 @@ public void merge(FunctionContext context, Schema schema, Tuple tuple) { } } - if (firstPhase) { + if (!intermediatePhase && !finalPhase) { + // firstPhase instance.eval(context, params); } else { instance.merge(context, params); @@ -71,7 +75,7 @@ public Datum eval(Schema schema, Tuple tuple) { } public Datum terminate(FunctionContext context) { - if (firstPhase) { + if (!finalPhase) { return instance.getPartialResult(context); } else { return instance.terminate(context); @@ -80,18 +84,40 @@ public Datum terminate(FunctionContext context) { @Override public DataType getValueType() { - if (firstPhase) { + if (!finalPhase) { return instance.getPartialResultType(); } else { return funcDesc.getReturnType(); } } + public void setAlias(String alias) { this.alias = alias; } + + public String getAlias() { return this.alias; } + public Object clone() throws CloneNotSupportedException { - return super.clone(); + AggregationFunctionCallEval clone = (AggregationFunctionCallEval)super.clone(); + + clone.finalPhase = finalPhase; + clone.intermediatePhase = intermediatePhase; + clone.alias = alias; + clone.instance = (AggFunction)instance.clone(); + + return clone; } public void setFirstPhase() { - this.firstPhase = true; + this.finalPhase = false; + this.intermediatePhase = false; + } + + public void setFinalPhase() { + this.finalPhase = true; + this.intermediatePhase = false; + } + + public void setIntermediatePhase() { + this.finalPhase = false; + this.intermediatePhase = true; } } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java index 2730202444..6b1c65cd16 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java @@ -43,6 +43,7 @@ import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer; import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.DistinctAggregationAlgorithm; +import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage; import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.SortSpecArray; import org.apache.tajo.storage.AbstractStorageManager; import org.apache.tajo.storage.StorageConstants; @@ -56,6 +57,7 @@ import java.io.IOException; import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; import java.util.List; import java.util.Stack; @@ -1047,17 +1049,61 @@ public PhysicalExec createDistinctGroupByPlan(TaskAttemptContext context, Enforcer enforcer = context.getEnforcer(); EnforceProperty property = getAlgorithmEnforceProperty(enforcer, distinctNode); if (property != null) { - DistinctAggregationAlgorithm algorithm = property.getDistinct().getAlgorithm(); - if (algorithm == DistinctAggregationAlgorithm.HASH_AGGREGATION) { - return createInMemoryDistinctGroupbyExec(context, distinctNode, subOp); + if (property.getDistinct().getIsMultipleAggregation()) { + MultipleAggregationStage stage = property.getDistinct().getMultipleAggregationStage(); + + if (stage == MultipleAggregationStage.FIRST_STAGE) { + return new DistinctGroupbyFirstAggregationExec(context, distinctNode, subOp); + } else if (stage == MultipleAggregationStage.SECOND_STAGE) { + return new DistinctGroupbySecondAggregationExec(context, distinctNode, + createSortExecForDistinctGroupby(context, distinctNode, subOp, 2)); + } else { + return new DistinctGroupbyThirdAggregationExec(context, distinctNode, + createSortExecForDistinctGroupby(context, distinctNode, subOp, 3)); + } } else { - return createSortAggregationDistinctGroupbyExec(context, distinctNode, subOp, property.getDistinct()); + DistinctAggregationAlgorithm algorithm = property.getDistinct().getAlgorithm(); + if (algorithm == DistinctAggregationAlgorithm.HASH_AGGREGATION) { + return createInMemoryDistinctGroupbyExec(context, distinctNode, subOp); + } else { + return createSortAggregationDistinctGroupbyExec(context, distinctNode, subOp, property.getDistinct()); + } } } else { return createInMemoryDistinctGroupbyExec(context, distinctNode, subOp); } } + private SortExec createSortExecForDistinctGroupby(TaskAttemptContext context, + DistinctGroupbyNode distinctNode, + PhysicalExec subOp, + int phase) throws IOException { + SortNode sortNode = LogicalPlan.createNodeWithoutPID(SortNode.class); + //2 phase: seq, groupby columns, distinct1 keys, distinct2 keys, + //3 phase: groupby columns, seq, distinct1 keys, distinct2 keys, + List sortSpecs = new ArrayList(); + if (phase == 2) { + sortSpecs.add(new SortSpec(distinctNode.getTargets()[0].getNamedColumn())); + } + for (Column eachColumn: distinctNode.getGroupingColumns()) { + sortSpecs.add(new SortSpec(eachColumn)); + } + if (phase == 3) { + sortSpecs.add(new SortSpec(distinctNode.getTargets()[0].getNamedColumn())); + } + for (GroupbyNode eachGroupbyNode: distinctNode.getGroupByNodes()) { + for (Column eachColumn: eachGroupbyNode.getGroupingColumns()) { + sortSpecs.add(new SortSpec(eachColumn)); + } + } + sortNode.setSortSpecs(sortSpecs.toArray(new SortSpec[]{})); + sortNode.setInSchema(distinctNode.getInSchema()); + sortNode.setOutSchema(distinctNode.getInSchema()); + ExternalSortExec sortExec = new ExternalSortExec(context, sm, sortNode, subOp); + + return sortExec; + } + private PhysicalExec createInMemoryDistinctGroupbyExec(TaskAttemptContext ctx, DistinctGroupbyNode distinctGroupbyNode, PhysicalExec subOp) throws IOException { return new DistinctGroupbyHashAggregationExec(ctx, distinctGroupbyNode, subOp); @@ -1145,7 +1191,7 @@ public PhysicalExec createIndexScanExec(TaskAttemptContext ctx, } - private EnforceProperty getAlgorithmEnforceProperty(Enforcer enforcer, LogicalNode node) { + public static EnforceProperty getAlgorithmEnforceProperty(Enforcer enforcer, LogicalNode node) { if (enforcer == null) { return null; } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java index 031569eae3..e2d77444a0 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java @@ -24,6 +24,7 @@ import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.common.ProtoObject; import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.DistinctAggregationAlgorithm; +import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage; import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.SortSpecArray; import org.apache.tajo.util.TUtil; @@ -135,13 +136,25 @@ public void enforceHashAggregation(int pid) { public void enforceDistinctAggregation(int pid, DistinctAggregationAlgorithm algorithm, List sortSpecArrays) { + enforceDistinctAggregation(pid, false, null, algorithm, sortSpecArrays); + } + + public void enforceDistinctAggregation(int pid, + boolean isMultipleAggregation, + MultipleAggregationStage stage, + DistinctAggregationAlgorithm algorithm, + List sortSpecArrays) { EnforceProperty.Builder builder = newProperty(); DistinctGroupbyEnforcer.Builder enforce = DistinctGroupbyEnforcer.newBuilder(); enforce.setPid(pid); + enforce.setIsMultipleAggregation(isMultipleAggregation); enforce.setAlgorithm(algorithm); if (sortSpecArrays != null) { enforce.addAllSortSpecArrays(sortSpecArrays); } + if (stage != null) { + enforce.setMultipleAggregationStage(stage); + } builder.setType(EnforceType.DISTINCT_GROUP_BY); builder.setDistinct(enforce.build()); diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java index 432589b3a5..01e02d7286 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java @@ -658,47 +658,6 @@ private RewrittenFunctions rewriteAggFunctionsForDistinctAggregation(GlobalPlanC return rewritten; } - public ExecutionBlock buildDistinctGroupbyAndUnionPlan(MasterPlan masterPlan, ExecutionBlock lastBlock, - DistinctGroupbyNode firstPhaseGroupBy, - DistinctGroupbyNode secondPhaseGroupBy) { - DataChannel lastDataChannel = null; - - // It pushes down the first phase group-by operator into all child blocks. - // - // (second phase) G (currentBlock) - // /|\ - // / / | \ - // (first phase) G G G G (child block) - - // They are already connected one another. - // So, we don't need to connect them again. - for (DataChannel dataChannel : masterPlan.getIncomingChannels(lastBlock.getId())) { - if (firstPhaseGroupBy.isEmptyGrouping()) { - dataChannel.setShuffle(HASH_SHUFFLE, firstPhaseGroupBy.getGroupingColumns(), 1); - } else { - dataChannel.setShuffle(HASH_SHUFFLE, firstPhaseGroupBy.getGroupingColumns(), 32); - } - dataChannel.setSchema(firstPhaseGroupBy.getOutSchema()); - ExecutionBlock childBlock = masterPlan.getExecBlock(dataChannel.getSrcId()); - - // Why must firstPhaseGroupby be copied? - // - // A groupby in each execution block can have different child. - // It affects groupby's input schema. - DistinctGroupbyNode firstPhaseGroupbyCopy = PlannerUtil.clone(masterPlan.getLogicalPlan(), firstPhaseGroupBy); - firstPhaseGroupbyCopy.setChild(childBlock.getPlan()); - childBlock.setPlan(firstPhaseGroupbyCopy); - - // just keep the last data channel. - lastDataChannel = dataChannel; - } - - ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), lastDataChannel); - secondPhaseGroupBy.setChild(scanNode); - lastBlock.setPlan(secondPhaseGroupBy); - return lastBlock; - } - /** * If there are at least one distinct aggregation function, a query works as if the query is rewritten as follows: * @@ -824,8 +783,20 @@ private ExecutionBlock buildGroupBy(GlobalPlanContext context, ExecutionBlock la ExecutionBlock currentBlock; if (groupbyNode.isDistinct()) { // if there is at one distinct aggregation function - DistinctGroupbyBuilder builder = new DistinctGroupbyBuilder(this); - return builder.buildPlan(context, lastBlock, groupbyNode); + boolean multiLevelEnabled = context.getPlan().getContext().getBool(SessionVars.GROUPBY_MULTI_LEVEL_ENABLED); + + if (multiLevelEnabled) { + if (PlannerUtil.findTopNode(groupbyNode, NodeType.UNION) == null) { + DistinctGroupbyBuilder builder = new DistinctGroupbyBuilder(this); + return builder.buildMultiLevelPlan(context, lastBlock, groupbyNode); + } else { + DistinctGroupbyBuilder builder = new DistinctGroupbyBuilder(this); + return builder.buildPlan(context, lastBlock, groupbyNode); + } + } else { + DistinctGroupbyBuilder builder = new DistinctGroupbyBuilder(this); + return builder.buildPlan(context, lastBlock, groupbyNode); + } } else { GroupbyNode firstPhaseGroupby = createFirstPhaseGroupBy(masterPlan.getLogicalPlan(), groupbyNode); @@ -968,6 +939,7 @@ public static GroupbyNode createFirstPhaseGroupBy(LogicalPlan plan, GroupbyNode firstPhaseEvals[i].setFirstPhase(); firstPhaseEvalNames[i] = plan.generateUniqueColumnName(firstPhaseEvals[i]); FieldEval param = new FieldEval(firstPhaseEvalNames[i], firstPhaseEvals[i].getValueType()); + secondPhaseEvals[i].setFinalPhase(); secondPhaseEvals[i].setArgs(new EvalNode[] {param}); } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java index 8727b84eef..cbe2d7ea45 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java @@ -24,6 +24,7 @@ import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.proto.CatalogProtos.SortSpecProto; +import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.engine.eval.AggregationFunctionCallEval; import org.apache.tajo.engine.eval.EvalNode; import org.apache.tajo.engine.eval.EvalTreeUtil; @@ -36,11 +37,14 @@ import org.apache.tajo.engine.planner.global.ExecutionBlock; import org.apache.tajo.engine.planner.global.GlobalPlanner; import org.apache.tajo.engine.planner.global.GlobalPlanner.GlobalPlanContext; +import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.engine.planner.logical.DistinctGroupbyNode; import org.apache.tajo.engine.planner.logical.GroupbyNode; import org.apache.tajo.engine.planner.logical.LogicalNode; import org.apache.tajo.engine.planner.logical.ScanNode; +import org.apache.tajo.engine.planner.rewrite.ProjectionPushDownRule; import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.DistinctAggregationAlgorithm; +import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage; import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.SortSpecArray; import org.apache.tajo.util.TUtil; @@ -56,6 +60,255 @@ public DistinctGroupbyBuilder(GlobalPlanner globalPlanner) { this.globalPlanner = globalPlanner; } + public ExecutionBlock buildMultiLevelPlan(GlobalPlanContext context, + ExecutionBlock latestExecBlock, + LogicalNode currentNode) throws PlanningException { + try { + GroupbyNode groupbyNode = (GroupbyNode) currentNode; + + LogicalPlan plan = context.getPlan().getLogicalPlan(); + + DistinctGroupbyNode baseDistinctNode = + buildMultiLevelBaseDistinctGroupByNode(context, latestExecBlock, groupbyNode); + baseDistinctNode.setGroupbyPlan(groupbyNode); + + // Set total Aggregation Functions. + AggregationFunctionCallEval[] aggFunctions = + new AggregationFunctionCallEval[groupbyNode.getAggFunctions().length]; + + for (int i = 0; i < aggFunctions.length; i++) { + aggFunctions[i] = (AggregationFunctionCallEval) groupbyNode.getAggFunctions()[i].clone(); + aggFunctions[i].setFirstPhase(); + // If there is not grouping column, we can't find column alias. + // Thus we should find the alias at Groupbynode output schema. + if (groupbyNode.getGroupingColumns().length == 0 + && aggFunctions.length == groupbyNode.getOutSchema().getColumns().size()) { + aggFunctions[i].setAlias(groupbyNode.getOutSchema().getColumn(i).getQualifiedName()); + } + } + + if (groupbyNode.getGroupingColumns().length == 0 + && aggFunctions.length == groupbyNode.getOutSchema().getColumns().size()) { + groupbyNode.setAggFunctions(aggFunctions); + } + + baseDistinctNode.setAggFunctions(aggFunctions); + + // Create First, SecondStage's Node using baseNode + DistinctGroupbyNode firstStageDistinctNode = PlannerUtil.clone(plan, baseDistinctNode); + DistinctGroupbyNode secondStageDistinctNode = PlannerUtil.clone(plan, baseDistinctNode); + DistinctGroupbyNode thirdStageDistinctNode = PlannerUtil.clone(plan, baseDistinctNode); + + // Set second, third non-distinct aggregation's eval node to field eval + GroupbyNode lastGroupbyNode = secondStageDistinctNode.getGroupByNodes().get(secondStageDistinctNode.getGroupByNodes().size() - 1); + if (!lastGroupbyNode.isDistinct()) { + int index = 0; + for (AggregationFunctionCallEval aggrFunction: lastGroupbyNode.getAggFunctions()) { + aggrFunction.setIntermediatePhase(); + aggrFunction.setArgs(new EvalNode[]{new FieldEval(lastGroupbyNode.getTargets()[index].getNamedColumn())}); + index++; + } + } + lastGroupbyNode = thirdStageDistinctNode.getGroupByNodes().get(thirdStageDistinctNode.getGroupByNodes().size() - 1); + if (!lastGroupbyNode.isDistinct()) { + int index = 0; + for (AggregationFunctionCallEval aggrFunction: lastGroupbyNode.getAggFunctions()) { + aggrFunction.setFirstPhase(); + aggrFunction.setArgs(new EvalNode[]{new FieldEval(lastGroupbyNode.getTargets()[index].getNamedColumn())}); + index++; + } + } + + // Set in & out schema for each DistinctGroupbyNode. + secondStageDistinctNode.setInSchema(firstStageDistinctNode.getOutSchema()); + secondStageDistinctNode.setOutSchema(firstStageDistinctNode.getOutSchema()); + thirdStageDistinctNode.setInSchema(firstStageDistinctNode.getOutSchema()); + thirdStageDistinctNode.setOutSchema(groupbyNode.getOutSchema()); + + // Set latestExecBlock's plan with firstDistinctNode + latestExecBlock.setPlan(firstStageDistinctNode); + + // Make SecondStage ExecutionBlock + ExecutionBlock secondStageBlock = context.getPlan().newExecutionBlock(); + + // Make ThirdStage ExecutionBlock + ExecutionBlock thirdStageBlock = context.getPlan().newExecutionBlock(); + + // Set Enforcer + setMultiStageAggregationEnforcer(latestExecBlock, firstStageDistinctNode, secondStageBlock, + secondStageDistinctNode, thirdStageBlock, thirdStageDistinctNode); + + //Create data channel FirstStage to SecondStage + DataChannel firstChannel = new DataChannel(latestExecBlock, secondStageBlock, HASH_SHUFFLE, 32); + + firstChannel.setShuffleKeys(firstStageDistinctNode.getFirstStageShuffleKeyColumns()); + firstChannel.setSchema(firstStageDistinctNode.getOutSchema()); + firstChannel.setStoreType(globalPlanner.getStoreType()); + + ScanNode scanNode = GlobalPlanner.buildInputExecutor(context.getPlan().getLogicalPlan(), firstChannel); + secondStageDistinctNode.setChild(scanNode); + + secondStageBlock.setPlan(secondStageDistinctNode); + + context.getPlan().addConnect(firstChannel); + + DataChannel secondChannel; + //Create data channel SecondStage to ThirdStage + if (groupbyNode.isEmptyGrouping()) { + secondChannel = new DataChannel(secondStageBlock, thirdStageBlock, HASH_SHUFFLE, 1); + secondChannel.setShuffleKeys(firstStageDistinctNode.getGroupingColumns()); + } else { + secondChannel = new DataChannel(secondStageBlock, thirdStageBlock, HASH_SHUFFLE, 32); + secondChannel.setShuffleKeys(firstStageDistinctNode.getGroupingColumns()); + } + secondChannel.setSchema(secondStageDistinctNode.getOutSchema()); + secondChannel.setStoreType(globalPlanner.getStoreType()); + + scanNode = GlobalPlanner.buildInputExecutor(context.getPlan().getLogicalPlan(), secondChannel); + thirdStageDistinctNode.setChild(scanNode); + + thirdStageBlock.setPlan(thirdStageDistinctNode); + + context.getPlan().addConnect(secondChannel); + + if (GlobalPlanner.hasUnionChild(firstStageDistinctNode)) { + buildDistinctGroupbyAndUnionPlan( + context.getPlan(), latestExecBlock, firstStageDistinctNode, firstStageDistinctNode); + } + + return thirdStageBlock; + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new PlanningException(e); + } + } + + private DistinctGroupbyNode buildMultiLevelBaseDistinctGroupByNode(GlobalPlanContext context, + ExecutionBlock latestExecBlock, + GroupbyNode groupbyNode) { + LogicalPlan plan = context.getPlan().getLogicalPlan(); + + /* + Making DistinctGroupbyNode from GroupByNode + select col1, count(distinct col2), count(distinct col3), sum(col4) from ... group by col1 + => DistinctGroupbyNode + Distinct Seq + grouping key = col1 + Sub GroupbyNodes + - GroupByNode1: grouping(col2), expr(count distinct col2) + - GroupByNode2: grouping(col3), expr(count distinct col3) + - GroupByNode3: expr(sum col4) + */ + List originalGroupingColumns = Arrays.asList(groupbyNode.getGroupingColumns()); + + List childGroupbyNodes = new ArrayList(); + + List otherAggregationFunctionCallEvals = new ArrayList(); + List otherAggregationFunctionTargets = new ArrayList(); + + //distinct columns -> GroupbyNode + Map distinctNodeBuildInfos = new HashMap(); + AggregationFunctionCallEval[] aggFunctions = groupbyNode.getAggFunctions(); + for (int aggIdx = 0; aggIdx < aggFunctions.length; aggIdx++) { + AggregationFunctionCallEval aggFunction = aggFunctions[aggIdx]; + aggFunction.setFirstPhase(); + Target originAggFunctionTarget = groupbyNode.getTargets()[originalGroupingColumns.size() + aggIdx]; + Target aggFunctionTarget = + new Target(new FieldEval(originAggFunctionTarget.getEvalTree().getName(), aggFunction.getValueType())); + + if (aggFunction.isDistinct()) { + // Create or reuse Groupby node for each Distinct expression. + LinkedHashSet groupbyUniqColumns = EvalTreeUtil.findUniqueColumns(aggFunction); + String groupbyMapKey = EvalTreeUtil.columnsToStr(groupbyUniqColumns); + DistinctGroupbyNodeBuildInfo buildInfo = distinctNodeBuildInfos.get(groupbyMapKey); + if (buildInfo == null) { + GroupbyNode distinctGroupbyNode = new GroupbyNode(context.getPlan().getLogicalPlan().newPID()); + buildInfo = new DistinctGroupbyNodeBuildInfo(distinctGroupbyNode); + distinctNodeBuildInfos.put(groupbyMapKey, buildInfo); + + // Grouping columns are GROUP BY clause's column + Distinct column. + List groupingColumns = new ArrayList(); + for (Column eachGroupingColumn: groupbyUniqColumns) { + if (!groupingColumns.contains(eachGroupingColumn)) { + groupingColumns.add(eachGroupingColumn); + } + } + distinctGroupbyNode.setGroupingColumns(groupingColumns.toArray(new Column[]{})); + } + buildInfo.addAggFunction(aggFunction); + buildInfo.addAggFunctionTarget(aggFunctionTarget); + } else { + otherAggregationFunctionCallEvals.add(aggFunction); + otherAggregationFunctionTargets.add(aggFunctionTarget); + } + } + + List baseGroupByTargets = new ArrayList(); + baseGroupByTargets.add(new Target(new FieldEval(new Column("?distinctseq", Type.INT2)))); + for (Column column : originalGroupingColumns) { + baseGroupByTargets.add(new Target(new FieldEval(column))); + } + + //Add child groupby node for each Distinct clause + for (String eachKey: distinctNodeBuildInfos.keySet()) { + DistinctGroupbyNodeBuildInfo buildInfo = distinctNodeBuildInfos.get(eachKey); + GroupbyNode eachGroupbyNode = buildInfo.getGroupbyNode(); + List groupbyAggFunctions = buildInfo.getAggFunctions(); + String [] firstPhaseEvalNames = new String[groupbyAggFunctions.size()]; + int index = 0; + for (AggregationFunctionCallEval eachCallEval: groupbyAggFunctions) { + firstPhaseEvalNames[index++] = eachCallEval.getName(); + } + + Target[] targets = new Target[eachGroupbyNode.getGroupingColumns().length + groupbyAggFunctions.size()]; + int targetIdx = 0; + + for (Column column : eachGroupbyNode.getGroupingColumns()) { + Target target = new Target(new FieldEval(column)); + targets[targetIdx++] = target; + baseGroupByTargets.add(target); + } + for (Target eachAggFunctionTarget: buildInfo.getAggFunctionTargets()) { + targets[targetIdx++] = eachAggFunctionTarget; + } + eachGroupbyNode.setTargets(targets); + eachGroupbyNode.setAggFunctions(groupbyAggFunctions.toArray(new AggregationFunctionCallEval[]{})); + eachGroupbyNode.setDistinct(true); + eachGroupbyNode.setInSchema(groupbyNode.getInSchema()); + + childGroupbyNodes.add(eachGroupbyNode); + } + + // Merge other aggregation function to a GroupBy Node. + if (!otherAggregationFunctionCallEvals.isEmpty()) { + // finally this aggregation output tuple's order is GROUP_BY_COL1, COL2, .... + AGG_VALUE, SUM_VALUE, ... + GroupbyNode otherGroupbyNode = new GroupbyNode(context.getPlan().getLogicalPlan().newPID()); + + Target[] targets = new Target[otherAggregationFunctionTargets.size()]; + int targetIdx = 0; + for (Target eachTarget : otherAggregationFunctionTargets) { + targets[targetIdx++] = eachTarget; + baseGroupByTargets.add(eachTarget); + } + + otherGroupbyNode.setTargets(targets); + otherGroupbyNode.setGroupingColumns(new Column[]{}); + otherGroupbyNode.setAggFunctions(otherAggregationFunctionCallEvals.toArray(new AggregationFunctionCallEval[]{})); + otherGroupbyNode.setInSchema(groupbyNode.getInSchema()); + + childGroupbyNodes.add(otherGroupbyNode); + } + + DistinctGroupbyNode baseDistinctNode = new DistinctGroupbyNode(context.getPlan().getLogicalPlan().newPID()); + baseDistinctNode.setTargets(baseGroupByTargets.toArray(new Target[]{})); + baseDistinctNode.setGroupColumns(groupbyNode.getGroupingColumns()); + baseDistinctNode.setInSchema(groupbyNode.getInSchema()); + baseDistinctNode.setChild(groupbyNode.getChild()); + + baseDistinctNode.setGroupbyNodes(childGroupbyNodes); + + return baseDistinctNode; + } public ExecutionBlock buildPlan(GlobalPlanContext context, ExecutionBlock latestExecBlock, @@ -66,7 +319,7 @@ public ExecutionBlock buildPlan(GlobalPlanContext context, DistinctGroupbyNode baseDistinctNode = buildBaseDistinctGroupByNode(context, latestExecBlock, groupbyNode); // Create First, SecondStage's Node using baseNode - DistinctGroupbyNode[] distinctNodes = createMultiPhaseDistinctNode(plan, groupbyNode, baseDistinctNode); + DistinctGroupbyNode[] distinctNodes = createTwoPhaseDistinctNode(plan, groupbyNode, baseDistinctNode); DistinctGroupbyNode firstStageDistinctNode = distinctNodes[0]; DistinctGroupbyNode secondStageDistinctNode = distinctNodes[1]; @@ -100,7 +353,7 @@ public ExecutionBlock buildPlan(GlobalPlanContext context, context.getPlan().addConnect(channel); if (GlobalPlanner.hasUnionChild(firstStageDistinctNode)) { - globalPlanner.buildDistinctGroupbyAndUnionPlan( + buildDistinctGroupbyAndUnionPlan( context.getPlan(), latestExecBlock, firstStageDistinctNode, firstStageDistinctNode); } @@ -162,6 +415,7 @@ select col1, count(distinct col2), count(distinct col3), sum(col4) from ... grou buildInfo.addAggFunction(aggFunction); buildInfo.addAggFunctionTarget(aggFunctionTarget); } else { + aggFunction.setFinalPhase(); otherAggregationFunctionCallEvals.add(aggFunction); otherAggregationFunctionTargets.add(aggFunctionTarget); } @@ -224,7 +478,7 @@ select col1, count(distinct col2), count(distinct col3), sum(col4) from ... grou return baseDistinctNode; } - public DistinctGroupbyNode[] createMultiPhaseDistinctNode(LogicalPlan plan, + public DistinctGroupbyNode[] createTwoPhaseDistinctNode(LogicalPlan plan, GroupbyNode originGroupbyNode, DistinctGroupbyNode baseDistinctNode) { /* @@ -456,6 +710,75 @@ private void setDistinctAggregationEnforcer( } + private void setMultiStageAggregationEnforcer( + ExecutionBlock firstStageBlock, DistinctGroupbyNode firstStageDistinctNode, + ExecutionBlock secondStageBlock, DistinctGroupbyNode secondStageDistinctNode, + ExecutionBlock thirdStageBlock, DistinctGroupbyNode thirdStageDistinctNode) { + firstStageBlock.getEnforcer().enforceDistinctAggregation(firstStageDistinctNode.getPID(), + true, MultipleAggregationStage.FIRST_STAGE, + DistinctAggregationAlgorithm.HASH_AGGREGATION, null); + + secondStageBlock.getEnforcer().enforceDistinctAggregation(secondStageDistinctNode.getPID(), + true, MultipleAggregationStage.SECOND_STAGE, + DistinctAggregationAlgorithm.HASH_AGGREGATION, null); + + List sortSpecArrays = new ArrayList(); + int index = 0; + for (GroupbyNode groupbyNode: firstStageDistinctNode.getGroupByNodes()) { + List sortSpecs = new ArrayList(); + for (Column column: groupbyNode.getGroupingColumns()) { + sortSpecs.add(SortSpecProto.newBuilder().setColumn(column.getProto()).build()); + } + sortSpecArrays.add( SortSpecArray.newBuilder() + .setPid(thirdStageDistinctNode.getGroupByNodes().get(index).getPID()) + .addAllSortSpecs(sortSpecs).build()); + } + thirdStageBlock.getEnforcer().enforceDistinctAggregation(thirdStageDistinctNode.getPID(), + true, MultipleAggregationStage.THRID_STAGE, + DistinctAggregationAlgorithm.SORT_AGGREGATION, sortSpecArrays); + } + + private ExecutionBlock buildDistinctGroupbyAndUnionPlan(MasterPlan masterPlan, ExecutionBlock lastBlock, + DistinctGroupbyNode firstPhaseGroupBy, + DistinctGroupbyNode secondPhaseGroupBy) { + DataChannel lastDataChannel = null; + + // It pushes down the first phase group-by operator into all child blocks. + // + // (second phase) G (currentBlock) + // /|\ + // / / | \ + // (first phase) G G G G (child block) + + // They are already connected one another. + // So, we don't need to connect them again. + for (DataChannel dataChannel : masterPlan.getIncomingChannels(lastBlock.getId())) { + if (firstPhaseGroupBy.isEmptyGrouping()) { + dataChannel.setShuffle(HASH_SHUFFLE, firstPhaseGroupBy.getGroupingColumns(), 1); + } else { + dataChannel.setShuffle(HASH_SHUFFLE, firstPhaseGroupBy.getGroupingColumns(), 32); + } + dataChannel.setSchema(firstPhaseGroupBy.getOutSchema()); + ExecutionBlock childBlock = masterPlan.getExecBlock(dataChannel.getSrcId()); + + // Why must firstPhaseGroupby be copied? + // + // A groupby in each execution block can have different child. + // It affects groupby's input schema. + DistinctGroupbyNode firstPhaseGroupbyCopy = PlannerUtil.clone(masterPlan.getLogicalPlan(), firstPhaseGroupBy); + firstPhaseGroupbyCopy.setChild(childBlock.getPlan()); + childBlock.setPlan(firstPhaseGroupbyCopy); + + // just keep the last data channel. + lastDataChannel = dataChannel; + } + + ScanNode scanNode = GlobalPlanner.buildInputExecutor(masterPlan.getLogicalPlan(), lastDataChannel); + secondPhaseGroupBy.setChild(scanNode); + lastBlock.setPlan(secondPhaseGroupBy); + return lastBlock; + } + static class DistinctGroupbyNodeBuildInfo { private GroupbyNode groupbyNode; private List aggFunctions = new ArrayList(); diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/DistinctGroupbyNode.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/DistinctGroupbyNode.java index b1e4bc38e4..47e8933976 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/DistinctGroupbyNode.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/DistinctGroupbyNode.java @@ -27,9 +27,14 @@ import org.apache.tajo.util.TUtil; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; public class DistinctGroupbyNode extends UnaryNode implements Projectable, Cloneable { + @Expose + private GroupbyNode groupbyPlan; + @Expose private List groupByNodes; @@ -42,6 +47,9 @@ public class DistinctGroupbyNode extends UnaryNode implements Projectable, Clone @Expose private int[] resultColumnIds; + /** Aggregation Functions */ + @Expose private AggregationFunctionCallEval [] aggrFunctions; + public DistinctGroupbyNode(int pid) { super(pid, NodeType.DISTINCT_GROUP_BY); } @@ -59,7 +67,11 @@ public void setTargets(Target[] targets) { @Override public Target[] getTargets() { - return new Target[0]; + if (hasTargets()) { + return targets; + } else { + return new Target[0]; + } } public void setGroupbyNodes(List groupByNodes) { @@ -86,6 +98,18 @@ public void setResultColumnIds(int[] resultColumnIds) { this.resultColumnIds = resultColumnIds; } + public AggregationFunctionCallEval [] getAggFunctions() { + return this.aggrFunctions; + } + + public void setAggFunctions(AggregationFunctionCallEval[] evals) { + this.aggrFunctions = evals; + } + + public void setGroupbyPlan(GroupbyNode groupbyPlan) { this.groupbyPlan = groupbyPlan; } + + public GroupbyNode getGroupbyPlan() { return this.groupbyPlan; } + @Override public Object clone() throws CloneNotSupportedException { DistinctGroupbyNode cloneNode = (DistinctGroupbyNode)super.clone(); @@ -113,6 +137,9 @@ public Object clone() throws CloneNotSupportedException { } } + if (groupbyPlan != null) { + cloneNode.groupbyPlan = (GroupbyNode)groupbyPlan.clone(); + } return cloneNode; } @@ -200,4 +227,27 @@ public PlanString getPlanString() { return planStr; } + + public Column[] getFirstStageShuffleKeyColumns() { + List shuffleKeyColumns = new ArrayList(); + shuffleKeyColumns.add(getOutSchema().getColumn(0)); //distinctseq column + if (groupingColumns != null) { + for (Column eachColumn: groupingColumns) { + if (!shuffleKeyColumns.contains(eachColumn)) { + shuffleKeyColumns.add(eachColumn); + } + } + } + for (GroupbyNode eachGroupbyNode: groupByNodes) { + if (eachGroupbyNode.getGroupingColumns() != null && eachGroupbyNode.getGroupingColumns().length > 0) { + for (Column eachColumn: eachGroupbyNode.getGroupingColumns()) { + if (!shuffleKeyColumns.contains(eachColumn)) { + shuffleKeyColumns.add(eachColumn); + } + } + } + } + + return shuffleKeyColumns.toArray(new Column[]{}); + } } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java new file mode 100644 index 0000000000..7201ed411d --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java @@ -0,0 +1,476 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.datum.Int2Datum; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.engine.eval.AggregationFunctionCallEval; +import org.apache.tajo.engine.function.FunctionContext; +import org.apache.tajo.engine.planner.logical.DistinctGroupbyNode; +import org.apache.tajo.engine.planner.logical.GroupbyNode; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; +import org.apache.tajo.worker.TaskAttemptContext; + +import java.io.IOException; +import java.util.*; +import java.util.Map.Entry; + +/** + * This class incremented each row to more rows by grouping columns. In addition, the operator must creates each row + * because of aggregation non-distinct columns. + * + * For example, there is a query as follows: + * select sum(distinct l_orderkey), l_linenumber, l_returnflag, l_linestatus, l_shipdate, + * count(distinct l_partkey), sum(l_orderkey) + * from lineitem + * group by l_linenumber, l_returnflag, l_linestatus, l_shipdate; + * + * If you execute above query on tajo, FileScanner makes tuples after scanning raw data as follows: + * + * ----------------------------------------------------------------------------- + * l_linenumber, l_returnflag, l_linestatus, l_shipdate, l_orderkey, l_partkey + * ----------------------------------------------------------------------------- + * 1, N, O, 1996-03-13, 1, 1 + * 2, N, O, 1996-04-12, 1, 1 + * 1, N, O, 1997-01-28, 2, 2 + * 1, R, F, 1994-02-02, 3, 2 + * 2, R, F, 1993-11-09, 3, 3 + * + * And then the scanner will push it as input data to this class. After then, this class will makes output data as + * follows: + * + * ------------------------------------------------------------------------------------------------------------------- + * NodeSequence, l_linenumber, l_returnflag, l_linestatus, l_shipdate, l_partkey for distinct, + * l_orderkey for distinct, l_orderkey for nondistinct + * ------------------------------------------------------------------------------------------------------------------- + * 0, 2, R, F, 1993-11-09, 3, NULL, 3 + * 0, 2, N, O, 1996-04-12, 1, NULL, 1 + * 0, 1, N, O, 1997-01-28, 2, NULL, 2 + * 0, 1, R, F, 1994-02-02, 2, NULL, 3 + * 0, 1, N, O, 1996-03-13, 1, NULL, 1 + * 1, 2, R, F, 1993-11-09, NULL, 3, NULL + * 1, 2, N, O, 1996-04-12, NULL, 1, NULL + * 1, 1, N, O, 1997-01-28, NULL, 2, NULL + * 1, 1, R, F, 1994-02-02, NULL, 3, NULL + * 1, 1, N, O, 1996-03-13, NULL, 1, NULL + * + * For reference, NodeSequence means GroupByNode sequence. In this case, there are two GroupByNode. And it consist + * of lineitem.l_partkey and lineitem.l_orderkey. The NodeSequence of lineitem.l_partkey is zero and the sequence of + * lineitem.l_orderkey is one. As above output data, If there are uncomfortable column for DistinctGroupBy, + * inner aggregator makes it to NullDataTum. + * + * In addition, columns for NonDistinctGroupBy only can contains real value at first NodeSequence. + * + */ + +public class DistinctGroupbyFirstAggregationExec extends PhysicalExec { + private static Log LOG = LogFactory.getLog(DistinctGroupbyFirstAggregationExec.class); + + private DistinctGroupbyNode plan; + private boolean finished = false; + private boolean preparedData = false; + private PhysicalExec child; + + private long totalNumRows; + private int fetchedRows; + private float progress; + + private int[] groupingKeyIndexes; + private NonDistinctHashAggregator nonDistinctHashAggregator; + private DistinctHashAggregator[] distinctAggregators; + + private int resultTupleLength; + + public DistinctGroupbyFirstAggregationExec(TaskAttemptContext context, DistinctGroupbyNode plan, PhysicalExec subOp) + throws IOException { + super(context, plan.getInSchema(), plan.getOutSchema()); + this.child = subOp; + this.plan = plan; + } + + @Override + public void init() throws IOException { + super.init(); + child.init(); + + // finding grouping column index + Column[] groupingColumns = plan.getGroupingColumns(); + groupingKeyIndexes = new int[groupingColumns.length]; + + int index = 0; + for (Column col: groupingColumns) { + int keyIndex; + if (col.hasQualifier()) { + keyIndex = inSchema.getColumnId(col.getQualifiedName()); + } else { + keyIndex = inSchema.getColumnIdByName(col.getSimpleName()); + } + groupingKeyIndexes[index++] = keyIndex; + } + resultTupleLength = groupingKeyIndexes.length + 1; //1 is Sequence Datum which indicates sequence of DistinctNode. + + List groupbyNodes = plan.getGroupByNodes(); + + List distinctAggrList = new ArrayList(); + int distinctSeq = 0; + for (GroupbyNode eachGroupby: groupbyNodes) { + if (eachGroupby.isDistinct()) { + DistinctHashAggregator aggregator = new DistinctHashAggregator(eachGroupby); + aggregator.setNodeSequence(distinctSeq++); + distinctAggrList.add(aggregator); + resultTupleLength += aggregator.getTupleLength(); + } else { + nonDistinctHashAggregator = new NonDistinctHashAggregator(eachGroupby); + resultTupleLength += nonDistinctHashAggregator.getTupleLength(); + } + } + distinctAggregators = distinctAggrList.toArray(new DistinctHashAggregator[]{}); + } + + private int currentAggregatorIndex = 0; + + @Override + public Tuple next() throws IOException { + if (!preparedData) { + prepareInputData(); + } + + int prevIndex = currentAggregatorIndex; + while (!context.isStopped()) { + DistinctHashAggregator aggregator = distinctAggregators[currentAggregatorIndex]; + Tuple result = aggregator.next(); + if (result != null) { + return result; + } + currentAggregatorIndex++; + currentAggregatorIndex = currentAggregatorIndex % distinctAggregators.length; + if (currentAggregatorIndex == prevIndex) { + finished = true; + return null; + } + } + + return null; + } + + private void prepareInputData() throws IOException { + Tuple tuple = null; + while(!context.isStopped() && (tuple = child.next()) != null) { + Tuple groupingKey = new VTuple(groupingKeyIndexes.length); + for (int i = 0; i < groupingKeyIndexes.length; i++) { + groupingKey.put(i, tuple.get(groupingKeyIndexes[i])); + } + for (int i = 0; i < distinctAggregators.length; i++) { + distinctAggregators[i].compute(groupingKey, tuple); + } + if (nonDistinctHashAggregator != null) { + nonDistinctHashAggregator.compute(groupingKey, tuple); + } + } + for (int i = 0; i < distinctAggregators.length; i++) { + distinctAggregators[i].rescan(); + } + + totalNumRows = distinctAggregators[0].distinctAggrDatas.size(); + preparedData = true; + } + + @Override + public void close() throws IOException { + child.close(); + } + + @Override + public TableStats getInputStats() { + if (child != null) { + return child.getInputStats(); + } else { + return null; + } + } + + @Override + public float getProgress() { + if (finished) { + return progress; + } else { + if (totalNumRows > 0) { + return progress + ((float)fetchedRows / (float)totalNumRows) * 0.5f; + } else { + return progress; + } + } + } + + @Override + public void rescan() { + finished = false; + currentAggregatorIndex = 0; + for (int i = 0; i < distinctAggregators.length; i++) { + distinctAggregators[i].rescan(); + } + } + + class NonDistinctHashAggregator { + private GroupbyNode groupbyNode; + private int aggFunctionsNum; + private final AggregationFunctionCallEval aggFunctions[]; + + // GroupingKey -> FunctionContext[] + private Map nonDistinctAggrDatas; + private int tupleLength; + + private Tuple dummyTuple; + private NonDistinctHashAggregator(GroupbyNode groupbyNode) throws IOException { + this.groupbyNode = groupbyNode; + + nonDistinctAggrDatas = new HashMap(); + + if (groupbyNode.hasAggFunctions()) { + aggFunctions = groupbyNode.getAggFunctions(); + aggFunctionsNum = aggFunctions.length; + for (AggregationFunctionCallEval eachFunction: aggFunctions) { + eachFunction.setFirstPhase(); + } + } else { + aggFunctions = new AggregationFunctionCallEval[0]; + aggFunctionsNum = 0; + } + + dummyTuple = new VTuple(aggFunctionsNum); + for (int i = 0; i < aggFunctionsNum; i++) { + dummyTuple.put(i, NullDatum.get()); + } + tupleLength = aggFunctionsNum; + } + + public void compute(Tuple groupingKeyTuple, Tuple tuple) { + FunctionContext[] contexts = nonDistinctAggrDatas.get(groupingKeyTuple); + if (contexts != null) { + for (int i = 0; i < aggFunctions.length; i++) { + aggFunctions[i].merge(contexts[i], inSchema, tuple); + } + } else { // if the key occurs firstly + contexts = new FunctionContext[aggFunctionsNum]; + for (int i = 0; i < aggFunctionsNum; i++) { + contexts[i] = aggFunctions[i].newContext(); + aggFunctions[i].merge(contexts[i], inSchema, tuple); + } + nonDistinctAggrDatas.put(groupingKeyTuple, contexts); + } + } + + public Tuple aggregate(Tuple groupingKey) { + FunctionContext[] contexts = nonDistinctAggrDatas.get(groupingKey); + if (contexts == null) { + return null; + } + Tuple tuple = new VTuple(aggFunctionsNum); + + for (int i = 0; i < aggFunctionsNum; i++) { + tuple.put(i, aggFunctions[i].terminate(contexts[i])); + } + + return tuple; + } + + public int getTupleLength() { + return tupleLength; + } + + public Tuple getDummyTuple() { + return dummyTuple; + } + } + + class DistinctHashAggregator { + private GroupbyNode groupbyNode; + + // GroupingKey -> DistinctKey + private Map> distinctAggrDatas; + private Iterator>> iterator = null; + + private int nodeSequence; + private Int2Datum nodeSequenceDatum; + + private int[] distinctKeyIndexes; + + private int tupleLength; + private Tuple dummyTuple; + private boolean aggregatorFinished = false; + + public DistinctHashAggregator(GroupbyNode groupbyNode) throws IOException { + this.groupbyNode = groupbyNode; + + Set groupingKeyIndexSet = new HashSet(); + for (Integer eachIndex: groupingKeyIndexes) { + groupingKeyIndexSet.add(eachIndex); + } + + List distinctGroupingKeyIndexSet = new ArrayList(); + Column[] groupingColumns = groupbyNode.getGroupingColumns(); + for (int idx = 0; idx < groupingColumns.length; idx++) { + Column col = groupingColumns[idx]; + int keyIndex; + if (col.hasQualifier()) { + keyIndex = inSchema.getColumnId(col.getQualifiedName()); + } else { + keyIndex = inSchema.getColumnIdByName(col.getSimpleName()); + } + if (!groupingKeyIndexSet.contains(keyIndex)) { + distinctGroupingKeyIndexSet.add(keyIndex); + } + } + int index = 0; + this.distinctKeyIndexes = new int[distinctGroupingKeyIndexSet.size()]; + this.dummyTuple = new VTuple(distinctGroupingKeyIndexSet.size()); + for (Integer eachId : distinctGroupingKeyIndexSet) { + this.dummyTuple.put(index, NullDatum.get()); + this.distinctKeyIndexes[index++] = eachId; + } + + this.distinctAggrDatas = new HashMap>(); + this.tupleLength = distinctKeyIndexes.length; + } + + public void setNodeSequence(int nodeSequence) { + this.nodeSequence = nodeSequence; + this.nodeSequenceDatum = new Int2Datum((short)nodeSequence); + } + + public int getTupleLength() { + return tupleLength; + } + + public void compute(Tuple groupingKey, Tuple tuple) throws IOException { + Tuple distinctKeyTuple = new VTuple(distinctKeyIndexes.length); + for (int i = 0; i < distinctKeyIndexes.length; i++) { + distinctKeyTuple.put(i, tuple.get(distinctKeyIndexes[i])); + } + + Set distinctEntry = distinctAggrDatas.get(groupingKey); + if (distinctEntry == null) { + distinctEntry = new HashSet(); + distinctAggrDatas.put(groupingKey, distinctEntry); + } + distinctEntry.add(distinctKeyTuple); + } + + public void rescan() { + iterator = distinctAggrDatas.entrySet().iterator(); + currentGroupingTuples = null; + groupingKeyChanged = false; + aggregatorFinished = false; + } + + public void close() throws IOException { + distinctAggrDatas.clear(); + distinctAggrDatas = null; + currentGroupingTuples = null; + iterator = null; + } + + Entry> currentGroupingTuples; + Iterator distinctKeyIterator; + boolean groupingKeyChanged = false; + + public Tuple next() { + if (aggregatorFinished) { + return null; + } + if (currentGroupingTuples == null) { + // first + if (!iterator.hasNext()) { + // Empty case + aggregatorFinished = true; + return null; + } + currentGroupingTuples = iterator.next(); + groupingKeyChanged = true; + distinctKeyIterator = currentGroupingTuples.getValue().iterator(); + } + if (!distinctKeyIterator.hasNext()) { + if (!iterator.hasNext()) { + aggregatorFinished = true; + return null; + } + currentGroupingTuples = iterator.next(); + groupingKeyChanged = true; + distinctKeyIterator = currentGroupingTuples.getValue().iterator(); + } + // node sequence, groupingKeys, 1'st distinctKeys, 2'st distinctKeys, ... + // If n'st == this.nodeSequence set with real data, otherwise set with NullDatum + Tuple tuple = new VTuple(resultTupleLength); + int tupleIndex = 0; + tuple.put(tupleIndex++, nodeSequenceDatum); + + // merge grouping key + Tuple groupingKeyTuple = currentGroupingTuples.getKey(); + int groupingKeyLength = groupingKeyTuple.size(); + for (int i = 0; i < groupingKeyLength; i++, tupleIndex++) { + tuple.put(tupleIndex, groupingKeyTuple.get(i)); + } + + // merge distinctKey + for (int i = 0; i < distinctAggregators.length; i++) { + if (i == nodeSequence) { + Tuple distinctKeyTuple = distinctKeyIterator.next(); + int distinctKeyLength = distinctKeyTuple.size(); + for (int j = 0; j < distinctKeyLength; j++, tupleIndex++) { + tuple.put(tupleIndex, distinctKeyTuple.get(j)); + } + } else { + Tuple dummyTuple = distinctAggregators[i].getDummyTuple(); + int dummyTupleSize = dummyTuple.size(); + for (int j = 0; j < dummyTupleSize; j++, tupleIndex++) { + tuple.put(tupleIndex, dummyTuple.get(j)); + } + } + } + + // merge non distinct aggregation tuple + if (nonDistinctHashAggregator != null) { + Tuple nonDistinctTuple; + if (nodeSequence == 0 && groupingKeyChanged) { + groupingKeyChanged = false; + nonDistinctTuple = nonDistinctHashAggregator.aggregate(groupingKeyTuple); + if (nonDistinctTuple == null) { + nonDistinctTuple = nonDistinctHashAggregator.getDummyTuple(); + } + } else { + nonDistinctTuple = nonDistinctHashAggregator.getDummyTuple(); + } + int tupleSize = nonDistinctTuple.size(); + for (int j = 0; j < tupleSize; j++, tupleIndex++) { + tuple.put(tupleIndex, nonDistinctTuple.get(j)); + } + } + return tuple; + } + + public Tuple getDummyTuple() { + return dummyTuple; + } + } +} diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java new file mode 100644 index 0000000000..bc8885f964 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java @@ -0,0 +1,295 @@ + /** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.engine.eval.AggregationFunctionCallEval; +import org.apache.tajo.engine.function.FunctionContext; +import org.apache.tajo.engine.planner.logical.DistinctGroupbyNode; +import org.apache.tajo.engine.planner.logical.GroupbyNode; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; +import org.apache.tajo.worker.TaskAttemptContext; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * This class adjusts shuffle columns between DistinctGroupbyFirstAggregationExec and + * DistinctGroupbyThirdAggregationExec. It shuffled by grouping columns and aggregation columns. Because of the + * shuffle, more DistinctGroupbyThirdAggregationExec will execute compare than previous two distinct group by + * algorithm. And then, many DistinctGroupbyThirdAggregationExec improve the performance of count distinct query. + * + * For example, there is a query as follows: + * select sum(distinct l_orderkey), l_linenumber, l_returnflag, l_linestatus, l_shipdate, + * count(distinct l_partkey), sum(l_orderkey) + * from lineitem + * group by l_linenumber, l_returnflag, l_linestatus, l_shipdate; + * + * In this case, execution plan for this operator will set shuffle type as follows: + * Incoming: 1 => 2 (type=HASH_SHUFFLE, key=?distinctseq (INT2), default.lineitem.l_linenumber (INT4), + * default.lineitem.l_returnflag (TEXT), default.lineitem.l_linestatus (TEXT), default.lineitem.l_shipdate (TEXT), + * default.lineitem.l_partkey (INT4), default.lineitem.l_orderkey (INT4), num=32) + * + * Outgoing: 2 => 3 (type=HASH_SHUFFLE, key=default.lineitem.l_linenumber (INT4), + * default.lineitem.l_returnflag (TEXT), default.lineitem.l_linestatus (TEXT), + * default.lineitem.l_shipdate (TEXT), num=32) + * + * For reference, input data and output data results as follows: + * + * ------------------------------------------------------------------------------------------------------------------- + * NodeSequence, l_linenumber, l_returnflag, l_linestatus, l_shipdate, l_partkey for distinct, + * l_orderkey for distinct, l_orderkey for nondistinct + * ------------------------------------------------------------------------------------------------------------------- + * 0, 2, R, F, 1993-11-09, 3, NULL, 3 + * 0, 2, N, O, 1996-04-12, 1, NULL, 1 + * 0, 1, N, O, 1997-01-28, 2, NULL, 2 + * 0, 1, R, F, 1994-02-02, 2, NULL, 3 + * 0, 1, N, O, 1996-03-13, 1, NULL, 1 + * 1, 2, R, F, 1993-11-09, NULL, 3, NULL + * 1, 2, N, O, 1996-04-12, NULL, 1, NULL + * 1, 1, N, O, 1997-01-28, NULL, 2, NULL + * 1, 1, R, F, 1994-02-02, NULL, 3, NULL + * 1, 1, N, O, 1996-03-13, NULL, 1, NULL + * + */ +public class DistinctGroupbySecondAggregationExec extends UnaryPhysicalExec { + private static Log LOG = LogFactory.getLog(DistinctGroupbySecondAggregationExec.class); + private DistinctGroupbyNode plan; + private PhysicalExec child; + + private boolean finished = false; + + private int numGroupingColumns; + private int[][] distinctKeyIndexes; + private FunctionContext[] nonDistinctAggrContexts; + private AggregationFunctionCallEval[] nonDistinctAggrFunctions; + private int nonDistinctAggrTupleStartIndex = -1; + + public DistinctGroupbySecondAggregationExec(TaskAttemptContext context, DistinctGroupbyNode plan, SortExec sortExec) + throws IOException { + super(context, plan.getInSchema(), plan.getOutSchema(), sortExec); + this.plan = plan; + this.child = sortExec; + } + + @Override + public void init() throws IOException { + this.child.init(); + + numGroupingColumns = plan.getGroupingColumns().length; + + List groupbyNodes = plan.getGroupByNodes(); + + // Finding distinct group by column index. + Set groupingKeyIndexSet = new HashSet(); + for (Column col: plan.getGroupingColumns()) { + int keyIndex; + if (col.hasQualifier()) { + keyIndex = inSchema.getColumnId(col.getQualifiedName()); + } else { + keyIndex = inSchema.getColumnIdByName(col.getSimpleName()); + } + groupingKeyIndexSet.add(keyIndex); + } + + int numDistinct = 0; + for (GroupbyNode eachGroupby : groupbyNodes) { + if (eachGroupby.isDistinct()) { + numDistinct++; + } else { + nonDistinctAggrFunctions = eachGroupby.getAggFunctions(); + if (nonDistinctAggrFunctions != null) { + for (AggregationFunctionCallEval eachFunction: nonDistinctAggrFunctions) { + eachFunction.setIntermediatePhase(); + } + nonDistinctAggrContexts = new FunctionContext[nonDistinctAggrFunctions.length]; + } + } + } + + int index = 0; + distinctKeyIndexes = new int[numDistinct][]; + for (GroupbyNode eachGroupby : groupbyNodes) { + if (eachGroupby.isDistinct()) { + List distinctGroupingKeyIndex = new ArrayList(); + Column[] distinctGroupingColumns = eachGroupby.getGroupingColumns(); + for (int idx = 0; idx < distinctGroupingColumns.length; idx++) { + Column col = distinctGroupingColumns[idx]; + int keyIndex; + if (col.hasQualifier()) { + keyIndex = inSchema.getColumnId(col.getQualifiedName()); + } else { + keyIndex = inSchema.getColumnIdByName(col.getSimpleName()); + } + if (!groupingKeyIndexSet.contains(keyIndex)) { + distinctGroupingKeyIndex.add(keyIndex); + } + } + int i = 0; + distinctKeyIndexes[index] = new int[distinctGroupingKeyIndex.size()]; + for (int eachIdx : distinctGroupingKeyIndex) { + distinctKeyIndexes[index][i++] = eachIdx; + } + index++; + } + } + if (nonDistinctAggrFunctions != null) { + nonDistinctAggrTupleStartIndex = inSchema.size() - nonDistinctAggrFunctions.length; + } + } + + Tuple prevKeyTuple = null; + Tuple prevTuple = null; + int prevSeq = -1; + + @Override + public Tuple next() throws IOException { + if (finished) { + return null; + } + + Tuple result = null; + while (!context.isStopped()) { + Tuple childTuple = child.next(); + if (childTuple == null) { + finished = true; + + if (prevTuple == null) { + // Empty case + return null; + } + if (prevSeq == 0 && nonDistinctAggrFunctions != null) { + terminatedNonDistinctAggr(prevTuple); + } + result = prevTuple; + break; + } + + Tuple tuple = null; + try { + tuple = childTuple.clone(); + } catch (CloneNotSupportedException e) { + throw new IOException(e.getMessage(), e); + } + + int distinctSeq = tuple.get(0).asInt2(); + Tuple keyTuple = getKeyTuple(distinctSeq, tuple); + + if (prevKeyTuple == null) { + // First + if (distinctSeq == 0 && nonDistinctAggrFunctions != null) { + initNonDistinctAggrContext(); + mergeNonDistinctAggr(tuple); + } + prevKeyTuple = keyTuple; + prevTuple = tuple; + prevSeq = distinctSeq; + continue; + } + + if (!prevKeyTuple.equals(keyTuple)) { + // new grouping key + if (prevSeq == 0 && nonDistinctAggrFunctions != null) { + terminatedNonDistinctAggr(prevTuple); + } + result = prevTuple; + + prevKeyTuple = keyTuple; + prevTuple = tuple; + prevSeq = distinctSeq; + + if (distinctSeq == 0 && nonDistinctAggrFunctions != null) { + initNonDistinctAggrContext(); + mergeNonDistinctAggr(tuple); + } + break; + } else { + prevKeyTuple = keyTuple; + prevTuple = tuple; + prevSeq = distinctSeq; + if (distinctSeq == 0 && nonDistinctAggrFunctions != null) { + mergeNonDistinctAggr(tuple); + } + } + } + + return result; + } + + private void initNonDistinctAggrContext() { + if (nonDistinctAggrFunctions != null) { + nonDistinctAggrContexts = new FunctionContext[nonDistinctAggrFunctions.length]; + for (int i = 0; i < nonDistinctAggrFunctions.length; i++) { + nonDistinctAggrContexts[i] = nonDistinctAggrFunctions[i].newContext(); + } + } + } + + private void mergeNonDistinctAggr(Tuple tuple) { + if (nonDistinctAggrFunctions == null) { + return; + } + for (int i = 0; i < nonDistinctAggrFunctions.length; i++) { + nonDistinctAggrFunctions[i].merge(nonDistinctAggrContexts[i], inSchema, tuple); + } + } + + private void terminatedNonDistinctAggr(Tuple tuple) { + if (nonDistinctAggrFunctions == null) { + return; + } + for (int i = 0; i < nonDistinctAggrFunctions.length; i++) { + tuple.put(nonDistinctAggrTupleStartIndex + i, nonDistinctAggrFunctions[i].terminate(nonDistinctAggrContexts[i])); + } + } + + private Tuple getKeyTuple(int distinctSeq, Tuple tuple) { + int[] columnIndexes = distinctKeyIndexes[distinctSeq]; + + Tuple keyTuple = new VTuple(numGroupingColumns + columnIndexes.length + 1); + keyTuple.put(0, tuple.get(0)); + for (int i = 0; i < numGroupingColumns; i++) { + keyTuple.put(i + 1, tuple.get(i + 1)); + } + for (int i = 0; i < columnIndexes.length; i++) { + keyTuple.put(i + 1 + numGroupingColumns, tuple.get(columnIndexes[i])); + } + + return keyTuple; + } + + @Override + public void rescan() throws IOException { + super.rescan(); + prevKeyTuple = null; + prevTuple = null; + finished = false; + } + + @Override + public void close() throws IOException { + super.close(); + } +} diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java new file mode 100644 index 0000000000..239dabf5d9 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java @@ -0,0 +1,304 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.engine.eval.AggregationFunctionCallEval; +import org.apache.tajo.engine.function.FunctionContext; +import org.apache.tajo.engine.planner.Target; +import org.apache.tajo.engine.planner.logical.DistinctGroupbyNode; +import org.apache.tajo.engine.planner.logical.GroupbyNode; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; +import org.apache.tajo.worker.TaskAttemptContext; + +import java.io.IOException; +import java.util.*; + +/** + * This class aggregates the output of DistinctGroupbySecondAggregationExec. + * + */ +public class DistinctGroupbyThirdAggregationExec extends UnaryPhysicalExec { + private static Log LOG = LogFactory.getLog(DistinctGroupbyThirdAggregationExec.class); + private DistinctGroupbyNode plan; + private PhysicalExec child; + + private boolean finished = false; + + private DistinctFinalAggregator[] aggregators; + private DistinctFinalAggregator nonDistinctAggr; + + private int resultTupleLength; + private int numGroupingColumns; + + private int[] resultTupleIndexes; + + public DistinctGroupbyThirdAggregationExec(TaskAttemptContext context, DistinctGroupbyNode plan, SortExec sortExec) + throws IOException { + super(context, plan.getInSchema(), plan.getOutSchema(), sortExec); + this.plan = plan; + this.child = sortExec; + } + + @Override + public void init() throws IOException { + this.child.init(); + + numGroupingColumns = plan.getGroupingColumns().length; + resultTupleLength = numGroupingColumns; + + List groupbyNodes = plan.getGroupByNodes(); + + List aggregatorList = new ArrayList(); + int inTupleIndex = 1 + numGroupingColumns; + int outTupleIndex = numGroupingColumns; + int distinctSeq = 0; + + for (GroupbyNode eachGroupby : groupbyNodes) { + if (eachGroupby.isDistinct()) { + aggregatorList.add(new DistinctFinalAggregator(distinctSeq, inTupleIndex, outTupleIndex, eachGroupby)); + distinctSeq++; + + Column[] distinctGroupingColumns = eachGroupby.getGroupingColumns(); + inTupleIndex += distinctGroupingColumns.length; + outTupleIndex += eachGroupby.getAggFunctions().length; + } else { + nonDistinctAggr = new DistinctFinalAggregator(-1, inTupleIndex, outTupleIndex, eachGroupby); + outTupleIndex += eachGroupby.getAggFunctions().length; + } + resultTupleLength += eachGroupby.getAggFunctions().length; + } + aggregators = aggregatorList.toArray(new DistinctFinalAggregator[]{}); + + // make output schema mapping index + resultTupleIndexes = new int[outSchema.size()]; + Map groupbyResultTupleIndex = new HashMap(); + int resultTupleIndex = 0; + for (Column eachColumn: plan.getGroupingColumns()) { + groupbyResultTupleIndex.put(eachColumn, resultTupleIndex); + resultTupleIndex++; + } + for (GroupbyNode eachGroupby : groupbyNodes) { + Set groupingColumnSet = new HashSet(); + for (Column column: eachGroupby.getGroupingColumns()) { + groupingColumnSet.add(column); + } + for (Target eachTarget: eachGroupby.getTargets()) { + if (!groupingColumnSet.contains(eachTarget.getNamedColumn())) { + //aggr function + groupbyResultTupleIndex.put(eachTarget.getNamedColumn(), resultTupleIndex); + resultTupleIndex++; + } + } + } + + int index = 0; + for (Column eachOutputColumn: outSchema.getColumns()) { + // If column is avg aggregation function, outschema's column type is float + // but groupbyResultTupleIndex's column type is protobuf + + int matchedIndex = -1; + for (Column eachIndexColumn: groupbyResultTupleIndex.keySet()) { + if (eachIndexColumn.getQualifiedName().equals(eachOutputColumn.getQualifiedName())) { + matchedIndex = groupbyResultTupleIndex.get(eachIndexColumn); + break; + } + } + if (matchedIndex < 0) { + throw new IOException("Can't find proper output column mapping: " + eachOutputColumn); + } + resultTupleIndexes[matchedIndex] = index++; + } + } + + Tuple prevKeyTuple = null; + Tuple prevTuple = null; + + @Override + public Tuple next() throws IOException { + if (finished) { + return null; + } + + Tuple resultTuple = new VTuple(resultTupleLength); + + while (!context.isStopped()) { + Tuple childTuple = child.next(); + // Last tuple + if (childTuple == null) { + finished = true; + + if (prevTuple == null) { + // Empty case + if (numGroupingColumns == 0) { + // No grouping column, return null tuple + return makeEmptyTuple(); + } else { + return null; + } + } + + for (int i = 0; i < numGroupingColumns; i++) { + resultTuple.put(resultTupleIndexes[i], prevTuple.get(i + 1)); + } + for (DistinctFinalAggregator eachAggr: aggregators) { + eachAggr.terminate(resultTuple); + } + break; + } + + Tuple tuple = null; + try { + tuple = childTuple.clone(); + } catch (CloneNotSupportedException e) { + throw new IOException(e.getMessage(), e); + } + + int distinctSeq = tuple.get(0).asInt2(); + Tuple keyTuple = getGroupingKeyTuple(tuple); + + // First tuple + if (prevKeyTuple == null) { + prevKeyTuple = keyTuple; + prevTuple = tuple; + + aggregators[distinctSeq].merge(tuple); + continue; + } + + if (!prevKeyTuple.equals(keyTuple)) { + // new grouping key + for (int i = 0; i < numGroupingColumns; i++) { + resultTuple.put(resultTupleIndexes[i], prevTuple.get(i + 1)); + } + for (DistinctFinalAggregator eachAggr: aggregators) { + eachAggr.terminate(resultTuple); + } + + prevKeyTuple = keyTuple; + prevTuple = tuple; + + aggregators[distinctSeq].merge(tuple); + break; + } else { + prevKeyTuple = keyTuple; + prevTuple = tuple; + aggregators[distinctSeq].merge(tuple); + } + } + + return resultTuple; + } + + private Tuple makeEmptyTuple() { + Tuple resultTuple = new VTuple(resultTupleLength); + for (DistinctFinalAggregator eachAggr: aggregators) { + eachAggr.terminateEmpty(resultTuple); + } + + return resultTuple; + } + + private Tuple getGroupingKeyTuple(Tuple tuple) { + Tuple keyTuple = new VTuple(numGroupingColumns); + for (int i = 0; i < numGroupingColumns; i++) { + keyTuple.put(i, tuple.get(i + 1)); + } + + return keyTuple; + } + + @Override + public void rescan() throws IOException { + super.rescan(); + prevKeyTuple = null; + prevTuple = null; + finished = false; + } + + @Override + public void close() throws IOException { + super.close(); + } + + class DistinctFinalAggregator { + private FunctionContext[] functionContexts; + private AggregationFunctionCallEval[] aggrFunctions; + private int seq; + private int inTupleIndex; + private int outTupleIndex; + public DistinctFinalAggregator(int seq, int inTupleIndex, int outTupleIndex, GroupbyNode groupbyNode) { + this.seq = seq; + this.inTupleIndex = inTupleIndex; + this.outTupleIndex = outTupleIndex; + + aggrFunctions = groupbyNode.getAggFunctions(); + if (aggrFunctions != null) { + for (AggregationFunctionCallEval eachFunction: aggrFunctions) { + eachFunction.setFinalPhase(); + } + } + newFunctionContext(); + } + + private void newFunctionContext() { + functionContexts = new FunctionContext[aggrFunctions.length]; + for (int i = 0; i < aggrFunctions.length; i++) { + functionContexts[i] = aggrFunctions[i].newContext(); + } + } + + public void merge(Tuple tuple) { + for (int i = 0; i < aggrFunctions.length; i++) { + aggrFunctions[i].merge(functionContexts[i], inSchema, tuple); + } + + if (seq == 0 && nonDistinctAggr != null) { + if (!tuple.get(nonDistinctAggr.inTupleIndex).isNull()) { + nonDistinctAggr.merge(tuple); + } + } + } + + public void terminate(Tuple resultTuple) { + for (int i = 0; i < aggrFunctions.length; i++) { + resultTuple.put(resultTupleIndexes[outTupleIndex + i], aggrFunctions[i].terminate(functionContexts[i])); + } + newFunctionContext(); + + if (seq == 0 && nonDistinctAggr != null) { + nonDistinctAggr.terminate(resultTuple); + } + } + + public void terminateEmpty(Tuple resultTuple) { + newFunctionContext(); + for (int i = 0; i < aggrFunctions.length; i++) { + resultTuple.put(resultTupleIndexes[outTupleIndex + i], aggrFunctions[i].terminate(functionContexts[i])); + } + if (seq == 0 && nonDistinctAggr != null) { + nonDistinctAggr.terminateEmpty(resultTuple); + } + } + } +} diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java index 4deddee587..598054c624 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java @@ -33,10 +33,8 @@ import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; -import org.apache.tajo.engine.planner.PlannerUtil; -import org.apache.tajo.engine.planner.PlanningException; -import org.apache.tajo.engine.planner.RangePartitionAlgorithm; -import org.apache.tajo.engine.planner.UniformRangePartition; +import org.apache.tajo.engine.planner.*; +import org.apache.tajo.engine.planner.enforce.Enforcer; import org.apache.tajo.engine.planner.global.DataChannel; import org.apache.tajo.engine.planner.global.ExecutionBlock; import org.apache.tajo.engine.planner.global.GlobalPlanner; @@ -45,6 +43,8 @@ import org.apache.tajo.engine.utils.TupleUtil; import org.apache.tajo.exception.InternalException; import org.apache.tajo.ipc.TajoWorkerProtocol; +import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage; +import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty; import org.apache.tajo.master.TaskSchedulerContext; import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry; import org.apache.tajo.storage.AbstractStorageManager; @@ -799,13 +799,30 @@ public static void scheduleHashShuffledFetches(TaskSchedulerContext schedulerCon } int groupingColumns = 0; - GroupbyNode groupby = PlannerUtil.findMostBottomNode(subQuery.getBlock().getPlan(), NodeType.GROUP_BY); - if (groupby != null) { - groupingColumns = groupby.getGroupingColumns().length; - } else { - DistinctGroupbyNode dGroupby = PlannerUtil.findMostBottomNode(subQuery.getBlock().getPlan(), NodeType.DISTINCT_GROUP_BY); - if (dGroupby != null) { - groupingColumns = dGroupby.getGroupingColumns().length; + LogicalNode[] groupbyNodes = PlannerUtil.findAllNodes(subQuery.getBlock().getPlan(), + new NodeType[]{NodeType.GROUP_BY, NodeType.DISTINCT_GROUP_BY}); + if (groupbyNodes != null && groupbyNodes.length > 0) { + LogicalNode bottomNode = groupbyNodes[0]; + if (bottomNode.getType() == NodeType.GROUP_BY) { + groupingColumns = ((GroupbyNode)bottomNode).getGroupingColumns().length; + } else if (bottomNode.getType() == NodeType.DISTINCT_GROUP_BY) { + DistinctGroupbyNode distinctNode = PlannerUtil.findMostBottomNode(subQuery.getBlock().getPlan(), NodeType.DISTINCT_GROUP_BY); + if (distinctNode == null) { + LOG.warn(subQuery.getId() + ", Can't find current DistinctGroupbyNode"); + distinctNode = (DistinctGroupbyNode)bottomNode; + } + groupingColumns = distinctNode.getGroupingColumns().length; + + Enforcer enforcer = execBlock.getEnforcer(); + EnforceProperty property = PhysicalPlannerImpl.getAlgorithmEnforceProperty(enforcer, distinctNode); + if (property != null) { + if (property.getDistinct().getIsMultipleAggregation()) { + MultipleAggregationStage stage = property.getDistinct().getMultipleAggregationStage(); + if (stage != MultipleAggregationStage.THRID_STAGE) { + groupingColumns = distinctNode.getOutSchema().size(); + } + } + } } } // get a proper number of tasks @@ -1145,7 +1162,8 @@ public static SubQuery setShuffleOutputNumForTwoPhase(SubQuery subQuery, final i // set the partition number for group by and sort if (channel.getShuffleType() == HASH_SHUFFLE) { - if (execBlock.getPlan().getType() == NodeType.GROUP_BY) { + if (execBlock.getPlan().getType() == NodeType.GROUP_BY || + execBlock.getPlan().getType() == NodeType.DISTINCT_GROUP_BY) { keys = channel.getShuffleKeys(); } } else if (channel.getShuffleType() == RANGE_SHUFFLE) { diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java index 919ac9b36c..1f348ff229 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java @@ -43,13 +43,18 @@ import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.json.CoreGsonHelper; +import org.apache.tajo.engine.plan.proto.PlanProto; +import org.apache.tajo.engine.planner.PhysicalPlannerImpl; import org.apache.tajo.engine.planner.PlannerUtil; +import org.apache.tajo.engine.planner.enforce.Enforcer; import org.apache.tajo.engine.planner.global.DataChannel; import org.apache.tajo.engine.planner.global.ExecutionBlock; import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.engine.planner.logical.*; import org.apache.tajo.ipc.TajoMasterProtocol; import org.apache.tajo.ipc.TajoWorkerProtocol; +import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage; +import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty; import org.apache.tajo.ipc.TajoWorkerProtocol.IntermediateEntryProto; import org.apache.tajo.master.*; import org.apache.tajo.master.TaskRunnerGroupEvent.EventType; @@ -810,9 +815,30 @@ public static int calculateShuffleOutputNum(SubQuery subQuery, DataChannel chann if (grpNode.getType() == NodeType.GROUP_BY) { hasGroupColumns = ((GroupbyNode)grpNode).getGroupingColumns().length > 0; } else if (grpNode.getType() == NodeType.DISTINCT_GROUP_BY) { - hasGroupColumns = ((DistinctGroupbyNode)grpNode).getGroupingColumns().length > 0; + // Find current distinct stage node. + DistinctGroupbyNode distinctNode = PlannerUtil.findMostBottomNode(subQuery.getBlock().getPlan(), NodeType.DISTINCT_GROUP_BY); + if (distinctNode == null) { + LOG.warn(subQuery.getId() + ", Can't find current DistinctGroupbyNode"); + distinctNode = (DistinctGroupbyNode)grpNode; + } + hasGroupColumns = distinctNode.getGroupingColumns().length > 0; + + Enforcer enforcer = subQuery.getBlock().getEnforcer(); + if (enforcer == null) { + LOG.warn(subQuery.getId() + ", DistinctGroupbyNode's enforcer is null."); + } + EnforceProperty property = PhysicalPlannerImpl.getAlgorithmEnforceProperty(enforcer, distinctNode); + if (property != null) { + if (property.getDistinct().getIsMultipleAggregation()) { + MultipleAggregationStage stage = property.getDistinct().getMultipleAggregationStage(); + if (stage != MultipleAggregationStage.THRID_STAGE) { + hasGroupColumns = true; + } + } + } } if (!hasGroupColumns) { + LOG.info(subQuery.getId() + ", No Grouping Column - determinedTaskNum is set to 1"); return 1; } else { long volume = getInputVolume(subQuery.masterPlan, subQuery.context, subQuery.block); diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto index bde245918b..2760301448 100644 --- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto +++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto @@ -315,6 +315,12 @@ message DistinctGroupbyEnforcer { SORT_AGGREGATION = 1; } + enum MultipleAggregationStage { + FIRST_STAGE = 0; + SECOND_STAGE = 1; + THRID_STAGE = 3; + } + message SortSpecArray { required int32 pid = 1; repeated SortSpecProto sortSpecs = 2; @@ -322,6 +328,8 @@ message DistinctGroupbyEnforcer { required int32 pid = 1; required DistinctAggregationAlgorithm algorithm = 2; repeated SortSpecArray sortSpecArrays = 3; + required bool isMultipleAggregation = 4 [default = false]; + optional MultipleAggregationStage multipleAggregationStage = 5; } message EnforcerProto { diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java index fccec26d1a..8b9f9f7e9c 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java @@ -20,10 +20,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.tajo.IntegrationTest; -import org.apache.tajo.QueryTestCaseBase; -import org.apache.tajo.TajoConstants; -import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.*; import org.apache.tajo.catalog.Schema; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.conf.TajoConf.ConfVars; @@ -34,9 +31,14 @@ import org.apache.tajo.master.querymaster.SubQuery; import org.apache.tajo.storage.StorageConstants; import org.apache.tajo.util.KeyValueSet; +import org.apache.tajo.util.TUtil; import org.apache.tajo.worker.TajoWorker; +import org.junit.AfterClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import java.sql.ResultSet; import java.util.*; @@ -44,11 +46,33 @@ import static org.junit.Assert.*; @Category(IntegrationTest.class) +@RunWith(Parameterized.class) public class TestGroupByQuery extends QueryTestCaseBase { private static final Log LOG = LogFactory.getLog(TestGroupByQuery.class); - public TestGroupByQuery() throws Exception { + public TestGroupByQuery(String groupByOption) throws Exception { super(TajoConstants.DEFAULT_DATABASE_NAME); + + Map variables = new HashMap(); + if (groupByOption.equals("MultiLevel")) { + variables.put(SessionVars.GROUPBY_MULTI_LEVEL_ENABLED.keyname(), "true"); + } else { + variables.put(SessionVars.GROUPBY_MULTI_LEVEL_ENABLED.keyname(), "false"); + } + client.updateSessionVariables(variables); + } + + @AfterClass + public static void tearDown() throws Exception { + client.unsetSessionVariables(TUtil.newList(SessionVars.GROUPBY_MULTI_LEVEL_ENABLED.keyname())); + } + + @Parameters + public static Collection generateParameters() { + return Arrays.asList(new Object[][]{ + {"MultiLevel"}, + {"No-MultiLevel"}, + }); } @Test @@ -284,6 +308,24 @@ public final void testDistinctAggregation7() throws Exception { cleanupQuery(res); } + @Test + public final void testDistinctAggregation8() throws Exception { + /* + select + sum(distinct l_orderkey), + l_linenumber, l_returnflag, l_linestatus, l_shipdate, + count(distinct l_partkey), + sum(l_orderkey) + from + lineitem + group by + l_linenumber, l_returnflag, l_linestatus, l_shipdate; + */ + ResultSet res = executeQuery(); + assertResultSet(res); + cleanupQuery(res); + } + @Test public final void testDistinctAggregationWithHaving1() throws Exception { // select l_linenumber, count(*), count(distinct l_orderkey), sum(distinct l_orderkey) from lineitem @@ -343,6 +385,14 @@ public final void testDistinctAggregationCasebyCase() throws Exception { assertResultSet(res, "testDistinctAggregation_case8.result"); res.close(); + res = executeFile("testDistinctAggregation_case9.sql"); + assertResultSet(res, "testDistinctAggregation_case9.result"); + res.close(); + + res = executeFile("testDistinctAggregation_case10.sql"); + assertResultSet(res, "testDistinctAggregation_case10.result"); + res.close(); + // case9 KeyValueSet tableOptions = new KeyValueSet(); tableOptions.set(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); diff --git a/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation8.sql b/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation8.sql new file mode 100644 index 0000000000..0553d06150 --- /dev/null +++ b/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation8.sql @@ -0,0 +1,9 @@ +select + sum(distinct l_orderkey), + l_linenumber, l_returnflag, l_linestatus, l_shipdate, + count(distinct l_partkey), + sum(l_orderkey) +from + lineitem +group by + l_linenumber, l_returnflag, l_linestatus, l_shipdate; \ No newline at end of file diff --git a/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case10.sql b/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case10.sql new file mode 100644 index 0000000000..6ab7c25362 --- /dev/null +++ b/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case10.sql @@ -0,0 +1,5 @@ +select sum(cnt1), sum(sum2) +from ( + select o_orderdate, count(distinct o_orderpriority), count(distinct o_orderkey) cnt1, sum(o_totalprice) sum2 + from orders group by o_orderdate +) a \ No newline at end of file diff --git a/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case9.sql b/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case9.sql new file mode 100644 index 0000000000..62655999c4 --- /dev/null +++ b/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case9.sql @@ -0,0 +1,11 @@ +select + lineitem.l_orderkey as l_orderkey, + count(distinct lineitem.l_partkey) as cnt1, + sum(lineitem.l_quantity + lineitem.l_linenumber)/count(distinct lineitem.l_suppkey) as value2, + lineitem.l_partkey as l_partkey, + avg(lineitem.l_quantity) as avg1, + count(distinct lineitem.l_suppkey) as cnt2 +from + lineitem +group by + lineitem.l_orderkey, lineitem.l_partkey \ No newline at end of file diff --git a/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation8.result b/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation8.result new file mode 100644 index 0000000000..519390dbe1 --- /dev/null +++ b/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation8.result @@ -0,0 +1,7 @@ +?sum,l_linenumber,l_returnflag,l_linestatus,l_shipdate,?count_1,?sum_2 +------------------------------- +1,1,N,O,1996-03-13,1,1 +2,1,N,O,1997-01-28,1,2 +3,1,R,F,1994-02-02,1,3 +1,2,N,O,1996-04-12,1,1 +3,2,R,F,1993-11-09,1,3 \ No newline at end of file diff --git a/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case10.result b/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case10.result new file mode 100644 index 0000000000..2035d9f333 --- /dev/null +++ b/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case10.result @@ -0,0 +1,3 @@ +?sum,?sum_1 +------------------------------- +3,414440.9 \ No newline at end of file diff --git a/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case9.result b/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case9.result new file mode 100644 index 0000000000..506eea017f --- /dev/null +++ b/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case9.result @@ -0,0 +1,6 @@ +l_orderkey,cnt1,value2,l_partkey,avg1,cnt2 +------------------------------- +1,1,28.0,1,26.5,2 +2,1,39.0,2,38.0,1 +3,1,46.0,2,45.0,1 +3,1,51.0,3,49.0,1 \ No newline at end of file diff --git a/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result b/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result index e6b12b1fcb..25f1ae786c 100644 --- a/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result +++ b/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result @@ -25,6 +25,7 @@ Available Session Variables: \set JOIN_PER_SHUFFLE_SIZE [int value] - shuffle output size for join (mb) \set GROUPBY_PER_SHUFFLE_SIZE [int value] - shuffle output size for sort (mb) \set TABLE_PARTITION_PER_SHUFFLE_SIZE [int value] - shuffle output size for partition table write (mb) +\set GROUPBY_MULTI_LEVEL_ENABLED [true or false] - Multiple level groupby enabled \set EXTSORT_BUFFER_SIZE [long value] - sort buffer size for external sort (mb) \set HASH_JOIN_SIZE_LIMIT [long value] - limited size for hash join (mb) \set INNER_HASH_JOIN_SIZE_LIMIT [long value] - limited size for hash inner join (mb) diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java index 51388a47c9..084c105d2b 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java @@ -42,10 +42,6 @@ public class TupleComparator implements Comparator, ProtoObject