Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multiple outer aggregators of same type and provide more help… #1799

Merged
merged 2 commits into from
Oct 30, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.common.guava.Accumulator;
Expand Down Expand Up @@ -158,14 +160,34 @@ private Sequence<Row> mergeGroupByResults(
}

final Sequence<Row> subqueryResult = mergeGroupByResults(subquery, runner, context);
final List<AggregatorFactory> aggs = Lists.newArrayList();
final Set<AggregatorFactory> aggs = Sets.newHashSet();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't it solve the problem if we did..

for (AggregatorFactory aggregatorFactory : subquery.getAggregatorSpeces()) {
  aggs.add(aggregatorFactory.getCombiningFactory());
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, probably set innerQuery.setPostAggregatorSpecs(subquery.getPostAggregatorSpecs())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@himanshug Using the combining factories from the subquery works for the aggregators, but setting innerQuery.setPostAggregatorSpecs(subquery.getPostAggregatorSpecs()) doesn't do anything since that field isn't read when we create the intermediate incremental index from the subquery results. The result is that any aggregator in the outer query that is operating on a subquery post aggregator doesn't work correctly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense


// Nested group-bys work by first running the inner query and then materializing the results in an incremental
// index which the outer query is then run against. To build the incremental index, we use the fieldNames from
// the aggregators for the outer query to define the column names so that the index will match the query. If
// there are multiple types of aggregators in the outer query referencing the same fieldName, we will try to build
// multiple columns of the same name using different aggregator types and will fail. Here, we permit multiple
// aggregators of the same type referencing the same fieldName (and skip creating identical columns for the
// subsequent ones) and return an error if the aggregator types are different.
for (AggregatorFactory aggregatorFactory : query.getAggregatorSpecs()) {
aggs.addAll(aggregatorFactory.getRequiredColumns());
for (final AggregatorFactory transferAgg : aggregatorFactory.getRequiredColumns()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add some comments to explain the logic here? Outside of the context of this PR it might be hard to understand what this is doing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xvrl I added a comment, hopefully it clears things up. Thanks for the feedback X.

if (Iterables.any(aggs, new Predicate<AggregatorFactory>() {
@Override
public boolean apply(AggregatorFactory agg) {
return agg.getName().equals(transferAgg.getName()) && !agg.equals(transferAgg);
}
})) {
throw new IAE("Inner aggregator can currently only be referenced by a single type of outer aggregator" +
" for '%s'", transferAgg.getName());
}

aggs.add(transferAgg);
}
}

// We need the inner incremental index to have all the columns required by the outer query
final GroupByQuery innerQuery = new GroupByQuery.Builder(subquery)
.setAggregatorSpecs(aggs)
.setAggregatorSpecs(Lists.newArrayList(aggs))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a comment explaining why you need to copy the list?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a conversion of a set to a list to satisfy the method signature, not so much about copying the list.

.setInterval(subquery.getIntervals())
.setPostAggregatorSpecs(Lists.<PostAggregator>newArrayList())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2436,6 +2436,61 @@ public void testDifferentGroupingSubquery()
TestHelper.assertExpectedObjects(expectedResults, results, "");
}

@Test
public void testDifferentGroupingSubqueryMultipleAggregatorsOnSameField()
{
GroupByQuery subquery = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
.setAggregatorSpecs(
Arrays.asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory("idx", "index")
)
)
.setPostAggregatorSpecs(
Lists.<PostAggregator>newArrayList(
new ArithmeticPostAggregator(
"post_agg",
"+",
Lists.<PostAggregator>newArrayList(
new FieldAccessPostAggregator("idx", "idx"),
new FieldAccessPostAggregator("idx", "idx")
)
)
)
)
.setGranularity(QueryRunnerTestHelper.dayGran)
.build();

GroupByQuery query = GroupByQuery
.builder()
.setDataSource(subquery)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setAggregatorSpecs(
Arrays.<AggregatorFactory>asList(
new DoubleMaxAggregatorFactory("idx1", "idx"),
new DoubleMaxAggregatorFactory("idx2", "idx"),
new DoubleMaxAggregatorFactory("idx3", "post_agg"),
new DoubleMaxAggregatorFactory("idx4", "post_agg")
)
)
.setGranularity(QueryRunnerTestHelper.dayGran)
.build();

List<Row> expectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "idx1", 2900.0, "idx2", 2900.0,
"idx3", 5800.0, "idx4", 5800.0),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "idx1", 2505.0, "idx2", 2505.0,
"idx3", 5010.0, "idx4", 5010.0)
);

Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "");
}


@Test
public void testDifferentGroupingSubqueryWithFilter()
Expand Down