Skip to content

Commit

Permalink
[Backport] SQL: Finalize aggregations for inner queries when necessar…
Browse files Browse the repository at this point in the history
…y. (#6229)

* SQL: Finalize aggregations for inner queries when necessary. (#6221)

* SQL: Finalize aggregations for inner queries when necessary.

Fixes #5779.

* Fixed test method name.

* Fix test for 0.12 branch.
  • Loading branch information
gianm authored and fjy committed Aug 26, 2018
1 parent 0b79e76 commit 98234e8
Show file tree
Hide file tree
Showing 20 changed files with 192 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.calcite.sql.type.SqlTypeName;

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;

Expand All @@ -62,6 +63,7 @@ public SqlAggFunction calciteFunction()
return FUNCTION_INSTANCE;
}

@Nullable
@Override
public Aggregation toDruidAggregation(
final PlannerContext plannerContext,
Expand All @@ -70,7 +72,8 @@ public Aggregation toDruidAggregation(
final String name,
final AggregateCall aggregateCall,
final Project project,
final List<Aggregation> existingAggregations
final List<Aggregation> existingAggregations,
final boolean finalizeAggregations
)
{
final DruidExpression input = Expressions.toDruidExpression(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
package io.druid.sql.calcite.aggregation;

import com.google.common.collect.ImmutableList;
import io.druid.java.util.common.StringUtils;
import io.druid.math.expr.ExprMacroTable;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
import io.druid.segment.VirtualColumn;
import io.druid.segment.column.ValueType;
import io.druid.sql.calcite.expression.DruidExpression;
import io.druid.sql.calcite.planner.Calcites;

import javax.annotation.Nullable;
import java.util.List;
Expand Down Expand Up @@ -85,7 +85,7 @@ public List<VirtualColumn> getVirtualColumns(final ExprMacroTable macroTable)
@Nullable
public String getVirtualColumnName()
{
return expression.isSimpleExtraction() ? null : StringUtils.format("%s:v", outputName);
return expression.isSimpleExtraction() ? null : Calcites.makePrefixedName(outputName, "v");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ public interface SqlAggregator
* @param project project that should be applied before aggregation; may be null
* @param existingAggregations existing aggregations for this query; useful for re-using aggregations. May be safely
* ignored if you do not want to re-use existing aggregations.
* @param finalizeAggregations true if this query should include explicit finalization for all of its
* aggregators, where required. This is set for subqueries where Druid's native query
* layer does not do this automatically.
*
* @return aggregation, or null if the call cannot be translated
*/
Expand All @@ -64,6 +67,7 @@ Aggregation toDruidAggregation(
String name,
AggregateCall aggregateCall,
Project project,
List<Aggregation> existingAggregations
List<Aggregation> existingAggregations,
boolean finalizeAggregations
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.StringUtils;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
Expand Down Expand Up @@ -52,6 +52,7 @@

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class ApproxCountDistinctSqlAggregator implements SqlAggregator
Expand All @@ -74,7 +75,8 @@ public Aggregation toDruidAggregation(
final String name,
final AggregateCall aggregateCall,
final Project project,
final List<Aggregation> existingAggregations
final List<Aggregation> existingAggregations,
final boolean finalizeAggregations
)
{
// Don't use Aggregations.getArgumentsForSimpleAggregator, since it won't let us use direct column access
Expand All @@ -92,14 +94,15 @@ public Aggregation toDruidAggregation(

final List<VirtualColumn> virtualColumns = new ArrayList<>();
final AggregatorFactory aggregatorFactory;
final String aggregatorName = finalizeAggregations ? Calcites.makePrefixedName(name, "a") : name;

if (arg.isDirectColumnAccess() && rowSignature.getColumnType(arg.getDirectColumn()) == ValueType.COMPLEX) {
aggregatorFactory = new HyperUniquesAggregatorFactory(name, arg.getDirectColumn(), false, true);
aggregatorFactory = new HyperUniquesAggregatorFactory(aggregatorName, arg.getDirectColumn(), false, true);
} else {
final SqlTypeName sqlTypeName = rexNode.getType().getSqlTypeName();
final ValueType inputType = Calcites.getValueTypeForSqlTypeName(sqlTypeName);
if (inputType == null) {
throw new ISE("Cannot translate sqlTypeName[%s] to Druid type for field[%s]", sqlTypeName, name);
throw new ISE("Cannot translate sqlTypeName[%s] to Druid type for field[%s]", sqlTypeName, aggregatorName);
}

final DimensionSpec dimensionSpec;
Expand All @@ -108,18 +111,28 @@ public Aggregation toDruidAggregation(
dimensionSpec = arg.getSimpleExtraction().toDimensionSpec(null, inputType);
} else {
final ExpressionVirtualColumn virtualColumn = arg.toVirtualColumn(
StringUtils.format("%s:v", name),
Calcites.makePrefixedName(name, "v"),
inputType,
plannerContext.getExprMacroTable()
);
dimensionSpec = new DefaultDimensionSpec(virtualColumn.getOutputName(), null, inputType);
virtualColumns.add(virtualColumn);
}

aggregatorFactory = new CardinalityAggregatorFactory(name, null, ImmutableList.of(dimensionSpec), false, true);
aggregatorFactory = new CardinalityAggregatorFactory(
aggregatorName,
null,
ImmutableList.of(dimensionSpec),
false,
true
);
}

return Aggregation.create(virtualColumns, aggregatorFactory);
return Aggregation.create(
virtualColumns,
Collections.singletonList(aggregatorFactory),
finalizeAggregations ? new HyperUniqueFinalizingPostAggregator(name, aggregatorFactory.getName()) : null
);
}

private static class ApproxCountDistinctSqlAggFunction extends SqlAggFunction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import io.druid.java.util.common.StringUtils;
import io.druid.math.expr.ExprMacroTable;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
Expand All @@ -32,6 +31,7 @@
import io.druid.sql.calcite.aggregation.Aggregations;
import io.druid.sql.calcite.aggregation.SqlAggregator;
import io.druid.sql.calcite.expression.DruidExpression;
import io.druid.sql.calcite.planner.Calcites;
import io.druid.sql.calcite.planner.PlannerContext;
import io.druid.sql.calcite.table.RowSignature;
import org.apache.calcite.rel.core.AggregateCall;
Expand Down Expand Up @@ -61,7 +61,8 @@ public Aggregation toDruidAggregation(
final String name,
final AggregateCall aggregateCall,
final Project project,
final List<Aggregation> existingAggregations
final List<Aggregation> existingAggregations,
final boolean finalizeAggregations
)
{
if (aggregateCall.isDistinct()) {
Expand Down Expand Up @@ -102,8 +103,8 @@ public Aggregation toDruidAggregation(
expression = arg.getExpression();
}

final String sumName = StringUtils.format("%s:sum", name);
final String countName = StringUtils.format("%s:count", name);
final String sumName = Calcites.makePrefixedName(name, "sum");
final String countName = Calcites.makePrefixedName(name, "count");
final AggregatorFactory sum = SumSqlAggregator.createSumAggregatorFactory(
sumType,
sumName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ public Aggregation toDruidAggregation(
final String name,
final AggregateCall aggregateCall,
final Project project,
final List<Aggregation> existingAggregations
final List<Aggregation> existingAggregations,
final boolean finalizeAggregations
)
{
final List<DruidExpression> args = Aggregations.getArgumentsForSimpleAggregator(
Expand All @@ -87,7 +88,8 @@ public Aggregation toDruidAggregation(
name,
aggregateCall,
project,
existingAggregations
existingAggregations,
finalizeAggregations
);
} else {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ public Aggregation toDruidAggregation(
final String name,
final AggregateCall aggregateCall,
final Project project,
final List<Aggregation> existingAggregations
final List<Aggregation> existingAggregations,
final boolean finalizeAggregations
)
{
if (aggregateCall.isDistinct()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ public Aggregation toDruidAggregation(
final String name,
final AggregateCall aggregateCall,
final Project project,
final List<Aggregation> existingAggregations
final List<Aggregation> existingAggregations,
final boolean finalizeAggregations
)
{
if (aggregateCall.isDistinct()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ public Aggregation toDruidAggregation(
final String name,
final AggregateCall aggregateCall,
final Project project,
final List<Aggregation> existingAggregations
final List<Aggregation> existingAggregations,
final boolean finalizeAggregations
)
{
if (aggregateCall.isDistinct()) {
Expand Down
11 changes: 8 additions & 3 deletions sql/src/main/java/io/druid/sql/calcite/planner/Calcites.java
Original file line number Diff line number Diff line change
Expand Up @@ -297,21 +297,26 @@ public static boolean isIntLiteral(final RexNode rexNode)
return rexNode instanceof RexLiteral && SqlTypeName.INT_TYPES.contains(rexNode.getType().getSqlTypeName());
}

public static String findOutputNamePrefix(final String basePrefix, final NavigableSet<String> strings)
public static String findUnusedPrefix(final String basePrefix, final NavigableSet<String> strings)
{
String prefix = basePrefix;

while (!isUsablePrefix(strings, prefix)) {
while (!isUnusedPrefix(prefix, strings)) {
prefix = "_" + prefix;
}

return prefix;
}

private static boolean isUsablePrefix(final NavigableSet<String> strings, final String prefix)
private static boolean isUnusedPrefix(final String prefix, final NavigableSet<String> strings)
{
// ":" is one character after "9"
final NavigableSet<String> subSet = strings.subSet(prefix + "0", true, prefix + ":", false);
return subSet.isEmpty();
}

public static String makePrefixedName(final String prefix, final String suffix)
{
return StringUtils.format("%s:%s", prefix, suffix);
}
}
18 changes: 13 additions & 5 deletions sql/src/main/java/io/druid/sql/calcite/rel/DruidOuterQueryRel.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,11 @@ public PartialDruidQuery getPartialDruidQuery()
@Override
public Sequence<Object[]> runQuery()
{
final DruidQuery query = toDruidQuery();
// runQuery doesn't need to finalize aggregations, because the fact that runQuery is happening suggests this
// is the outermost query and it will actually get run as a native query. Druid's native query layer will
// finalize aggregations for the outermost query even if we don't explicitly ask it to.

final DruidQuery query = toDruidQuery(false);
if (query != null) {
return getQueryMaker().runQuery(query);
} else {
Expand Down Expand Up @@ -116,9 +120,11 @@ public int getQueryCount()

@Nullable
@Override
public DruidQuery toDruidQuery()
public DruidQuery toDruidQuery(final boolean finalizeAggregations)
{
final DruidQuery subQuery = ((DruidRel) sourceRel).toDruidQuery();
// Must finalize aggregations on subqueries.

final DruidQuery subQuery = ((DruidRel) sourceRel).toDruidQuery(true);
if (subQuery == null) {
return null;
}
Expand All @@ -128,7 +134,8 @@ public DruidQuery toDruidQuery()
new QueryDataSource(subQuery.toGroupByQuery()),
sourceRowSignature,
getPlannerContext(),
getCluster().getRexBuilder()
getCluster().getRexBuilder(),
finalizeAggregations
);
}

Expand All @@ -142,7 +149,8 @@ public DruidQuery toDruidQueryForExplaining()
sourceRel.getRowType()
),
getPlannerContext(),
getCluster().getRexBuilder()
getCluster().getRexBuilder(),
false
);
}

Expand Down
Loading

0 comments on commit 98234e8

Please sign in to comment.