Skip to content

Commit

Permalink
[ML] Addressing bug streaming DatafeedConfig aggs from (<= 6.5.4) -> …
Browse files Browse the repository at this point in the history
…6.7.0 (elastic#40610)

* Addressing stream failure and adding tests to catch such in the future

* Add aggs to full cluster restart tests

* Test BWC for datafeeds with and without aggs

The wire serialisation is different for null/non-null
aggs, so it's worth testing both cases.
  • Loading branch information
benwtrent committed Mar 29, 2019
1 parent cbe7d33 commit acb2f5c
Show file tree
Hide file tree
Showing 7 changed files with 251 additions and 27 deletions.
Expand Up @@ -73,7 +73,8 @@ static AggProvider fromStream(StreamInput in) throws IOException {
} else if (in.getVersion().onOrAfter(Version.V_6_6_0)) { // Has the bug, but supports lazy objects
return new AggProvider(in.readMap(), null, null);
} else { // only supports eagerly parsed objects
return AggProvider.fromParsedAggs(in.readOptionalWriteable(AggregatorFactories.Builder::new));
// Upstream, we have read the bool already and know for sure that we have parsed aggs in the stream
return AggProvider.fromParsedAggs(new AggregatorFactories.Builder(in));
}
}

Expand Down Expand Up @@ -111,7 +112,8 @@ public void writeTo(StreamOutput out) throws IOException {
// actually are aggregations defined
throw new ElasticsearchException("Unsupported operation: parsed aggregations are null");
}
out.writeOptionalWriteable(parsedAggs);
// Upstream we already verified that this calling object is not null, no need to write a second boolean to the stream
parsedAggs.writeTo(out);
}
}

Expand Down
Expand Up @@ -212,6 +212,7 @@ public DatafeedConfig(StreamInput in) throws IOException {
}
// each of these writables are version aware
this.queryProvider = QueryProvider.fromStream(in);
// This reads a boolean from the stream, if true, it sends the stream to the `fromStream` method
this.aggProvider = in.readOptionalWriteable(AggProvider::fromStream);

if (in.readBoolean()) {
Expand Down Expand Up @@ -420,6 +421,7 @@ public void writeTo(StreamOutput out) throws IOException {

// Each of these writables are version aware
queryProvider.writeTo(out); // never null
// This writes a boolean to the stream, if true, it sends the stream to the `writeTo` method
out.writeOptionalWriteable(aggProvider);

if (scriptFields != null) {
Expand Down
Expand Up @@ -13,6 +13,10 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder;
import org.elasticsearch.upgrades.AbstractFullClusterRestartTestCase;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
Expand Down Expand Up @@ -125,6 +129,7 @@ private void oldClusterTests() throws IOException {
dfBuilder.setDelayedDataCheckConfig(null);
}
dfBuilder.setIndices(Collections.singletonList("airline-data"));
addAggregations(dfBuilder);

Request putDatafeed = new Request("PUT", "_xpack/ml/datafeeds/" + OLD_CLUSTER_STARTED_DATAFEED_ID);
putDatafeed.setJsonEntity(Strings.toString(dfBuilder.build()));
Expand Down Expand Up @@ -258,4 +263,11 @@ private void assertJobNotPresent(String jobId, List<Map<String, Object>> jobs) {
.filter(id -> id.equals(jobId)).findFirst();
assertFalse(config.isPresent());
}

private void addAggregations(DatafeedConfig.Builder dfBuilder) {
TermsAggregationBuilder airline = AggregationBuilders.terms("airline");
MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time").subAggregation(airline);
dfBuilder.setParsedAggregations(AggregatorFactories.builder().addAggregator(
AggregationBuilders.histogram("time").interval(300000).subAggregation(maxTime).field("time")));
}
}
3 changes: 2 additions & 1 deletion x-pack/qa/rolling-upgrade/build.gradle
Expand Up @@ -275,7 +275,8 @@ subprojects {
systemProperty 'tests.rest.blacklist', [
'mixed_cluster/10_basic/Start scroll in mixed cluster on upgraded node that we will continue after upgrade',
'mixed_cluster/30_ml_jobs_crud/Create a job in the mixed cluster and write some data',
'mixed_cluster/40_ml_datafeed_crud/Put job and datafeed in mixed cluster',
'mixed_cluster/40_ml_datafeed_crud/Put job and datafeed without aggs in mixed cluster',
'mixed_cluster/40_ml_datafeed_crud/Put job and datafeed with aggs in mixed cluster'
].join(',')
finalizedBy "${baseName}#oldClusterTestCluster#node1.stop"
}
Expand Down
@@ -1,24 +1,44 @@
---
"Test old cluster datafeed":
"Test old cluster datafeed without aggs":
- do:
ml.get_datafeeds:
datafeed_id: old-cluster-datafeed
- match: { datafeeds.0.datafeed_id: "old-cluster-datafeed"}
datafeed_id: old-cluster-datafeed-without-aggs
- match: { datafeeds.0.datafeed_id: "old-cluster-datafeed-without-aggs"}
- length: { datafeeds.0.indices: 1 }
- gte: { datafeeds.0.scroll_size: 2000 }
- match: { datafeeds.0.script_fields.double_responsetime.script.lang: painless }
- is_false: datafeeds.0.aggregations

- do:
ml.get_datafeed_stats:
datafeed_id: old-cluster-datafeed
datafeed_id: old-cluster-datafeed-without-aggs
- match: { datafeeds.0.state: "stopped"}
- is_false: datafeeds.0.node

---
"Put job and datafeed in mixed cluster":
"Test old cluster datafeed with aggs":
- do:
ml.get_datafeeds:
datafeed_id: old-cluster-datafeed-with-aggs
- match: { datafeeds.0.datafeed_id: "old-cluster-datafeed-with-aggs"}
- length: { datafeeds.0.indices: 1 }
- gte: { datafeeds.0.scroll_size: 2000 }
- is_false: datafeeds.0.script_fields
- match: { datafeeds.0.aggregations.buckets.date_histogram.field: time }
- match: { datafeeds.0.aggregations.buckets.aggregations.time.max.field: time }

- do:
ml.get_datafeed_stats:
datafeed_id: old-cluster-datafeed-with-aggs
- match: { datafeeds.0.state: "stopped"}
- is_false: datafeeds.0.node

---
"Put job and datafeed without aggs in mixed cluster":

- do:
ml.put_job:
job_id: mixed-cluster-datafeed-job
job_id: mixed-cluster-datafeed-job-without-aggs
body: >
{
"description":"Cluster upgrade",
Expand All @@ -37,16 +57,91 @@
- do:
ml.put_datafeed:
datafeed_id: mixed-cluster-datafeed
datafeed_id: mixed-cluster-datafeed-without-aggs
body: >
{
"job_id":"mixed-cluster-datafeed-job",
"job_id":"mixed-cluster-datafeed-job-without-aggs",
"indices":["airline-data"],
"scroll_size": 2000
"scroll_size": 2000,
"script_fields": {
"double_responsetime": {
"script": {
"lang": "painless",
"source": "doc['responsetime'].value * 2"
}
}
}
}
- do:
ml.get_datafeed_stats:
datafeed_id: mixed-cluster-datafeed-without-aggs
- match: { datafeeds.0.state: stopped}
- is_false: datafeeds.0.node

---
"Put job and datafeed with aggs in mixed cluster":

- do:
ml.put_job:
job_id: mixed-cluster-datafeed-job-with-aggs
body: >
{
"description":"Cluster upgrade",
"analysis_config" : {
"bucket_span": "60s",
"summary_count_field_name": "doc_count",
"detectors" :[{"function":"count"}]
},
"analysis_limits" : {
"model_memory_limit": "50mb"
},
"data_description" : {
"format":"xcontent",
"time_field":"time"
}
}
- do:
ml.put_datafeed:
datafeed_id: mixed-cluster-datafeed-with-aggs
body: >
{
"job_id":"mixed-cluster-datafeed-job-with-aggs",
"indices":["airline-data"],
"types":["response"],
"scroll_size": 2000,
"aggregations": {
"buckets": {
"date_histogram": {
"field": "time",
"interval": "30s",
"time_zone": "UTC"
},
"aggregations": {
"time": {
"max": {"field": "time"}
},
"airline": {
"terms": {
"field": "airline",
"size": 100
},
"aggregations": {
"responsetime": {
"avg": {
"field": "responsetime"
}
}
}
}
}
}
}
}
- do:
ml.get_datafeed_stats:
datafeed_id: mixed-cluster-datafeed
datafeed_id: mixed-cluster-datafeed-with-aggs
- match: { datafeeds.0.state: stopped}
- is_false: datafeeds.0.node
@@ -1,9 +1,9 @@
---
"Put job and datafeed in old cluster":
"Put job and datafeed without aggs in old cluster":

- do:
ml.put_job:
job_id: old-cluster-datafeed-job
job_id: old-cluster-datafeed-job-without-aggs
body: >
{
"description":"Cluster upgrade",
Expand All @@ -19,20 +19,95 @@
"time_field":"time"
}
}
- match: { job_id: old-cluster-datafeed-job }
- match: { job_id: old-cluster-datafeed-job-without-aggs }

- do:
ml.put_datafeed:
datafeed_id: old-cluster-datafeed
datafeed_id: old-cluster-datafeed-without-aggs
body: >
{
"job_id":"old-cluster-datafeed-job",
"job_id":"old-cluster-datafeed-job-without-aggs",
"indices":["airline-data"],
"scroll_size": 2000
"scroll_size": 2000,
"script_fields": {
"double_responsetime": {
"script": {
"lang": "painless",
"source": "doc['responsetime'].value * 2"
}
}
}
}
- do:
ml.get_datafeed_stats:
datafeed_id: old-cluster-datafeed-without-aggs
- match: { datafeeds.0.state: stopped}
- is_false: datafeeds.0.node

---
"Put job and datafeed with aggs in old cluster":

- do:
ml.put_job:
job_id: old-cluster-datafeed-job-with-aggs
body: >
{
"description":"Cluster upgrade",
"analysis_config" : {
"bucket_span": "60s",
"summary_count_field_name": "doc_count",
"detectors" :[{"function":"count"}]
},
"analysis_limits" : {
"model_memory_limit": "50mb"
},
"data_description" : {
"format":"xcontent",
"time_field":"time"
}
}
- match: { job_id: old-cluster-datafeed-job-with-aggs }

- do:
ml.put_datafeed:
datafeed_id: old-cluster-datafeed-with-aggs
body: >
{
"job_id":"old-cluster-datafeed-job-with-aggs",
"indices":["airline-data"],
"scroll_size": 2000,
"aggregations": {
"buckets": {
"date_histogram": {
"field": "time",
"interval": "30s",
"time_zone": "UTC"
},
"aggregations": {
"time": {
"max": {"field": "time"}
},
"airline": {
"terms": {
"field": "airline",
"size": 100
},
"aggregations": {
"responsetime": {
"avg": {
"field": "responsetime"
}
}
}
}
}
}
}
}
- do:
ml.get_datafeed_stats:
datafeed_id: old-cluster-datafeed
datafeed_id: old-cluster-datafeed-with-aggs
- match: { datafeeds.0.state: stopped}
- is_false: datafeeds.0.node

0 comments on commit acb2f5c

Please sign in to comment.