From f4e0a76820be2b230d5c672a619c5173a258c11c Mon Sep 17 00:00:00 2001 From: dclim Date: Thu, 1 Oct 2015 15:15:12 -0600 Subject: [PATCH 1/2] Support multiple outer aggregators of same type and provide more helpful exception when the same inner aggregator is referenced by multiple types of outer aggregators --- .../groupby/GroupByQueryQueryToolChest.java | 21 ++++++- .../query/groupby/GroupByQueryRunnerTest.java | 55 +++++++++++++++++++ 2 files changed, 73 insertions(+), 3 deletions(-) diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index 8b6c0538f5d6..208cce4ac093 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -30,7 +30,9 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Ordering; +import com.google.common.collect.Sets; import com.google.inject.Inject; +import com.metamx.common.IAE; import com.metamx.common.ISE; import com.metamx.common.Pair; import com.metamx.common.guava.Accumulator; @@ -158,14 +160,27 @@ private Sequence mergeGroupByResults( } final Sequence subqueryResult = mergeGroupByResults(subquery, runner, context); - final List aggs = Lists.newArrayList(); + final Set aggs = Sets.newHashSet(); + for (AggregatorFactory aggregatorFactory : query.getAggregatorSpecs()) { - aggs.addAll(aggregatorFactory.getRequiredColumns()); + for (final AggregatorFactory transferAgg : aggregatorFactory.getRequiredColumns()) { + if (Iterables.any(aggs, new Predicate() { + @Override + public boolean apply(AggregatorFactory agg) { + return agg.getName().equals(transferAgg.getName()) && !agg.equals(transferAgg); + } + })) { + throw new IAE("Inner aggregator can currently only be referenced by a single type of outer aggregator" + + " for '%s'", transferAgg.getName()); + } + + aggs.add(transferAgg); + } } // We need the inner incremental index to have all the columns required by the outer query final GroupByQuery innerQuery = new GroupByQuery.Builder(subquery) - .setAggregatorSpecs(aggs) + .setAggregatorSpecs(Lists.newArrayList(aggs)) .setInterval(subquery.getIntervals()) .setPostAggregatorSpecs(Lists.newArrayList()) .build(); diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index bacd6a1a31d0..12ecb5420fe5 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -2436,6 +2436,61 @@ public void testDifferentGroupingSubquery() TestHelper.assertExpectedObjects(expectedResults, results, ""); } + @Test + public void testDifferentGroupingSubqueryMultipleAggregatorsOnSameField() + { + GroupByQuery subquery = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("quality", "alias"))) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory("idx", "index") + ) + ) + .setPostAggregatorSpecs( + Lists.newArrayList( + new ArithmeticPostAggregator( + "post_agg", + "+", + Lists.newArrayList( + new FieldAccessPostAggregator("idx", "idx"), + new FieldAccessPostAggregator("idx", "idx") + ) + ) + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .build(); + + GroupByQuery query = GroupByQuery + .builder() + .setDataSource(subquery) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setAggregatorSpecs( + Arrays.asList( + new DoubleMaxAggregatorFactory("idx1", "idx"), + new DoubleMaxAggregatorFactory("idx2", "idx"), + new DoubleMaxAggregatorFactory("idx3", "post_agg"), + new DoubleMaxAggregatorFactory("idx4", "post_agg") + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .build(); + + List expectedResults = Arrays.asList( + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "idx1", 2900.0, "idx2", 2900.0, + "idx3", 5800.0, "idx4", 5800.0), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "idx1", 2505.0, "idx2", 2505.0, + "idx3", 5010.0, "idx4", 5010.0) + ); + + Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + TestHelper.assertExpectedObjects(expectedResults, results, ""); + } + @Test public void testDifferentGroupingSubqueryWithFilter() From 46ecdfa7573f7bf9af6ee18d5473fe6d0fb201ca Mon Sep 17 00:00:00 2001 From: dclim Date: Thu, 15 Oct 2015 16:04:06 -0600 Subject: [PATCH 2/2] add comment explaining logic --- .../io/druid/query/groupby/GroupByQueryQueryToolChest.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index 208cce4ac093..31b7ec91f534 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -162,6 +162,13 @@ private Sequence mergeGroupByResults( final Sequence subqueryResult = mergeGroupByResults(subquery, runner, context); final Set aggs = Sets.newHashSet(); + // Nested group-bys work by first running the inner query and then materializing the results in an incremental + // index which the outer query is then run against. To build the incremental index, we use the fieldNames from + // the aggregators for the outer query to define the column names so that the index will match the query. If + // there are multiple types of aggregators in the outer query referencing the same fieldName, we will try to build + // multiple columns of the same name using different aggregator types and will fail. Here, we permit multiple + // aggregators of the same type referencing the same fieldName (and skip creating identical columns for the + // subsequent ones) and return an error if the aggregator types are different. for (AggregatorFactory aggregatorFactory : query.getAggregatorSpecs()) { for (final AggregatorFactory transferAgg : aggregatorFactory.getRequiredColumns()) { if (Iterables.any(aggs, new Predicate() {