From c10dfc8423c502c8825f47b79ec5375d32ae4ec0 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Mon, 13 Apr 2020 16:36:48 -0700 Subject: [PATCH 1/4] fixes for inline subqueries when multi-value dimension is present --- .../epinephelinae/GroupByQueryEngineV2.java | 2 +- .../druid/query/topn/TopNQueryEngine.java | 2 +- .../server/ClientQuerySegmentWalkerTest.java | 131 +++++++++++++++++- 3 files changed, 132 insertions(+), 3 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java index 823f5f250a82..d01ae11fd5a1 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java @@ -333,7 +333,7 @@ public static boolean isAllSingleValueDims( // Now check column capabilities. final ColumnCapabilities columnCapabilities = capabilitiesFunction.apply(dimension.getDimension()); - return columnCapabilities == null || !columnCapabilities.hasMultipleValues(); + return columnCapabilities != null && !columnCapabilities.hasMultipleValues(); }); } diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java index dfe180971baa..3b247f5c6bdc 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java @@ -132,7 +132,7 @@ private TopNMapFn getMapFn( topNAlgorithm = new TimeExtractionTopNAlgorithm(adapter, query); } else if (selector.isHasExtractionFn()) { topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query); - } else if (columnCapabilities != null && !(columnCapabilities.getType() == ValueType.STRING + } else if (columnCapabilities == null || !(columnCapabilities.getType() == ValueType.STRING && columnCapabilities.isDictionaryEncoded())) { // Use HeapBasedTopNAlgorithm for non-Strings and for non-dictionary-encoded Strings. topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query); diff --git a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java index c98c7768decd..f224b5f580e4 100644 --- a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java +++ b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java @@ -56,7 +56,11 @@ import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.query.groupby.GroupByQueryHelper; import org.apache.druid.query.groupby.strategy.GroupByStrategyV2; +import org.apache.druid.query.scan.ScanQuery; +import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; import org.apache.druid.query.timeseries.TimeseriesQuery; +import org.apache.druid.query.topn.TopNQuery; +import org.apache.druid.query.topn.TopNQueryBuilder; import org.apache.druid.segment.InlineSegmentWrangler; import org.apache.druid.segment.MapSegmentWrangler; import org.apache.druid.segment.ReferenceCountingSegment; @@ -107,6 +111,7 @@ public class ClientQuerySegmentWalkerTest private static final String FOO = "foo"; private static final String BAR = "bar"; + private static final String MULTI = "multi"; private static final Interval INTERVAL = Intervals.of("2000/P1Y"); private static final String VERSION = "A"; @@ -140,6 +145,20 @@ public class ClientQuerySegmentWalkerTest .build() ); + private static final InlineDataSource MULTI_VALUE_INLINE = InlineDataSource.fromIterable( + ImmutableList.builder() + .add(new Object[]{INTERVAL.getStartMillis(), ImmutableList.of("a", "b"), 1}) + .add(new Object[]{INTERVAL.getStartMillis(), ImmutableList.of("a", "c"), 2}) + .add(new Object[]{INTERVAL.getStartMillis(), ImmutableList.of("b"), 3}) + .add(new Object[]{INTERVAL.getStartMillis(), ImmutableList.of("c"), 4}) + .build(), + RowSignature.builder() + .addTimeColumn() + .add("s", ValueType.STRING) + .add("n", ValueType.LONG) + .build() + ); + @Rule public ExpectedException expectedException = ExpectedException.none(); @@ -399,6 +418,115 @@ public void testJoinOnGroupByOnTable() Assert.assertEquals(2, scheduler.getTotalReleased().get()); } + @Test + public void testGroupByOnScanMultiValue() + { + ScanQuery subquery = new Druids.ScanQueryBuilder().dataSource(MULTI) + .columns("s", "n") + .intervals( + new MultipleIntervalSegmentSpec( + ImmutableList.of(Intervals.ETERNITY) + ) + ) + .legacy(false) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .build(); + final GroupByQuery query = + GroupByQuery.builder() + .setDataSource(new QueryDataSource(subquery)) + .setGranularity(Granularities.ALL) + .setInterval(Intervals.ONLY_ETERNITY) + .setDimensions(DefaultDimensionSpec.of("s")) + .setAggregatorSpecs(new LongSumAggregatorFactory("sum_n", "n")) + .build(); + + testQuery( + query, + // GroupBy handles its own subqueries; only the inner one will go to the cluster. + ImmutableList.of( + ExpectedQuery.cluster(subquery), + ExpectedQuery.local( + query.withDataSource( + InlineDataSource.fromIterable( + ImmutableList.of( + new Object[]{ImmutableList.of("a", "b"), 1}, + new Object[]{ImmutableList.of("a", "c"), 2}, + new Object[]{ImmutableList.of("b"), 3}, + new Object[]{ImmutableList.of("c"), 4} + ), + RowSignature.builder().add("s", null).add("n", null).build() + ) + ) + ) + ), + ImmutableList.of( + new Object[]{"a", 3L}, + new Object[]{"b", 4L}, + new Object[]{"c", 6L} + ) + ); + + Assert.assertEquals(2, scheduler.getTotalRun().get()); + Assert.assertEquals(2, scheduler.getTotalPrioritizedAndLaned().get()); + Assert.assertEquals(2, scheduler.getTotalAcquired().get()); + Assert.assertEquals(2, scheduler.getTotalReleased().get()); + } + + @Test + public void testTopNScanMultiValue() + { + ScanQuery subquery = new Druids.ScanQueryBuilder().dataSource(MULTI) + .columns("s", "n") + .intervals( + new MultipleIntervalSegmentSpec( + ImmutableList.of(Intervals.ETERNITY) + ) + ) + .legacy(false) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .build(); + final TopNQuery query = + new TopNQueryBuilder().dataSource(new QueryDataSource(subquery)) + .granularity(Granularities.ALL) + .intervals(Intervals.ONLY_ETERNITY) + .dimension(DefaultDimensionSpec.of("s")) + .metric("sum_n") + .threshold(100) + .aggregators(new LongSumAggregatorFactory("sum_n", "n")) + .build(); + + testQuery( + query, + // GroupBy handles its own subqueries; only the inner one will go to the cluster. + ImmutableList.of( + ExpectedQuery.cluster(subquery), + ExpectedQuery.local( + query.withDataSource( + InlineDataSource.fromIterable( + ImmutableList.of( + new Object[]{ImmutableList.of("a", "b"), 1}, + new Object[]{ImmutableList.of("a", "c"), 2}, + new Object[]{ImmutableList.of("b"), 3}, + new Object[]{ImmutableList.of("c"), 4} + ), + RowSignature.builder().add("s", null).add("n", null).build() + ) + ) + ) + ), + ImmutableList.of( + new Object[]{Intervals.ETERNITY.getStartMillis(), "c", 6L}, + new Object[]{Intervals.ETERNITY.getStartMillis(), "b", 4L}, + new Object[]{Intervals.ETERNITY.getStartMillis(), "a", 3L} + ) + ); + + Assert.assertEquals(2, scheduler.getTotalRun().get()); + Assert.assertEquals(2, scheduler.getTotalPrioritizedAndLaned().get()); + Assert.assertEquals(2, scheduler.getTotalAcquired().get()); + Assert.assertEquals(2, scheduler.getTotalReleased().get()); + } + @Test public void testJoinOnTableErrorCantInlineTable() { @@ -522,7 +650,8 @@ public QueryRunner getQueryRunnerForSegments(Query query, Iterable Date: Mon, 13 Apr 2020 18:40:19 -0700 Subject: [PATCH 2/4] fix test --- .../druid/query/groupby/GroupByQueryRunnerTest.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java index 1b8519d68b49..f2c62e7a9859 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java @@ -4551,6 +4551,8 @@ public void testGroupByWithRegEx() @Test public void testGroupByWithNonexistentDimension() { + // column capabilities are null so is treated as potentially multi-value + cannotVectorize(); GroupByQuery.Builder builder = makeQueryBuilder() .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) .setInterval("2011-04-02/2011-04-04") @@ -8079,6 +8081,8 @@ public void testGroupByWithExtractionDimFilterWhenSearchValueNotInTheMap() @Test public void testGroupByWithExtractionDimFilterKeyisNull() { + // column capabilities are null so is treated as potentially multi-value + cannotVectorize(); Map extractionMap = new HashMap<>(); @@ -8345,6 +8349,8 @@ public void testGroupByWithExtractionDimFilterOptimazitionManyToOne() @Test public void testGroupByWithExtractionDimFilterNullDims() { + // column capabilities are null so is treated as potentially multi-value + cannotVectorize(); Map extractionMap = new HashMap<>(); extractionMap.put("", "EMPTY"); @@ -8473,6 +8479,8 @@ public void testBySegmentResultsWithAllFiltersWithExtractionFns() @Test public void testGroupByWithAllFiltersOnNullDimsWithExtractionFns() { + // column capabilities are null so is treated as potentially multi-value + cannotVectorize(); Map extractionMap = new HashMap<>(); extractionMap.put("", "EMPTY"); extractionMap.put(null, "EMPTY"); From ce05094e62d588db41b8f3d3dc8ff4b536c6524c Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Mon, 13 Apr 2020 21:27:54 -0700 Subject: [PATCH 3/4] allow missing capabilities for vectorized group by queries to be treated as single dims since it means that column doesnt exist --- .../epinephelinae/GroupByQueryEngineV2.java | 16 ++++++++++------ .../vector/VectorGroupByEngine.java | 2 +- .../query/groupby/GroupByQueryRunnerTest.java | 8 -------- 3 files changed, 11 insertions(+), 15 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java index d01ae11fd5a1..28399e0dbd35 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java @@ -227,7 +227,7 @@ public GroupByEngineIterator make() processingBuffer, fudgeTimestamp, dims, - isAllSingleValueDims(columnSelectorFactory::getColumnCapabilities, query.getDimensions()), + isAllSingleValueDims(columnSelectorFactory::getColumnCapabilities, query.getDimensions(), false), cardinalityForArrayAggregation ); } else { @@ -238,7 +238,7 @@ public GroupByEngineIterator make() processingBuffer, fudgeTimestamp, dims, - isAllSingleValueDims(columnSelectorFactory::getColumnCapabilities, query.getDimensions()) + isAllSingleValueDims(columnSelectorFactory::getColumnCapabilities, query.getDimensions(), false) ); } } @@ -313,12 +313,15 @@ public static int getCardinalityForArrayAggregation( } /** - * Checks whether all "dimensions" are either single-valued or nonexistent (which is just as good as single-valued, - * since their selectors will show up as full of nulls). + * Checks whether all "dimensions" are either single-valued, or if allowed, nonexistent. Since non-existent column + * selectors will show up as full of nulls they are effectively single valued, however they can also be null during + * broker merge, for example with an 'inline' datasource subquery. 'missingMeansNonexistent' is sort of a hack to let + * the vectorized engine, which only operates on actual segments, to still work in this case for non-existent columns. */ public static boolean isAllSingleValueDims( final Function capabilitiesFunction, - final List dimensions + final List dimensions, + final boolean missingMeansNonexistent ) { return dimensions @@ -333,7 +336,8 @@ public static boolean isAllSingleValueDims( // Now check column capabilities. final ColumnCapabilities columnCapabilities = capabilitiesFunction.apply(dimension.getDimension()); - return columnCapabilities != null && !columnCapabilities.hasMultipleValues(); + return (columnCapabilities != null && !columnCapabilities.hasMultipleValues()) || + (missingMeansNonexistent && columnCapabilities == null); }); } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java index f8a3a34f167f..3fa85040cea4 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java @@ -83,7 +83,7 @@ public static boolean canVectorize( // This situation should sort itself out pretty well once this engine supports multi-valued columns. Then we // won't have to worry about having this all-single-value-dims check here. - return GroupByQueryEngineV2.isAllSingleValueDims(adapter::getColumnCapabilities, query.getDimensions()) + return GroupByQueryEngineV2.isAllSingleValueDims(adapter::getColumnCapabilities, query.getDimensions(), true) && query.getDimensions().stream().allMatch(DimensionSpec::canVectorize) && query.getAggregatorSpecs().stream().allMatch(AggregatorFactory::canVectorize) && adapter.canVectorize(filter, query.getVirtualColumns(), false); diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java index f2c62e7a9859..1b8519d68b49 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java @@ -4551,8 +4551,6 @@ public void testGroupByWithRegEx() @Test public void testGroupByWithNonexistentDimension() { - // column capabilities are null so is treated as potentially multi-value - cannotVectorize(); GroupByQuery.Builder builder = makeQueryBuilder() .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) .setInterval("2011-04-02/2011-04-04") @@ -8081,8 +8079,6 @@ public void testGroupByWithExtractionDimFilterWhenSearchValueNotInTheMap() @Test public void testGroupByWithExtractionDimFilterKeyisNull() { - // column capabilities are null so is treated as potentially multi-value - cannotVectorize(); Map extractionMap = new HashMap<>(); @@ -8349,8 +8345,6 @@ public void testGroupByWithExtractionDimFilterOptimazitionManyToOne() @Test public void testGroupByWithExtractionDimFilterNullDims() { - // column capabilities are null so is treated as potentially multi-value - cannotVectorize(); Map extractionMap = new HashMap<>(); extractionMap.put("", "EMPTY"); @@ -8479,8 +8473,6 @@ public void testBySegmentResultsWithAllFiltersWithExtractionFns() @Test public void testGroupByWithAllFiltersOnNullDimsWithExtractionFns() { - // column capabilities are null so is treated as potentially multi-value - cannotVectorize(); Map extractionMap = new HashMap<>(); extractionMap.put("", "EMPTY"); extractionMap.put(null, "EMPTY"); From 238d32685b7c6777288c6c5ded6fcacf93374c0a Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Tue, 21 Apr 2020 13:53:14 -0700 Subject: [PATCH 4/4] add comment --- .../main/java/org/apache/druid/query/topn/TopNQueryEngine.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java index 3b247f5c6bdc..1d1bccdd7b1b 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java @@ -134,7 +134,8 @@ private TopNMapFn getMapFn( topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query); } else if (columnCapabilities == null || !(columnCapabilities.getType() == ValueType.STRING && columnCapabilities.isDictionaryEncoded())) { - // Use HeapBasedTopNAlgorithm for non-Strings and for non-dictionary-encoded Strings. + // Use HeapBasedTopNAlgorithm for non-Strings and for non-dictionary-encoded Strings, and for things we don't know + // which can happen for 'inline' data sources when this is run on the broker topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query); } else if (query.getDimensionSpec().getOutputType() != ValueType.STRING) { // Use HeapBasedTopNAlgorithm when the dimension output type is a non-String. (It's like an extractionFn: there can be