Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize filter for timeseries, search, and select queries #2931

Merged
merged 4 commits into from
May 9, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public interface QueryRunnerFactory<T, QueryType extends Query<T>>
*
* @param queryExecutor ExecutorService to be used for parallel processing
* @param queryRunners Individual QueryRunner objects that produce some results
* @return a QueryRunner that, when asked, will use the ExecutorService to runt he base QueryRunners
* @return a QueryRunner that, when asked, will use the ExecutorService to run the base QueryRunners
*/
public QueryRunner<T> mergeRunners(ExecutorService queryExecutor, Iterable<QueryRunner<T>> queryRunners);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,9 +424,6 @@ public QueryRunner<Row> preMergeQueryDecoration(final QueryRunner<Row> runner)
@Override
public Sequence<Row> run(Query<Row> query, Map<String, Object> responseContext)
{
if (!(query instanceof GroupByQuery)) {
return runner.run(query, responseContext);
}
GroupByQuery groupByQuery = (GroupByQuery) query;
if (groupByQuery.getDimFilter() != null){
groupByQuery = groupByQuery.withDimFilter(groupByQuery.getDimFilter().optimize());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,24 @@ public SearchHit apply(@Nullable Object input)
}

@Override
public QueryRunner<Result<SearchResultValue>> preMergeQueryDecoration(QueryRunner<Result<SearchResultValue>> runner)
public QueryRunner<Result<SearchResultValue>> preMergeQueryDecoration(final QueryRunner<Result<SearchResultValue>> runner)
{
return new SearchThresholdAdjustingQueryRunner(
intervalChunkingQueryRunnerDecorator.decorate(runner, this),
intervalChunkingQueryRunnerDecorator.decorate(
new QueryRunner<Result<SearchResultValue>>()
{
@Override
public Sequence<Result<SearchResultValue>> run(
Query<Result<SearchResultValue>> query, Map<String, Object> responseContext
)
{
SearchQuery searchQuery = (SearchQuery) query;
if (searchQuery.getDimensionsFilter() != null) {
searchQuery = searchQuery.withDimFilter(searchQuery.getDimensionsFilter().optimize());
}
return runner.run(searchQuery, responseContext);
}
} , this),
config
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,21 @@ public SearchQuery withOverriddenContext(Map<String, Object> contextOverrides)
);
}

public SearchQuery withDimFilter(DimFilter dimFilter)
{
return new SearchQuery(
getDataSource(),
dimFilter,
granularity,
limit,
getQuerySegmentSpec(),
dimensions,
querySpec,
sortSpec,
getContext()
);
}

@JsonProperty("filter")
public DimFilter getDimensionsFilter()
{
Expand Down
15 changes: 15 additions & 0 deletions processing/src/main/java/io/druid/query/select/SelectQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,21 @@ public SelectQuery withPagingSpec(PagingSpec pagingSpec)
);
}

public SelectQuery withDimFilter(DimFilter dimFilter)
{
return new SelectQuery(
getDataSource(),
getQuerySegmentSpec(),
isDescending(),
dimFilter,
granularity,
dimensions,
metrics,
pagingSpec,
getContext()
);
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.StringUtils;
import com.metamx.common.guava.Comparators;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.nary.BinaryFn;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.granularity.QueryGranularity;
Expand Down Expand Up @@ -261,9 +263,23 @@ public Result<SelectResultValue> apply(Object input)
}

@Override
public QueryRunner<Result<SelectResultValue>> preMergeQueryDecoration(QueryRunner<Result<SelectResultValue>> runner)
public QueryRunner<Result<SelectResultValue>> preMergeQueryDecoration(final QueryRunner<Result<SelectResultValue>> runner)
{
return intervalChunkingQueryRunnerDecorator.decorate(runner, this);
return intervalChunkingQueryRunnerDecorator.decorate(
new QueryRunner<Result<SelectResultValue>>()
{
@Override
public Sequence<Result<SelectResultValue>> run(
Query<Result<SelectResultValue>> query, Map<String, Object> responseContext
)
{
SelectQuery selectQuery = (SelectQuery) query;
if (selectQuery.getDimensionsFilter() != null) {
selectQuery = selectQuery.withDimFilter(selectQuery.getDimensionsFilter().optimize());
}
return runner.run(selectQuery, responseContext);
}
}, this);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,20 @@ public TimeseriesQuery withOverriddenContext(Map<String, Object> contextOverride
);
}

public TimeseriesQuery withDimFilter(DimFilter dimFilter)
{
return new TimeseriesQuery(
getDataSource(),
getQuerySegmentSpec(),
isDescending(),
dimFilter,
granularity,
aggregatorSpecs,
postAggregatorSpecs,
getContext()
);
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.nary.BinaryFn;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.granularity.QueryGranularity;
Expand Down Expand Up @@ -210,9 +212,23 @@ public Result<TimeseriesResultValue> apply(@Nullable Object input)
}

@Override
public QueryRunner<Result<TimeseriesResultValue>> preMergeQueryDecoration(QueryRunner<Result<TimeseriesResultValue>> runner)
public QueryRunner<Result<TimeseriesResultValue>> preMergeQueryDecoration(final QueryRunner<Result<TimeseriesResultValue>> runner)
{
return intervalChunkingQueryRunnerDecorator.decorate(runner, this);
return intervalChunkingQueryRunnerDecorator.decorate(
new QueryRunner<Result<TimeseriesResultValue>>()
{
@Override
public Sequence<Result<TimeseriesResultValue>> run(
Query<Result<TimeseriesResultValue>> query, Map<String, Object> responseContext
)
{
TimeseriesQuery timeseriesQuery = (TimeseriesQuery) query;
if (timeseriesQuery.getDimensionsFilter() != null) {
timeseriesQuery = timeseriesQuery.withDimFilter(timeseriesQuery.getDimensionsFilter().optimize());
}
return runner.run(timeseriesQuery, responseContext);
}
}, this);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,27 +418,23 @@ public Sequence<Result<TopNResultValue>> run(
Query<Result<TopNResultValue>> query, Map<String, Object> responseContext
)
{
if (!(query instanceof TopNQuery)) {
return runner.run(query, responseContext);
TopNQuery topNQuery = (TopNQuery) query;
if (topNQuery.getDimensionsFilter() != null) {
topNQuery = topNQuery.withDimFilter(topNQuery.getDimensionsFilter().optimize());
}
final TopNQuery delegateTopNQuery = topNQuery;
if (TopNQueryEngine.canApplyExtractionInPost(delegateTopNQuery)) {
final DimensionSpec dimensionSpec = delegateTopNQuery.getDimensionSpec();
return runner.run(
delegateTopNQuery.withDimensionSpec(
new DefaultDimensionSpec(
dimensionSpec.getDimension(),
dimensionSpec.getOutputName()
)
), responseContext
);
} else {
TopNQuery topNQuery = (TopNQuery) query;
if (topNQuery.getDimensionsFilter() != null) {
topNQuery = topNQuery.withDimFilter(topNQuery.getDimensionsFilter().optimize());
}
final TopNQuery delegateTopNQuery = topNQuery;
if (TopNQueryEngine.canApplyExtractionInPost(delegateTopNQuery)) {
final DimensionSpec dimensionSpec = delegateTopNQuery.getDimensionSpec();
return runner.run(
delegateTopNQuery.withDimensionSpec(
new DefaultDimensionSpec(
dimensionSpec.getDimension(),
dimensionSpec.getOutputName()
)
), responseContext
);
} else {
return runner.run(delegateTopNQuery, responseContext);
}
return runner.run(delegateTopNQuery, responseContext);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,15 @@ public static Iterable<Object[]> constructorFeeder() throws IOException
}

private final QueryRunner runner;
private final QueryRunner decoratedRunner;

public SearchQueryRunnerTest(
QueryRunner runner
)
{
this.runner = runner;
this.decoratedRunner = toolChest.postMergeQueryDecoration(
toolChest.mergeResults(toolChest.preMergeQueryDecoration(runner)));
}

@Test
Expand Down Expand Up @@ -342,34 +345,33 @@ public void testSearchWithExtractionFilter1()
true,
null,
true,
false
true
);

checkSearchQuery(
Druids.newSearchQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.granularity(QueryRunnerTestHelper.allGran)
.filters(
new ExtractionDimFilter(
QueryRunnerTestHelper.qualityDimension,
automotiveSnowman,
lookupExtractionFn,
null
)
)
.intervals(QueryRunnerTestHelper.fullOnInterval)
.dimensions(
new ExtractionDimensionSpec(
QueryRunnerTestHelper.qualityDimension,
null,
lookupExtractionFn,
null
)
)
.query("☃")
.build(),
expectedHits
);
SearchQuery query = Druids.newSearchQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.granularity(QueryRunnerTestHelper.allGran)
.filters(
new ExtractionDimFilter(
QueryRunnerTestHelper.qualityDimension,
automotiveSnowman,
lookupExtractionFn,
null
)
)
.intervals(QueryRunnerTestHelper.fullOnInterval)
.dimensions(
new ExtractionDimensionSpec(
QueryRunnerTestHelper.qualityDimension,
null,
lookupExtractionFn,
null
)
)
.query("☃")
.build();

checkSearchQuery(query, expectedHits);
}

@Test
Expand Down Expand Up @@ -551,6 +553,7 @@ public void testSearchNonExistingDimension()
private void checkSearchQuery(Query searchQuery, List<SearchHit> expectedResults)
{
checkSearchQuery(searchQuery, runner, expectedResults);
checkSearchQuery(searchQuery, decoratedRunner, expectedResults);
}

private void checkSearchQuery(Query searchQuery, QueryRunner runner, List<SearchHit> expectedResults)
Expand All @@ -559,31 +562,31 @@ private void checkSearchQuery(Query searchQuery, QueryRunner runner, List<Search
runner.run(searchQuery, ImmutableMap.of()),
Lists.<Result<SearchResultValue>>newArrayList()
);
List<SearchHit> copy = ImmutableList.copyOf(expectedResults);
List<SearchHit> copy = Lists.newLinkedList(expectedResults);
for (Result<SearchResultValue> result : results) {
Assert.assertEquals(new DateTime("2011-01-12T00:00:00.000Z"), result.getTimestamp());
Assert.assertTrue(result.getValue() instanceof Iterable);

Iterable<SearchHit> resultValues = result.getValue();
for (SearchHit resultValue : resultValues) {
int index = expectedResults.indexOf(resultValue);
int index = copy.indexOf(resultValue);
if (index < 0) {
fail(
copy, results,
expectedResults, results,
"No result found containing " + resultValue.getDimension() + " and " + resultValue.getValue()
);
}
SearchHit expected = expectedResults.remove(index);
SearchHit expected = copy.remove(index);
if (!resultValue.toString().equals(expected.toString())) {
fail(
copy, results,
expectedResults, results,
"Invalid count for " + resultValue + ".. which was expected to be " + expected.getCount()
);
}
}
}
if (!expectedResults.isEmpty()) {
fail(copy, results, "Some expected results are not shown: " + expectedResults);
if (!copy.isEmpty()) {
fail(expectedResults, results, "Some expected results are not shown: " + copy);
}
}

Expand Down
Loading