Skip to content

Commit

Permalink
[ML] Add lazy parsing for DatafeedConfig:Aggs,Query (#36117)
Browse files Browse the repository at this point in the history
* Lazily parsing aggs and query in DatafeedConfigs

* Adding parser tests

* Fixing exception types && unneccessary checked ex

* Adding semi aggregation parser

* Adding tests, fixing up semi-parser

* Reverting semi-parsing

* Moving agg validations

* Making bad configs throw badRequestException
  • Loading branch information
benwtrent committed Dec 4, 2018
1 parent 89c3eaa commit ff6c194
Show file tree
Hide file tree
Showing 20 changed files with 295 additions and 79 deletions.

Large diffs are not rendered by default.

Expand Up @@ -303,10 +303,11 @@ public DatafeedConfig apply(DatafeedConfig datafeedConfig, Map<String, String> h
builder.setTypes(types);
}
if (query != null) {
builder.setQuery(query);
builder.setParsedQuery(query);
}
if (aggregations != null) {
builder.setAggregations(aggregations);
DatafeedConfig.validateAggregations(aggregations);
builder.setParsedAggregations(aggregations);
}
if (scriptFields != null) {
builder.setScriptFields(scriptFields);
Expand Down Expand Up @@ -379,9 +380,9 @@ boolean isNoop(DatafeedConfig datafeed) {
&& (queryDelay == null || Objects.equals(queryDelay, datafeed.getQueryDelay()))
&& (indices == null || Objects.equals(indices, datafeed.getIndices()))
&& (types == null || Objects.equals(types, datafeed.getTypes()))
&& (query == null || Objects.equals(query, datafeed.getQuery()))
&& (query == null || Objects.equals(query, datafeed.getParsedQuery()))
&& (scrollSize == null || Objects.equals(scrollSize, datafeed.getQueryDelay()))
&& (aggregations == null || Objects.equals(aggregations, datafeed.getAggregations()))
&& (aggregations == null || Objects.equals(aggregations, datafeed.getParsedAggregations()))
&& (scriptFields == null || Objects.equals(scriptFields, datafeed.getScriptFields()))
&& (delayedDataCheckConfig == null || Objects.equals(delayedDataCheckConfig, datafeed.getDelayedDataCheckConfig()))
&& (chunkingConfig == null || Objects.equals(chunkingConfig, datafeed.getChunkingConfig()));
Expand Down
Expand Up @@ -26,6 +26,8 @@ public final class Messages {
"delayed_data_check_config: check_window [{0}] must be greater than the bucket_span [{1}]";
public static final String DATAFEED_CONFIG_DELAYED_DATA_CHECK_SPANS_TOO_MANY_BUCKETS =
"delayed_data_check_config: check_window [{0}] must be less than 10,000x the bucket_span [{1}]";
public static final String DATAFEED_CONFIG_QUERY_BAD_FORMAT = "Datafeed [{0}] query is not parsable: {1}";
public static final String DATAFEED_CONFIG_AGG_BAD_FORMAT = "Datafeed [{0}] aggregations are not parsable: {1}";

public static final String DATAFEED_DOES_NOT_SUPPORT_JOB_WITH_LATENCY = "A job configured with datafeed cannot support latency";
public static final String DATAFEED_NOT_FOUND = "No datafeed with id [{0}] exists";
Expand Down
Expand Up @@ -61,6 +61,9 @@ public static XContentObjectTransformer<QueryBuilder> queryBuilderTransformer()
}

public T fromMap(Map<String, Object> stringObjectMap) throws IOException {
if (stringObjectMap == null) {
return null;
}
LoggingDeprecationAccumulationHandler deprecationLogger = new LoggingDeprecationAccumulationHandler();
try(XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().map(stringObjectMap);
XContentParser parser = XContentType.JSON
Expand All @@ -74,6 +77,9 @@ public T fromMap(Map<String, Object> stringObjectMap) throws IOException {
}

public Map<String, Object> toMap(T object) throws IOException {
if (object == null) {
return null;
}
try(XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()) {
XContentBuilder content = object.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS);
return XContentHelper.convertToMap(BytesReference.bytes(content), true, XContentType.JSON).v2();
Expand Down
Expand Up @@ -67,7 +67,7 @@ public static DatafeedConfig createRandomizedDatafeedConfig(String jobId, long b
builder.setIndices(randomStringList(1, 10));
builder.setTypes(randomStringList(0, 10));
if (randomBoolean()) {
builder.setQuery(QueryBuilders.termQuery(randomAlphaOfLength(10), randomAlphaOfLength(10)));
builder.setParsedQuery(QueryBuilders.termQuery(randomAlphaOfLength(10), randomAlphaOfLength(10)));
}
boolean addScriptFields = randomBoolean();
if (addScriptFields) {
Expand All @@ -91,7 +91,7 @@ public static DatafeedConfig createRandomizedDatafeedConfig(String jobId, long b
MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time");
aggs.addAggregator(AggregationBuilders.dateHistogram("buckets")
.interval(aggHistogramInterval).subAggregation(maxTime).field("time"));
builder.setAggregations(aggs);
builder.setParsedAggregations(aggs);
}
if (randomBoolean()) {
builder.setScrollSize(randomIntBetween(0, Integer.MAX_VALUE));
Expand Down Expand Up @@ -155,6 +155,43 @@ protected DatafeedConfig doParseInstance(XContentParser parser) {
" \"scroll_size\": 1234\n" +
"}";

private static final String ANACHRONISTIC_QUERY_DATAFEED = "{\n" +
" \"datafeed_id\": \"farequote-datafeed\",\n" +
" \"job_id\": \"farequote\",\n" +
" \"frequency\": \"1h\",\n" +
" \"indices\": [\"farequote1\", \"farequote2\"],\n" +
//query:match:type stopped being supported in 6.x
" \"query\": {\"match\" : {\"query\":\"fieldName\", \"type\": \"phrase\"}},\n" +
" \"scroll_size\": 1234\n" +
"}";

private static final String ANACHRONISTIC_AGG_DATAFEED = "{\n" +
" \"datafeed_id\": \"farequote-datafeed\",\n" +
" \"job_id\": \"farequote\",\n" +
" \"frequency\": \"1h\",\n" +
" \"indices\": [\"farequote1\", \"farequote2\"],\n" +
" \"aggregations\": {\n" +
" \"buckets\": {\n" +
" \"date_histogram\": {\n" +
" \"field\": \"time\",\n" +
" \"interval\": \"360s\",\n" +
" \"time_zone\": \"UTC\"\n" +
" },\n" +
" \"aggregations\": {\n" +
" \"time\": {\n" +
" \"max\": {\"field\": \"time\"}\n" +
" },\n" +
" \"airline\": {\n" +
" \"terms\": {\n" +
" \"field\": \"airline\",\n" +
" \"size\": 0\n" + //size: 0 stopped being supported in 6.x
" }\n" +
" }\n" +
" }\n" +
" }\n" +
" }\n" +
"}";

public void testFutureConfigParse() throws IOException {
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, FUTURE_DATAFEED);
Expand All @@ -163,6 +200,44 @@ public void testFutureConfigParse() throws IOException {
assertEquals("[6:5] [datafeed_config] unknown field [tomorrows_technology_today], parser not found", e.getMessage());
}

public void testPastQueryConfigParse() throws IOException {
try(XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, ANACHRONISTIC_QUERY_DATAFEED)) {

DatafeedConfig config = DatafeedConfig.LENIENT_PARSER.apply(parser, null).build();
ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> config.getParsedQuery());
assertEquals("[match] query doesn't support multiple fields, found [query] and [type]", e.getMessage());
}

try(XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, ANACHRONISTIC_QUERY_DATAFEED)) {

XContentParseException e = expectThrows(XContentParseException.class,
() -> DatafeedConfig.STRICT_PARSER.apply(parser, null).build());
assertEquals("[6:25] [datafeed_config] failed to parse field [query]", e.getMessage());
}
}

public void testPastAggConfigParse() throws IOException {
try(XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, ANACHRONISTIC_AGG_DATAFEED)) {

DatafeedConfig.Builder configBuilder = DatafeedConfig.LENIENT_PARSER.apply(parser, null);
ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> configBuilder.build());
assertEquals(
"Datafeed [farequote-datafeed] aggregations are not parsable: [size] must be greater than 0. Found [0] in [airline]",
e.getMessage());
}

try(XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, ANACHRONISTIC_AGG_DATAFEED)) {

XContentParseException e = expectThrows(XContentParseException.class,
() -> DatafeedConfig.STRICT_PARSER.apply(parser, null).build());
assertEquals("[8:25] [datafeed_config] failed to parse field [aggregations]", e.getMessage());
}
}

public void testFutureMetadataParse() throws IOException {
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, FUTURE_DATAFEED);
Expand Down Expand Up @@ -274,7 +349,7 @@ public void testBuild_GivenScriptFieldsAndAggregations() {
datafeed.setTypes(Collections.singletonList("my_type"));
datafeed.setScriptFields(Collections.singletonList(new SearchSourceBuilder.ScriptField(randomAlphaOfLength(10),
mockScript(randomAlphaOfLength(10)), randomBoolean())));
datafeed.setAggregations(new AggregatorFactories.Builder().addAggregator(AggregationBuilders.avg("foo")));
datafeed.setParsedAggregations(new AggregatorFactories.Builder().addAggregator(AggregationBuilders.avg("foo")));

ElasticsearchException e = expectThrows(ElasticsearchException.class, datafeed::build);

Expand All @@ -295,7 +370,7 @@ public void testHasAggregations_NonEmpty() {
builder.setIndices(Collections.singletonList("myIndex"));
builder.setTypes(Collections.singletonList("myType"));
MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time");
builder.setAggregations(new AggregatorFactories.Builder().addAggregator(
builder.setParsedAggregations(new AggregatorFactories.Builder().addAggregator(
AggregationBuilders.dateHistogram("time").interval(300000).subAggregation(maxTime).field("time")));
DatafeedConfig datafeedConfig = builder.build();

Expand All @@ -306,7 +381,7 @@ public void testBuild_GivenEmptyAggregations() {
DatafeedConfig.Builder builder = new DatafeedConfig.Builder("datafeed1", "job1");
builder.setIndices(Collections.singletonList("myIndex"));
builder.setTypes(Collections.singletonList("myType"));
builder.setAggregations(new AggregatorFactories.Builder());
builder.setParsedAggregations(new AggregatorFactories.Builder());

ElasticsearchException e = expectThrows(ElasticsearchException.class, builder::build);

Expand All @@ -318,13 +393,13 @@ public void testBuild_GivenHistogramWithDefaultInterval() {
builder.setIndices(Collections.singletonList("myIndex"));
builder.setTypes(Collections.singletonList("myType"));
MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time");
builder.setAggregations(new AggregatorFactories.Builder().addAggregator(
builder.setParsedAggregations(new AggregatorFactories.Builder().addAggregator(
AggregationBuilders.histogram("time").subAggregation(maxTime).field("time"))
);

ElasticsearchException e = expectThrows(ElasticsearchException.class, builder::build);

assertThat(e.getMessage(), equalTo("Aggregation interval must be greater than 0"));
assertThat(e.getMessage(), containsString("[interval] must be >0 for histogram aggregation [time]"));
}

public void testBuild_GivenDateHistogramWithInvalidTimeZone() {
Expand All @@ -341,7 +416,7 @@ public void testBuild_GivenDateHistogramWithDefaultInterval() {
ElasticsearchException e = expectThrows(ElasticsearchException.class,
() -> createDatafeedWithDateHistogram((String) null));

assertThat(e.getMessage(), equalTo("Aggregation interval must be greater than 0"));
assertThat(e.getMessage(), containsString("Aggregation interval must be greater than 0"));
}

public void testBuild_GivenValidDateHistogram() {
Expand Down Expand Up @@ -402,9 +477,8 @@ public void testValidateAggregations_GivenMulitpleHistogramAggs() {
TermsAggregationBuilder toplevelTerms = AggregationBuilders.terms("top_level");
toplevelTerms.subAggregation(dateHistogram);

DatafeedConfig.Builder builder = new DatafeedConfig.Builder("foo", "bar");
builder.setAggregations(new AggregatorFactories.Builder().addAggregator(toplevelTerms));
ElasticsearchException e = expectThrows(ElasticsearchException.class, builder::validateAggregations);
ElasticsearchException e = expectThrows(ElasticsearchException.class,
() -> DatafeedConfig.validateAggregations(new AggregatorFactories.Builder().addAggregator(toplevelTerms)));

assertEquals("Aggregations can only have 1 date_histogram or histogram aggregation", e.getMessage());
}
Expand Down Expand Up @@ -520,7 +594,9 @@ private static DatafeedConfig createDatafeedWithDateHistogram(DateHistogramAggre
DatafeedConfig.Builder builder = new DatafeedConfig.Builder("datafeed1", "job1");
builder.setIndices(Collections.singletonList("myIndex"));
builder.setTypes(Collections.singletonList("myType"));
builder.setAggregations(new AggregatorFactories.Builder().addAggregator(dateHistogram));
AggregatorFactories.Builder aggs = new AggregatorFactories.Builder().addAggregator(dateHistogram);
DatafeedConfig.validateAggregations(aggs);
builder.setParsedAggregations(aggs);
return builder.build();
}

Expand Down Expand Up @@ -556,11 +632,11 @@ protected DatafeedConfig mutateInstance(DatafeedConfig instance) throws IOExcept
break;
case 6:
BoolQueryBuilder query = new BoolQueryBuilder();
if (instance.getQuery() != null) {
query.must(instance.getQuery());
if (instance.getParsedQuery() != null) {
query.must(instance.getParsedQuery());
}
query.filter(new TermQueryBuilder(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10)));
builder.setQuery(query);
builder.setParsedQuery(query);
break;
case 7:
if (instance.hasAggregations()) {
Expand All @@ -571,7 +647,7 @@ protected DatafeedConfig mutateInstance(DatafeedConfig instance) throws IOExcept
aggBuilder
.addAggregator(new DateHistogramAggregationBuilder(timeField).field(timeField).interval(between(10000, 3600000))
.subAggregation(new MaxAggregationBuilder(timeField).field(timeField)));
builder.setAggregations(aggBuilder);
builder.setParsedAggregations(aggBuilder);
if (instance.getScriptFields().isEmpty() == false) {
builder.setScriptFields(Collections.emptyList());
}
Expand Down
Expand Up @@ -167,7 +167,7 @@ public void testApply_givenFullUpdateNoAggregations() {
assertThat(updatedDatafeed.getTypes(), equalTo(Collections.singletonList("t_2")));
assertThat(updatedDatafeed.getQueryDelay(), equalTo(TimeValue.timeValueSeconds(42)));
assertThat(updatedDatafeed.getFrequency(), equalTo(TimeValue.timeValueSeconds(142)));
assertThat(updatedDatafeed.getQuery(), equalTo(QueryBuilders.termQuery("a", "b")));
assertThat(updatedDatafeed.getParsedQuery(), equalTo(QueryBuilders.termQuery("a", "b")));
assertThat(updatedDatafeed.hasAggregations(), is(false));
assertThat(updatedDatafeed.getScriptFields(),
equalTo(Collections.singletonList(new SearchSourceBuilder.ScriptField("a", mockScript("b"), false))));
Expand All @@ -192,7 +192,7 @@ public void testApply_givenAggregations() {

assertThat(updatedDatafeed.getIndices(), equalTo(Collections.singletonList("i_1")));
assertThat(updatedDatafeed.getTypes(), equalTo(Collections.singletonList("t_1")));
assertThat(updatedDatafeed.getAggregations(),
assertThat(updatedDatafeed.getParsedAggregations(),
equalTo(new AggregatorFactories.Builder().addAggregator(
AggregationBuilders.histogram("a").interval(300000).field("time").subAggregation(maxTime))));
}
Expand Down
Expand Up @@ -153,13 +153,13 @@ public void testMissingDataDetectionWithAggregationsAndQuery() throws Exception
DatafeedConfig.Builder datafeedConfigBuilder = createDatafeedBuilder(job.getId() + "-datafeed",
job.getId(),
Collections.singletonList(index));
datafeedConfigBuilder.setAggregations(new AggregatorFactories.Builder().addAggregator(
datafeedConfigBuilder.setParsedAggregations(new AggregatorFactories.Builder().addAggregator(
AggregationBuilders.histogram("time")
.subAggregation(maxTime)
.subAggregation(avgAggregationBuilder)
.field("time")
.interval(TimeValue.timeValueMinutes(5).millis())));
datafeedConfigBuilder.setQuery(new RangeQueryBuilder("value").gte(numDocs/2));
datafeedConfigBuilder.setParsedQuery(new RangeQueryBuilder("value").gte(numDocs/2));
datafeedConfigBuilder.setFrequency(TimeValue.timeValueMinutes(5));
datafeedConfigBuilder.setDelayedDataCheckConfig(DelayedDataCheckConfig.enabledDelayedDataCheckConfig(TimeValue.timeValueHours(12)));

Expand Down
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.rollup.action.GetRollupIndexCapsAction;
import org.elasticsearch.xpack.core.rollup.action.RollupSearchAction;
import org.elasticsearch.xpack.core.security.SecurityContext;
Expand Down Expand Up @@ -154,6 +155,7 @@ private void handlePrivsResponse(String username, PutDatafeedAction.Request requ
private void putDatafeed(PutDatafeedAction.Request request, Map<String, String> headers,
ActionListener<PutDatafeedAction.Response> listener) {

DatafeedConfig.validateAggregations(request.getDatafeed().getParsedAggregations());
clusterService.submitStateUpdateTask(
"put-datafeed-" + request.getDatafeed().getId(),
new AckedClusterStateUpdateTask<PutDatafeedAction.Response>(request, listener) {
Expand Down
Expand Up @@ -91,6 +91,7 @@ static void validate(String datafeedId, MlMetadata mlMetadata, PersistentTasksCu
throw ExceptionsHelper.missingJobException(datafeed.getJobId());
}
DatafeedJobValidator.validate(datafeed, job);
DatafeedConfig.validateAggregations(datafeed.getParsedAggregations());
JobState jobState = MlTasks.getJobState(datafeed.getJobId(), tasks);
if (jobState.isAnyOf(JobState.OPENING, JobState.OPENED) == false) {
throw ExceptionsHelper.conflictStatusException("cannot start datafeed [" + datafeedId + "] because job [" + job.getId() +
Expand Down
Expand Up @@ -44,7 +44,7 @@ public static DelayedDataDetector buildDetector(Job job, DatafeedConfig datafeed
window,
job.getId(),
job.getDataDescription().getTimeField(),
datafeedConfig.getQuery(),
datafeedConfig.getParsedQuery(),
datafeedConfig.getIndices().toArray(new String[0]),
client);
} else {
Expand Down

0 comments on commit ff6c194

Please sign in to comment.