Skip to content

Commit

Permalink
Allow terms query in _rollup_search (#30973)
Browse files Browse the repository at this point in the history
This change adds the `terms` query to the list of accepted queries
for the _rollup_search endpoint.
  • Loading branch information
jimczi committed Jun 5, 2018
1 parent 14c4088 commit 7f850bb
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories;
Expand Down Expand Up @@ -66,6 +67,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -271,91 +273,38 @@ static QueryBuilder rewriteQuery(QueryBuilder builder, Set<RollupJobCaps> jobCap
rewriteQuery(((BoostingQueryBuilder)builder).positiveQuery(), jobCaps));
} else if (builder.getWriteableName().equals(DisMaxQueryBuilder.NAME)) {
DisMaxQueryBuilder rewritten = new DisMaxQueryBuilder();
((DisMaxQueryBuilder)builder).innerQueries().forEach(query -> rewritten.add(rewriteQuery(query, jobCaps)));
((DisMaxQueryBuilder) builder).innerQueries().forEach(query -> rewritten.add(rewriteQuery(query, jobCaps)));
return rewritten;
} else if (builder.getWriteableName().equals(RangeQueryBuilder.NAME) || builder.getWriteableName().equals(TermQueryBuilder.NAME)) {

String fieldName = builder.getWriteableName().equals(RangeQueryBuilder.NAME)
? ((RangeQueryBuilder)builder).fieldName()
: ((TermQueryBuilder)builder).fieldName();

List<String> incorrectTimeZones = new ArrayList<>();
List<String> rewrittenFieldName = jobCaps.stream()
// We only care about job caps that have the query's target field
.filter(caps -> caps.getFieldCaps().keySet().contains(fieldName))
.map(caps -> {
RollupJobCaps.RollupFieldCaps fieldCaps = caps.getFieldCaps().get(fieldName);
return fieldCaps.getAggs().stream()
// For now, we only allow filtering on grouping fields
.filter(agg -> {
String type = (String)agg.get(RollupField.AGG);

// If the cap is for a date_histo, and the query is a range, the timezones need to match
if (type.equals(DateHistogramAggregationBuilder.NAME) && builder instanceof RangeQueryBuilder) {
String timeZone = ((RangeQueryBuilder)builder).timeZone();

// Many range queries don't include the timezone because the default is UTC, but the query
// builder will return null so we need to set it here
if (timeZone == null) {
timeZone = DateTimeZone.UTC.toString();
}
boolean matchingTZ = ((String)agg.get(DateHistoGroupConfig.TIME_ZONE.getPreferredName()))
.equalsIgnoreCase(timeZone);
if (matchingTZ == false) {
incorrectTimeZones.add((String)agg.get(DateHistoGroupConfig.TIME_ZONE.getPreferredName()));
}
return matchingTZ;
}
// Otherwise just make sure it's one of the three groups
return type.equals(TermsAggregationBuilder.NAME)
|| type.equals(DateHistogramAggregationBuilder.NAME)
|| type.equals(HistogramAggregationBuilder.NAME);
})
// Rewrite the field name to our convention (e.g. "foo" -> "date_histogram.foo.timestamp")
.map(agg -> {
if (agg.get(RollupField.AGG).equals(DateHistogramAggregationBuilder.NAME)) {
return RollupField.formatFieldName(fieldName, (String)agg.get(RollupField.AGG), RollupField.TIMESTAMP);
} else {
return RollupField.formatFieldName(fieldName, (String)agg.get(RollupField.AGG), RollupField.VALUE);
}
})
.collect(Collectors.toList());
})
.distinct()
.collect(ArrayList::new, List::addAll, List::addAll);

if (rewrittenFieldName.isEmpty()) {
if (incorrectTimeZones.isEmpty()) {
throw new IllegalArgumentException("Field [" + fieldName + "] in [" + builder.getWriteableName()
+ "] query is not available in selected rollup indices, cannot query.");
} else {
throw new IllegalArgumentException("Field [" + fieldName + "] in [" + builder.getWriteableName()
+ "] query was found in rollup indices, but requested timezone is not compatible. Options include: "
+ incorrectTimeZones);
}
} else if (builder.getWriteableName().equals(RangeQueryBuilder.NAME)) {
RangeQueryBuilder range = (RangeQueryBuilder) builder;
String fieldName = range.fieldName();
// Many range queries don't include the timezone because the default is UTC, but the query
// builder will return null so we need to set it here
String timeZone = range.timeZone() == null ? DateTimeZone.UTC.toString() : range.timeZone();

String rewrittenFieldName = rewriteFieldName(jobCaps, RangeQueryBuilder.NAME, fieldName, timeZone);
RangeQueryBuilder rewritten = new RangeQueryBuilder(rewrittenFieldName)
.from(range.from())
.to(range.to())
.includeLower(range.includeLower())
.includeUpper(range.includeUpper());
if (range.timeZone() != null) {
rewritten.timeZone(range.timeZone());
}

if (rewrittenFieldName.size() > 1) {
throw new IllegalArgumentException("Ambiguous field name resolution when mapping to rolled fields. Field name [" +
fieldName + "] was mapped to: [" + Strings.collectionToDelimitedString(rewrittenFieldName, ",") + "].");
if (range.format() != null) {
rewritten.format(range.format());
}

//Note: instanceof here to make casting checks happier
if (builder instanceof RangeQueryBuilder) {
RangeQueryBuilder rewritten = new RangeQueryBuilder(rewrittenFieldName.get(0));
RangeQueryBuilder original = (RangeQueryBuilder)builder;
rewritten.from(original.from());
rewritten.to(original.to());
if (original.timeZone() != null) {
rewritten.timeZone(original.timeZone());
}
rewritten.includeLower(original.includeLower());
rewritten.includeUpper(original.includeUpper());
return rewritten;
} else {
return new TermQueryBuilder(rewrittenFieldName.get(0), ((TermQueryBuilder)builder).value());
}

return rewritten;
} else if (builder.getWriteableName().equals(TermQueryBuilder.NAME)) {
TermQueryBuilder term = (TermQueryBuilder) builder;
String fieldName = term.fieldName();
String rewrittenFieldName = rewriteFieldName(jobCaps, TermQueryBuilder.NAME, fieldName, null);
return new TermQueryBuilder(rewrittenFieldName, term.value());
} else if (builder.getWriteableName().equals(TermsQueryBuilder.NAME)) {
TermsQueryBuilder terms = (TermsQueryBuilder) builder;
String fieldName = terms.fieldName();
String rewrittenFieldName = rewriteFieldName(jobCaps, TermQueryBuilder.NAME, fieldName, null);
return new TermsQueryBuilder(rewrittenFieldName, terms.values());
} else if (builder.getWriteableName().equals(MatchAllQueryBuilder.NAME)) {
// no-op
return builder;
Expand All @@ -364,6 +313,64 @@ static QueryBuilder rewriteQuery(QueryBuilder builder, Set<RollupJobCaps> jobCap
}
}

private static String rewriteFieldName(Set<RollupJobCaps> jobCaps,
String builderName,
String fieldName,
String timeZone) {
List<String> incompatibleTimeZones = timeZone == null ? Collections.emptyList() : new ArrayList<>();
List<String> rewrittenFieldNames = jobCaps.stream()
// We only care about job caps that have the query's target field
.filter(caps -> caps.getFieldCaps().keySet().contains(fieldName))
.map(caps -> {
RollupJobCaps.RollupFieldCaps fieldCaps = caps.getFieldCaps().get(fieldName);
return fieldCaps.getAggs().stream()
// For now, we only allow filtering on grouping fields
.filter(agg -> {
String type = (String)agg.get(RollupField.AGG);

// If the cap is for a date_histo, and the query is a range, the timezones need to match
if (type.equals(DateHistogramAggregationBuilder.NAME) && timeZone != null) {
boolean matchingTZ = ((String)agg.get(DateHistoGroupConfig.TIME_ZONE.getPreferredName()))
.equalsIgnoreCase(timeZone);
if (matchingTZ == false) {
incompatibleTimeZones.add((String)agg.get(DateHistoGroupConfig.TIME_ZONE.getPreferredName()));
}
return matchingTZ;
}
// Otherwise just make sure it's one of the three groups
return type.equals(TermsAggregationBuilder.NAME)
|| type.equals(DateHistogramAggregationBuilder.NAME)
|| type.equals(HistogramAggregationBuilder.NAME);
})
// Rewrite the field name to our convention (e.g. "foo" -> "date_histogram.foo.timestamp")
.map(agg -> {
if (agg.get(RollupField.AGG).equals(DateHistogramAggregationBuilder.NAME)) {
return RollupField.formatFieldName(fieldName, (String)agg.get(RollupField.AGG), RollupField.TIMESTAMP);
} else {
return RollupField.formatFieldName(fieldName, (String)agg.get(RollupField.AGG), RollupField.VALUE);
}
})
.collect(Collectors.toList());
})
.distinct()
.collect(ArrayList::new, List::addAll, List::addAll);
if (rewrittenFieldNames.isEmpty()) {
if (incompatibleTimeZones.isEmpty()) {
throw new IllegalArgumentException("Field [" + fieldName + "] in [" + builderName
+ "] query is not available in selected rollup indices, cannot query.");
} else {
throw new IllegalArgumentException("Field [" + fieldName + "] in [" + builderName
+ "] query was found in rollup indices, but requested timezone is not compatible. Options include: "
+ incompatibleTimeZones);
}
} else if (rewrittenFieldNames.size() > 1) {
throw new IllegalArgumentException("Ambiguous field name resolution when mapping to rolled fields. Field name [" +
fieldName + "] was mapped to: [" + Strings.collectionToDelimitedString(rewrittenFieldNames, ",") + "].");
} else {
return rewrittenFieldNames.get(0);
}
}

static RollupSearchContext separateIndices(String[] indices, ImmutableOpenMap<String, IndexMetaData> indexMetaData) {

if (indices.length == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
import org.elasticsearch.index.query.DisMaxQueryBuilder;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.MatchPhraseQueryBuilder;
import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.indices.IndicesModule;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.script.ScriptService;
Expand Down Expand Up @@ -61,6 +63,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -153,7 +156,7 @@ public void testRangeWrongTZ() {
"compatible. Options include: [UTC]"));
}

public void testTerms() {
public void testTermQuery() {
RollupJobConfig.Builder job = ConfigTestHelpers.getRollupJob("foo");
GroupConfig.Builder group = ConfigTestHelpers.getGroupConfig();
group.setTerms(ConfigTestHelpers.getTerms().setFields(Collections.singletonList("foo")).build());
Expand All @@ -166,6 +169,23 @@ public void testTerms() {
assertThat(((TermQueryBuilder)rewritten).fieldName(), equalTo("foo.terms.value"));
}

public void testTermsQuery() {
RollupJobConfig.Builder job = ConfigTestHelpers.getRollupJob("foo");
GroupConfig.Builder group = ConfigTestHelpers.getGroupConfig();
group.setTerms(ConfigTestHelpers.getTerms().setFields(Collections.singletonList("foo")).build());
job.setGroupConfig(group.build());
RollupJobCaps cap = new RollupJobCaps(job.build());
Set<RollupJobCaps> caps = new HashSet<>();
caps.add(cap);
QueryBuilder original = new TermsQueryBuilder("foo", Arrays.asList("bar", "baz"));
QueryBuilder rewritten =
TransportRollupSearchAction.rewriteQuery(original, caps);
assertThat(rewritten, instanceOf(TermsQueryBuilder.class));
assertNotSame(rewritten, original);
assertThat(((TermsQueryBuilder)rewritten).fieldName(), equalTo("foo.terms.value"));
assertThat(((TermsQueryBuilder)rewritten).values(), equalTo(Arrays.asList("bar", "baz")));
}

public void testCompounds() {
RollupJobConfig.Builder job = ConfigTestHelpers.getRollupJob("foo");
GroupConfig.Builder group = ConfigTestHelpers.getGroupConfig();
Expand Down

0 comments on commit 7f850bb

Please sign in to comment.