From acb2f5c9d1a839d43644ce2e4759cbc5c9af9960 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Fri, 29 Mar 2019 15:30:32 -0500 Subject: [PATCH] [ML] Addressing bug streaming DatafeedConfig aggs from (<= 6.5.4) -> 6.7.0 (#40610) * Addressing stream failure and adding tests to catch such in the future * Add aggs to full cluster restart tests * Test BWC for datafeeds with and without aggs The wire serialisation is different for null/non-null aggs, so it's worth testing both cases. --- .../xpack/core/ml/datafeed/AggProvider.java | 6 +- .../core/ml/datafeed/DatafeedConfig.java | 2 + .../MlMigrationFullClusterRestartIT.java | 12 ++ x-pack/qa/rolling-upgrade/build.gradle | 3 +- .../mixed_cluster/40_ml_datafeed_crud.yml | 115 ++++++++++++++++-- .../test/old_cluster/40_ml_datafeed_crud.yml | 89 ++++++++++++-- .../upgraded_cluster/40_ml_datafeed_crud.yml | 51 ++++++-- 7 files changed, 251 insertions(+), 27 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/AggProvider.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/AggProvider.java index 7982cffb01de5..8585e4122e673 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/AggProvider.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/AggProvider.java @@ -73,7 +73,8 @@ static AggProvider fromStream(StreamInput in) throws IOException { } else if (in.getVersion().onOrAfter(Version.V_6_6_0)) { // Has the bug, but supports lazy objects return new AggProvider(in.readMap(), null, null); } else { // only supports eagerly parsed objects - return AggProvider.fromParsedAggs(in.readOptionalWriteable(AggregatorFactories.Builder::new)); + // Upstream, we have read the bool already and know for sure that we have parsed aggs in the stream + return AggProvider.fromParsedAggs(new AggregatorFactories.Builder(in)); } } @@ -111,7 +112,8 @@ public void writeTo(StreamOutput out) throws IOException { // actually are aggregations defined throw new ElasticsearchException("Unsupported operation: parsed aggregations are null"); } - out.writeOptionalWriteable(parsedAggs); + // Upstream we already verified that this calling object is not null, no need to write a second boolean to the stream + parsedAggs.writeTo(out); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java index 3cd071f61aaee..810d97df34636 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java @@ -212,6 +212,7 @@ public DatafeedConfig(StreamInput in) throws IOException { } // each of these writables are version aware this.queryProvider = QueryProvider.fromStream(in); + // This reads a boolean from the stream, if true, it sends the stream to the `fromStream` method this.aggProvider = in.readOptionalWriteable(AggProvider::fromStream); if (in.readBoolean()) { @@ -420,6 +421,7 @@ public void writeTo(StreamOutput out) throws IOException { // Each of these writables are version aware queryProvider.writeTo(out); // never null + // This writes a boolean to the stream, if true, it sends the stream to the `writeTo` method out.writeOptionalWriteable(aggProvider); if (scriptFields != null) { diff --git a/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/MlMigrationFullClusterRestartIT.java b/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/MlMigrationFullClusterRestartIT.java index e8bce60ae0bbb..b881af65420aa 100644 --- a/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/MlMigrationFullClusterRestartIT.java +++ b/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/MlMigrationFullClusterRestartIT.java @@ -13,6 +13,10 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder; import org.elasticsearch.upgrades.AbstractFullClusterRestartTestCase; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; @@ -125,6 +129,7 @@ private void oldClusterTests() throws IOException { dfBuilder.setDelayedDataCheckConfig(null); } dfBuilder.setIndices(Collections.singletonList("airline-data")); + addAggregations(dfBuilder); Request putDatafeed = new Request("PUT", "_xpack/ml/datafeeds/" + OLD_CLUSTER_STARTED_DATAFEED_ID); putDatafeed.setJsonEntity(Strings.toString(dfBuilder.build())); @@ -258,4 +263,11 @@ private void assertJobNotPresent(String jobId, List> jobs) { .filter(id -> id.equals(jobId)).findFirst(); assertFalse(config.isPresent()); } + + private void addAggregations(DatafeedConfig.Builder dfBuilder) { + TermsAggregationBuilder airline = AggregationBuilders.terms("airline"); + MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time").subAggregation(airline); + dfBuilder.setParsedAggregations(AggregatorFactories.builder().addAggregator( + AggregationBuilders.histogram("time").interval(300000).subAggregation(maxTime).field("time"))); + } } diff --git a/x-pack/qa/rolling-upgrade/build.gradle b/x-pack/qa/rolling-upgrade/build.gradle index f97845d972ee3..0c1890b1b93cb 100644 --- a/x-pack/qa/rolling-upgrade/build.gradle +++ b/x-pack/qa/rolling-upgrade/build.gradle @@ -275,7 +275,8 @@ subprojects { systemProperty 'tests.rest.blacklist', [ 'mixed_cluster/10_basic/Start scroll in mixed cluster on upgraded node that we will continue after upgrade', 'mixed_cluster/30_ml_jobs_crud/Create a job in the mixed cluster and write some data', - 'mixed_cluster/40_ml_datafeed_crud/Put job and datafeed in mixed cluster', + 'mixed_cluster/40_ml_datafeed_crud/Put job and datafeed without aggs in mixed cluster', + 'mixed_cluster/40_ml_datafeed_crud/Put job and datafeed with aggs in mixed cluster' ].join(',') finalizedBy "${baseName}#oldClusterTestCluster#node1.stop" } diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/40_ml_datafeed_crud.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/40_ml_datafeed_crud.yml index b9ae06499d112..c4165fb21178d 100644 --- a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/40_ml_datafeed_crud.yml +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/40_ml_datafeed_crud.yml @@ -1,24 +1,44 @@ --- -"Test old cluster datafeed": +"Test old cluster datafeed without aggs": - do: ml.get_datafeeds: - datafeed_id: old-cluster-datafeed - - match: { datafeeds.0.datafeed_id: "old-cluster-datafeed"} + datafeed_id: old-cluster-datafeed-without-aggs + - match: { datafeeds.0.datafeed_id: "old-cluster-datafeed-without-aggs"} - length: { datafeeds.0.indices: 1 } - gte: { datafeeds.0.scroll_size: 2000 } + - match: { datafeeds.0.script_fields.double_responsetime.script.lang: painless } + - is_false: datafeeds.0.aggregations - do: ml.get_datafeed_stats: - datafeed_id: old-cluster-datafeed + datafeed_id: old-cluster-datafeed-without-aggs - match: { datafeeds.0.state: "stopped"} - is_false: datafeeds.0.node --- -"Put job and datafeed in mixed cluster": +"Test old cluster datafeed with aggs": + - do: + ml.get_datafeeds: + datafeed_id: old-cluster-datafeed-with-aggs + - match: { datafeeds.0.datafeed_id: "old-cluster-datafeed-with-aggs"} + - length: { datafeeds.0.indices: 1 } + - gte: { datafeeds.0.scroll_size: 2000 } + - is_false: datafeeds.0.script_fields + - match: { datafeeds.0.aggregations.buckets.date_histogram.field: time } + - match: { datafeeds.0.aggregations.buckets.aggregations.time.max.field: time } + + - do: + ml.get_datafeed_stats: + datafeed_id: old-cluster-datafeed-with-aggs + - match: { datafeeds.0.state: "stopped"} + - is_false: datafeeds.0.node + +--- +"Put job and datafeed without aggs in mixed cluster": - do: ml.put_job: - job_id: mixed-cluster-datafeed-job + job_id: mixed-cluster-datafeed-job-without-aggs body: > { "description":"Cluster upgrade", @@ -37,16 +57,91 @@ - do: ml.put_datafeed: - datafeed_id: mixed-cluster-datafeed + datafeed_id: mixed-cluster-datafeed-without-aggs body: > { - "job_id":"mixed-cluster-datafeed-job", + "job_id":"mixed-cluster-datafeed-job-without-aggs", "indices":["airline-data"], - "scroll_size": 2000 + "scroll_size": 2000, + "script_fields": { + "double_responsetime": { + "script": { + "lang": "painless", + "source": "doc['responsetime'].value * 2" + } + } + } + } + + - do: + ml.get_datafeed_stats: + datafeed_id: mixed-cluster-datafeed-without-aggs + - match: { datafeeds.0.state: stopped} + - is_false: datafeeds.0.node + +--- +"Put job and datafeed with aggs in mixed cluster": + + - do: + ml.put_job: + job_id: mixed-cluster-datafeed-job-with-aggs + body: > + { + "description":"Cluster upgrade", + "analysis_config" : { + "bucket_span": "60s", + "summary_count_field_name": "doc_count", + "detectors" :[{"function":"count"}] + }, + "analysis_limits" : { + "model_memory_limit": "50mb" + }, + "data_description" : { + "format":"xcontent", + "time_field":"time" + } + } + + - do: + ml.put_datafeed: + datafeed_id: mixed-cluster-datafeed-with-aggs + body: > + { + "job_id":"mixed-cluster-datafeed-job-with-aggs", + "indices":["airline-data"], + "types":["response"], + "scroll_size": 2000, + "aggregations": { + "buckets": { + "date_histogram": { + "field": "time", + "interval": "30s", + "time_zone": "UTC" + }, + "aggregations": { + "time": { + "max": {"field": "time"} + }, + "airline": { + "terms": { + "field": "airline", + "size": 100 + }, + "aggregations": { + "responsetime": { + "avg": { + "field": "responsetime" + } + } + } + } + } + } + } } - do: ml.get_datafeed_stats: - datafeed_id: mixed-cluster-datafeed + datafeed_id: mixed-cluster-datafeed-with-aggs - match: { datafeeds.0.state: stopped} - is_false: datafeeds.0.node diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/40_ml_datafeed_crud.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/40_ml_datafeed_crud.yml index d7dba6b3b2d3a..597540d36c4ec 100644 --- a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/40_ml_datafeed_crud.yml +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/40_ml_datafeed_crud.yml @@ -1,9 +1,9 @@ --- -"Put job and datafeed in old cluster": +"Put job and datafeed without aggs in old cluster": - do: ml.put_job: - job_id: old-cluster-datafeed-job + job_id: old-cluster-datafeed-job-without-aggs body: > { "description":"Cluster upgrade", @@ -19,20 +19,95 @@ "time_field":"time" } } - - match: { job_id: old-cluster-datafeed-job } + - match: { job_id: old-cluster-datafeed-job-without-aggs } - do: ml.put_datafeed: - datafeed_id: old-cluster-datafeed + datafeed_id: old-cluster-datafeed-without-aggs body: > { - "job_id":"old-cluster-datafeed-job", + "job_id":"old-cluster-datafeed-job-without-aggs", "indices":["airline-data"], - "scroll_size": 2000 + "scroll_size": 2000, + "script_fields": { + "double_responsetime": { + "script": { + "lang": "painless", + "source": "doc['responsetime'].value * 2" + } + } + } + } + + - do: + ml.get_datafeed_stats: + datafeed_id: old-cluster-datafeed-without-aggs + - match: { datafeeds.0.state: stopped} + - is_false: datafeeds.0.node + +--- +"Put job and datafeed with aggs in old cluster": + + - do: + ml.put_job: + job_id: old-cluster-datafeed-job-with-aggs + body: > + { + "description":"Cluster upgrade", + "analysis_config" : { + "bucket_span": "60s", + "summary_count_field_name": "doc_count", + "detectors" :[{"function":"count"}] + }, + "analysis_limits" : { + "model_memory_limit": "50mb" + }, + "data_description" : { + "format":"xcontent", + "time_field":"time" + } + } + - match: { job_id: old-cluster-datafeed-job-with-aggs } + + - do: + ml.put_datafeed: + datafeed_id: old-cluster-datafeed-with-aggs + body: > + { + "job_id":"old-cluster-datafeed-job-with-aggs", + "indices":["airline-data"], + "scroll_size": 2000, + "aggregations": { + "buckets": { + "date_histogram": { + "field": "time", + "interval": "30s", + "time_zone": "UTC" + }, + "aggregations": { + "time": { + "max": {"field": "time"} + }, + "airline": { + "terms": { + "field": "airline", + "size": 100 + }, + "aggregations": { + "responsetime": { + "avg": { + "field": "responsetime" + } + } + } + } + } + } + } } - do: ml.get_datafeed_stats: - datafeed_id: old-cluster-datafeed + datafeed_id: old-cluster-datafeed-with-aggs - match: { datafeeds.0.state: stopped} - is_false: datafeeds.0.node diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/40_ml_datafeed_crud.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/40_ml_datafeed_crud.yml index 928fb3a066c28..a3d03b5becc17 100644 --- a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/40_ml_datafeed_crud.yml +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/40_ml_datafeed_crud.yml @@ -14,32 +14,69 @@ setup: properties: time: type: date +--- +"Test old and mixed cluster datafeeds without aggs": + - do: + ml.get_datafeeds: + datafeed_id: old-cluster-datafeed-without-aggs + - match: { datafeeds.0.datafeed_id: "old-cluster-datafeed-without-aggs"} + - length: { datafeeds.0.indices: 1 } + - gte: { datafeeds.0.scroll_size: 2000 } + - match: { datafeeds.0.script_fields.double_responsetime.script.lang: painless } + - is_false: datafeeds.0.aggregations + + - do: + ml.get_datafeed_stats: + datafeed_id: old-cluster-datafeed-without-aggs + - match: { datafeeds.0.state: "stopped"} + - is_false: datafeeds.0.node + + - do: + ml.get_datafeeds: + datafeed_id: mixed-cluster-datafeed-without-aggs + - match: { datafeeds.0.datafeed_id: "mixed-cluster-datafeed-without-aggs"} + - length: { datafeeds.0.indices: 1 } + - gte: { datafeeds.0.scroll_size: 2000 } + - match: { datafeeds.0.script_fields.double_responsetime.script.lang: painless } + - is_false: datafeeds.0.aggregations + + - do: + ml.get_datafeed_stats: + datafeed_id: mixed-cluster-datafeed-without-aggs + - match: { datafeeds.0.state: "stopped"} + - is_false: datafeeds.0.node --- -"Test old and mixed cluster datafeeds": +"Test old and mixed cluster datafeeds with aggs": - do: ml.get_datafeeds: - datafeed_id: old-cluster-datafeed - - match: { datafeeds.0.datafeed_id: "old-cluster-datafeed"} + datafeed_id: old-cluster-datafeed-with-aggs + - match: { datafeeds.0.datafeed_id: "old-cluster-datafeed-with-aggs"} - length: { datafeeds.0.indices: 1 } - gte: { datafeeds.0.scroll_size: 2000 } + - is_false: datafeeds.0.script_fields + - match: { datafeeds.0.aggregations.buckets.date_histogram.field: time } + - match: { datafeeds.0.aggregations.buckets.aggregations.time.max.field: time } - do: ml.get_datafeed_stats: - datafeed_id: old-cluster-datafeed + datafeed_id: old-cluster-datafeed-with-aggs - match: { datafeeds.0.state: "stopped"} - is_false: datafeeds.0.node - do: ml.get_datafeeds: - datafeed_id: mixed-cluster-datafeed - - match: { datafeeds.0.datafeed_id: "mixed-cluster-datafeed"} + datafeed_id: mixed-cluster-datafeed-with-aggs + - match: { datafeeds.0.datafeed_id: "mixed-cluster-datafeed-with-aggs"} - length: { datafeeds.0.indices: 1 } - gte: { datafeeds.0.scroll_size: 2000 } + - is_false: datafeeds.0.script_fields + - match: { datafeeds.0.aggregations.buckets.date_histogram.field: time } + - match: { datafeeds.0.aggregations.buckets.aggregations.time.max.field: time } - do: ml.get_datafeed_stats: - datafeed_id: mixed-cluster-datafeed + datafeed_id: mixed-cluster-datafeed-with-aggs - match: { datafeeds.0.state: "stopped"} - is_false: datafeeds.0.node