From f6b55a95e70863edc2a50898e6b9d5d8b3fea3e0 Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Mon, 30 Mar 2015 18:24:31 +0900 Subject: [PATCH 1/2] TAJO-1484 Apply on ColPartitionStoreExec --- .../physical/ColPartitionStoreExec.java | 5 ++ .../HashBasedColPartitionStoreExec.java | 59 ++++++++--------- .../SortBasedColPartitionStoreExec.java | 63 +++++++------------ 3 files changed, 52 insertions(+), 75 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java index 4481569fc8..ecb45d6292 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java @@ -174,4 +174,9 @@ public void openAppender(int suffixId) throws IOException { appender.enableStats(); appender.init(); } + + @Override + public void rescan() throws IOException { + // nothing to do + } } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java index e94bc262f5..443cee1394 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java @@ -21,6 +21,7 @@ import org.apache.tajo.catalog.statistics.StatisticsUtil; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.datum.Datum; +import org.apache.tajo.engine.planner.physical.ComparableVector.ComparableTuple; import org.apache.tajo.plan.logical.StoreTableNode; import org.apache.tajo.storage.Appender; import org.apache.tajo.storage.Tuple; @@ -37,26 +38,35 @@ * This class is a physical operator to store at column partitioned table. */ public class HashBasedColPartitionStoreExec extends ColPartitionStoreExec { - private final Map appenderMap = new HashMap(); + + private final ComparableTuple partKey; + private final Map appenderMap = new HashMap(); public HashBasedColPartitionStoreExec(TaskAttemptContext context, StoreTableNode plan, PhysicalExec child) throws IOException { super(context, plan, child); + partKey = new ComparableTuple(inSchema, keyIds); } - public void init() throws IOException { - super.init(); - } - - private Appender getAppender(String partition) throws IOException { - Appender appender = appenderMap.get(partition); + private transient final StringBuilder sb = new StringBuilder(); - if (appender == null) { - appender = getNextPartitionAppender(partition); - appenderMap.put(partition, appender); - } else { - appender = appenderMap.get(partition); + private Appender getAppender(ComparableTuple partitionKey, Tuple tuple) throws IOException { + Appender appender = appenderMap.get(partitionKey); + if (appender != null) { + return appender; } + sb.setLength(0); + for (int i = 0; i < keyNum; i++) { + if (i > 0) { + sb.append('/'); + } + sb.append(keyNames[i]).append('='); + Datum datum = tuple.get(keyIds[i]); + sb.append(StringUtils.escapePathName(datum.asChars())); + } + appender = getNextPartitionAppender(sb.toString()); + + appenderMap.put(partitionKey.copy(), appender); return appender; } @@ -66,28 +76,14 @@ private Appender getAppender(String partition) throws IOException { @Override public Tuple next() throws IOException { Tuple tuple; - StringBuilder sb = new StringBuilder(); while(!context.isStopped() && (tuple = child.next()) != null) { - // set subpartition directory name - sb.delete(0, sb.length()); - if (keyIds != null) { - for(int i = 0; i < keyIds.length; i++) { - Datum datum = tuple.get(keyIds[i]); - if(i > 0) - sb.append("/"); - sb.append(keyNames[i]).append("="); - sb.append(StringUtils.escapePathName(datum.asChars())); - } - } - + partKey.set(tuple); // add tuple - Appender appender = getAppender(sb.toString()); - appender.addTuple(tuple); + getAppender(partKey, tuple).addTuple(tuple); } List statSet = new ArrayList(); - for (Map.Entry entry : appenderMap.entrySet()) { - Appender app = entry.getValue(); + for (Appender app : appenderMap.values()) { app.flush(); app.close(); statSet.add(app.getStats()); @@ -99,9 +95,4 @@ public Tuple next() throws IOException { return null; } - - @Override - public void rescan() throws IOException { - // nothing to do - } } \ No newline at end of file diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java index ca90b0e78e..ad481babc8 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java @@ -23,9 +23,9 @@ import org.apache.tajo.catalog.statistics.StatisticsUtil; import org.apache.tajo.datum.Datum; +import org.apache.tajo.engine.planner.physical.ComparableVector.ComparableTuple; import org.apache.tajo.plan.logical.StoreTableNode; import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.VTuple; import org.apache.tajo.util.StringUtils; import org.apache.tajo.worker.TaskAttemptContext; @@ -36,35 +36,24 @@ * ascending or descending order of partition columns. */ public class SortBasedColPartitionStoreExec extends ColPartitionStoreExec { - private Tuple currentKey; - private Tuple prevKey; + + private ComparableTuple prevKey; public SortBasedColPartitionStoreExec(TaskAttemptContext context, StoreTableNode plan, PhysicalExec child) throws IOException { super(context, plan, child); } - public void init() throws IOException { - super.init(); - - currentKey = new VTuple(keyNum); - } - - private void fillKeyTuple(Tuple inTuple, Tuple keyTuple) { - for (int i = 0; i < keyIds.length; i++) { - keyTuple.put(i, inTuple.get(keyIds[i])); - } - } - - private String getSubdirectory(Tuple keyTuple) { - StringBuilder sb = new StringBuilder(); + private transient StringBuilder sb = new StringBuilder(); + private String getSubdirectory(Tuple tuple) { + sb.setLength(0); for(int i = 0; i < keyIds.length; i++) { - Datum datum = keyTuple.get(i); - if(i > 0) { - sb.append("/"); + Datum datum = tuple.get(keyIds[i]); + if (i > 0) { + sb.append('/'); } - sb.append(keyNames[i]).append("="); + sb.append(keyNames[i]).append('='); sb.append(StringUtils.escapePathName(datum.asChars())); } return sb.toString(); @@ -75,22 +64,19 @@ public Tuple next() throws IOException { Tuple tuple; while(!context.isStopped() && (tuple = child.next()) != null) { - fillKeyTuple(tuple, currentKey); - if (prevKey == null) { - appender = getNextPartitionAppender(getSubdirectory(currentKey)); - prevKey = new VTuple(currentKey); - } else { - if (!prevKey.equals(currentKey)) { - appender.close(); - StatisticsUtil.aggregateTableStat(aggregatedStats, appender.getStats()); - - appender = getNextPartitionAppender(getSubdirectory(currentKey)); - prevKey = new VTuple(currentKey); - - // reset all states for file rotating - writtenFileNum = 0; - } + appender = getNextPartitionAppender(getSubdirectory(tuple)); + prevKey = new ComparableTuple(inSchema, keyIds); + prevKey.set(tuple); + } else if (!prevKey.equals(tuple)) { + appender.close(); + StatisticsUtil.aggregateTableStat(aggregatedStats, appender.getStats()); + + appender = getNextPartitionAppender(getSubdirectory(tuple)); + prevKey.set(tuple); + + // reset all states for file rotating + writtenFileNum = 0; } appender.addTuple(tuple); @@ -117,9 +103,4 @@ public void close() throws IOException { context.setResultStats(aggregatedStats); } } - - @Override - public void rescan() throws IOException { - // nothing to do - } } From 08227b844e35c626b8a796817b4069059670f80e Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Wed, 1 Apr 2015 10:02:06 +0900 Subject: [PATCH 2/2] TAJO-1492 Apply on *GroupByExec --- .../planner/physical/AggregationExec.java | 4 + .../DistinctGroupbyFirstAggregationExec.java | 98 ++++++++++--------- .../DistinctGroupbyHashAggregationExec.java | 92 ++++++++--------- .../DistinctGroupbySortAggregationExec.java | 1 - .../DistinctGroupbyThirdAggregationExec.java | 41 +++----- .../planner/physical/HashAggregateExec.java | 37 +++---- .../planner/physical/SortAggregateExec.java | 21 ++-- .../apache/tajo/LocalTajoTestingUtility.java | 12 +-- .../TestGroupByQuery/testGroupBy2.result | 2 +- .../testGroupbyWithJson.result | 2 +- ...tBroadcastMultiColumnPartitionTable.result | 2 +- 11 files changed, 140 insertions(+), 172 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java index 4b53b39501..b80ddb0d7e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java @@ -19,6 +19,7 @@ package org.apache.tajo.engine.planner.physical; import org.apache.tajo.catalog.Column; +import org.apache.tajo.engine.planner.physical.ComparableVector.ComparableTuple; import org.apache.tajo.plan.expr.AggregationFunctionCallEval; import org.apache.tajo.plan.expr.EvalNode; import org.apache.tajo.plan.logical.GroupbyNode; @@ -33,6 +34,8 @@ public abstract class AggregationExec extends UnaryPhysicalExec { protected final int aggFunctionsNum; protected final AggregationFunctionCallEval aggFunctions[]; + protected final ComparableTuple groupingKey; + public AggregationExec(final TaskAttemptContext context, GroupbyNode plan, PhysicalExec child) throws IOException { super(context, plan.getInSchema(), plan.getOutSchema(), child); @@ -57,6 +60,7 @@ public AggregationExec(final TaskAttemptContext context, GroupbyNode plan, aggFunctions = new AggregationFunctionCallEval[0]; aggFunctionsNum = 0; } + groupingKey = new ComparableTuple(inSchema, groupingKeyIds); } @Override diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java index 94429a0d22..a233931f1a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java @@ -24,6 +24,7 @@ import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.datum.Int2Datum; import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.engine.planner.physical.ComparableVector.ComparableTuple; import org.apache.tajo.plan.expr.AggregationFunctionCallEval; import org.apache.tajo.plan.function.FunctionContext; import org.apache.tajo.plan.logical.DistinctGroupbyNode; @@ -141,7 +142,7 @@ public void init() throws IOException { resultTupleLength += nonDistinctHashAggregator.getTupleLength(); } } - distinctAggregators = distinctAggrList.toArray(new DistinctHashAggregator[]{}); + distinctAggregators = distinctAggrList.toArray(new DistinctHashAggregator[distinctAggrList.size()]); } private int currentAggregatorIndex = 0; @@ -171,24 +172,20 @@ public Tuple next() throws IOException { } private void prepareInputData() throws IOException { - Tuple tuple = null; + Tuple tuple; while(!context.isStopped() && (tuple = child.next()) != null) { - Tuple groupingKey = new VTuple(groupingKeyIndexes.length); - for (int i = 0; i < groupingKeyIndexes.length; i++) { - groupingKey.put(i, tuple.get(groupingKeyIndexes[i])); - } for (int i = 0; i < distinctAggregators.length; i++) { - distinctAggregators[i].compute(groupingKey, tuple); + distinctAggregators[i].compute(tuple); } if (nonDistinctHashAggregator != null) { - nonDistinctHashAggregator.compute(groupingKey, tuple); + nonDistinctHashAggregator.compute(tuple); } } for (int i = 0; i < distinctAggregators.length; i++) { distinctAggregators[i].rescan(); } - totalNumRows = distinctAggregators[0].distinctAggrDatas.size(); + totalNumRows = distinctAggregators[0].distinctAggrData.size(); preparedData = true; } @@ -233,13 +230,16 @@ class NonDistinctHashAggregator { private final AggregationFunctionCallEval aggFunctions[]; // GroupingKey -> FunctionContext[] - private Map nonDistinctAggrDatas; + private Map nonDistinctAggrData; private int tupleLength; private Tuple dummyTuple; + + private transient ComparableTuple groupingKey; + private NonDistinctHashAggregator(GroupbyNode groupbyNode) throws IOException { - nonDistinctAggrDatas = new HashMap(); + nonDistinctAggrData = new HashMap(); if (groupbyNode.hasAggFunctions()) { aggFunctions = groupbyNode.getAggFunctions(); @@ -259,26 +259,27 @@ private NonDistinctHashAggregator(GroupbyNode groupbyNode) throws IOException { dummyTuple.put(i, NullDatum.get()); } tupleLength = aggFunctionsNum; + groupingKey = new ComparableTuple(inSchema, groupingKeyIndexes); } - public void compute(Tuple groupingKeyTuple, Tuple tuple) { - FunctionContext[] contexts = nonDistinctAggrDatas.get(groupingKeyTuple); - if (contexts != null) { - for (int i = 0; i < aggFunctions.length; i++) { - aggFunctions[i].merge(contexts[i], tuple); - } - } else { // if the key occurs firstly + public void compute(Tuple tuple) { + groupingKey.set(tuple); + FunctionContext[] contexts = nonDistinctAggrData.get(groupingKey); + if (contexts == null) { + // if the key occurs firstly contexts = new FunctionContext[aggFunctionsNum]; for (int i = 0; i < aggFunctionsNum; i++) { contexts[i] = aggFunctions[i].newContext(); - aggFunctions[i].merge(contexts[i], tuple); } - nonDistinctAggrDatas.put(groupingKeyTuple, contexts); + nonDistinctAggrData.put(groupingKey.copy(), contexts); + } + for (int i = 0; i < aggFunctionsNum; i++) { + aggFunctions[i].merge(contexts[i], tuple); } } - public Tuple aggregate(Tuple groupingKey) { - FunctionContext[] contexts = nonDistinctAggrDatas.get(groupingKey); + public Tuple aggregate(ComparableTuple groupingKey) { + FunctionContext[] contexts = nonDistinctAggrData.get(groupingKey); if (contexts == null) { return null; } @@ -303,14 +304,17 @@ public Tuple getDummyTuple() { class DistinctHashAggregator { // GroupingKey -> DistinctKey - private Map> distinctAggrDatas; - private Iterator>> iterator = null; + private Map> distinctAggrData; + private Iterator>> iterator; private int nodeSequence; private Int2Datum nodeSequenceDatum; private int[] distinctKeyIndexes; + private transient ComparableTuple groupingKey; + private transient ComparableTuple distinctKey; + private int tupleLength; private Tuple dummyTuple; private boolean aggregatorFinished = false; @@ -344,8 +348,11 @@ public DistinctHashAggregator(GroupbyNode groupbyNode) throws IOException { this.distinctKeyIndexes[index++] = eachId; } - this.distinctAggrDatas = new HashMap>(); + this.distinctAggrData = new HashMap>(); this.tupleLength = distinctKeyIndexes.length; + + this.groupingKey = new ComparableTuple(inSchema, groupingKeyIndexes); + this.distinctKey = new ComparableTuple(inSchema, distinctKeyIndexes); } public void setNodeSequence(int nodeSequence) { @@ -357,36 +364,35 @@ public int getTupleLength() { return tupleLength; } - public void compute(Tuple groupingKey, Tuple tuple) throws IOException { - Tuple distinctKeyTuple = new VTuple(distinctKeyIndexes.length); - for (int i = 0; i < distinctKeyIndexes.length; i++) { - distinctKeyTuple.put(i, tuple.get(distinctKeyIndexes[i])); - } - - Set distinctEntry = distinctAggrDatas.get(groupingKey); + public void compute(Tuple tuple) throws IOException { + groupingKey.set(tuple); + Set distinctEntry = distinctAggrData.get(groupingKey); if (distinctEntry == null) { - distinctEntry = new HashSet(); - distinctAggrDatas.put(groupingKey, distinctEntry); + distinctEntry = new HashSet(); + distinctAggrData.put(groupingKey.copy(), distinctEntry); + } + distinctKey.set(tuple); + if (distinctEntry.add(distinctKey)) { + distinctKey = distinctKey.emptyCopy(); } - distinctEntry.add(distinctKeyTuple); } public void rescan() { - iterator = distinctAggrDatas.entrySet().iterator(); + iterator = distinctAggrData.entrySet().iterator(); currentGroupingTuples = null; groupingKeyChanged = false; aggregatorFinished = false; } public void close() throws IOException { - distinctAggrDatas.clear(); - distinctAggrDatas = null; + distinctAggrData.clear(); + distinctAggrData = null; currentGroupingTuples = null; iterator = null; } - Entry> currentGroupingTuples; - Iterator distinctKeyIterator; + Entry> currentGroupingTuples; + Iterator distinctKeyIterator; boolean groupingKeyChanged = false; public Tuple next() { @@ -420,19 +426,19 @@ public Tuple next() { tuple.put(tupleIndex++, nodeSequenceDatum); // merge grouping key - Tuple groupingKeyTuple = currentGroupingTuples.getKey(); - int groupingKeyLength = groupingKeyTuple.size(); + ComparableTuple groupingKey = currentGroupingTuples.getKey(); + int groupingKeyLength = groupingKey.size(); for (int i = 0; i < groupingKeyLength; i++, tupleIndex++) { - tuple.put(tupleIndex, groupingKeyTuple.get(i)); + tuple.put(tupleIndex, groupingKey.toDatum(i)); } // merge distinctKey for (int i = 0; i < distinctAggregators.length; i++) { if (i == nodeSequence) { - Tuple distinctKeyTuple = distinctKeyIterator.next(); + ComparableTuple distinctKeyTuple = distinctKeyIterator.next(); int distinctKeyLength = distinctKeyTuple.size(); for (int j = 0; j < distinctKeyLength; j++, tupleIndex++) { - tuple.put(tupleIndex, distinctKeyTuple.get(j)); + tuple.put(tupleIndex, distinctKeyTuple.toDatum(j)); } } else { Tuple dummyTuple = distinctAggregators[i].getDummyTuple(); @@ -448,7 +454,7 @@ public Tuple next() { Tuple nonDistinctTuple; if (nodeSequence == 0 && groupingKeyChanged) { groupingKeyChanged = false; - nonDistinctTuple = nonDistinctHashAggregator.aggregate(groupingKeyTuple); + nonDistinctTuple = nonDistinctHashAggregator.aggregate(groupingKey); if (nonDistinctTuple == null) { nonDistinctTuple = nonDistinctHashAggregator.getDummyTuple(); } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java index 0f25d6cbb9..e35ec98f15 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java @@ -18,11 +18,13 @@ package org.apache.tajo.engine.planner.physical; +import com.google.common.primitives.Ints; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.datum.DatumFactory; import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.engine.planner.physical.ComparableVector.ComparableTuple; import org.apache.tajo.plan.expr.AggregationFunctionCallEval; import org.apache.tajo.plan.expr.EvalNode; import org.apache.tajo.plan.function.FunctionContext; @@ -50,6 +52,8 @@ public class DistinctGroupbyHashAggregationExec extends UnaryPhysicalExec { private int[] resultColumnIdIndexes; + private ComparableTuple primaryGroupingKey; + public DistinctGroupbyHashAggregationExec(TaskAttemptContext context, DistinctGroupbyNode plan, PhysicalExec subOp) throws IOException { super(context, plan.getInSchema(), plan.getOutSchema(), subOp); @@ -72,11 +76,8 @@ public void init() throws IOException { distinctGroupingKeyIdList.add(keyIndex); } } - int idx = 0; - distinctGroupingKeyIds = new int[distinctGroupingKeyIdList.size()]; - for (Integer intVal: distinctGroupingKeyIdList) { - distinctGroupingKeyIds[idx++] = intVal; - } + + distinctGroupingKeyIds = Ints.toArray(distinctGroupingKeyIdList); List groupbyNodes = plan.getSubPlans(); groupbyNodeNum = groupbyNodes.size(); @@ -103,6 +104,8 @@ public void init() throws IOException { for(int i = 0; i < resultColumnIds.length; i++) { resultColumnIdIndexes[resultColumnIds[i]] = i; } + + primaryGroupingKey = new ComparableTuple(inSchema, distinctGroupingKeyIds); } List currentAggregatedTuples = null; @@ -125,8 +128,7 @@ public Tuple next() throws IOException { return currentAggregatedTuples.get(currentAggregatedTupleIndex++); } - Tuple distinctGroupingKey = null; - int nullCount = 0; + ComparableTuple distinctGroupingKey = null; //-------------------------------------------------------------------------------------- // Output tuple @@ -148,11 +150,10 @@ public Tuple next() throws IOException { // aggregation with single grouping key for (int i = 0; i < hashAggregators.length; i++) { if (!hashAggregators[i].iterator.hasNext()) { - nullCount++; tupleSlots.add(new ArrayList()); continue; } - Entry> entry = hashAggregators[i].iterator.next(); + Entry> entry = hashAggregators[i].iterator.next(); if (distinctGroupingKey == null) { distinctGroupingKey = entry.getKey(); } @@ -160,7 +161,7 @@ public Tuple next() throws IOException { tupleSlots.add(aggregatedTuples); } - if (nullCount == hashAggregators.length) { + if (distinctGroupingKey == null) { finished = true; progress = 1.0f; @@ -236,7 +237,7 @@ public Tuple next() throws IOException { // set group key tuple // Because each hashAggregator has different number of tuples, // sometimes getting group key from each hashAggregator will be null value. - mergedTuple.put(mergeTupleIndex, distinctGroupingKey.get(mergeTupleIndex)); + mergedTuple.put(mergeTupleIndex, distinctGroupingKey.toDatum(mergeTupleIndex)); } else { if (tuples[i] != null) { mergedTuple.put(mergeTupleIndex, tuples[i].get(j)); @@ -273,10 +274,11 @@ public Tuple next() throws IOException { } private void loadChildHashTable() throws IOException { - Tuple tuple = null; + Tuple tuple; while(!context.isStopped() && (tuple = child.next()) != null) { + primaryGroupingKey.set(tuple); for (int i = 0; i < hashAggregators.length; i++) { - hashAggregators[i].compute(tuple); + hashAggregators[i].compute(tuple, primaryGroupingKey); } } for (int i = 0; i < hashAggregators.length; i++) { @@ -327,18 +329,20 @@ public TableStats getInputStats() { class HashAggregator { // Outer's GroupBy Key -> Each GroupByNode's Key -> FunctionContext - private Map> hashTable; - private Iterator>> iterator = null; + private Map> hashTable; + private Iterator>> iterator; private int groupingKeyIds[]; private final int aggFunctionsNum; private final AggregationFunctionCallEval aggFunctions[]; - int tupleSize; + private int tupleSize; - public HashAggregator(GroupbyNode groupbyNode, Schema schema) throws IOException { + private transient ComparableTuple groupingKey; - hashTable = new HashMap>(10000); + public HashAggregator(GroupbyNode groupbyNode, Schema schema) { + + hashTable = new HashMap>(10000); List distinctGroupingKeyIdSet = new ArrayList(); for (int i = 0; i < distinctGroupingKeyIds.length; i++) { @@ -360,11 +364,7 @@ public HashAggregator(GroupbyNode groupbyNode, Schema schema) throws IOException groupingKeyIdList.add(keyIndex); } } - int index = 0; - groupingKeyIds = new int[groupingKeyIdList.size()]; - for (Integer eachId : groupingKeyIdList) { - groupingKeyIds[index++] = eachId; - } + groupingKeyIds = Ints.toArray(groupingKeyIdList); if (groupbyNode.hasAggFunctions()) { aggFunctions = groupbyNode.getAggFunctions(); @@ -379,40 +379,34 @@ public HashAggregator(GroupbyNode groupbyNode, Schema schema) throws IOException } tupleSize = groupingKeyIds.length + aggFunctionsNum; + + groupingKey = new ComparableTuple(inSchema, groupingKeyIds); } public int getTupleSize() { return tupleSize; } - public void compute(Tuple tuple) throws IOException { - Tuple outerKeyTuple = new VTuple(distinctGroupingKeyIds.length); - for (int i = 0; i < distinctGroupingKeyIds.length; i++) { - outerKeyTuple.put(i, tuple.get(distinctGroupingKeyIds[i])); - } - - Tuple keyTuple = new VTuple(groupingKeyIds.length); - for (int i = 0; i < groupingKeyIds.length; i++) { - keyTuple.put(i, tuple.get(groupingKeyIds[i])); - } + public void compute(Tuple tuple, ComparableTuple primaryGroupingKey) { - Map distinctEntry = hashTable.get(outerKeyTuple); + Map distinctEntry = hashTable.get(primaryGroupingKey); if (distinctEntry == null) { - distinctEntry = new HashMap(); - hashTable.put(outerKeyTuple, distinctEntry); + distinctEntry = new HashMap(); + hashTable.put(primaryGroupingKey.copy(), distinctEntry); } - FunctionContext[] contexts = distinctEntry.get(keyTuple); - if (contexts != null) { - for (int i = 0; i < aggFunctions.length; i++) { - aggFunctions[i].merge(contexts[i], tuple); - } - } else { // if the key occurs firstly + + groupingKey.set(tuple); + FunctionContext[] contexts = distinctEntry.get(groupingKey); + if (contexts == null) { + // if the key occurs firstly contexts = new FunctionContext[aggFunctionsNum]; for (int i = 0; i < aggFunctionsNum; i++) { contexts[i] = aggFunctions[i].newContext(); - aggFunctions[i].merge(contexts[i], tuple); } - distinctEntry.put(keyTuple, contexts); + distinctEntry.put(groupingKey.copy(), contexts); + } + for (int i = 0; i < aggFunctions.length; i++) { + aggFunctions[i].merge(contexts[i], tuple); } } @@ -420,15 +414,15 @@ public void initFetch() { iterator = hashTable.entrySet().iterator(); } - public List aggregate(Map groupTuples) { + public List aggregate(Map groupTuples) { List aggregatedTuples = new ArrayList(); - for (Entry entry : groupTuples.entrySet()) { + for (Entry entry : groupTuples.entrySet()) { Tuple tuple = new VTuple(groupingKeyIds.length + aggFunctionsNum); - Tuple groupbyKey = entry.getKey(); + ComparableTuple groupbyKey = entry.getKey(); int index = 0; for (; index < groupbyKey.size(); index++) { - tuple.put(index, groupbyKey.get(index)); + tuple.put(index, groupbyKey.toDatum(index)); } FunctionContext[] contexts = entry.getValue(); @@ -440,7 +434,7 @@ public List aggregate(Map groupTuples) { return aggregatedTuples; } - public void close() throws IOException { + public void close() { hashTable.clear(); hashTable = null; iterator = null; diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySortAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySortAggregationExec.java index 9ff479b5da..15e2479cf9 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySortAggregationExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySortAggregationExec.java @@ -22,7 +22,6 @@ import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.datum.DatumFactory; import org.apache.tajo.datum.NullDatum; -import org.apache.tajo.plan.expr.AggregationFunctionCallEval; import org.apache.tajo.plan.logical.DistinctGroupbyNode; import org.apache.tajo.plan.logical.GroupbyNode; import org.apache.tajo.storage.Tuple; diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java index e71976c9ee..ec6a8de308 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java @@ -21,6 +21,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tajo.catalog.Column; +import org.apache.tajo.engine.planner.physical.ComparableVector.ComparableTuple; import org.apache.tajo.plan.Target; import org.apache.tajo.plan.expr.AggregationFunctionCallEval; import org.apache.tajo.plan.function.FunctionContext; @@ -33,6 +34,7 @@ import java.io.IOException; import java.util.*; + /** * This class aggregates the output of DistinctGroupbySecondAggregationExec. * @@ -85,7 +87,7 @@ public void init() throws IOException { } resultTupleLength += eachGroupby.getAggFunctions().length; } - aggregators = aggregatorList.toArray(new DistinctFinalAggregator[]{}); + aggregators = aggregatorList.toArray(new DistinctFinalAggregator[aggregatorList.size()]); // make output schema mapping index resultTupleIndexes = new int[outSchema.size()]; @@ -97,9 +99,7 @@ public void init() throws IOException { } for (GroupbyNode eachGroupby : groupbyNodes) { Set groupingColumnSet = new HashSet(); - for (Column column: eachGroupby.getGroupingColumns()) { - groupingColumnSet.add(column); - } + Collections.addAll(groupingColumnSet, eachGroupby.getGroupingColumns()); for (Target eachTarget: eachGroupby.getTargets()) { if (!groupingColumnSet.contains(eachTarget.getNamedColumn())) { //aggr function @@ -128,8 +128,8 @@ public void init() throws IOException { } } - Tuple prevKeyTuple = null; - Tuple prevTuple = null; + private transient ComparableTuple keyTuple; + private transient Tuple prevTuple; @Override public Tuple next() throws IOException { @@ -164,7 +164,7 @@ public Tuple next() throws IOException { break; } - Tuple tuple = null; + Tuple tuple; try { tuple = childTuple.clone(); } catch (CloneNotSupportedException e) { @@ -172,18 +172,18 @@ public Tuple next() throws IOException { } int distinctSeq = tuple.get(0).asInt2(); - Tuple keyTuple = getGroupingKeyTuple(tuple); // First tuple - if (prevKeyTuple == null) { - prevKeyTuple = keyTuple; + if (keyTuple == null) { + keyTuple = new ComparableTuple(inSchema, 1, numGroupingColumns + 1); + keyTuple.set(tuple); prevTuple = tuple; aggregators[distinctSeq].merge(tuple); continue; } - if (!prevKeyTuple.equals(keyTuple)) { + if (!keyTuple.equals(tuple)) { // new grouping key for (int i = 0; i < numGroupingColumns; i++) { resultTuple.put(resultTupleIndexes[i], prevTuple.get(i + 1)); @@ -192,13 +192,12 @@ public Tuple next() throws IOException { eachAggr.terminate(resultTuple); } - prevKeyTuple = keyTuple; + keyTuple.set(tuple); prevTuple = tuple; aggregators[distinctSeq].merge(tuple); break; } else { - prevKeyTuple = keyTuple; prevTuple = tuple; aggregators[distinctSeq].merge(tuple); } @@ -216,28 +215,14 @@ private Tuple makeEmptyTuple() { return resultTuple; } - private Tuple getGroupingKeyTuple(Tuple tuple) { - Tuple keyTuple = new VTuple(numGroupingColumns); - for (int i = 0; i < numGroupingColumns; i++) { - keyTuple.put(i, tuple.get(i + 1)); - } - - return keyTuple; - } - @Override public void rescan() throws IOException { super.rescan(); - prevKeyTuple = null; + keyTuple = null; prevTuple = null; finished = false; } - @Override - public void close() throws IOException { - super.close(); - } - class DistinctFinalAggregator { private FunctionContext[] functionContexts; private AggregationFunctionCallEval[] aggrFunctions; diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java index 8ffd5039aa..cc292a2045 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java @@ -18,6 +18,7 @@ package org.apache.tajo.engine.planner.physical; +import org.apache.tajo.engine.planner.physical.ComparableVector.ComparableTuple; import org.apache.tajo.plan.function.FunctionContext; import org.apache.tajo.plan.logical.GroupbyNode; import org.apache.tajo.storage.Tuple; @@ -35,44 +36,38 @@ */ public class HashAggregateExec extends AggregationExec { private Tuple tuple = null; - private Map hashTable; + private Map hashTable; private boolean computed = false; - private Iterator> iterator = null; + private Iterator> iterator = null; public HashAggregateExec(TaskAttemptContext ctx, GroupbyNode plan, PhysicalExec subOp) throws IOException { super(ctx, plan, subOp); - hashTable = new HashMap(100000); + hashTable = new HashMap(100000); this.tuple = new VTuple(plan.getOutSchema().size()); } private void compute() throws IOException { Tuple tuple; - Tuple keyTuple; while(!context.isStopped() && (tuple = child.next()) != null) { - keyTuple = new VTuple(groupingKeyIds.length); - // build one key tuple - for(int i = 0; i < groupingKeyIds.length; i++) { - keyTuple.put(i, tuple.get(groupingKeyIds[i])); - } + groupingKey.set(tuple); - FunctionContext [] contexts = hashTable.get(keyTuple); - if(contexts != null) { - for(int i = 0; i < aggFunctions.length; i++) { - aggFunctions[i].merge(contexts[i], tuple); - } - } else { // if the key occurs firstly + FunctionContext [] contexts = hashTable.get(groupingKey); + if (contexts == null) { + // if the key occurs firstly contexts = new FunctionContext[aggFunctionsNum]; for(int i = 0; i < aggFunctionsNum; i++) { contexts[i] = aggFunctions[i].newContext(); - aggFunctions[i].merge(contexts[i], tuple); } - hashTable.put(keyTuple, contexts); + hashTable.put(groupingKey.copy(), contexts); + } + for(int i = 0; i < aggFunctions.length; i++) { + aggFunctions[i].merge(contexts[i], tuple); } } // If HashAggregateExec received NullDatum and didn't has any grouping keys, // it should return primitive values for NullLDatum. - if (groupingKeyNum == 0 && aggFunctionsNum > 0 && hashTable.entrySet().size() == 0) { + if (groupingKeyNum == 0 && aggFunctionsNum > 0 && hashTable.isEmpty()) { FunctionContext[] contexts = new FunctionContext[aggFunctionsNum]; for(int i = 0; i < aggFunctionsNum; i++) { contexts[i] = aggFunctions[i].newContext(); @@ -92,13 +87,13 @@ public Tuple next() throws IOException { FunctionContext [] contexts; if (iterator.hasNext()) { - Entry entry = iterator.next(); - Tuple keyTuple = entry.getKey(); + Entry entry = iterator.next(); + ComparableTuple keyTuple = entry.getKey(); contexts = entry.getValue(); int tupleIdx = 0; for (; tupleIdx < groupingKeyNum; tupleIdx++) { - tuple.put(tupleIdx, keyTuple.get(tupleIdx)); + tuple.put(tupleIdx, keyTuple.toDatum(tupleIdx)); } for (int funcIdx = 0; funcIdx < aggFunctionsNum; funcIdx++, tupleIdx++) { tuple.put(tupleIdx, aggFunctions[funcIdx].terminate(contexts[funcIdx])); diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java index 9831d83ed3..c3379f65fd 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java @@ -19,6 +19,7 @@ package org.apache.tajo.engine.planner.physical; import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.engine.planner.physical.ComparableVector.ComparableTuple; import org.apache.tajo.plan.function.FunctionContext; import org.apache.tajo.plan.logical.GroupbyNode; import org.apache.tajo.storage.Tuple; @@ -42,7 +43,7 @@ * it makes an output tuple. */ public class SortAggregateExec extends AggregationExec { - private Tuple lastKey = null; + private ComparableTuple lastKey = null; private boolean finished = false; private FunctionContext contexts[]; @@ -53,19 +54,13 @@ public SortAggregateExec(TaskAttemptContext context, GroupbyNode plan, PhysicalE @Override public Tuple next() throws IOException { - Tuple currentKey; Tuple tuple = null; Tuple outputTuple = null; while(!context.isStopped() && (tuple = child.next()) != null) { - // get a key tuple - currentKey = new VTuple(groupingKeyIds.length); - for(int i = 0; i < groupingKeyIds.length; i++) { - currentKey.put(i, tuple.get(groupingKeyIds[i])); - } - + groupingKey.set(tuple); /** Aggregation State */ - if (lastKey == null || lastKey.equals(currentKey)) { + if (lastKey == null || lastKey.equals(groupingKey)) { if (lastKey == null) { for(int i = 0; i < aggFunctionsNum; i++) { contexts[i] = aggFunctions[i].newContext(); @@ -76,7 +71,7 @@ public Tuple next() throws IOException { aggFunctions[i].merge(contexts[i], tuple); } } - lastKey = currentKey; + lastKey = groupingKey.copy(); } else { // aggregate for (int i = 0; i < aggFunctionsNum; i++) { @@ -90,7 +85,7 @@ public Tuple next() throws IOException { int tupleIdx = 0; for(; tupleIdx < groupingKeyNum; tupleIdx++) { - outputTuple.put(tupleIdx, lastKey.get(tupleIdx)); + outputTuple.put(tupleIdx, lastKey.toDatum(tupleIdx)); } for(int aggFuncIdx = 0; aggFuncIdx < aggFunctionsNum; tupleIdx++, aggFuncIdx++) { outputTuple.put(tupleIdx, aggFunctions[aggFuncIdx].terminate(contexts[aggFuncIdx])); @@ -101,7 +96,7 @@ public Tuple next() throws IOException { aggFunctions[evalIdx].merge(contexts[evalIdx], tuple); } - lastKey = currentKey; + lastKey = groupingKey.copy(); return outputTuple; } } // while loop @@ -114,7 +109,7 @@ public Tuple next() throws IOException { outputTuple = new VTuple(outSchema.size()); int tupleIdx = 0; for(; tupleIdx < groupingKeyNum; tupleIdx++) { - outputTuple.put(tupleIdx, lastKey.get(tupleIdx)); + outputTuple.put(tupleIdx, lastKey.toDatum(tupleIdx)); } for(int aggFuncIdx = 0; aggFuncIdx < aggFunctionsNum; tupleIdx++, aggFuncIdx++) { outputTuple.put(tupleIdx, aggFunctions[aggFuncIdx].terminate(contexts[aggFuncIdx])); diff --git a/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java b/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java index 5407ff5cd1..8d7fa9ad77 100644 --- a/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java +++ b/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java @@ -56,16 +56,6 @@ public class LocalTajoTestingUtility { private TajoConf conf; private TajoClient client; - private static UserGroupInformation dummyUserInfo; - - static { - try { - dummyUserInfo = UserGroupInformation.getCurrentUser(); - } catch (IOException e) { - e.printStackTrace(); - } - } - private static int taskAttemptId; public static TaskAttemptId newTaskAttemptId() { @@ -77,7 +67,7 @@ public static TaskAttemptId newTaskAttemptId(MasterPlan plan) { } public static Session createDummySession() { - return new Session(UUID.randomUUID().toString(), dummyUserInfo.getUserName(), TajoConstants.DEFAULT_DATABASE_NAME); + return new Session(UUID.randomUUID().toString(), "tajo-test", TajoConstants.DEFAULT_DATABASE_NAME); } public static QueryContext createDummyContext(TajoConf conf) { diff --git a/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupBy2.result b/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupBy2.result index 6afdd23d57..dcdacff094 100644 --- a/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupBy2.result +++ b/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupBy2.result @@ -1,4 +1,4 @@ unique_key ------------------------------- +3 2 -3 \ No newline at end of file diff --git a/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithJson.result b/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithJson.result index 366b76e8c7..514db4baa3 100644 --- a/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithJson.result +++ b/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithJson.result @@ -1,5 +1,5 @@ l_orderkey,total,num ------------------------------- 3,2.5,3 +1,1.0,3 2,2.0,1 -1,1.0,3 \ No newline at end of file diff --git a/tajo-core/src/test/resources/results/TestJoinBroadcast/testBroadcastMultiColumnPartitionTable.result b/tajo-core/src/test/resources/results/TestJoinBroadcast/testBroadcastMultiColumnPartitionTable.result index 9ef26b46d7..35fca3f95b 100644 --- a/tajo-core/src/test/resources/results/TestJoinBroadcast/testBroadcastMultiColumnPartitionTable.result +++ b/tajo-core/src/test/resources/results/TestJoinBroadcast/testBroadcastMultiColumnPartitionTable.result @@ -1,5 +1,5 @@ col3 ------------------------------- 01 +10 12 -10 \ No newline at end of file