diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index 8b6c0538f5d6..6417bd3d891a 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -62,9 +62,11 @@ import io.druid.query.dimension.DimensionSpec; import io.druid.query.extraction.ExtractionFn; import io.druid.query.filter.DimFilter; +import io.druid.query.spec.MultipleIntervalSegmentSpec; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexStorageAdapter; import org.joda.time.DateTime; +import org.joda.time.Interval; import javax.annotation.Nullable; import java.nio.ByteBuffer; @@ -173,14 +175,30 @@ private Sequence mergeGroupByResults( final GroupByQuery outerQuery = new GroupByQuery.Builder(query) .setLimitSpec(query.getLimitSpec().merge(subquery.getLimitSpec())) .build(); - IncrementalIndex index = makeIncrementalIndex(innerQuery, subqueryResult); + final IncrementalIndex index = makeIncrementalIndex(innerQuery, subqueryResult); + //Outer query might have multiple intervals, but they are expected to be non-overlapping and sorted which + //is ensured by QuerySegmentSpec. + //GroupByQueryEngine can only process one interval at a time, so we need to call it once per interval + //and concatenate the results. return new ResourceClosingSequence<>( outerQuery.applyLimit( - engine.process( - outerQuery, - new IncrementalIndexStorageAdapter( - index + Sequences.concat( + Sequences.map( + Sequences.simple(outerQuery.getIntervals()), + new Function>() + { + @Override + public Sequence apply(Interval interval) + { + return engine.process( + outerQuery.withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(ImmutableList.of(interval)) + ), + new IncrementalIndexStorageAdapter(index) + ); + } + } ) ) ), diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index bacd6a1a31d0..d52e35e2f6b1 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -2398,6 +2398,72 @@ public void testIdenticalSubquery() TestHelper.assertExpectedObjects(expectedResults, results, ""); } + @Test + public void testSubqueryWithMultipleIntervalsInOuterQuery() + { + GroupByQuery subquery = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("quality", "alias"))) + .setDimFilter(new JavaScriptDimFilter("quality", "function(dim){ return true; }")) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory("idx", "index") + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .build(); + + GroupByQuery query = GroupByQuery + .builder() + .setDataSource(subquery) + .setQuerySegmentSpec( + new MultipleIntervalSegmentSpec( + ImmutableList.of( + new Interval("2011-04-01T00:00:00.000Z/2011-04-01T23:58:00.000Z"), + new Interval("2011-04-02T00:00:00.000Z/2011-04-03T00:00:00.000Z") + ) + ) + ) + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("alias", "alias"))) + .setAggregatorSpecs( + Arrays.asList( + new LongSumAggregatorFactory("rows", "rows"), + new LongSumAggregatorFactory("idx", "idx") + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .build(); + + List expectedResults = Arrays.asList( + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "automotive", "rows", 1L, "idx", 135L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "business", "rows", 1L, "idx", 118L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 1L, "idx", 158L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "health", "rows", 1L, "idx", 120L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "mezzanine", "rows", 3L, "idx", 2870L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "news", "rows", 1L, "idx", 121L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "premium", "rows", 3L, "idx", 2900L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "technology", "rows", 1L, "idx", 78L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "travel", "rows", 1L, "idx", 119L), + + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "automotive", "rows", 1L, "idx", 147L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "business", "rows", 1L, "idx", 112L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "entertainment", "rows", 1L, "idx", 166L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "health", "rows", 1L, "idx", 113L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "mezzanine", "rows", 3L, "idx", 2447L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "news", "rows", 1L, "idx", 114L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "premium", "rows", 3L, "idx", 2505L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "technology", "rows", 1L, "idx", 97L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "travel", "rows", 1L, "idx", 126L) + ); + + // Subqueries are handled by the ToolChest + Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + TestHelper.assertExpectedObjects(expectedResults, results, ""); + } + @Test public void testDifferentGroupingSubquery() {