From 5dda14a9d3e2216c1c80ae2b3654a78b852df269 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EA=B9=80=ED=98=95=EC=A4=80?= Date: Tue, 20 May 2014 00:47:45 +0900 Subject: [PATCH] TAJO-833 --- .../apache/tajo/engine/eval/FieldEval.java | 6 ++- .../builder/DistinctGroupbyBuilder.java | 22 ++++++-- .../planner/physical/AggregationExec.java | 6 ++- .../DistinctGroupbyHashAggregationExec.java | 51 +++++++++++++++---- .../DistinctGroupbySortAggregationExec.java | 7 --- .../planner/physical/SortAggregateExec.java | 1 - .../tajo/engine/query/TestGroupByQuery.java | 4 ++ .../testDistinctAggregation_case8.sql | 10 ++++ .../testDistinctAggregation_case8.result | 6 +++ .../apache/tajo/storage/TupleComparator.java | 20 +++++++- 10 files changed, 106 insertions(+), 27 deletions(-) create mode 100644 tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case8.sql create mode 100644 tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case8.result diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/eval/FieldEval.java b/tajo-core/src/main/java/org/apache/tajo/engine/eval/FieldEval.java index ea2b031ebf..6799c04154 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/eval/FieldEval.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/eval/FieldEval.java @@ -42,7 +42,11 @@ public FieldEval(Column column) { @Override public Datum eval(Schema schema, Tuple tuple) { if (fieldId == -1) { - fieldId = schema.getColumnId(column.getQualifiedName()); + if (column.hasQualifier()) { + fieldId = schema.getColumnId(column.getQualifiedName()); + } else { + fieldId = schema.getColumnIdByName(column.getSimpleName()); + } if (fieldId == -1) { throw new IllegalStateException("No Such Column Reference: " + column + ", schema: " + schema); } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java index 1ccd9dcf1e..8727b84eef 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java @@ -263,7 +263,11 @@ select col1, count(distinct col2), count(distinct col3), sum(col4) from ... grou int[] secondStageColumnIds = new int[secondStageDistinctNode.getOutSchema().size()]; int columnIdIndex = 0; for (Column column: secondStageDistinctNode.getGroupingColumns()) { - secondStageColumnIds[originOutputSchema.getColumnId(column.getQualifiedName())] = columnIdIndex; + if (column.hasQualifier()) { + secondStageColumnIds[originOutputSchema.getColumnId(column.getQualifiedName())] = columnIdIndex; + } else { + secondStageColumnIds[originOutputSchema.getColumnIdByName(column.getSimpleName())] = columnIdIndex; + } columnIdIndex++; } @@ -312,8 +316,12 @@ select col1, count(distinct col2), count(distinct col3), sum(col4) from ... grou int targetIdx = originGroupColumns.size() + uniqueDistinctColumn.size() + aggFuncIdx; Target aggFuncTarget = oldTargets[targetIdx]; secondGroupbyTargets.add(aggFuncTarget); - int outputColumnId = originOutputSchema.getColumnId(aggFuncTarget.getNamedColumn().getQualifiedName()); - secondStageColumnIds[outputColumnId] = columnIdIndex; + Column column = aggFuncTarget.getNamedColumn(); + if (column.hasQualifier()) { + secondStageColumnIds[originOutputSchema.getColumnId(column.getQualifiedName())] = columnIdIndex; + } else { + secondStageColumnIds[originOutputSchema.getColumnIdByName(column.getSimpleName())] = columnIdIndex; + } columnIdIndex++; } secondStageGroupbyNode.setTargets(secondGroupbyTargets.toArray(new Target[]{})); @@ -336,8 +344,12 @@ select col1, count(distinct col2), count(distinct col3), sum(col4) from ... grou secondStageAggFunction.setArgs(new EvalNode[] {firstEval}); Target secondTarget = secondStageGroupbyNode.getTargets()[secondStageGroupbyNode.getGroupingColumns().length + aggFuncIdx]; - int outputColumnId = originOutputSchema.getColumnId(secondTarget.getNamedColumn().getQualifiedName()); - secondStageColumnIds[outputColumnId] = columnIdIndex; + Column column = secondTarget.getNamedColumn(); + if (column.hasQualifier()) { + secondStageColumnIds[originOutputSchema.getColumnId(column.getQualifiedName())] = columnIdIndex; + } else { + secondStageColumnIds[originOutputSchema.getColumnIdByName(column.getSimpleName())] = columnIdIndex; + } columnIdIndex++; aggFuncIdx++; } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java index 208973e93f..2a671e6306 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java @@ -49,7 +49,11 @@ public AggregationExec(final TaskAttemptContext context, GroupbyNode plan, Column col; for (int idx = 0; idx < plan.getGroupingColumns().length; idx++) { col = keyColumns[idx]; - groupingKeyIds[idx] = inSchema.getColumnId(col.getQualifiedName()); + if (col.hasQualifier()) { + groupingKeyIds[idx] = inSchema.getColumnId(col.getQualifiedName()); + } else { + groupingKeyIds[idx] = inSchema.getColumnIdByName(col.getSimpleName()); + } } if (plan.hasAggFunctions()) { diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java index 6458f47f58..33cc242085 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java @@ -40,7 +40,6 @@ public class DistinctGroupbyHashAggregationExec extends PhysicalExec { private HashAggregator[] hashAggregators; private PhysicalExec child; - private int distinctGroupingKeyNum; private int distinctGroupingKeyIds[]; private boolean first = true; private int groupbyNodeNum; @@ -58,14 +57,22 @@ public DistinctGroupbyHashAggregationExec(TaskAttemptContext context, DistinctGr this.child = subOp; this.child.init(); - distinctGroupingKeyNum = plan.getGroupingColumns().length; - distinctGroupingKeyIds = new int[distinctGroupingKeyNum]; - - Column[] keyColumns = plan.getGroupingColumns(); - Column col; - for (int idx = 0; idx < plan.getGroupingColumns().length; idx++) { - col = keyColumns[idx]; - distinctGroupingKeyIds[idx] = inSchema.getColumnId(col.getQualifiedName()); + List distinctGroupingKeyIdList = new ArrayList(); + for (Column col: plan.getGroupingColumns()) { + int keyIndex; + if (col.hasQualifier()) { + keyIndex = inSchema.getColumnId(col.getQualifiedName()); + } else { + keyIndex = inSchema.getColumnIdByName(col.getSimpleName()); + } + if (!distinctGroupingKeyIdList.contains(keyIndex)) { + distinctGroupingKeyIdList.add(keyIndex); + } + } + int idx = 0; + distinctGroupingKeyIds = new int[distinctGroupingKeyIdList.size()]; + for (Integer intVal: distinctGroupingKeyIdList) { + distinctGroupingKeyIds[idx++] = intVal.intValue(); } List groupbyNodes = plan.getGroupByNodes(); @@ -179,6 +186,23 @@ public Tuple next() throws IOException { if (tuples[i] != null) { mergedTuple.put(resultColumnIdIndexes[mergeTupleIndex], tuples[i].get(j)); } else { + /* + Output Tuple Index: 0(l_orderkey), 1(l_partkey), 2(default.lineitem.l_suppkey), 5(default.lineitem.l_partkey), 8(sum) + select + lineitem.l_orderkey as l_orderkey, + lineitem.l_partkey as l_partkey, + count(distinct lineitem.l_partkey) as cnt1, + count(distinct lineitem.l_suppkey) as cnt2, + sum(lineitem.l_quantity) as sum1 + from + lineitem + group by + lineitem.l_orderkey, lineitem.l_partkey + + l_orderkey l_partkey default.lineitem.l_suppkey l_orderkey l_partkey default.lineitem.l_partkey l_orderkey l_partkey sum + 1 1 7311 1 1 1 1 1 53.0 + 1 1 7706 + */ mergedTuple.put(resultColumnIdIndexes[mergeTupleIndex], NullDatum.get()); } } @@ -296,9 +320,14 @@ public HashAggregator(GroupbyNode groupbyNode) throws IOException { List groupingKeyIdList = new ArrayList(distinctGroupingKeyIdSet); Column[] keyColumns = groupbyNode.getGroupingColumns(); Column col; - for (int idx = 0; idx < groupbyNode.getGroupingColumns().length; idx++) { + for (int idx = 0; idx < keyColumns.length; idx++) { col = keyColumns[idx]; - int keyIndex = inSchema.getColumnId(col.getQualifiedName()); + int keyIndex; + if (col.hasQualifier()) { + keyIndex = inSchema.getColumnId(col.getQualifiedName()); + } else { + keyIndex = inSchema.getColumnIdByName(col.getSimpleName()); + } if (!distinctGroupingKeyIdSet.contains(keyIndex)) { groupingKeyIdList.add(keyIndex); } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySortAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySortAggregationExec.java index c8457ace45..fd79725da3 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySortAggregationExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySortAggregationExec.java @@ -18,7 +18,6 @@ package org.apache.tajo.engine.planner.physical; -import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.engine.planner.logical.DistinctGroupbyNode; import org.apache.tajo.engine.planner.logical.GroupbyNode; @@ -34,8 +33,6 @@ public class DistinctGroupbySortAggregationExec extends PhysicalExec { private boolean finished = false; - private int distinctGroupingKeyNum; - private Tuple[] currentTuples; private int outColumnNum; private int groupbyNodeNum; @@ -49,9 +46,6 @@ public DistinctGroupbySortAggregationExec(final TaskAttemptContext context, Dist this.aggregateExecs = aggregateExecs; this.groupbyNodeNum = plan.getGroupByNodes().size(); - final Column[] keyColumns = plan.getGroupingColumns(); - distinctGroupingKeyNum = keyColumns.length; - currentTuples = new Tuple[groupbyNodeNum]; outColumnNum = outSchema.size(); @@ -116,7 +110,6 @@ public Tuple next() throws IOException { mergeTupleIndex++; } } - return mergedTuple; } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java index 4c4227f066..9a415d1d6c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java @@ -57,7 +57,6 @@ public Tuple next() throws IOException { Tuple outputTuple = null; while(!context.isStopped() && (tuple = child.next()) != null) { - // get a key tuple currentKey = new VTuple(groupingKeyIds.length); for(int i = 0; i < groupingKeyIds.length; i++) { diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java index 91993a1b3b..1263bbecfe 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java @@ -249,6 +249,10 @@ public final void testDistinctAggregationCasebyCase() throws Exception { res = executeFile("testDistinctAggregation_case7.sql"); assertResultSet(res, "testDistinctAggregation_case7.result"); res.close(); + + res = executeFile("testDistinctAggregation_case8.sql"); + assertResultSet(res, "testDistinctAggregation_case8.result"); + res.close(); } @Test diff --git a/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case8.sql b/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case8.sql new file mode 100644 index 0000000000..ed8e363c1f --- /dev/null +++ b/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case8.sql @@ -0,0 +1,10 @@ +select + lineitem.l_orderkey as l_orderkey, + lineitem.l_partkey as l_partkey, + count(distinct lineitem.l_partkey) as cnt1, + count(distinct lineitem.l_suppkey) as cnt2, + sum(lineitem.l_quantity) as sum1 +from + lineitem +group by + lineitem.l_orderkey, lineitem.l_partkey \ No newline at end of file diff --git a/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case8.result b/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case8.result new file mode 100644 index 0000000000..e2348966ee --- /dev/null +++ b/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case8.result @@ -0,0 +1,6 @@ +l_orderkey,l_partkey,cnt1,cnt2,sum1 +------------------------------- +1,1,1,2,53.0 +2,2,1,1,38.0 +3,2,1,1,45.0 +3,3,1,1,49.0 \ No newline at end of file diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java index 30f281058c..51388a47c9 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java @@ -58,7 +58,11 @@ public TupleComparator(Schema schema, SortSpec[] sortKeys) { this.asc = new boolean[sortKeys.length]; this.nullFirsts = new boolean[sortKeys.length]; for (int i = 0; i < sortKeys.length; i++) { - this.sortKeyIds[i] = schema.getColumnId(sortKeys[i].getSortKey().getQualifiedName()); + if (sortKeys[i].getSortKey().hasQualifier()) { + this.sortKeyIds[i] = schema.getColumnId(sortKeys[i].getSortKey().getQualifiedName()); + } else { + this.sortKeyIds[i] = schema.getColumnIdByName(sortKeys[i].getSortKey().getSimpleName()); + } this.asc[i] = sortKeys[i].isAscending(); this.nullFirsts[i]= sortKeys[i].isNullFirst(); @@ -160,4 +164,18 @@ public TupleComparatorProto getProto() { return builder.build(); } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + + String prefix = ""; + for (int i = 0; i < sortKeyIds.length; i++) { + sb.append(prefix).append("SortKeyId=").append(sortKeyIds[i]) + .append(",Asc=").append(asc[i]) + .append(",NullFirst=").append(nullFirsts[i]); + prefix = " ,"; + } + return sb.toString(); + } } \ No newline at end of file