Permalink
Browse files

SQL support for nested groupBys. (#3806)

* SQL support for nested groupBys.

Allows, for example, doing exact count distinct by writing:

  SELECT COUNT(*) FROM (SELECT DISTINCT col FROM druid.foo)

Contrast with approximate count distinct, which is:

  SELECT COUNT(DISTINCT col) FROM druid.foo

* Add deeply-nested groupBy docs, tests, and maxQueryCount config.

* Extract magic constants into statics.

* Rework rules to put preconditions in the "matches" method.
  • Loading branch information...
1 parent 7662061 commit e86859b228a809cb70001097ba315f345cc70edb @gianm gianm committed with jon-wei Jan 12, 2017
Showing with 2,118 additions and 1,002 deletions.
  1. +2 −2 benchmarks/src/main/java/io/druid/benchmark/query/SqlBenchmark.java
  2. +1 −0 docs/content/configuration/broker.md
  3. +6 −0 docs/content/querying/groupbyquery.md
  4. +38 −4 docs/content/querying/sql.md
  5. +43 −18 processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java
  6. +4 −2 sql/src/main/java/io/druid/sql/calcite/DruidSchema.java
  7. +19 −23 sql/src/main/java/io/druid/sql/calcite/expression/Expressions.java
  8. +9 −14 sql/src/main/java/io/druid/sql/calcite/expression/RowExtraction.java
  9. +7 −9 sql/src/main/java/io/druid/sql/calcite/filtration/ConvertBoundsToSelectors.java
  10. +7 −9 sql/src/main/java/io/druid/sql/calcite/filtration/ConvertSelectorsToIns.java
  11. +7 −7 sql/src/main/java/io/druid/sql/calcite/filtration/Filtration.java
  12. +13 −1 sql/src/main/java/io/druid/sql/calcite/planner/PlannerConfig.java
  13. +3 −5 sql/src/main/java/io/druid/sql/calcite/planner/Rules.java
  14. +174 −0 sql/src/main/java/io/druid/sql/calcite/rel/DruidNestedGroupBy.java
  15. +237 −60 sql/src/main/java/io/druid/sql/calcite/rel/DruidQueryBuilder.java
  16. +59 −7 sql/src/main/java/io/druid/sql/calcite/rel/DruidQueryRel.java
  17. +35 −3 sql/src/main/java/io/druid/sql/calcite/rel/DruidRel.java
  18. +102 −81 sql/src/main/java/io/druid/sql/calcite/rel/DruidSemiJoin.java
  19. +84 −133 sql/src/main/java/io/druid/sql/calcite/rel/QueryMaker.java
  20. +1 −2 sql/src/main/java/io/druid/sql/calcite/rule/DruidFilterRule.java
  21. +0 −125 sql/src/main/java/io/druid/sql/calcite/rule/DruidSelectProjectionRule.java
  22. +0 −74 sql/src/main/java/io/druid/sql/calcite/rule/DruidSelectSortRule.java
  23. +18 −5 sql/src/main/java/io/druid/sql/calcite/rule/DruidSemiJoinRule.java
  24. +258 −122 sql/src/main/java/io/druid/sql/calcite/rule/GroupByRules.java
  25. +186 −0 sql/src/main/java/io/druid/sql/calcite/rule/SelectRules.java
  26. +16 −85 sql/src/main/java/io/druid/sql/calcite/table/DruidTable.java
  27. +0 −76 sql/src/main/java/io/druid/sql/calcite/table/DruidTables.java
  28. +211 −0 sql/src/main/java/io/druid/sql/calcite/table/RowSignature.java
  29. +446 −71 sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java
  30. +25 −1 sql/src/test/java/io/druid/sql/calcite/util/CalciteTests.java
  31. +107 −63 sql/src/test/java/io/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java
@@ -45,6 +45,7 @@
import io.druid.segment.serde.ComplexMetrics;
import io.druid.sql.calcite.planner.Calcites;
import io.druid.sql.calcite.planner.PlannerConfig;
+import io.druid.sql.calcite.rel.QueryMaker;
import io.druid.sql.calcite.table.DruidTable;
import io.druid.sql.calcite.util.CalciteTests;
import io.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
@@ -133,9 +134,8 @@ public void setup() throws Exception
final Map<String, Table> tableMap = ImmutableMap.<String, Table>of(
"foo",
new DruidTable(
- walker,
+ new QueryMaker(walker, plannerConfig),
new TableDataSource("foo"),
- plannerConfig,
ImmutableMap.of(
"__time", ValueType.LONG,
"dimSequential", ValueType.STRING,
@@ -101,6 +101,7 @@ The broker's [SQL planner](../querying/sql.html) can be configured through the f
|Property|Description|Default|
|--------|-----------|-------|
+|`druid.sql.planner.maxQueryCount`|Maximum number of queries to issue, including nested queries. Set to 1 to disable sub-queries, or set to 0 for unlimited.|8|
|`druid.sql.planner.maxSemiJoinRowsInMemory`|Maximum number of rows to keep in memory for executing two-stage semi-join queries like `SELECT * FROM Employee WHERE DeptName IN (SELECT DeptName FROM Dept)`.|100000|
|`druid.sql.planner.maxTopNLimit`|Maximum threshold for a [TopN query](../querying/topnquery.html). Higher limits will be planned as [GroupBy queries](../querying/groupbyquery.html) instead.|100000|
|`druid.sql.planner.metadataRefreshPeriod`|Throttle for metadata refreshes.|PT1M|
@@ -156,6 +156,12 @@ indexing mechanism, and runs the outer query on these materialized results. "v2"
inner query's results stream with off-heap fact map and on-heap string dictionary that can spill to disk. Both
strategy perform the outer query on the broker in a single-threaded fashion.
+Note that groupBys require a separate merge buffer on the broker for each layer beyond the first layer of the groupBy.
+With the v2 groupBy strategy, this can potentially lead to deadlocks for groupBys nested beyond two layers, since the
+merge buffers are limited in number and are acquired one-by-one and not as a complete set. At this time we recommend
+that you avoid deeply-nested groupBys with the v2 strategy. Doubly-nested groupBys (groupBy -> groupBy -> table) are
+safe and do not suffer from this issue.
+
#### Server configuration
When using the "v1" strategy, the following runtime properties apply:
@@ -77,6 +77,20 @@ If `druid.sql.planner.useFallback` is enabled, full SQL is possible on metadata
recommended in production since it can generate unscalable query plans. The JDBC driver allows accessing
table and column metadata through `connection.getMetaData()` even if useFallback is off.
+### Approximate queries
+
+The following SQL queries and features may be executed using approximate algorithms:
+
+- `COUNT(DISTINCT col)` aggregations use [HyperLogLog](http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf), a
+fast approximate distinct counting algorithm. If you need exact distinct counts, you can instead use
+`SELECT COUNT(*) FROM (SELECT DISTINCT col FROM druid.foo)`, which will use a slower and more resource intensive exact
+algorithm.
+- TopN-style queries with a single grouping column, like
+`SELECT col1, SUM(col2) FROM druid.foo GROUP BY col1 ORDER BY SUM(col2) DESC LIMIT 100`, by default will be executed
+as [TopN queries](topnquery.html), which use an approximate algorithm. To disable this behavior, and use exact
+algorithms for topN-style queries, set
+[druid.sql.planner.useApproximateTopN](../configuration/broker.html#sql-planner-configuration) to "false".
+
### Time functions
Druid's SQL language supports a number of time operations, including:
@@ -85,12 +99,33 @@ Druid's SQL language supports a number of time operations, including:
- `EXTRACT(<granularity> FROM __time)` for grouping or filtering on time parts, like `SELECT EXTRACT(HOUR FROM __time), SUM(cnt) FROM druid.foo GROUP BY EXTRACT(HOUR FROM __time)`
- Comparisons to `TIMESTAMP '<time string>'` for time filters, like `SELECT COUNT(*) FROM druid.foo WHERE __time >= TIMESTAMP '2000-01-01 00:00:00' AND __time < TIMESTAMP '2001-01-01 00:00:00'`
-### Semi-joins
+### Subqueries
+
+Druid's SQL layer supports many types of subqueries, including the ones listed below.
+
+#### Nested groupBy
+
+Subqueries involving `FROM (SELECT ... GROUP BY ...)` may be executed as
+[nested groupBys](groupbyquery.html#nested-groupbys). For example, the following query can be used to perform an
+exact distinct count using a nested groupBy.
+
+```sql
+SELECT COUNT(*) FROM (SELECT DISTINCT col FROM druid.foo)
+```
+
+Note that groupBys require a separate merge buffer on the broker for each layer beyond the first layer of the groupBy.
+With the v2 groupBy strategy, this can potentially lead to deadlocks for groupBys nested beyond two layers, since the
+merge buffers are limited in number and are acquired one-by-one and not as a complete set. At this time we recommend
+that you avoid deeply-nested groupBys with the v2 strategy. Doubly-nested groupBys (groupBy -> groupBy -> table) are
+safe and do not suffer from this issue. If you like, you can forbid deeper nesting by setting
+`druid.sql.planner.maxQueryCount = 2`.
+
+#### Semi-joins
-Semi-joins involving `IN (SELECT ...)`, like the following, are planned with a special process.
+Semi-join subqueries involving `WHERE ... IN (SELECT ...)`, like the following, are executed with a special process.
```sql
-SELECT x, count(*)
+SELECT x, COUNT(*)
FROM druid.foo
WHERE x IN (SELECT x FROM druid.bar WHERE y = 'baz')
GROUP BY x
@@ -121,7 +156,6 @@ Additionally, some Druid features are not supported by the SQL language. Some un
- [Multi-value dimensions](multi-value-dimensions.html).
- [Query-time lookups](lookups.html).
-- [Nested groupBy queries](groupbyquery.html#nested-groupbys).
- Extensions, including [approximate histograms](../development/extensions-core/approximate-histograms.html) and
[DataSketches](../development/extensions-core/datasketches-aggregators.html).
@@ -138,6 +138,35 @@
@RunWith(Parameterized.class)
public class GroupByQueryRunnerTest
{
+ public static final ObjectMapper DEFAULT_MAPPER = new DefaultObjectMapper(new SmileFactory());
+ public static final DruidProcessingConfig DEFAULT_PROCESSING_CONFIG = new DruidProcessingConfig()
+ {
+ @Override
+ public String getFormatString()
+ {
+ return null;
+ }
+
+ @Override
+ public int intermediateComputeSizeBytes()
+ {
+ return 10 * 1024 * 1024;
+ }
+
+ @Override
+ public int getNumMergeBuffers()
+ {
+ // There are some tests that need to allocate two buffers (simulating two levels of merging)
+ return 2;
+ }
+
+ @Override
+ public int getNumThreads()
+ {
+ return 2;
+ }
+ };
+
private final QueryRunner<Row> runner;
private GroupByQueryRunnerFactory factory;
private GroupByQueryConfig config;
@@ -253,14 +282,23 @@ public static GroupByQueryRunnerFactory makeQueryRunnerFactory(
final GroupByQueryConfig config
)
{
- return makeQueryRunnerFactory(new DefaultObjectMapper(new SmileFactory()), config);
+ return makeQueryRunnerFactory(DEFAULT_MAPPER, config, DEFAULT_PROCESSING_CONFIG);
}
public static GroupByQueryRunnerFactory makeQueryRunnerFactory(
final ObjectMapper mapper,
final GroupByQueryConfig config
)
{
+ return makeQueryRunnerFactory(mapper, config, DEFAULT_PROCESSING_CONFIG);
+ }
+
+ public static GroupByQueryRunnerFactory makeQueryRunnerFactory(
+ final ObjectMapper mapper,
+ final GroupByQueryConfig config,
+ final DruidProcessingConfig processingConfig
+ )
+ {
final Supplier<GroupByQueryConfig> configSupplier = Suppliers.ofInstance(config);
final StupidPool<ByteBuffer> bufferPool = new StupidPool<>(
"GroupByQueryEngine-bufferPool",
@@ -269,7 +307,7 @@ public static GroupByQueryRunnerFactory makeQueryRunnerFactory(
@Override
public ByteBuffer get()
{
- return ByteBuffer.allocateDirect(10 * 1024 * 1024);
+ return ByteBuffer.allocateDirect(processingConfig.intermediateComputeSizeBytes());
}
}
);
@@ -279,10 +317,10 @@ public ByteBuffer get()
@Override
public ByteBuffer get()
{
- return ByteBuffer.allocateDirect(10 * 1024 * 1024);
+ return ByteBuffer.allocateDirect(processingConfig.intermediateComputeSizeBytes());
}
},
- 2 // There are some tests that need to allocate two buffers (simulating two levels of merging)
+ processingConfig.getNumMergeBuffers()
);
final GroupByStrategySelector strategySelector = new GroupByStrategySelector(
configSupplier,
@@ -293,20 +331,7 @@ public ByteBuffer get()
bufferPool
),
new GroupByStrategyV2(
- new DruidProcessingConfig()
- {
- @Override
- public String getFormatString()
- {
- return null;
- }
-
- @Override
- public int getNumThreads()
- {
- return 2;
- }
- },
+ processingConfig,
configSupplier,
bufferPool,
mergeBufferPool,
@@ -50,6 +50,7 @@
import io.druid.segment.column.ValueType;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.sql.calcite.planner.PlannerConfig;
+import io.druid.sql.calcite.rel.QueryMaker;
import io.druid.sql.calcite.table.DruidTable;
import io.druid.timeline.DataSegment;
import org.apache.calcite.linq4j.tree.Expression;
@@ -77,6 +78,7 @@
private final TimelineServerView serverView;
private final PlannerConfig config;
private final ExecutorService cacheExec;
+ private final QueryMaker queryMaker;
private final ConcurrentMap<String, Table> tables;
// For awaitInitialization.
@@ -102,6 +104,7 @@ public DruidSchema(
this.serverView = Preconditions.checkNotNull(serverView, "serverView");
this.config = Preconditions.checkNotNull(config, "config");
this.cacheExec = ScheduledExecutors.fixed(1, "DruidSchema-Cache-%d");
+ this.queryMaker = new QueryMaker(walker, config);
this.tables = Maps.newConcurrentMap();
}
@@ -349,9 +352,8 @@ private DruidTable computeTable(final String dataSource)
}
return new DruidTable(
- walker,
+ queryMaker,
new TableDataSource(dataSource),
- config,
columnValueTypes
);
}
@@ -47,7 +47,7 @@
import io.druid.segment.column.Column;
import io.druid.sql.calcite.aggregation.PostAggregatorFactory;
import io.druid.sql.calcite.filtration.Filtration;
-import io.druid.sql.calcite.table.DruidTable;
+import io.druid.sql.calcite.table.RowSignature;
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rex.RexCall;
@@ -112,23 +112,23 @@ private Expressions()
}
/**
- * Translate a field access, possibly through a projection, to an underlying Druid table.
+ * Translate a field access, possibly through a projection, to an underlying Druid dataSource.
*
- * @param druidTable underlying Druid table
- * @param project projection, or null
- * @param fieldNumber number of the field to access
+ * @param rowSignature row signature of underlying Druid dataSource
+ * @param project projection, or null
+ * @param fieldNumber number of the field to access
*
* @return row expression
*/
public static RexNode fromFieldAccess(
- final DruidTable druidTable,
+ final RowSignature rowSignature,
final Project project,
final int fieldNumber
)
{
if (project == null) {
// I don't think the factory impl matters here.
- return RexInputRef.of(fieldNumber, druidTable.getRowType(new JavaTypeFactoryImpl()));
+ return RexInputRef.of(fieldNumber, rowSignature.getRelDataType(new JavaTypeFactoryImpl()));
} else {
return project.getChildExps().get(fieldNumber);
}
@@ -332,13 +332,11 @@ public static String toMathExpression(
/**
* Translates "condition" to a Druid filter, or returns null if we cannot translate the condition.
*
- * @param druidTable Druid table, if the rows come from a table scan; null otherwise
- * @param rowOrder order of columns in the rows to be filtered
- * @param expression Calcite row expression
+ * @param rowSignature row signature of the dataSource to be filtered
+ * @param expression Calcite row expression
*/
public static DimFilter toFilter(
- final DruidTable druidTable,
- final List<String> rowOrder,
+ final RowSignature rowSignature,
final RexNode expression
)
{
@@ -347,7 +345,7 @@ public static DimFilter toFilter(
|| expression.getKind() == SqlKind.NOT) {
final List<DimFilter> filters = Lists.newArrayList();
for (final RexNode rexNode : ((RexCall) expression).getOperands()) {
- final DimFilter nextFilter = toFilter(druidTable, rowOrder, rexNode);
+ final DimFilter nextFilter = toFilter(rowSignature, rexNode);
if (nextFilter == null) {
return null;
}
@@ -364,21 +362,19 @@ public static DimFilter toFilter(
}
} else {
// Handle filter conditions on everything else.
- return toLeafFilter(druidTable, rowOrder, expression);
+ return toLeafFilter(rowSignature, expression);
}
}
/**
* Translates "condition" to a Druid filter, assuming it does not contain any boolean expressions. Returns null
* if we cannot translate the condition.
*
- * @param druidTable Druid table, if the rows come from a table scan; null otherwise
- * @param rowOrder order of columns in the rows to be filtered
- * @param expression Calcite row expression
+ * @param rowSignature row signature of the dataSource to be filtered
+ * @param expression Calcite row expression
*/
private static DimFilter toLeafFilter(
- final DruidTable druidTable,
- final List<String> rowOrder,
+ final RowSignature rowSignature,
final RexNode expression
)
{
@@ -392,8 +388,8 @@ private static DimFilter toLeafFilter(
if (kind == SqlKind.LIKE) {
final List<RexNode> operands = ((RexCall) expression).getOperands();
- final RowExtraction rex = EXPRESSION_CONVERTER.convert(rowOrder, operands.get(0));
- if (rex == null || !rex.isFilterable(druidTable)) {
+ final RowExtraction rex = EXPRESSION_CONVERTER.convert(rowSignature.getRowOrder(), operands.get(0));
+ if (rex == null || !rex.isFilterable(rowSignature)) {
return null;
}
return new LikeDimFilter(
@@ -428,8 +424,8 @@ private static DimFilter toLeafFilter(
}
// lhs must be translatable to a RowExtraction to be filterable
- final RowExtraction rex = EXPRESSION_CONVERTER.convert(rowOrder, lhs);
- if (rex == null || !rex.isFilterable(druidTable)) {
+ final RowExtraction rex = EXPRESSION_CONVERTER.convert(rowSignature.getRowOrder(), lhs);
+ if (rex == null || !rex.isFilterable(rowSignature)) {
return null;
}
Oops, something went wrong.

0 comments on commit e86859b

Please sign in to comment.