Skip to content
This repository was archived by the owner on May 12, 2021. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ private LogicalNode insertGroupbyNode(PlanContext context, LogicalNode child, St
for (Iterator<NamedExpr> it = block.namedExprsMgr.getIteratorForUnevaluatedExprs(); it.hasNext();) {
NamedExpr rawTarget = it.next();
try {
includeDistinctFunction = PlannerUtil.existsDistinctAggregationFunction(rawTarget.getExpr());
includeDistinctFunction |= PlannerUtil.existsDistinctAggregationFunction(rawTarget.getExpr());
EvalNode evalNode = exprAnnotator.createEvalNode(context.plan, context.queryBlock, rawTarget.getExpr());
if (evalNode.getType() == EvalType.AGG_FUNCTION) {
aggEvalNames.add(rawTarget.getAlias());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public Tuple next() throws IOException {
}
if (first) {
loadChildHashTable();

progress = 0.5f;
first = false;
}
Expand Down Expand Up @@ -141,9 +142,12 @@ public Tuple next() throws IOException {
//--------------------------------------------------------------------------------------

List<List<Tuple>> tupleSlots = new ArrayList<List<Tuple>>();

// processing aggregation with single grouping key
for (int i = 0; i < hashAggregators.length; i++) {
if (!hashAggregators[i].iterator.hasNext()) {
nullCount++;
tupleSlots.add(new ArrayList<Tuple>());
continue;
}
Entry<Tuple, Map<Tuple, FunctionContext[]>> entry = hashAggregators[i].iterator.next();
Expand All @@ -161,7 +165,7 @@ public Tuple next() throws IOException {
// If DistinctGroupbyHashAggregationExec didn't has any rows,
// it should return NullDatum.
if (totalNumRows == 0 && groupbyNodeNum == 0) {
Tuple tuple = new VTuple(hashAggregators.length);
Tuple tuple = new VTuple(outputColumnNum);
for (int i = 0; i < tuple.size(); i++) {
tuple.put(i, DatumFactory.createNullDatum());
}
Expand Down Expand Up @@ -199,9 +203,11 @@ public Tuple next() throws IOException {

*/

// currentAggregatedTuples has tuples which has same group key.
currentAggregatedTuples = new ArrayList<Tuple>();
int listIndex = 0;
while (true) {
// Each item in tuples is VTuple. So the tuples variable is two dimensions(tuple[aggregator][datum]).
Tuple[] tuples = new Tuple[hashAggregators.length];
for (int i = 0; i < hashAggregators.length; i++) {
List<Tuple> aggregatedTuples = tupleSlots.get(i);
Expand All @@ -212,7 +218,7 @@ public Tuple next() throws IOException {

//merge
Tuple mergedTuple = new VTuple(outputColumnNum);
int mergeTupleIndex = 0;
int resultColumnIdx = 0;

boolean allNull = true;
for (int i = 0; i < hashAggregators.length; i++) {
Expand All @@ -222,14 +228,22 @@ public Tuple next() throws IOException {

int tupleSize = hashAggregators[i].getTupleSize();
for (int j = 0; j < tupleSize; j++) {
if (resultColumnIdIndexes[mergeTupleIndex] >= 0) {
if (tuples[i] != null) {
mergedTuple.put(resultColumnIdIndexes[mergeTupleIndex], tuples[i].get(j));
int mergeTupleIndex = resultColumnIdIndexes[resultColumnIdx];
if (mergeTupleIndex >= 0) {
if (mergeTupleIndex < distinctGroupingKey.size()) {
// set group key tuple
// Because each hashAggregator has different number of tuples,
// sometimes getting group key from each hashAggregator will be null value.
mergedTuple.put(mergeTupleIndex, distinctGroupingKey.get(mergeTupleIndex));
} else {
mergedTuple.put(resultColumnIdIndexes[mergeTupleIndex], NullDatum.get());
if (tuples[i] != null) {
mergedTuple.put(mergeTupleIndex, tuples[i].get(j));
} else {
mergedTuple.put(mergeTupleIndex, NullDatum.get());
}
}
}
mergeTupleIndex++;
resultColumnIdx++;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,20 +271,84 @@ public final void testDistinctAggregationCasebyCase() throws Exception {
schema.addColumn("code", Type.TEXT);
schema.addColumn("qty", Type.INT4);
schema.addColumn("qty2", Type.FLOAT8);
String[] data = new String[]{ "1|a|3|3.0", "1|a|4|4.0", "1|b|5|5.0", "2|a|1|6.0", "2|c|2|7.0", "2|d|3|8.0" };
String[] data = new String[]{"1|a|3|3.0", "1|a|4|4.0", "1|b|5|5.0", "2|a|1|6.0", "2|c|2|7.0", "2|d|3|8.0"};
TajoTestingCluster.createTable("table10", schema, tableOptions, data);

res = executeString("select id, count(distinct code), " +
"avg(qty), min(qty), max(qty), sum(qty), " +
"cast(avg(qty2) as INT8), cast(min(qty2) as INT8), cast(max(qty2) as INT8), cast(sum(qty2) as INT8) " +
"from table10 group by id");
String result = resultSetToString(res);

String expected = "id,?count_4,?avg_5,?min_6,?max_7,?sum_8,?cast_9,?cast_10,?cast_11,?cast_12\n" +
"-------------------------------\n" +
"1,2,4.0,0,5,12,4,0,5,12\n" +
"2,3,2.0,0,3,6,7,0,8,21\n";

assertEquals(expected, resultSetToString(res));

// multiple distinct with expression
res = executeString(
"select count(distinct code) + count(distinct qty) from table10"
);

expected = "?plus_2\n" +
"-------------------------------\n" +
"9\n";

assertEquals(expected, resultSetToString(res));
res.close();

res = executeString(
"select id, count(distinct code) + count(distinct qty) from table10 group by id"
);

expected = "id,?plus_2\n" +
"-------------------------------\n" +
"1,5\n" +
"2,6\n";

assertEquals(expected, resultSetToString(res));
res.close();

executeString("DROP TABLE table10 PURGE").close();
}

@Test
public final void testDistinctAggregationCasebyCase2() throws Exception {
// first distinct is smaller than second distinct.
KeyValueSet tableOptions = new KeyValueSet();
tableOptions.put(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
tableOptions.put(StorageConstants.CSVFILE_NULL, "\\\\N");

Schema schema = new Schema();
schema.addColumn("col1", Type.TEXT);
schema.addColumn("col2", Type.TEXT);
schema.addColumn("col3", Type.TEXT);

String[] data = new String[]{
"a|b-1|\\N",
"a|b-2|\\N",
"a|b-2|\\N",
"a|b-3|\\N",
"a|b-3|\\N",
"a|b-3|\\N"
};

TajoTestingCluster.createTable("table10", schema, tableOptions, data);

ResultSet res = executeString(
"select col1 \n" +
",count(distinct col2) as cnt1\n" +
",count(distinct case when col3 is not null then col2 else null end) as cnt2\n" +
"from table10 \n" +
"group by col1"
);
String result = resultSetToString(res);

String expected = "col1,cnt1,cnt2\n" +
"-------------------------------\n" +
"a,3,1\n";

assertEquals(expected, result);

executeString("DROP TABLE table10 PURGE").close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1457,11 +1457,6 @@ protected int nextKeyBuffer() throws IOException {
currentKeyLength = sin.readInt();
compressedKeyLen = sin.readInt();

// System.out.println(">>>currentRecordLength=" + currentRecordLength +
// ",currentKeyLength=" + currentKeyLength +
// ",compressedKeyLen=" + compressedKeyLen +
// ",decompress=" + decompress);

if (decompress) {
keyTempBuffer.reset();
keyTempBuffer.write(sin, compressedKeyLen);
Expand Down