From df574e5168df6773204f3db16e6896dba3029f7b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Przemys=C5=82aw=20Witek?= Date: Wed, 14 Aug 2019 08:26:03 +0200 Subject: [PATCH] [7.x] Implement ml/data_frame/analytics/_estimate_memory_usage API endpoint (#45188) (#45510) --- ...estimate-memory-usage-dfanalytics.asciidoc | 83 +++++++ .../ml/df-analytics/apis/index.asciidoc | 4 +- .../xpack/core/XPackClientPlugin.java | 2 + .../ml/action/EstimateMemoryUsageAction.java | 204 ++++++++++++++++++ .../dataframe/DataFrameAnalyticsConfig.java | 74 ++++--- .../xpack/core/ml/job/messages/Messages.java | 2 +- ...EstimateMemoryUsageActionRequestTests.java | 55 +++++ ...stimateMemoryUsageActionResponseTests.java | 47 ++++ .../DataFrameAnalyticsConfigTests.java | 59 +++-- ...NativeDataFrameAnalyticsIntegTestCase.java | 8 +- .../xpack/ml/MachineLearning.java | 23 +- .../TransportEstimateMemoryUsageAction.java | 128 +++++++++++ .../dataframe/DataFrameAnalyticsManager.java | 2 +- .../DataFrameDataExtractorFactory.java | 79 +++++-- .../extractor/ExtractedFieldsDetector.java | 32 +-- .../AbstractNativeAnalyticsProcess.java | 54 +++++ .../dataframe/process/AnalyticsBuilder.java | 25 ++- .../dataframe/process/AnalyticsProcess.java | 4 +- .../process/AnalyticsProcessFactory.java | 6 +- .../process/AnalyticsProcessManager.java | 20 +- .../process/AnalyticsResultProcessor.java | 3 +- .../MemoryUsageEstimationProcessManager.java | 143 ++++++++++++ .../process/NativeAnalyticsProcess.java | 38 +--- .../NativeAnalyticsProcessFactory.java | 9 +- .../NativeMemoryUsageEstimationProcess.java | 27 +++ ...veMemoryUsageEstimationProcessFactory.java | 103 +++++++++ .../{ => results}/AnalyticsResult.java | 11 +- .../results/MemoryUsageEstimationResult.java | 97 +++++++++ .../dataframe/process/results/RowResults.java | 6 +- .../ml/process/AbstractNativeProcess.java | 41 ++-- .../RestEstimateMemoryUsageAction.java | 37 ++++ .../DataFrameAnalyticsIndexTests.java | 3 +- .../dataframe/SourceDestValidatorTests.java | 24 ++- .../ExtractedFieldsDetectorTests.java | 61 ++++-- .../process/AnalyticsBuilderTests.java | 70 ++++++ .../AnalyticsResultProcessorTests.java | 4 +- ...oryUsageEstimationProcessManagerTests.java | 183 ++++++++++++++++ .../{ => results}/AnalyticsResultTests.java | 4 +- .../MemoryUsageEstimationResultTests.java | 51 +++++ .../process/AbstractNativeProcessTests.java | 155 +++++++++++++ .../api/ml.estimate_memory_usage.json | 16 ++ ...rame_analytics_memory_usage_estimation.yml | 75 +++++++ 42 files changed, 1882 insertions(+), 190 deletions(-) create mode 100644 docs/reference/ml/df-analytics/apis/estimate-memory-usage-dfanalytics.asciidoc create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/EstimateMemoryUsageAction.java create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/EstimateMemoryUsageActionRequestTests.java create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/EstimateMemoryUsageActionResponseTests.java create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportEstimateMemoryUsageAction.java create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AbstractNativeAnalyticsProcess.java create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/MemoryUsageEstimationProcessManager.java create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcess.java create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcessFactory.java rename x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/{ => results}/AnalyticsResult.java (82%) create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/results/MemoryUsageEstimationResult.java create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/dataframe/RestEstimateMemoryUsageAction.java create mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsBuilderTests.java create mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/MemoryUsageEstimationProcessManagerTests.java rename x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/{ => results}/AnalyticsResultTests.java (84%) create mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/results/MemoryUsageEstimationResultTests.java create mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcessTests.java create mode 100644 x-pack/plugin/src/test/resources/rest-api-spec/api/ml.estimate_memory_usage.json create mode 100644 x-pack/plugin/src/test/resources/rest-api-spec/test/ml/data_frame_analytics_memory_usage_estimation.yml diff --git a/docs/reference/ml/df-analytics/apis/estimate-memory-usage-dfanalytics.asciidoc b/docs/reference/ml/df-analytics/apis/estimate-memory-usage-dfanalytics.asciidoc new file mode 100644 index 0000000000000..9f1f77052d647 --- /dev/null +++ b/docs/reference/ml/df-analytics/apis/estimate-memory-usage-dfanalytics.asciidoc @@ -0,0 +1,83 @@ +[role="xpack"] +[testenv="platinum"] +[[estimate-memory-usage-dfanalytics]] +=== Estimate memory usage API + +[subs="attributes"] +++++ +Estimate memory usage for {dfanalytics-jobs} +++++ + +Estimates memory usage for the given {dataframe-analytics-config}. + +experimental[] + +[[ml-estimate-memory-usage-dfanalytics-request]] +==== {api-request-title} + +`POST _ml/data_frame/analytics/_estimate_memory_usage` + +[[ml-estimate-memory-usage-dfanalytics-prereq]] +==== {api-prereq-title} + +* You must have `monitor_ml` privilege to use this API. For more +information, see {stack-ov}/security-privileges.html[Security privileges] and +{stack-ov}/built-in-roles.html[Built-in roles]. + +[[ml-estimate-memory-usage-dfanalytics-desc]] +==== {api-description-title} + +This API estimates memory usage for the given {dataframe-analytics-config} before the {dfanalytics-job} is even created. + +Serves as an advice on how to set `model_memory_limit` when creating {dfanalytics-job}. + +[[ml-estimate-memory-usage-dfanalytics-request-body]] +==== {api-request-body-title} + +`data_frame_analytics_config`:: + (Required, object) Intended configuration of {dfanalytics-job}. For more information, see + <>. + Note that `id` and `dest` don't need to be provided in the context of this API. + +[[ml-estimate-memory-usage-dfanalytics-results]] +==== {api-response-body-title} + +`expected_memory_usage_with_one_partition`:: + (string) Estimated memory usage under the assumption that the whole {dfanalytics} should happen in memory + (i.e. without overflowing to disk). + +`expected_memory_usage_with_max_partitions`:: + (string) Estimated memory usage under the assumption that overflowing to disk is allowed during {dfanalytics}. + `expected_memory_usage_with_max_partitions` is usually smaller than `expected_memory_usage_with_one_partition` + as using disk allows to limit the main memory needed to perform {dfanalytics}. + +[[ml-estimate-memory-usage-dfanalytics-example]] +==== {api-examples-title} + +[source,js] +-------------------------------------------------- +POST _ml/data_frame/analytics/_estimate_memory_usage +{ + "data_frame_analytics_config": { + "source": { + "index": "logdata" + }, + "analysis": { + "outlier_detection": {} + } + } +} +-------------------------------------------------- +// CONSOLE +// TEST[skip:TBD] + +The API returns the following results: + +[source,js] +---- +{ + "expected_memory_usage_with_one_partition": "128MB", + "expected_memory_usage_with_max_partitions": "32MB" +} +---- +// TESTRESPONSE \ No newline at end of file diff --git a/docs/reference/ml/df-analytics/apis/index.asciidoc b/docs/reference/ml/df-analytics/apis/index.asciidoc index 416e11f146b70..30e909f3ffad6 100644 --- a/docs/reference/ml/df-analytics/apis/index.asciidoc +++ b/docs/reference/ml/df-analytics/apis/index.asciidoc @@ -12,6 +12,7 @@ You can use the following APIs to perform {ml} {dfanalytics} activities. * <> * <> * <> +* <> See also <>. @@ -21,10 +22,11 @@ include::put-dfanalytics.asciidoc[] include::delete-dfanalytics.asciidoc[] //EVALUATE include::evaluate-dfanalytics.asciidoc[] +//ESTIMATE_MEMORY_USAGE +include::estimate-memory-usage-dfanalytics.asciidoc[] //GET include::get-dfanalytics.asciidoc[] include::get-dfanalytics-stats.asciidoc[] //SET/START/STOP include::start-dfanalytics.asciidoc[] include::stop-dfanalytics.asciidoc[] - diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java index 6711bd96cb7d6..b2c3ae776e005 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java @@ -96,6 +96,7 @@ import org.elasticsearch.xpack.core.ml.action.DeleteForecastAction; import org.elasticsearch.xpack.core.ml.action.DeleteJobAction; import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction; +import org.elasticsearch.xpack.core.ml.action.EstimateMemoryUsageAction; import org.elasticsearch.xpack.core.ml.action.EvaluateDataFrameAction; import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction; import org.elasticsearch.xpack.core.ml.action.FindFileStructureAction; @@ -347,6 +348,7 @@ public List> getClientActions() { StartDataFrameAnalyticsAction.INSTANCE, StopDataFrameAnalyticsAction.INSTANCE, EvaluateDataFrameAction.INSTANCE, + EstimateMemoryUsageAction.INSTANCE, // security ClearRealmCacheAction.INSTANCE, ClearRolesCacheAction.INSTANCE, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/EstimateMemoryUsageAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/EstimateMemoryUsageAction.java new file mode 100644 index 0000000000000..62a8220d1a535 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/EstimateMemoryUsageAction.java @@ -0,0 +1,204 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ml.action; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; +import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; + +import java.io.IOException; +import java.util.Objects; + +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; + +public class EstimateMemoryUsageAction extends ActionType { + + public static final EstimateMemoryUsageAction INSTANCE = new EstimateMemoryUsageAction(); + public static final String NAME = "cluster:admin/xpack/ml/data_frame/analytics/estimate_memory_usage"; + + private EstimateMemoryUsageAction() { + super(NAME, EstimateMemoryUsageAction.Response::new); + } + + public static class Request extends ActionRequest implements ToXContentObject { + + private static final ParseField DATA_FRAME_ANALYTICS_CONFIG = new ParseField("data_frame_analytics_config"); + + private static final ConstructingObjectParser PARSER = + new ConstructingObjectParser<>( + NAME, + args -> { + DataFrameAnalyticsConfig.Builder configBuilder = (DataFrameAnalyticsConfig.Builder) args[0]; + DataFrameAnalyticsConfig config = configBuilder.buildForMemoryEstimation(); + return new EstimateMemoryUsageAction.Request(config); + }); + + static { + PARSER.declareObject(constructorArg(), DataFrameAnalyticsConfig.STRICT_PARSER, DATA_FRAME_ANALYTICS_CONFIG); + } + + public static EstimateMemoryUsageAction.Request parseRequest(XContentParser parser) { + return PARSER.apply(parser, null); + } + + private final DataFrameAnalyticsConfig config; + + public Request(DataFrameAnalyticsConfig config) { + this.config = ExceptionsHelper.requireNonNull(config, DATA_FRAME_ANALYTICS_CONFIG); + } + + public Request(StreamInput in) throws IOException { + super(in); + this.config = new DataFrameAnalyticsConfig(in); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + public DataFrameAnalyticsConfig getConfig() { + return config; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + config.writeTo(out); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(DATA_FRAME_ANALYTICS_CONFIG.getPreferredName(), config); + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (other == null || getClass() != other.getClass()) { + return false; + } + + Request that = (Request) other; + return Objects.equals(config, that.config); + } + + @Override + public int hashCode() { + return Objects.hash(config); + } + } + + public static class Response extends ActionResponse implements ToXContentObject { + + public static final ParseField TYPE = new ParseField("memory_usage_estimation_result"); + + public static final ParseField EXPECTED_MEMORY_USAGE_WITH_ONE_PARTITION = + new ParseField("expected_memory_usage_with_one_partition"); + public static final ParseField EXPECTED_MEMORY_USAGE_WITH_MAX_PARTITIONS = + new ParseField("expected_memory_usage_with_max_partitions"); + + static final ConstructingObjectParser PARSER = + new ConstructingObjectParser<>( + TYPE.getPreferredName(), + args -> new Response((ByteSizeValue) args[0], (ByteSizeValue) args[1])); + + static { + PARSER.declareField( + optionalConstructorArg(), + (p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), EXPECTED_MEMORY_USAGE_WITH_ONE_PARTITION.getPreferredName()), + EXPECTED_MEMORY_USAGE_WITH_ONE_PARTITION, + ObjectParser.ValueType.VALUE); + PARSER.declareField( + optionalConstructorArg(), + (p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), EXPECTED_MEMORY_USAGE_WITH_MAX_PARTITIONS.getPreferredName()), + EXPECTED_MEMORY_USAGE_WITH_MAX_PARTITIONS, + ObjectParser.ValueType.VALUE); + } + + private final ByteSizeValue expectedMemoryUsageWithOnePartition; + private final ByteSizeValue expectedMemoryUsageWithMaxPartitions; + + public Response(@Nullable ByteSizeValue expectedMemoryUsageWithOnePartition, + @Nullable ByteSizeValue expectedMemoryUsageWithMaxPartitions) { + this.expectedMemoryUsageWithOnePartition = expectedMemoryUsageWithOnePartition; + this.expectedMemoryUsageWithMaxPartitions = expectedMemoryUsageWithMaxPartitions; + } + + public Response(StreamInput in) throws IOException { + super(in); + this.expectedMemoryUsageWithOnePartition = in.readOptionalWriteable(ByteSizeValue::new); + this.expectedMemoryUsageWithMaxPartitions = in.readOptionalWriteable(ByteSizeValue::new); + } + + public ByteSizeValue getExpectedMemoryUsageWithOnePartition() { + return expectedMemoryUsageWithOnePartition; + } + + public ByteSizeValue getExpectedMemoryUsageWithMaxPartitions() { + return expectedMemoryUsageWithMaxPartitions; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeOptionalWriteable(expectedMemoryUsageWithOnePartition); + out.writeOptionalWriteable(expectedMemoryUsageWithMaxPartitions); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + if (expectedMemoryUsageWithOnePartition != null) { + builder.field( + EXPECTED_MEMORY_USAGE_WITH_ONE_PARTITION.getPreferredName(), expectedMemoryUsageWithOnePartition.getStringRep()); + } + if (expectedMemoryUsageWithMaxPartitions != null) { + builder.field( + EXPECTED_MEMORY_USAGE_WITH_MAX_PARTITIONS.getPreferredName(), expectedMemoryUsageWithMaxPartitions.getStringRep()); + } + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (other == null || getClass() != other.getClass()) { + return false; + } + + Response that = (Response) other; + return Objects.equals(expectedMemoryUsageWithOnePartition, that.expectedMemoryUsageWithOnePartition) + && Objects.equals(expectedMemoryUsageWithMaxPartitions, that.expectedMemoryUsageWithMaxPartitions); + } + + @Override + public int hashCode() { + return Objects.hash(expectedMemoryUsageWithOnePartition, expectedMemoryUsageWithMaxPartitions); + } + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfig.java index efb76a8963850..f194d108ad0b8 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfig.java @@ -57,7 +57,7 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable { public static final ObjectParser STRICT_PARSER = createParser(false); public static final ObjectParser LENIENT_PARSER = createParser(true); - public static ObjectParser createParser(boolean ignoreUnknownFields) { + private static ObjectParser createParser(boolean ignoreUnknownFields) { ObjectParser parser = new ObjectParser<>(TYPE, ignoreUnknownFields, Builder::new); parser.declareString((c, s) -> {}, CONFIG_TYPE); @@ -281,14 +281,6 @@ public static class Builder { public Builder() {} - public Builder(String id) { - setId(id); - } - - public Builder(ByteSizeValue maxModelMemoryLimit) { - this.maxModelMemoryLimit = maxModelMemoryLimit; - } - public Builder(DataFrameAnalyticsConfig config) { this(config, null); } @@ -343,30 +335,10 @@ public Builder setHeaders(Map headers) { } public Builder setModelMemoryLimit(ByteSizeValue modelMemoryLimit) { - if (modelMemoryLimit != null && modelMemoryLimit.compareTo(MIN_MODEL_MEMORY_LIMIT) < 0) { - throw new IllegalArgumentException("[" + MODEL_MEMORY_LIMIT.getPreferredName() - + "] must be at least [" + MIN_MODEL_MEMORY_LIMIT.getStringRep() + "]"); - } this.modelMemoryLimit = modelMemoryLimit; return this; } - private void applyMaxModelMemoryLimit() { - - boolean maxModelMemoryIsSet = maxModelMemoryLimit != null && maxModelMemoryLimit.getMb() > 0; - - if (modelMemoryLimit == null) { - // Default is silently capped if higher than limit - if (maxModelMemoryIsSet && DEFAULT_MODEL_MEMORY_LIMIT.compareTo(maxModelMemoryLimit) > 0) { - modelMemoryLimit = maxModelMemoryLimit; - } - } else if (maxModelMemoryIsSet && modelMemoryLimit.compareTo(maxModelMemoryLimit) > 0) { - // Explicit setting higher than limit is an error - throw ExceptionsHelper.badRequestException(Messages.getMessage(Messages.JOB_CONFIG_MODEL_MEMORY_LIMIT_GREATER_THAN_MAX, - modelMemoryLimit, maxModelMemoryLimit)); - } - } - public Builder setCreateTime(Instant createTime) { this.createTime = createTime; return this; @@ -377,9 +349,53 @@ public Builder setVersion(Version version) { return this; } + /** + * Builds {@link DataFrameAnalyticsConfig} object. + */ public DataFrameAnalyticsConfig build() { applyMaxModelMemoryLimit(); return new DataFrameAnalyticsConfig(id, source, dest, analysis, headers, modelMemoryLimit, analyzedFields, createTime, version); } + + /** + * Builds {@link DataFrameAnalyticsConfig} object for the purpose of performing memory estimation. + * Some fields (i.e. "id", "dest") may not be present, therefore we overwrite them here to make {@link DataFrameAnalyticsConfig}'s + * constructor validations happy. + */ + public DataFrameAnalyticsConfig buildForMemoryEstimation() { + return new DataFrameAnalyticsConfig( + id != null ? id : "dummy", + source, + dest != null ? dest : new DataFrameAnalyticsDest("dummy", null), + analysis, + headers, + modelMemoryLimit, + analyzedFields, + createTime, + version); + } + + private void applyMaxModelMemoryLimit() { + boolean maxModelMemoryIsSet = maxModelMemoryLimit != null && maxModelMemoryLimit.getMb() > 0; + + if (modelMemoryLimit != null) { + if (modelMemoryLimit.compareTo(MIN_MODEL_MEMORY_LIMIT) < 0) { + // Explicit setting lower than minimum is an error + throw ExceptionsHelper.badRequestException( + Messages.getMessage(Messages.JOB_CONFIG_MODEL_MEMORY_LIMIT_TOO_LOW, modelMemoryLimit)); + } + if (maxModelMemoryIsSet && modelMemoryLimit.compareTo(maxModelMemoryLimit) > 0) { + // Explicit setting higher than limit is an error + throw ExceptionsHelper.badRequestException( + Messages.getMessage( + Messages.JOB_CONFIG_MODEL_MEMORY_LIMIT_GREATER_THAN_MAX, modelMemoryLimit, maxModelMemoryLimit)); + } + } else { + // Default is silently capped if higher than limit + if (maxModelMemoryIsSet && DEFAULT_MODEL_MEMORY_LIMIT.compareTo(maxModelMemoryLimit) > 0) { + modelMemoryLimit = maxModelMemoryLimit; + } + } + } } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java index dfb95d2adac33..f5e66fed8a882 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java @@ -122,7 +122,7 @@ public final class Messages { "Invalid detector rule: scope field ''{0}'' is invalid; select from {1}"; public static final String JOB_CONFIG_FIELDNAME_INCOMPATIBLE_FUNCTION = "field_name cannot be used with function ''{0}''"; public static final String JOB_CONFIG_FIELD_VALUE_TOO_LOW = "{0} cannot be less than {1,number}. Value = {2,number}"; - public static final String JOB_CONFIG_MODEL_MEMORY_LIMIT_TOO_LOW = "model_memory_limit must be at least 1 MiB. Value = {0,number}"; + public static final String JOB_CONFIG_MODEL_MEMORY_LIMIT_TOO_LOW = "model_memory_limit must be at least 1 MiB. Value = {0}"; public static final String JOB_CONFIG_MODEL_MEMORY_LIMIT_GREATER_THAN_MAX = "model_memory_limit [{0}] must be less than the value of the " + MachineLearningField.MAX_MODEL_MEMORY_LIMIT.getKey() + diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/EstimateMemoryUsageActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/EstimateMemoryUsageActionRequestTests.java new file mode 100644 index 0000000000000..3aa4fd9327dd9 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/EstimateMemoryUsageActionRequestTests.java @@ -0,0 +1,55 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ml.action; + +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.search.SearchModule; +import org.elasticsearch.test.AbstractSerializingTestCase; +import org.elasticsearch.xpack.core.ml.action.EstimateMemoryUsageAction.Request; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfigTests; +import org.elasticsearch.xpack.core.ml.dataframe.analyses.MlDataFrameAnalysisNamedXContentProvider; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class EstimateMemoryUsageActionRequestTests extends AbstractSerializingTestCase { + + @Override + protected NamedWriteableRegistry getNamedWriteableRegistry() { + List namedWriteables = new ArrayList<>(); + namedWriteables.addAll(new MlDataFrameAnalysisNamedXContentProvider().getNamedWriteables()); + namedWriteables.addAll(new SearchModule(Settings.EMPTY, false, Collections.emptyList()).getNamedWriteables()); + return new NamedWriteableRegistry(namedWriteables); + } + + @Override + protected NamedXContentRegistry xContentRegistry() { + List namedXContent = new ArrayList<>(); + namedXContent.addAll(new MlDataFrameAnalysisNamedXContentProvider().getNamedXContentParsers()); + namedXContent.addAll(new SearchModule(Settings.EMPTY, false, Collections.emptyList()).getNamedXContents()); + return new NamedXContentRegistry(namedXContent); + } + + @Override + protected Request createTestInstance() { + return new Request(DataFrameAnalyticsConfigTests.createRandom("dummy")); + } + + @Override + protected Writeable.Reader instanceReader() { + return Request::new; + } + + @Override + protected Request doParseInstance(XContentParser parser) { + return Request.parseRequest(parser); + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/EstimateMemoryUsageActionResponseTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/EstimateMemoryUsageActionResponseTests.java new file mode 100644 index 0000000000000..e6b9f4a99a25d --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/EstimateMemoryUsageActionResponseTests.java @@ -0,0 +1,47 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ml.action; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractSerializingTestCase; +import org.elasticsearch.xpack.core.ml.action.EstimateMemoryUsageAction.Response; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; + +public class EstimateMemoryUsageActionResponseTests extends AbstractSerializingTestCase { + + @Override + protected Response createTestInstance() { + return new Response( + randomBoolean() ? new ByteSizeValue(randomNonNegativeLong()) : null, + randomBoolean() ? new ByteSizeValue(randomNonNegativeLong()) : null); + } + + @Override + protected Writeable.Reader instanceReader() { + return Response::new; + } + + @Override + protected Response doParseInstance(XContentParser parser) { + return Response.PARSER.apply(parser, null); + } + + public void testConstructor_NullValues() { + Response response = new Response(null, null); + assertThat(response.getExpectedMemoryUsageWithOnePartition(), nullValue()); + assertThat(response.getExpectedMemoryUsageWithMaxPartitions(), nullValue()); + } + + public void testConstructor() { + Response response = new Response(new ByteSizeValue(2048), new ByteSizeValue(1024)); + assertThat(response.getExpectedMemoryUsageWithOnePartition(), equalTo(new ByteSizeValue(2048))); + assertThat(response.getExpectedMemoryUsageWithMaxPartitions(), equalTo(new ByteSizeValue(1024))); + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigTests.java index 518950b675c19..a8275425b5c1c 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigTests.java @@ -43,7 +43,6 @@ import java.util.Map; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.hasSize; @@ -227,18 +226,18 @@ public void testInvalidModelMemoryLimits() { DataFrameAnalyticsConfig.Builder builder = new DataFrameAnalyticsConfig.Builder(); // All these are different ways of specifying a limit that is lower than the minimum - assertTooSmall(expectThrows(IllegalArgumentException.class, - () -> builder.setModelMemoryLimit(new ByteSizeValue(1048575, ByteSizeUnit.BYTES)))); - assertTooSmall(expectThrows(IllegalArgumentException.class, - () -> builder.setModelMemoryLimit(new ByteSizeValue(0, ByteSizeUnit.BYTES)))); - assertTooSmall(expectThrows(IllegalArgumentException.class, - () -> builder.setModelMemoryLimit(new ByteSizeValue(-1, ByteSizeUnit.BYTES)))); - assertTooSmall(expectThrows(IllegalArgumentException.class, - () -> builder.setModelMemoryLimit(new ByteSizeValue(1023, ByteSizeUnit.KB)))); - assertTooSmall(expectThrows(IllegalArgumentException.class, - () -> builder.setModelMemoryLimit(new ByteSizeValue(0, ByteSizeUnit.KB)))); - assertTooSmall(expectThrows(IllegalArgumentException.class, - () -> builder.setModelMemoryLimit(new ByteSizeValue(0, ByteSizeUnit.MB)))); + assertTooSmall(expectThrows(ElasticsearchStatusException.class, + () -> builder.setModelMemoryLimit(new ByteSizeValue(1048575, ByteSizeUnit.BYTES)).build())); + assertTooSmall(expectThrows(ElasticsearchStatusException.class, + () -> builder.setModelMemoryLimit(new ByteSizeValue(0, ByteSizeUnit.BYTES)).build())); + assertTooSmall(expectThrows(ElasticsearchStatusException.class, + () -> builder.setModelMemoryLimit(new ByteSizeValue(-1, ByteSizeUnit.BYTES)).build())); + assertTooSmall(expectThrows(ElasticsearchStatusException.class, + () -> builder.setModelMemoryLimit(new ByteSizeValue(1023, ByteSizeUnit.KB)).build())); + assertTooSmall(expectThrows(ElasticsearchStatusException.class, + () -> builder.setModelMemoryLimit(new ByteSizeValue(0, ByteSizeUnit.KB)).build())); + assertTooSmall(expectThrows(ElasticsearchStatusException.class, + () -> builder.setModelMemoryLimit(new ByteSizeValue(0, ByteSizeUnit.MB)).build())); } public void testNoMemoryCapping() { @@ -276,6 +275,36 @@ public void testExplicitModelMemoryLimitTooHigh() { assertThat(e.getMessage(), containsString("must be less than the value of the xpack.ml.max_model_memory_limit setting")); } + public void testBuildForMemoryEstimation() { + DataFrameAnalyticsConfig.Builder builder = createRandomBuilder("foo"); + + DataFrameAnalyticsConfig config = builder.buildForMemoryEstimation(); + + assertThat(config, equalTo(builder.build())); + } + + public void testBuildForMemoryEstimation_MissingId() { + DataFrameAnalyticsConfig.Builder builder = new DataFrameAnalyticsConfig.Builder() + .setAnalysis(OutlierDetectionTests.createRandom()) + .setSource(DataFrameAnalyticsSourceTests.createRandom()) + .setDest(DataFrameAnalyticsDestTests.createRandom()); + + DataFrameAnalyticsConfig config = builder.buildForMemoryEstimation(); + + assertThat(config.getId(), equalTo("dummy")); + } + + public void testBuildForMemoryEstimation_MissingDest() { + DataFrameAnalyticsConfig.Builder builder = new DataFrameAnalyticsConfig.Builder() + .setId("foo") + .setAnalysis(OutlierDetectionTests.createRandom()) + .setSource(DataFrameAnalyticsSourceTests.createRandom()); + + DataFrameAnalyticsConfig config = builder.buildForMemoryEstimation(); + + assertThat(config.getDest().getIndex(), equalTo("dummy")); + } + public void testPreventCreateTimeInjection() throws IOException { String json = "{" + " \"create_time\" : 123456789 }," @@ -306,7 +335,7 @@ public void testPreventVersionInjection() throws IOException { } } - public void assertTooSmall(IllegalArgumentException e) { - assertThat(e.getMessage(), is("[model_memory_limit] must be at least [1mb]")); + private static void assertTooSmall(ElasticsearchStatusException e) { + assertThat(e.getMessage(), startsWith("model_memory_limit must be at least 1 MiB.")); } } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java index 520f7a30ece4f..24045c1549151 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java @@ -105,8 +105,9 @@ protected static String createJsonRecord(Map keyValueMap) throws } protected static DataFrameAnalyticsConfig buildOutlierDetectionAnalytics(String id, String[] sourceIndex, String destIndex, - @Nullable String resultsField) { - DataFrameAnalyticsConfig.Builder configBuilder = new DataFrameAnalyticsConfig.Builder(id); + @Nullable String resultsField) { + DataFrameAnalyticsConfig.Builder configBuilder = new DataFrameAnalyticsConfig.Builder(); + configBuilder.setId(id); configBuilder.setSource(new DataFrameAnalyticsSource(sourceIndex, null)); configBuilder.setDest(new DataFrameAnalyticsDest(destIndex, resultsField)); configBuilder.setAnalysis(new OutlierDetection()); @@ -122,7 +123,8 @@ protected void assertState(String id, DataFrameAnalyticsState state) { protected static DataFrameAnalyticsConfig buildRegressionAnalytics(String id, String[] sourceIndex, String destIndex, @Nullable String resultsField, String dependentVariable) { - DataFrameAnalyticsConfig.Builder configBuilder = new DataFrameAnalyticsConfig.Builder(id); + DataFrameAnalyticsConfig.Builder configBuilder = new DataFrameAnalyticsConfig.Builder(); + configBuilder.setId(id); configBuilder.setSource(new DataFrameAnalyticsSource(sourceIndex, null)); configBuilder.setDest(new DataFrameAnalyticsDest(destIndex, resultsField)); configBuilder.setAnalysis(new Regression(dependentVariable)); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index a56317193a79f..93968015f2dd7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -72,6 +72,7 @@ import org.elasticsearch.xpack.core.ml.action.DeleteForecastAction; import org.elasticsearch.xpack.core.ml.action.DeleteJobAction; import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction; +import org.elasticsearch.xpack.core.ml.action.EstimateMemoryUsageAction; import org.elasticsearch.xpack.core.ml.action.EvaluateDataFrameAction; import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction; import org.elasticsearch.xpack.core.ml.action.FindFileStructureAction; @@ -136,6 +137,7 @@ import org.elasticsearch.xpack.ml.action.TransportDeleteForecastAction; import org.elasticsearch.xpack.ml.action.TransportDeleteJobAction; import org.elasticsearch.xpack.ml.action.TransportDeleteModelSnapshotAction; +import org.elasticsearch.xpack.ml.action.TransportEstimateMemoryUsageAction; import org.elasticsearch.xpack.ml.action.TransportEvaluateDataFrameAction; import org.elasticsearch.xpack.ml.action.TransportFinalizeJobExecutionAction; import org.elasticsearch.xpack.ml.action.TransportFindFileStructureAction; @@ -190,6 +192,10 @@ import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfigProvider; import org.elasticsearch.xpack.ml.dataframe.process.AnalyticsProcessFactory; import org.elasticsearch.xpack.ml.dataframe.process.AnalyticsProcessManager; +import org.elasticsearch.xpack.ml.dataframe.process.results.AnalyticsResult; +import org.elasticsearch.xpack.ml.dataframe.process.MemoryUsageEstimationProcessManager; +import org.elasticsearch.xpack.ml.dataframe.process.results.MemoryUsageEstimationResult; +import org.elasticsearch.xpack.ml.dataframe.process.NativeMemoryUsageEstimationProcessFactory; import org.elasticsearch.xpack.ml.dataframe.process.NativeAnalyticsProcessFactory; import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.JobManagerHolder; @@ -235,6 +241,7 @@ import org.elasticsearch.xpack.ml.rest.datafeeds.RestStopDatafeedAction; import org.elasticsearch.xpack.ml.rest.datafeeds.RestUpdateDatafeedAction; import org.elasticsearch.xpack.ml.rest.dataframe.RestDeleteDataFrameAnalyticsAction; +import org.elasticsearch.xpack.ml.rest.dataframe.RestEstimateMemoryUsageAction; import org.elasticsearch.xpack.ml.rest.dataframe.RestEvaluateDataFrameAction; import org.elasticsearch.xpack.ml.rest.dataframe.RestGetDataFrameAnalyticsAction; import org.elasticsearch.xpack.ml.rest.dataframe.RestGetDataFrameAnalyticsStatsAction; @@ -487,7 +494,8 @@ public Collection createComponents(Client client, ClusterService cluster AutodetectProcessFactory autodetectProcessFactory; NormalizerProcessFactory normalizerProcessFactory; - AnalyticsProcessFactory analyticsProcessFactory; + AnalyticsProcessFactory analyticsProcessFactory; + AnalyticsProcessFactory memoryEstimationProcessFactory; if (MachineLearningField.AUTODETECT_PROCESS.get(settings) && MachineLearningFeatureSet.isRunningOnMlPlatform(true)) { try { NativeController nativeController = NativeControllerHolder.getNativeController(clusterService.getNodeName(), environment); @@ -503,6 +511,8 @@ public Collection createComponents(Client client, ClusterService cluster clusterService); normalizerProcessFactory = new NativeNormalizerProcessFactory(environment, nativeController, clusterService); analyticsProcessFactory = new NativeAnalyticsProcessFactory(environment, nativeController, clusterService); + memoryEstimationProcessFactory = + new NativeMemoryUsageEstimationProcessFactory(environment, nativeController, clusterService); } catch (IOException e) { // The low level cause of failure from the named pipe helper's perspective is almost never the real root cause, so // only log this at the lowest level of detail. It's almost always "file not found" on a named pipe we expect to be @@ -519,6 +529,7 @@ public Collection createComponents(Client client, ClusterService cluster // factor of 1.0 makes renormalization a no-op normalizerProcessFactory = (jobId, quantilesState, bucketSpan, executorService) -> new MultiplyingNormalizerProcess(1.0); analyticsProcessFactory = (jobId, analyticsProcessConfig, executorService, onProcessCrash) -> null; + memoryEstimationProcessFactory = (jobId, analyticsProcessConfig, executorService, onProcessCrash) -> null; } NormalizerFactory normalizerFactory = new NormalizerFactory(normalizerProcessFactory, threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)); @@ -542,6 +553,9 @@ public Collection createComponents(Client client, ClusterService cluster // Data frame analytics components AnalyticsProcessManager analyticsProcessManager = new AnalyticsProcessManager(client, threadPool, analyticsProcessFactory); + MemoryUsageEstimationProcessManager memoryEstimationProcessManager = + new MemoryUsageEstimationProcessManager( + threadPool.generic(), threadPool.executor(MachineLearning.JOB_COMMS_THREAD_POOL_NAME), memoryEstimationProcessFactory); DataFrameAnalyticsConfigProvider dataFrameAnalyticsConfigProvider = new DataFrameAnalyticsConfigProvider(client); assert client instanceof NodeClient; DataFrameAnalyticsManager dataFrameAnalyticsManager = new DataFrameAnalyticsManager((NodeClient) client, @@ -579,6 +593,7 @@ public Collection createComponents(Client client, ClusterService cluster new MlAssignmentNotifier(settings, auditor, threadPool, client, clusterService), memoryTracker, analyticsProcessManager, + memoryEstimationProcessManager, dataFrameAnalyticsConfigProvider, nativeStorageProvider ); @@ -676,7 +691,8 @@ public List getRestHandlers(Settings settings, RestController restC new RestDeleteDataFrameAnalyticsAction(restController), new RestStartDataFrameAnalyticsAction(restController), new RestStopDataFrameAnalyticsAction(restController), - new RestEvaluateDataFrameAction(restController) + new RestEvaluateDataFrameAction(restController), + new RestEstimateMemoryUsageAction(restController) ); } @@ -742,7 +758,8 @@ public List getRestHandlers(Settings settings, RestController restC new ActionHandler<>(DeleteDataFrameAnalyticsAction.INSTANCE, TransportDeleteDataFrameAnalyticsAction.class), new ActionHandler<>(StartDataFrameAnalyticsAction.INSTANCE, TransportStartDataFrameAnalyticsAction.class), new ActionHandler<>(StopDataFrameAnalyticsAction.INSTANCE, TransportStopDataFrameAnalyticsAction.class), - new ActionHandler<>(EvaluateDataFrameAction.INSTANCE, TransportEvaluateDataFrameAction.class) + new ActionHandler<>(EvaluateDataFrameAction.INSTANCE, TransportEvaluateDataFrameAction.class), + new ActionHandler<>(EstimateMemoryUsageAction.INSTANCE, TransportEstimateMemoryUsageAction.class) ); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportEstimateMemoryUsageAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportEstimateMemoryUsageAction.java new file mode 100644 index 0000000000000..4ff9e084c1e99 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportEstimateMemoryUsageAction.java @@ -0,0 +1,128 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.action; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionListenerResponseHandler; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ml.action.EstimateMemoryUsageAction; +import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractorFactory; +import org.elasticsearch.xpack.ml.dataframe.process.MemoryUsageEstimationProcessManager; + +import java.util.Objects; +import java.util.Optional; + +/** + * Estimates memory usage for the given data frame analytics spec. + * Redirects to a different node if the current node is *not* an ML node. + */ +public class TransportEstimateMemoryUsageAction + extends HandledTransportAction { + + private final TransportService transportService; + private final ClusterService clusterService; + private final NodeClient client; + private final MemoryUsageEstimationProcessManager processManager; + + @Inject + public TransportEstimateMemoryUsageAction(TransportService transportService, + ActionFilters actionFilters, + ClusterService clusterService, + NodeClient client, + MemoryUsageEstimationProcessManager processManager) { + super(EstimateMemoryUsageAction.NAME, transportService, actionFilters, EstimateMemoryUsageAction.Request::new); + this.transportService = transportService; + this.clusterService = Objects.requireNonNull(clusterService); + this.client = Objects.requireNonNull(client); + this.processManager = Objects.requireNonNull(processManager); + } + + @Override + protected void doExecute(Task task, + EstimateMemoryUsageAction.Request request, + ActionListener listener) { + DiscoveryNode localNode = clusterService.localNode(); + if (MachineLearning.isMlNode(localNode)) { + doEstimateMemoryUsage(createTaskIdForMemoryEstimation(task), request, listener); + } else { + redirectToMlNode(request, listener); + } + } + + /** + * Creates unique task id for the memory estimation process. This id is useful when logging. + */ + private static String createTaskIdForMemoryEstimation(Task task) { + return "memory_usage_estimation_" + task.getId(); + } + + /** + * Performs memory usage estimation. + * Memory usage estimation spawns an ML C++ process which is only available on ML nodes. That's why this method can only be called on + * the ML node. + */ + private void doEstimateMemoryUsage(String taskId, + EstimateMemoryUsageAction.Request request, + ActionListener listener) { + DataFrameDataExtractorFactory.createForSourceIndices( + client, + taskId, + request.getConfig(), + ActionListener.wrap( + dataExtractorFactory -> { + processManager.runJobAsync( + taskId, + request.getConfig(), + dataExtractorFactory, + ActionListener.wrap( + result -> listener.onResponse( + new EstimateMemoryUsageAction.Response( + result.getExpectedMemoryUsageWithOnePartition(), result.getExpectedMemoryUsageWithMaxPartitions())), + listener::onFailure + ) + ); + }, + listener::onFailure + ) + ); + } + + /** + * Finds the first available ML node in the cluster and redirects the request to this node. + */ + private void redirectToMlNode(EstimateMemoryUsageAction.Request request, + ActionListener listener) { + Optional node = findMlNode(clusterService.state()); + if (node.isPresent()) { + transportService.sendRequest( + node.get(), actionName, request, new ActionListenerResponseHandler<>(listener, EstimateMemoryUsageAction.Response::new)); + } else { + listener.onFailure(ExceptionsHelper.badRequestException("No ML node to run on")); + } + } + + /** + * Finds the first available ML node in the cluster state. + */ + private static Optional findMlNode(ClusterState clusterState) { + for (DiscoveryNode node : clusterState.getNodes()) { + if (MachineLearning.isMlNode(node)) { + return Optional.of(node); + } + } + return Optional.empty(); + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java index 31eab37fbf6e9..7206376334a36 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java @@ -210,7 +210,7 @@ private void startAnalytics(DataFrameAnalyticsTask task, DataFrameAnalyticsConfi // TODO This could fail with errors. In that case we get stuck with the copied index. // We could delete the index in case of failure or we could try building the factory before reindexing // to catch the error early on. - DataFrameDataExtractorFactory.create(client, config, isTaskRestarting, dataExtractorFactoryListener); + DataFrameDataExtractorFactory.createForDestinationIndex(client, config, isTaskRestarting, dataExtractorFactoryListener); } public void stop(DataFrameAnalyticsTask task) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorFactory.java index acdf527b84b0a..2e7139bca2c1f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorFactory.java @@ -29,6 +29,7 @@ import java.util.Arrays; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; @@ -37,15 +38,15 @@ public class DataFrameDataExtractorFactory { private final Client client; private final String analyticsId; - private final String index; + private final List indices; private final ExtractedFields extractedFields; private final Map headers; - private DataFrameDataExtractorFactory(Client client, String analyticsId, String index, ExtractedFields extractedFields, + private DataFrameDataExtractorFactory(Client client, String analyticsId, List indices, ExtractedFields extractedFields, Map headers) { this.client = Objects.requireNonNull(client); this.analyticsId = Objects.requireNonNull(analyticsId); - this.index = Objects.requireNonNull(index); + this.indices = Objects.requireNonNull(indices); this.extractedFields = Objects.requireNonNull(extractedFields); this.headers = headers; } @@ -54,7 +55,7 @@ public DataFrameDataExtractor newExtractor(boolean includeSource) { DataFrameDataExtractorContext context = new DataFrameDataExtractorContext( analyticsId, extractedFields, - Arrays.asList(index), + indices, allExtractedFieldsExistQuery(), 1000, headers, @@ -71,6 +72,34 @@ private QueryBuilder allExtractedFieldsExistQuery() { return query; } + /** + * Validate and create a new extractor factory + * + * The source index must exist and contain at least 1 compatible field or validations will fail. + * + * @param client ES Client used to make calls against the cluster + * @param config The config from which to create the extractor factory + * @param listener The listener to notify on creation or failure + */ + public static void createForSourceIndices(Client client, + String taskId, + DataFrameAnalyticsConfig config, + ActionListener listener) { + validateIndexAndExtractFields( + client, + config.getSource().getIndex(), + config, + null, + false, + ActionListener.wrap( + extractedFields -> listener.onResponse( + new DataFrameDataExtractorFactory( + client, taskId, Arrays.asList(config.getSource().getIndex()), extractedFields, config.getHeaders())), + listener::onFailure + ) + ); + } + /** * Validate and create a new extractor factory * @@ -81,15 +110,23 @@ private QueryBuilder allExtractedFieldsExistQuery() { * @param isTaskRestarting Whether the task is restarting * @param listener The listener to notify on creation or failure */ - public static void create(Client client, - DataFrameAnalyticsConfig config, - boolean isTaskRestarting, - ActionListener listener) { - validateIndexAndExtractFields(client, new String[] {config.getDest().getIndex()}, config, isTaskRestarting, - ActionListener.wrap(extractedFields -> listener.onResponse(new DataFrameDataExtractorFactory( - client, config.getId(), config.getDest().getIndex(), extractedFields, config.getHeaders())), + public static void createForDestinationIndex(Client client, + DataFrameAnalyticsConfig config, + boolean isTaskRestarting, + ActionListener listener) { + validateIndexAndExtractFields( + client, + new String[] {config.getDest().getIndex()}, + config, + config.getDest().getResultsField(), + isTaskRestarting, + ActionListener.wrap( + extractedFields -> listener.onResponse( + new DataFrameDataExtractorFactory( + client, config.getId(), Arrays.asList(config.getDest().getIndex()), extractedFields, config.getHeaders())), listener::onFailure - )); + ) + ); } /** @@ -102,26 +139,36 @@ public static void create(Client client, public static void validateConfigAndSourceIndex(Client client, DataFrameAnalyticsConfig config, ActionListener listener) { - validateIndexAndExtractFields(client, config.getSource().getIndex(), config, false, ActionListener.wrap( + validateIndexAndExtractFields( + client, + config.getSource().getIndex(), + config, + config.getDest().getResultsField(), + false, + ActionListener.wrap( fields -> { config.getSource().getParsedQuery(); // validate query is acceptable listener.onResponse(config); }, listener::onFailure - )); + ) + ); } private static void validateIndexAndExtractFields(Client client, String[] index, DataFrameAnalyticsConfig config, + String resultsField, boolean isTaskRestarting, ActionListener listener) { AtomicInteger docValueFieldsLimitHolder = new AtomicInteger(); // Step 3. Extract fields (if possible) and notify listener ActionListener fieldCapabilitiesHandler = ActionListener.wrap( - fieldCapabilitiesResponse -> listener.onResponse(new ExtractedFieldsDetector(index, config, isTaskRestarting, - docValueFieldsLimitHolder.get(), fieldCapabilitiesResponse).detect()), + fieldCapabilitiesResponse -> listener.onResponse( + new ExtractedFieldsDetector( + index, config, resultsField, isTaskRestarting, docValueFieldsLimitHolder.get(), fieldCapabilitiesResponse) + .detect()), listener::onFailure ); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetector.java index 3ff8c8a492377..017b7070fcda2 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetector.java @@ -60,14 +60,16 @@ public class ExtractedFieldsDetector { private final String[] index; private final DataFrameAnalyticsConfig config; + private final String resultsField; private final boolean isTaskRestarting; private final int docValueFieldsLimit; private final FieldCapabilitiesResponse fieldCapabilitiesResponse; - ExtractedFieldsDetector(String[] index, DataFrameAnalyticsConfig config, boolean isTaskRestarting, int docValueFieldsLimit, - FieldCapabilitiesResponse fieldCapabilitiesResponse) { + ExtractedFieldsDetector(String[] index, DataFrameAnalyticsConfig config, String resultsField, boolean isTaskRestarting, + int docValueFieldsLimit, FieldCapabilitiesResponse fieldCapabilitiesResponse) { this.index = Objects.requireNonNull(index); this.config = Objects.requireNonNull(config); + this.resultsField = resultsField; this.isTaskRestarting = isTaskRestarting; this.docValueFieldsLimit = docValueFieldsLimit; this.fieldCapabilitiesResponse = Objects.requireNonNull(fieldCapabilitiesResponse); @@ -76,12 +78,7 @@ public class ExtractedFieldsDetector { public ExtractedFields detect() { Set fields = new HashSet<>(fieldCapabilitiesResponse.get().keySet()); fields.removeAll(IGNORE_FIELDS); - - checkResultsFieldIsNotPresent(); - - // Ignore fields under the results object - fields.removeIf(field -> field.startsWith(config.getDest().getResultsField() + ".")); - + removeFieldsUnderResultsField(fields); includeAndExcludeFields(fields); removeFieldsWithIncompatibleTypes(fields); checkRequiredFieldsArePresent(fields); @@ -105,17 +102,28 @@ public ExtractedFields detect() { return extractedFields; } + private void removeFieldsUnderResultsField(Set fields) { + if (resultsField == null) { + return; + } + checkResultsFieldIsNotPresent(); + // Ignore fields under the results object + fields.removeIf(field -> field.startsWith(resultsField + ".")); + } + private void checkResultsFieldIsNotPresent() { // If the task is restarting we do not mind the index containing the results field, we will overwrite all docs if (isTaskRestarting) { return; } - Map indexToFieldCaps = fieldCapabilitiesResponse.getField(config.getDest().getResultsField()); + Map indexToFieldCaps = fieldCapabilitiesResponse.getField(resultsField); if (indexToFieldCaps != null && indexToFieldCaps.isEmpty() == false) { - throw ExceptionsHelper.badRequestException("A field that matches the {}.{} [{}] already exists;" + - " please set a different {}", DataFrameAnalyticsConfig.DEST.getPreferredName(), - DataFrameAnalyticsDest.RESULTS_FIELD.getPreferredName(), config.getDest().getResultsField(), + throw ExceptionsHelper.badRequestException( + "A field that matches the {}.{} [{}] already exists; please set a different {}", + DataFrameAnalyticsConfig.DEST.getPreferredName(), + DataFrameAnalyticsDest.RESULTS_FIELD.getPreferredName(), + resultsField, DataFrameAnalyticsDest.RESULTS_FIELD.getPreferredName()); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AbstractNativeAnalyticsProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AbstractNativeAnalyticsProcess.java new file mode 100644 index 0000000000000..55481de160b97 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AbstractNativeAnalyticsProcess.java @@ -0,0 +1,54 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.dataframe.process; + +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.xpack.ml.process.AbstractNativeProcess; +import org.elasticsearch.xpack.ml.process.ProcessResultsParser; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Path; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.function.Consumer; + +abstract class AbstractNativeAnalyticsProcess extends AbstractNativeProcess implements AnalyticsProcess { + + private final String name; + private final ProcessResultsParser resultsParser; + + protected AbstractNativeAnalyticsProcess(String name, ConstructingObjectParser resultParser, String jobId, + InputStream logStream, OutputStream processInStream, + InputStream processOutStream, OutputStream processRestoreStream, int numberOfFields, + List filesToDelete, Consumer onProcessCrash) { + super(jobId, logStream, processInStream, processOutStream, processRestoreStream, numberOfFields, filesToDelete, onProcessCrash); + this.name = Objects.requireNonNull(name); + this.resultsParser = new ProcessResultsParser<>(Objects.requireNonNull(resultParser)); + } + + @Override + public String getName() { + return name; + } + + @Override + public void persistState() { + // Nothing to persist + } + + @Override + public void writeEndOfDataMessage() throws IOException { + new AnalyticsControlMessageWriter(recordWriter(), numberOfFields()).writeEndOfData(); + } + + @Override + public Iterator readAnalyticsResults() { + return resultsParser.parseResults(processOutStream()); + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsBuilder.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsBuilder.java index 4d58a132bab5b..065767ba4e736 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsBuilder.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsBuilder.java @@ -9,7 +9,6 @@ import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.json.JsonXContent; -import org.elasticsearch.env.Environment; import org.elasticsearch.xpack.ml.process.NativeController; import org.elasticsearch.xpack.ml.process.ProcessPipes; @@ -21,6 +20,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.function.Supplier; public class AnalyticsBuilder { @@ -29,38 +29,49 @@ public class AnalyticsBuilder { private static final String LENGTH_ENCODED_INPUT_ARG = "--lengthEncodedInput"; private static final String CONFIG_ARG = "--config="; + private static final String MEMORY_USAGE_ESTIMATION_ONLY_ARG = "--memoryUsageEstimationOnly"; - private final Environment env; + private final Supplier tempDirPathSupplier; private final NativeController nativeController; private final ProcessPipes processPipes; private final AnalyticsProcessConfig config; private final List filesToDelete; + private boolean performMemoryUsageEstimationOnly; - public AnalyticsBuilder(Environment env, NativeController nativeController, ProcessPipes processPipes, AnalyticsProcessConfig config, - List filesToDelete) { - this.env = Objects.requireNonNull(env); + public AnalyticsBuilder(Supplier tempDirPathSupplier, NativeController nativeController, + ProcessPipes processPipes, AnalyticsProcessConfig config, List filesToDelete) { + this.tempDirPathSupplier = Objects.requireNonNull(tempDirPathSupplier); this.nativeController = Objects.requireNonNull(nativeController); this.processPipes = Objects.requireNonNull(processPipes); this.config = Objects.requireNonNull(config); this.filesToDelete = Objects.requireNonNull(filesToDelete); } + public AnalyticsBuilder performMemoryUsageEstimationOnly() { + this.performMemoryUsageEstimationOnly = true; + return this; + } + public void build() throws IOException { List command = buildAnalyticsCommand(); processPipes.addArgs(command); nativeController.startProcess(command); } - List buildAnalyticsCommand() throws IOException { + private List buildAnalyticsCommand() throws IOException { List command = new ArrayList<>(); command.add(ANALYTICS_PATH); command.add(LENGTH_ENCODED_INPUT_ARG); addConfigFile(command); + if (performMemoryUsageEstimationOnly) { + command.add(MEMORY_USAGE_ESTIMATION_ONLY_ARG); + } return command; } private void addConfigFile(List command) throws IOException { - Path configFile = Files.createTempFile(env.tmpFile(), "analysis", ".conf"); + Path tempDir = tempDirPathSupplier.get(); + Path configFile = Files.createTempFile(tempDir, "analysis", ".conf"); filesToDelete.add(configFile); try (OutputStreamWriter osw = new OutputStreamWriter(Files.newOutputStream(configFile),StandardCharsets.UTF_8); XContentBuilder jsonBuilder = JsonXContent.contentBuilder()) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcess.java index c5e361c3e1215..6a2ea283b4440 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcess.java @@ -10,7 +10,7 @@ import java.io.IOException; import java.util.Iterator; -public interface AnalyticsProcess extends NativeProcess { +public interface AnalyticsProcess extends NativeProcess { /** * Writes a control message that informs the process @@ -22,7 +22,7 @@ public interface AnalyticsProcess extends NativeProcess { /** * @return stream of data frame analytics results. */ - Iterator readAnalyticsResults(); + Iterator readAnalyticsResults(); /** * Read anything left in the stream before diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessFactory.java index 1df29b88ba4e3..e72d1ad51a51f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessFactory.java @@ -8,7 +8,7 @@ import java.util.concurrent.ExecutorService; import java.util.function.Consumer; -public interface AnalyticsProcessFactory { +public interface AnalyticsProcessFactory { /** * Create an implementation of {@link AnalyticsProcess} @@ -19,6 +19,6 @@ public interface AnalyticsProcessFactory { * @param onProcessCrash Callback to execute if the process stops unexpectedly * @return The process */ - AnalyticsProcess createAnalyticsProcess(String jobId, AnalyticsProcessConfig analyticsProcessConfig, ExecutorService executorService, - Consumer onProcessCrash); + AnalyticsProcess createAnalyticsProcess(String jobId, AnalyticsProcessConfig analyticsProcessConfig, + ExecutorService executorService, Consumer onProcessCrash); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java index f04ba577be413..e9ef10e848eb4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java @@ -21,6 +21,7 @@ import org.elasticsearch.xpack.ml.action.TransportStartDataFrameAnalyticsAction.DataFrameAnalyticsTask; import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractor; import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractorFactory; +import org.elasticsearch.xpack.ml.dataframe.process.results.AnalyticsResult; import java.io.IOException; import java.util.List; @@ -39,10 +40,12 @@ public class AnalyticsProcessManager { private final Client client; private final ThreadPool threadPool; - private final AnalyticsProcessFactory processFactory; + private final AnalyticsProcessFactory processFactory; private final ConcurrentMap processContextByAllocation = new ConcurrentHashMap<>(); - public AnalyticsProcessManager(Client client, ThreadPool threadPool, AnalyticsProcessFactory analyticsProcessFactory) { + public AnalyticsProcessManager(Client client, + ThreadPool threadPool, + AnalyticsProcessFactory analyticsProcessFactory) { this.client = Objects.requireNonNull(client); this.threadPool = Objects.requireNonNull(threadPool); this.processFactory = Objects.requireNonNull(analyticsProcessFactory); @@ -83,7 +86,8 @@ private void processResults(ProcessContext processContext) { } private void processData(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config, DataFrameDataExtractor dataExtractor, - AnalyticsProcess process, AnalyticsResultProcessor resultProcessor, Consumer finishHandler) { + AnalyticsProcess process, AnalyticsResultProcessor resultProcessor, + Consumer finishHandler) { try { writeHeaderRecord(dataExtractor, process); @@ -118,7 +122,7 @@ private void processData(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig c } } - private void writeDataRows(DataFrameDataExtractor dataExtractor, AnalyticsProcess process) throws IOException { + private void writeDataRows(DataFrameDataExtractor dataExtractor, AnalyticsProcess process) throws IOException { // The extra fields are for the doc hash and the control field (should be an empty string) String[] record = new String[dataExtractor.getFieldNames().size() + 2]; // The value of the control field should be an empty string for data frame rows @@ -139,7 +143,7 @@ private void writeDataRows(DataFrameDataExtractor dataExtractor, AnalyticsProces } } - private void writeHeaderRecord(DataFrameDataExtractor dataExtractor, AnalyticsProcess process) throws IOException { + private void writeHeaderRecord(DataFrameDataExtractor dataExtractor, AnalyticsProcess process) throws IOException { List fieldNames = dataExtractor.getFieldNames(); // We add 2 extra fields, both named dot: @@ -155,9 +159,9 @@ private void writeHeaderRecord(DataFrameDataExtractor dataExtractor, AnalyticsPr process.writeRecord(headerRecord); } - private AnalyticsProcess createProcess(DataFrameAnalyticsTask task, AnalyticsProcessConfig analyticsProcessConfig) { + private AnalyticsProcess createProcess(DataFrameAnalyticsTask task, AnalyticsProcessConfig analyticsProcessConfig) { ExecutorService executorService = threadPool.executor(MachineLearning.JOB_COMMS_THREAD_POOL_NAME); - AnalyticsProcess process = processFactory.createAnalyticsProcess(task.getParams().getId(), analyticsProcessConfig, + AnalyticsProcess process = processFactory.createAnalyticsProcess(task.getParams().getId(), analyticsProcessConfig, executorService, onProcessCrash(task)); if (process.isProcessAlive() == false) { throw ExceptionsHelper.serverError("Failed to start data frame analytics process"); @@ -215,7 +219,7 @@ public void stop(DataFrameAnalyticsTask task) { class ProcessContext { private final String id; - private volatile AnalyticsProcess process; + private volatile AnalyticsProcess process; private volatile DataFrameDataExtractor dataExtractor; private volatile AnalyticsResultProcessor resultProcessor; private final AtomicInteger progressPercent = new AtomicInteger(0); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessor.java index 11c451e9c3932..fd5f43e8426e3 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessor.java @@ -9,6 +9,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.common.Nullable; +import org.elasticsearch.xpack.ml.dataframe.process.results.AnalyticsResult; import org.elasticsearch.xpack.ml.dataframe.process.results.RowResults; import java.util.Iterator; @@ -53,7 +54,7 @@ public void awaitForCompletion() { } } - public void process(AnalyticsProcess process) { + public void process(AnalyticsProcess process) { // TODO When java 9 features can be used, we will not need the local variable here try (DataFrameRowsJoiner resultsJoiner = dataFrameRowsJoiner) { Iterator iterator = process.readAnalyticsResults(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/MemoryUsageEstimationProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/MemoryUsageEstimationProcessManager.java new file mode 100644 index 0000000000000..17595db791ee3 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/MemoryUsageEstimationProcessManager.java @@ -0,0 +1,143 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.dataframe.process; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; +import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractor; +import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractorFactory; +import org.elasticsearch.xpack.ml.dataframe.process.results.MemoryUsageEstimationResult; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.function.Consumer; + +public class MemoryUsageEstimationProcessManager { + + private static final Logger LOGGER = LogManager.getLogger(MemoryUsageEstimationProcessManager.class); + + private final ExecutorService executorServiceForJob; + private final ExecutorService executorServiceForProcess; + private final AnalyticsProcessFactory processFactory; + + public MemoryUsageEstimationProcessManager(ExecutorService executorServiceForJob, + ExecutorService executorServiceForProcess, + AnalyticsProcessFactory processFactory) { + this.executorServiceForJob = Objects.requireNonNull(executorServiceForJob); + this.executorServiceForProcess = Objects.requireNonNull(executorServiceForProcess); + this.processFactory = Objects.requireNonNull(processFactory); + } + + public void runJobAsync(String jobId, + DataFrameAnalyticsConfig config, + DataFrameDataExtractorFactory dataExtractorFactory, + ActionListener listener) { + executorServiceForJob.execute(() -> { + try { + MemoryUsageEstimationResult result = runJob(jobId, config, dataExtractorFactory); + listener.onResponse(result); + } catch (Exception e) { + listener.onFailure(e); + } + }); + } + + private MemoryUsageEstimationResult runJob(String jobId, + DataFrameAnalyticsConfig config, + DataFrameDataExtractorFactory dataExtractorFactory) { + DataFrameDataExtractor dataExtractor = dataExtractorFactory.newExtractor(false); + DataFrameDataExtractor.DataSummary dataSummary = dataExtractor.collectDataSummary(); + Set categoricalFields = dataExtractor.getCategoricalFields(); + if (dataSummary.rows == 0) { + return new MemoryUsageEstimationResult(ByteSizeValue.ZERO, ByteSizeValue.ZERO); + } + AnalyticsProcessConfig processConfig = + new AnalyticsProcessConfig( + dataSummary.rows, + dataSummary.cols, + DataFrameAnalyticsConfig.MIN_MODEL_MEMORY_LIMIT, + 1, + "", + categoricalFields, + config.getAnalysis()); + ProcessHolder processHolder = new ProcessHolder(); + AnalyticsProcess process = + processFactory.createAnalyticsProcess( + jobId, + processConfig, + executorServiceForProcess, + onProcessCrash(jobId, processHolder)); + processHolder.process = process; + if (process.isProcessAlive() == false) { + String errorMsg = new ParameterizedMessage("[{}] Error while starting process", jobId).getFormattedMessage(); + throw ExceptionsHelper.serverError(errorMsg); + } + try { + return readResult(jobId, process); + } catch (Exception e) { + String errorMsg = + new ParameterizedMessage("[{}] Error while processing result [{}]", jobId, e.getMessage()).getFormattedMessage(); + throw ExceptionsHelper.serverError(errorMsg, e); + } finally { + process.consumeAndCloseOutputStream(); + try { + LOGGER.info("[{}] Closing process", jobId); + process.close(); + LOGGER.info("[{}] Closed process", jobId); + } catch (Exception e) { + String errorMsg = + new ParameterizedMessage("[{}] Error while closing process [{}]", jobId, e.getMessage()).getFormattedMessage(); + throw ExceptionsHelper.serverError(errorMsg, e); + } + } + } + + private static class ProcessHolder { + volatile AnalyticsProcess process; + } + + private static Consumer onProcessCrash(String jobId, ProcessHolder processHolder) { + return reason -> { + AnalyticsProcess process = processHolder.process; + if (process == null) { + LOGGER.error(new ParameterizedMessage("[{}] Process does not exist", jobId)); + return; + } + try { + process.kill(); + } catch (IOException e) { + LOGGER.error(new ParameterizedMessage("[{}] Failed to kill process", jobId), e); + } + }; + } + + /** + * Extracts {@link MemoryUsageEstimationResult} from process' output. + */ + private static MemoryUsageEstimationResult readResult(String jobId, AnalyticsProcess process) { + Iterator iterator = process.readAnalyticsResults(); + if (iterator.hasNext() == false) { + String errorMsg = + new ParameterizedMessage("[{}] Memory usage estimation process returned no results", jobId).getFormattedMessage(); + throw ExceptionsHelper.serverError(errorMsg); + } + MemoryUsageEstimationResult result = iterator.next(); + if (iterator.hasNext()) { + String errorMsg = + new ParameterizedMessage("[{}] Memory usage estimation process returned more than one result", jobId).getFormattedMessage(); + throw ExceptionsHelper.serverError(errorMsg); + } + return result; + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcess.java index e42a9c1bdc012..abff4c863c3af 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcess.java @@ -5,46 +5,22 @@ */ package org.elasticsearch.xpack.ml.dataframe.process; -import org.elasticsearch.xpack.ml.process.AbstractNativeProcess; -import org.elasticsearch.xpack.ml.process.ProcessResultsParser; +import org.elasticsearch.xpack.ml.dataframe.process.results.AnalyticsResult; -import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.file.Path; -import java.util.Iterator; import java.util.List; import java.util.function.Consumer; -public class NativeAnalyticsProcess extends AbstractNativeProcess implements AnalyticsProcess { +public class NativeAnalyticsProcess extends AbstractNativeAnalyticsProcess { private static final String NAME = "analytics"; - private final ProcessResultsParser resultsParser = new ProcessResultsParser<>(AnalyticsResult.PARSER); - - protected NativeAnalyticsProcess(String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream, - OutputStream processRestoreStream, int numberOfFields, List filesToDelete, - Consumer onProcessCrash) { - super(jobId, logStream, processInStream, processOutStream, processRestoreStream, numberOfFields, filesToDelete, onProcessCrash); - } - - @Override - public String getName() { - return NAME; - } - - @Override - public void persistState() { - // Nothing to persist - } - - @Override - public void writeEndOfDataMessage() throws IOException { - new AnalyticsControlMessageWriter(recordWriter(), numberOfFields()).writeEndOfData(); - } - - @Override - public Iterator readAnalyticsResults() { - return resultsParser.parseResults(processOutStream()); + protected NativeAnalyticsProcess(String jobId, InputStream logStream, OutputStream processInStream, + InputStream processOutStream, OutputStream processRestoreStream, int numberOfFields, + List filesToDelete, Consumer onProcessCrash) { + super(NAME, AnalyticsResult.PARSER, jobId, logStream, processInStream, processOutStream, processRestoreStream, numberOfFields, + filesToDelete, onProcessCrash); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java index 2ac9eb301e743..c41510019ba17 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java @@ -14,6 +14,7 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.dataframe.process.results.AnalyticsResult; import org.elasticsearch.xpack.ml.process.NativeController; import org.elasticsearch.xpack.ml.process.ProcessPipes; import org.elasticsearch.xpack.ml.utils.NamedPipeHelper; @@ -27,7 +28,7 @@ import java.util.concurrent.ExecutorService; import java.util.function.Consumer; -public class NativeAnalyticsProcessFactory implements AnalyticsProcessFactory { +public class NativeAnalyticsProcessFactory implements AnalyticsProcessFactory { private static final Logger LOGGER = LogManager.getLogger(NativeAnalyticsProcessFactory.class); @@ -50,7 +51,7 @@ void setProcessConnectTimeout(TimeValue processConnectTimeout) { } @Override - public AnalyticsProcess createAnalyticsProcess(String jobId, AnalyticsProcessConfig analyticsProcessConfig, + public NativeAnalyticsProcess createAnalyticsProcess(String jobId, AnalyticsProcessConfig analyticsProcessConfig, ExecutorService executorService, Consumer onProcessCrash) { List filesToDelete = new ArrayList<>(); ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, AnalyticsBuilder.ANALYTICS, jobId, @@ -80,8 +81,8 @@ public AnalyticsProcess createAnalyticsProcess(String jobId, AnalyticsProcessCon private void createNativeProcess(String jobId, AnalyticsProcessConfig analyticsProcessConfig, List filesToDelete, ProcessPipes processPipes) { - AnalyticsBuilder analyticsBuilder = new AnalyticsBuilder(env, nativeController, processPipes, analyticsProcessConfig, - filesToDelete); + AnalyticsBuilder analyticsBuilder = + new AnalyticsBuilder(env::tmpFile, nativeController, processPipes, analyticsProcessConfig, filesToDelete); try { analyticsBuilder.build(); processPipes.connectStreams(processConnectTimeout); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcess.java new file mode 100644 index 0000000000000..55c9ec7dbbd71 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcess.java @@ -0,0 +1,27 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.dataframe.process; + +import org.elasticsearch.xpack.ml.dataframe.process.results.MemoryUsageEstimationResult; + +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Path; +import java.util.List; +import java.util.function.Consumer; + +public class NativeMemoryUsageEstimationProcess extends AbstractNativeAnalyticsProcess { + + private static final String NAME = "memory_usage_estimation"; + + protected NativeMemoryUsageEstimationProcess(String jobId, InputStream logStream, + OutputStream processInStream, InputStream processOutStream, + OutputStream processRestoreStream, int numberOfFields, List filesToDelete, + Consumer onProcessCrash) { + super(NAME, MemoryUsageEstimationResult.PARSER, jobId, logStream, processInStream, processOutStream, processRestoreStream, + numberOfFields, filesToDelete, onProcessCrash); + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcessFactory.java new file mode 100644 index 0000000000000..3c573701f361e --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcessFactory.java @@ -0,0 +1,103 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.dataframe.process; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.env.Environment; +import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.dataframe.process.results.MemoryUsageEstimationResult; +import org.elasticsearch.xpack.ml.process.NativeController; +import org.elasticsearch.xpack.ml.process.ProcessPipes; +import org.elasticsearch.xpack.ml.utils.NamedPipeHelper; + +import java.io.IOException; +import java.nio.file.Path; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ExecutorService; +import java.util.function.Consumer; + +public class NativeMemoryUsageEstimationProcessFactory implements AnalyticsProcessFactory { + + private static final Logger LOGGER = LogManager.getLogger(NativeMemoryUsageEstimationProcessFactory.class); + + private static final NamedPipeHelper NAMED_PIPE_HELPER = new NamedPipeHelper(); + + private final Environment env; + private final NativeController nativeController; + private volatile Duration processConnectTimeout; + + public NativeMemoryUsageEstimationProcessFactory(Environment env, NativeController nativeController, ClusterService clusterService) { + this.env = Objects.requireNonNull(env); + this.nativeController = Objects.requireNonNull(nativeController); + setProcessConnectTimeout(MachineLearning.PROCESS_CONNECT_TIMEOUT.get(env.settings())); + clusterService.getClusterSettings().addSettingsUpdateConsumer( + MachineLearning.PROCESS_CONNECT_TIMEOUT, this::setProcessConnectTimeout); + } + + void setProcessConnectTimeout(TimeValue processConnectTimeout) { + this.processConnectTimeout = Duration.ofMillis(processConnectTimeout.getMillis()); + } + + @Override + public NativeMemoryUsageEstimationProcess createAnalyticsProcess( + String jobId, + AnalyticsProcessConfig analyticsProcessConfig, + ExecutorService executorService, + Consumer onProcessCrash) { + List filesToDelete = new ArrayList<>(); + ProcessPipes processPipes = new ProcessPipes( + env, NAMED_PIPE_HELPER, AnalyticsBuilder.ANALYTICS, jobId, true, false, false, true, false, false); + + createNativeProcess(jobId, analyticsProcessConfig, filesToDelete, processPipes); + + NativeMemoryUsageEstimationProcess process = new NativeMemoryUsageEstimationProcess( + jobId, + processPipes.getLogStream().get(), + // Memory estimation process does not use the input pipe, hence null. + null, + processPipes.getProcessOutStream().get(), + null, + 0, + filesToDelete, + onProcessCrash); + + try { + process.start(executorService); + return process; + } catch (EsRejectedExecutionException e) { + try { + IOUtils.close(process); + } catch (IOException ioe) { + LOGGER.error("Can't close data frame analytics memory usage estimation process", ioe); + } + throw e; + } + } + + private void createNativeProcess(String jobId, AnalyticsProcessConfig analyticsProcessConfig, List filesToDelete, + ProcessPipes processPipes) { + AnalyticsBuilder analyticsBuilder = + new AnalyticsBuilder(env::tmpFile, nativeController, processPipes, analyticsProcessConfig, filesToDelete) + .performMemoryUsageEstimationOnly(); + try { + analyticsBuilder.build(); + processPipes.connectStreams(processConnectTimeout); + } catch (IOException e) { + String msg = "Failed to launch data frame analytics memory usage estimation process for job " + jobId; + LOGGER.error(msg); + throw ExceptionsHelper.serverError(msg, e); + } + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResult.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/results/AnalyticsResult.java similarity index 82% rename from x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResult.java rename to x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/results/AnalyticsResult.java index ced64ab04a280..8118c3645f130 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResult.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/results/AnalyticsResult.java @@ -3,29 +3,30 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.ml.dataframe.process; +package org.elasticsearch.xpack.ml.dataframe.process.results; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.xpack.ml.dataframe.process.results.RowResults; import java.io.IOException; import java.util.Objects; +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; + public class AnalyticsResult implements ToXContentObject { public static final ParseField TYPE = new ParseField("analytics_result"); public static final ParseField PROGRESS_PERCENT = new ParseField("progress_percent"); - static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(TYPE.getPreferredName(), + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(TYPE.getPreferredName(), a -> new AnalyticsResult((RowResults) a[0], (Integer) a[1])); static { - PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), RowResults.PARSER, RowResults.TYPE); - PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), PROGRESS_PERCENT); + PARSER.declareObject(optionalConstructorArg(), RowResults.PARSER, RowResults.TYPE); + PARSER.declareInt(optionalConstructorArg(), PROGRESS_PERCENT); } private final RowResults rowResults; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/results/MemoryUsageEstimationResult.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/results/MemoryUsageEstimationResult.java new file mode 100644 index 0000000000000..03fcb3a52ca42 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/results/MemoryUsageEstimationResult.java @@ -0,0 +1,97 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.dataframe.process.results; + +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Objects; + +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; + +public class MemoryUsageEstimationResult implements ToXContentObject { + + public static final ParseField TYPE = new ParseField("memory_usage_estimation_result"); + + public static final ParseField EXPECTED_MEMORY_USAGE_WITH_ONE_PARTITION = new ParseField("expected_memory_usage_with_one_partition"); + public static final ParseField EXPECTED_MEMORY_USAGE_WITH_MAX_PARTITIONS = new ParseField("expected_memory_usage_with_max_partitions"); + + public static final ConstructingObjectParser PARSER = + new ConstructingObjectParser<>( + TYPE.getPreferredName(), + true, + args -> new MemoryUsageEstimationResult((ByteSizeValue) args[0], (ByteSizeValue) args[1])); + + static { + PARSER.declareField( + optionalConstructorArg(), + (p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), EXPECTED_MEMORY_USAGE_WITH_ONE_PARTITION.getPreferredName()), + EXPECTED_MEMORY_USAGE_WITH_ONE_PARTITION, + ObjectParser.ValueType.VALUE); + PARSER.declareField( + optionalConstructorArg(), + (p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), EXPECTED_MEMORY_USAGE_WITH_MAX_PARTITIONS.getPreferredName()), + EXPECTED_MEMORY_USAGE_WITH_MAX_PARTITIONS, + ObjectParser.ValueType.VALUE); + } + + private final ByteSizeValue expectedMemoryUsageWithOnePartition; + private final ByteSizeValue expectedMemoryUsageWithMaxPartitions; + + public MemoryUsageEstimationResult(@Nullable ByteSizeValue expectedMemoryUsageWithOnePartition, + @Nullable ByteSizeValue expectedMemoryUsageWithMaxPartitions) { + this.expectedMemoryUsageWithOnePartition = expectedMemoryUsageWithOnePartition; + this.expectedMemoryUsageWithMaxPartitions = expectedMemoryUsageWithMaxPartitions; + } + + public ByteSizeValue getExpectedMemoryUsageWithOnePartition() { + return expectedMemoryUsageWithOnePartition; + } + + public ByteSizeValue getExpectedMemoryUsageWithMaxPartitions() { + return expectedMemoryUsageWithMaxPartitions; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + if (expectedMemoryUsageWithOnePartition != null) { + builder.field( + EXPECTED_MEMORY_USAGE_WITH_ONE_PARTITION.getPreferredName(), expectedMemoryUsageWithOnePartition.getStringRep()); + } + if (expectedMemoryUsageWithMaxPartitions != null) { + builder.field( + EXPECTED_MEMORY_USAGE_WITH_MAX_PARTITIONS.getPreferredName(), expectedMemoryUsageWithMaxPartitions.getStringRep()); + } + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (other == null || getClass() != other.getClass()) { + return false; + } + + MemoryUsageEstimationResult that = (MemoryUsageEstimationResult) other; + return Objects.equals(expectedMemoryUsageWithOnePartition, that.expectedMemoryUsageWithOnePartition) + && Objects.equals(expectedMemoryUsageWithMaxPartitions, that.expectedMemoryUsageWithMaxPartitions); + } + + @Override + public int hashCode() { + return Objects.hash(expectedMemoryUsageWithOnePartition, expectedMemoryUsageWithMaxPartitions); + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/results/RowResults.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/results/RowResults.java index f32e13703212d..509464456d9df 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/results/RowResults.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/results/RowResults.java @@ -14,6 +14,8 @@ import java.util.Map; import java.util.Objects; +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; + public class RowResults implements ToXContentObject { public static final ParseField TYPE = new ParseField("row_results"); @@ -25,8 +27,8 @@ public class RowResults implements ToXContentObject { a -> new RowResults((Integer) a[0], (Map) a[1])); static { - PARSER.declareInt(ConstructingObjectParser.constructorArg(), CHECKSUM); - PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, context) -> p.map(), RESULTS); + PARSER.declareInt(constructorArg(), CHECKSUM); + PARSER.declareObject(constructorArg(), (p, context) -> p.map(), RESULTS); } private final int checksum; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java index 60673467ba0e4..e0117109691e8 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java @@ -62,7 +62,7 @@ protected AbstractNativeProcess(String jobId, InputStream logStream, OutputStrea Consumer onProcessCrash) { this.jobId = jobId; cppLogHandler = new CppLogMessageHandler(jobId, logStream); - this.processInStream = new BufferedOutputStream(processInStream); + this.processInStream = processInStream != null ? new BufferedOutputStream(processInStream) : null; this.processOutStream = processOutStream; this.processRestoreStream = processRestoreStream; this.recordWriter = new LengthEncodedWriter(this.processInStream); @@ -87,19 +87,32 @@ public void start(ExecutorService executorService) { LOGGER.error(new ParameterizedMessage("[{}] Error tailing {} process logs", jobId, getName()), e); } } finally { - if (processCloseInitiated == false && processKilled == false) { - // The log message doesn't say "crashed", as the process could have been killed - // by a user or other process (e.g. the Linux OOM killer) - - String errors = cppLogHandler.getErrors(); - String fullError = String.format(Locale.ROOT, "[%s] %s process stopped unexpectedly: %s", jobId, getName(), errors); - LOGGER.error(fullError); - onProcessCrash.accept(fullError); - } + detectCrash(); } }); } + /** + * Try detecting whether the process crashed i.e. stopped prematurely without any known reason. + */ + private void detectCrash() { + if (processCloseInitiated || processKilled) { + // Do not detect crash when the process is being closed or killed. + return; + } + if (processInStream == null) { + // Do not detect crash when the process has been closed automatically. + // This is possible when the process does not have input pipe to hang on and closes right after writing its output. + return; + } + // The log message doesn't say "crashed", as the process could have been killed + // by a user or other process (e.g. the Linux OOM killer) + String errors = cppLogHandler.getErrors(); + String fullError = String.format(Locale.ROOT, "[%s] %s process stopped unexpectedly: %s", jobId, getName(), errors); + LOGGER.error(fullError); + onProcessCrash.accept(fullError); + } + /** * Starts a process that may persist its state * @param executorService the executor service to run on @@ -147,7 +160,9 @@ public void close() throws IOException { try { processCloseInitiated = true; // closing its input causes the process to exit - processInStream.close(); + if (processInStream != null) { + processInStream.close(); + } // wait for the process to exit by waiting for end-of-file on the named pipe connected // to the state processor - it may take a long time for all the model state to be // indexed @@ -192,7 +207,9 @@ public void kill() throws IOException { LOGGER.warn("[{}] Failed to get PID of {} process to kill", jobId, getName()); } finally { try { - processInStream.close(); + if (processInStream != null) { + processInStream.close(); + } } catch (IOException e) { // Ignore it - we're shutting down and the method itself has logged a warning } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/dataframe/RestEstimateMemoryUsageAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/dataframe/RestEstimateMemoryUsageAction.java new file mode 100644 index 0000000000000..cf426adbb9886 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/dataframe/RestEstimateMemoryUsageAction.java @@ -0,0 +1,37 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.rest.dataframe; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xpack.core.ml.action.EstimateMemoryUsageAction; +import org.elasticsearch.xpack.ml.MachineLearning; + +import java.io.IOException; + +public class RestEstimateMemoryUsageAction extends BaseRestHandler { + + public RestEstimateMemoryUsageAction(RestController controller) { + controller.registerHandler( + RestRequest.Method.POST, + MachineLearning.BASE_PATH + "data_frame/analytics/_estimate_memory_usage", this); + } + + @Override + public String getName() { + return "ml_estimate_memory_usage_action"; + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { + EstimateMemoryUsageAction.Request request = + EstimateMemoryUsageAction.Request.parseRequest(restRequest.contentOrSourceParamParser()); + return channel -> client.execute(EstimateMemoryUsageAction.INSTANCE, request, new RestToXContentListener<>(channel)); + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsIndexTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsIndexTests.java index 7079a3295bd17..8d85869e97967 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsIndexTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsIndexTests.java @@ -58,7 +58,8 @@ public class DataFrameAnalyticsIndexTests extends ESTestCase { private static final String[] SOURCE_INDEX = new String[] {"source-index"}; private static final String DEST_INDEX = "dest-index"; private static final DataFrameAnalyticsConfig ANALYTICS_CONFIG = - new DataFrameAnalyticsConfig.Builder(ANALYTICS_ID) + new DataFrameAnalyticsConfig.Builder() + .setId(ANALYTICS_ID) .setSource(new DataFrameAnalyticsSource(SOURCE_INDEX, null)) .setDest(new DataFrameAnalyticsDest(DEST_INDEX, null)) .setAnalysis(new OutlierDetection()) diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/SourceDestValidatorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/SourceDestValidatorTests.java index d48d079314aa6..7df1af62449e4 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/SourceDestValidatorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/SourceDestValidatorTests.java @@ -63,7 +63,8 @@ public class SourceDestValidatorTests extends ESTestCase { } public void testCheck_GivenSimpleSourceIndexAndValidDestIndex() { - DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder("test") + DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder() + .setId("test") .setSource(createSource("source-1")) .setDest(new DataFrameAnalyticsDest("dest", null)) .setAnalysis(new OutlierDetection()) @@ -74,7 +75,8 @@ public void testCheck_GivenSimpleSourceIndexAndValidDestIndex() { } public void testCheck_GivenMissingConcreteSourceIndex() { - DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder("test") + DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder() + .setId("test") .setSource(createSource("missing")) .setDest(new DataFrameAnalyticsDest("dest", null)) .setAnalysis(new OutlierDetection()) @@ -88,7 +90,8 @@ public void testCheck_GivenMissingConcreteSourceIndex() { } public void testCheck_GivenMissingWildcardSourceIndex() { - DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder("test") + DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder() + .setId("test") .setSource(createSource("missing*")) .setDest(new DataFrameAnalyticsDest("dest", null)) .setAnalysis(new OutlierDetection()) @@ -102,7 +105,8 @@ public void testCheck_GivenMissingWildcardSourceIndex() { } public void testCheck_GivenDestIndexSameAsSourceIndex() { - DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder("test") + DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder() + .setId("test") .setSource(createSource("source-1")) .setDest(new DataFrameAnalyticsDest("source-1", null)) .setAnalysis(new OutlierDetection()) @@ -116,7 +120,8 @@ public void testCheck_GivenDestIndexSameAsSourceIndex() { } public void testCheck_GivenDestIndexMatchesSourceIndex() { - DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder("test") + DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder() + .setId("test") .setSource(createSource("source-*")) .setDest(new DataFrameAnalyticsDest(SOURCE_2, null)) .setAnalysis(new OutlierDetection()) @@ -130,7 +135,8 @@ public void testCheck_GivenDestIndexMatchesSourceIndex() { } public void testCheck_GivenDestIndexMatchesOneOfSourceIndices() { - DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder("test") + DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder() + .setId("test") .setSource(createSource("source-1,source-*")) .setDest(new DataFrameAnalyticsDest(SOURCE_2, null)) .setAnalysis(new OutlierDetection()) @@ -144,7 +150,8 @@ public void testCheck_GivenDestIndexMatchesOneOfSourceIndices() { } public void testCheck_GivenDestIndexIsAliasThatMatchesMultipleIndices() { - DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder("test") + DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder() + .setId("test") .setSource(createSource(SOURCE_1)) .setDest(new DataFrameAnalyticsDest("dest-alias", null)) .setAnalysis(new OutlierDetection()) @@ -159,7 +166,8 @@ public void testCheck_GivenDestIndexIsAliasThatMatchesMultipleIndices() { } public void testCheck_GivenDestIndexIsAliasThatIsIncludedInSource() { - DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder("test") + DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder() + .setId("test") .setSource(createSource("source-1")) .setDest(new DataFrameAnalyticsDest("source-1-alias", null)) .setAnalysis(new OutlierDetection()) diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetectorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetectorTests.java index 5f781538bec24..6d51923f68c75 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetectorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetectorTests.java @@ -43,7 +43,7 @@ public void testDetect_GivenFloatField() { .addAggregatableField("some_float", "float").build(); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities); + SOURCE_INDEX, buildOutlierDetectionConfig(), RESULTS_FIELD, false, 100, fieldCapabilities); ExtractedFields extractedFields = extractedFieldsDetector.detect(); List allFields = extractedFields.getAllFields(); @@ -58,7 +58,7 @@ public void testDetect_GivenNumericFieldWithMultipleTypes() { .build(); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities); + SOURCE_INDEX, buildOutlierDetectionConfig(), RESULTS_FIELD, false, 100, fieldCapabilities); ExtractedFields extractedFields = extractedFieldsDetector.detect(); List allFields = extractedFields.getAllFields(); @@ -72,7 +72,7 @@ public void testDetect_GivenNonNumericField() { .addAggregatableField("some_keyword", "keyword").build(); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities); + SOURCE_INDEX, buildOutlierDetectionConfig(), RESULTS_FIELD, false, 100, fieldCapabilities); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); assertThat(e.getMessage(), equalTo("No compatible fields could be detected in index [source_index]")); @@ -83,7 +83,7 @@ public void testDetect_GivenOutlierDetectionAndFieldWithNumericAndNonNumericType .addAggregatableField("indecisive_field", "float", "keyword").build(); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities); + SOURCE_INDEX, buildOutlierDetectionConfig(), RESULTS_FIELD, false, 100, fieldCapabilities); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); assertThat(e.getMessage(), equalTo("No compatible fields could be detected in index [source_index]")); @@ -97,7 +97,7 @@ public void testDetect_GivenOutlierDetectionAndMultipleFields() { .build(); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities); + SOURCE_INDEX, buildOutlierDetectionConfig(), RESULTS_FIELD, false, 100, fieldCapabilities); ExtractedFields extractedFields = extractedFieldsDetector.detect(); List allFields = extractedFields.getAllFields(); @@ -117,7 +117,7 @@ public void testDetect_GivenRegressionAndMultipleFields() { .build(); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildRegressionConfig("foo"), false, 100, fieldCapabilities); + SOURCE_INDEX, buildRegressionConfig("foo"), RESULTS_FIELD, false, 100, fieldCapabilities); ExtractedFields extractedFields = extractedFieldsDetector.detect(); List allFields = extractedFields.getAllFields(); @@ -136,7 +136,7 @@ public void testDetect_GivenRegressionAndRequiredFieldMissing() { .build(); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildRegressionConfig("foo"), false, 100, fieldCapabilities); + SOURCE_INDEX, buildRegressionConfig("foo"), RESULTS_FIELD, false, 100, fieldCapabilities); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); assertThat(e.getMessage(), equalTo("required fields [foo] are missing")); @@ -147,7 +147,7 @@ public void testDetect_GivenIgnoredField() { .addAggregatableField("_id", "float").build(); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities); + SOURCE_INDEX, buildOutlierDetectionConfig(), RESULTS_FIELD, false, 100, fieldCapabilities); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); assertThat(e.getMessage(), equalTo("No compatible fields could be detected in index [source_index]")); @@ -169,7 +169,7 @@ public void testDetect_ShouldSortFieldsAlphabetically() { FieldCapabilitiesResponse fieldCapabilities = mockFieldCapsResponseBuilder.build(); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities); + SOURCE_INDEX, buildOutlierDetectionConfig(), RESULTS_FIELD, false, 100, fieldCapabilities); ExtractedFields extractedFields = extractedFieldsDetector.detect(); List extractedFieldNames = extractedFields.getAllFields().stream().map(ExtractedField::getName) @@ -186,7 +186,7 @@ public void testDetectedExtractedFields_GivenIncludeWithMissingField() { FetchSourceContext desiredFields = new FetchSourceContext(true, new String[]{"your_field1", "my*"}, new String[0]); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildOutlierDetectionConfig(desiredFields), false, 100, fieldCapabilities); + SOURCE_INDEX, buildOutlierDetectionConfig(desiredFields), RESULTS_FIELD, false, 100, fieldCapabilities); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); assertThat(e.getMessage(), equalTo("No field [your_field1] could be detected")); @@ -201,7 +201,7 @@ public void testDetectedExtractedFields_GivenExcludeAllValidFields() { FetchSourceContext desiredFields = new FetchSourceContext(true, new String[0], new String[]{"my_*"}); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildOutlierDetectionConfig(desiredFields), false, 100, fieldCapabilities); + SOURCE_INDEX, buildOutlierDetectionConfig(desiredFields), RESULTS_FIELD, false, 100, fieldCapabilities); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); assertThat(e.getMessage(), equalTo("No compatible fields could be detected in index [source_index]")); } @@ -217,7 +217,7 @@ public void testDetectedExtractedFields_GivenInclusionsAndExclusions() { FetchSourceContext desiredFields = new FetchSourceContext(true, new String[]{"your*", "my_*"}, new String[]{"*nope"}); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildOutlierDetectionConfig(desiredFields), false, 100, fieldCapabilities); + SOURCE_INDEX, buildOutlierDetectionConfig(desiredFields), RESULTS_FIELD, false, 100, fieldCapabilities); ExtractedFields extractedFields = extractedFieldsDetector.detect(); List extractedFieldNames = extractedFields.getAllFields().stream().map(ExtractedField::getName) @@ -234,7 +234,7 @@ public void testDetectedExtractedFields_GivenIndexContainsResultsField() { .build(); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities); + SOURCE_INDEX, buildOutlierDetectionConfig(), RESULTS_FIELD, false, 100, fieldCapabilities); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); assertThat(e.getMessage(), equalTo("A field that matches the dest.results_field [ml] already exists; " + @@ -250,7 +250,7 @@ public void testDetectedExtractedFields_GivenIndexContainsResultsFieldAndTaskIsR .build(); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildOutlierDetectionConfig(), true, 100, fieldCapabilities); + SOURCE_INDEX, buildOutlierDetectionConfig(), RESULTS_FIELD, true, 100, fieldCapabilities); ExtractedFields extractedFields = extractedFieldsDetector.detect(); List extractedFieldNames = extractedFields.getAllFields().stream().map(ExtractedField::getName) @@ -258,6 +258,23 @@ public void testDetectedExtractedFields_GivenIndexContainsResultsFieldAndTaskIsR assertThat(extractedFieldNames, equalTo(Arrays.asList("my_field1", "your_field2"))); } + public void testDetectedExtractedFields_NullResultsField() { + FieldCapabilitiesResponse fieldCapabilities = new MockFieldCapsResponseBuilder() + .addAggregatableField(RESULTS_FIELD, "float") + .addAggregatableField("my_field1", "float") + .addAggregatableField("your_field2", "float") + .addAggregatableField("your_keyword", "keyword") + .build(); + + ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( + SOURCE_INDEX, buildOutlierDetectionConfig(), null, false, 100, fieldCapabilities); + ExtractedFields extractedFields = extractedFieldsDetector.detect(); + + List extractedFieldNames = extractedFields.getAllFields().stream().map(ExtractedField::getName) + .collect(Collectors.toList()); + assertThat(extractedFieldNames, equalTo(Arrays.asList(RESULTS_FIELD, "my_field1", "your_field2"))); + } + public void testDetectedExtractedFields_GivenLessFieldsThanDocValuesLimit() { FieldCapabilitiesResponse fieldCapabilities = new MockFieldCapsResponseBuilder() .addAggregatableField("field_1", "float") @@ -267,7 +284,7 @@ public void testDetectedExtractedFields_GivenLessFieldsThanDocValuesLimit() { .build(); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildOutlierDetectionConfig(), true, 4, fieldCapabilities); + SOURCE_INDEX, buildOutlierDetectionConfig(), RESULTS_FIELD, true, 4, fieldCapabilities); ExtractedFields extractedFields = extractedFieldsDetector.detect(); List extractedFieldNames = extractedFields.getAllFields().stream().map(ExtractedField::getName) @@ -286,7 +303,7 @@ public void testDetectedExtractedFields_GivenEqualFieldsToDocValuesLimit() { .build(); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildOutlierDetectionConfig(), true, 3, fieldCapabilities); + SOURCE_INDEX, buildOutlierDetectionConfig(), RESULTS_FIELD, true, 3, fieldCapabilities); ExtractedFields extractedFields = extractedFieldsDetector.detect(); List extractedFieldNames = extractedFields.getAllFields().stream().map(ExtractedField::getName) @@ -305,7 +322,7 @@ public void testDetectedExtractedFields_GivenMoreFieldsThanDocValuesLimit() { .build(); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildOutlierDetectionConfig(), true, 2, fieldCapabilities); + SOURCE_INDEX, buildOutlierDetectionConfig(), RESULTS_FIELD, true, 2, fieldCapabilities); ExtractedFields extractedFields = extractedFieldsDetector.detect(); List extractedFieldNames = extractedFields.getAllFields().stream().map(ExtractedField::getName) @@ -320,9 +337,10 @@ private static DataFrameAnalyticsConfig buildOutlierDetectionConfig() { } private static DataFrameAnalyticsConfig buildOutlierDetectionConfig(FetchSourceContext analyzedFields) { - return new DataFrameAnalyticsConfig.Builder("foo") + return new DataFrameAnalyticsConfig.Builder() + .setId("foo") .setSource(new DataFrameAnalyticsSource(SOURCE_INDEX, null)) - .setDest(new DataFrameAnalyticsDest(DEST_INDEX, null)) + .setDest(new DataFrameAnalyticsDest(DEST_INDEX, RESULTS_FIELD)) .setAnalyzedFields(analyzedFields) .setAnalysis(new OutlierDetection()) .build(); @@ -333,9 +351,10 @@ private static DataFrameAnalyticsConfig buildRegressionConfig(String dependentVa } private static DataFrameAnalyticsConfig buildRegressionConfig(String dependentVariable, FetchSourceContext analyzedFields) { - return new DataFrameAnalyticsConfig.Builder("foo") + return new DataFrameAnalyticsConfig.Builder() + .setId("foo") .setSource(new DataFrameAnalyticsSource(SOURCE_INDEX, null)) - .setDest(new DataFrameAnalyticsDest(DEST_INDEX, null)) + .setDest(new DataFrameAnalyticsDest(DEST_INDEX, RESULTS_FIELD)) .setAnalyzedFields(analyzedFields) .setAnalysis(new Regression(dependentVariable)) .build(); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsBuilderTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsBuilderTests.java new file mode 100644 index 0000000000000..186a52fb5d307 --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsBuilderTests.java @@ -0,0 +1,70 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.dataframe.process; + +import org.apache.lucene.util.LuceneTestCase; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ml.process.NativeController; +import org.elasticsearch.xpack.ml.process.ProcessPipes; +import org.junit.Before; +import org.mockito.ArgumentCaptor; + +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; + +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.not; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; + +public class AnalyticsBuilderTests extends ESTestCase { + + private NativeController nativeController; + private ProcessPipes processPipes; + private AnalyticsProcessConfig config; + private List filesToDelete; + private ArgumentCaptor> commandCaptor; + private AnalyticsBuilder analyticsBuilder; + + @SuppressWarnings("unchecked") + @Before + public void setUpMocks() { + nativeController = mock(NativeController.class); + processPipes = mock(ProcessPipes.class); + config = mock(AnalyticsProcessConfig.class); + filesToDelete = new ArrayList<>(); + commandCaptor = ArgumentCaptor.forClass((Class) List.class); + + analyticsBuilder = new AnalyticsBuilder(LuceneTestCase::createTempDir, nativeController, processPipes, config, filesToDelete); + } + + public void testBuild_Analytics() throws Exception { + analyticsBuilder.build(); + assertThat(filesToDelete, hasSize(1)); + + verify(nativeController).startProcess(commandCaptor.capture()); + verifyNoMoreInteractions(nativeController); + + List command = commandCaptor.getValue(); + assertThat(command, not(hasItem("--memoryUsageEstimationOnly"))); + } + + public void testBuild_MemoryUsageEstimation() throws Exception { + analyticsBuilder + .performMemoryUsageEstimationOnly() + .build(); + assertThat(filesToDelete, hasSize(1)); + + verify(nativeController).startProcess(commandCaptor.capture()); + verifyNoMoreInteractions(nativeController); + + List command = commandCaptor.getValue(); + assertThat(command, hasItem("--memoryUsageEstimationOnly")); + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessorTests.java index 4032f2d65bf34..097437ce8a40f 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessorTests.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ml.dataframe.process; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ml.dataframe.process.results.AnalyticsResult; import org.elasticsearch.xpack.ml.dataframe.process.results.RowResults; import org.junit.Before; import org.mockito.InOrder; @@ -25,12 +26,13 @@ public class AnalyticsResultProcessorTests extends ESTestCase { private static final String JOB_ID = "analytics-result-processor-tests"; - private AnalyticsProcess process; + private AnalyticsProcess process; private DataFrameRowsJoiner dataFrameRowsJoiner; private int progressPercent; @Before + @SuppressWarnings("unchecked") public void setUpMocks() { process = mock(AnalyticsProcess.class); dataFrameRowsJoiner = mock(DataFrameRowsJoiner.class); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/MemoryUsageEstimationProcessManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/MemoryUsageEstimationProcessManagerTests.java new file mode 100644 index 0000000000000..82532ca430f42 --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/MemoryUsageEstimationProcessManagerTests.java @@ -0,0 +1,183 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.dataframe.process; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfigTests; +import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractor; +import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractorFactory; +import org.elasticsearch.xpack.ml.dataframe.process.results.MemoryUsageEstimationResult; +import org.junit.Before; +import org.mockito.ArgumentCaptor; +import org.mockito.InOrder; + +import java.util.Arrays; +import java.util.concurrent.ExecutorService; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +public class MemoryUsageEstimationProcessManagerTests extends ESTestCase { + + private static final String TASK_ID = "mem_est_123"; + private static final String CONFIG_ID = "dummy"; + private static final int NUM_ROWS = 100; + private static final int NUM_COLS = 4; + private static final MemoryUsageEstimationResult PROCESS_RESULT_ZERO = + new MemoryUsageEstimationResult(ByteSizeValue.ZERO, ByteSizeValue.ZERO); + private static final MemoryUsageEstimationResult PROCESS_RESULT = + new MemoryUsageEstimationResult(ByteSizeValue.parseBytesSizeValue("20kB", ""), ByteSizeValue.parseBytesSizeValue("10kB", "")); + + private ExecutorService executorServiceForJob; + private ExecutorService executorServiceForProcess; + private AnalyticsProcess process; + private AnalyticsProcessFactory processFactory; + private DataFrameDataExtractor dataExtractor; + private DataFrameDataExtractorFactory dataExtractorFactory; + private DataFrameAnalyticsConfig dataFrameAnalyticsConfig; + private ActionListener listener; + private ArgumentCaptor resultCaptor; + private ArgumentCaptor exceptionCaptor; + private MemoryUsageEstimationProcessManager processManager; + + @SuppressWarnings("unchecked") + @Before + public void setUpMocks() { + executorServiceForJob = EsExecutors.newDirectExecutorService(); + executorServiceForProcess = mock(ExecutorService.class); + process = mock(AnalyticsProcess.class); + when(process.isProcessAlive()).thenReturn(true); + when(process.readAnalyticsResults()).thenReturn(Arrays.asList(PROCESS_RESULT).iterator()); + processFactory = mock(AnalyticsProcessFactory.class); + when(processFactory.createAnalyticsProcess(anyString(), any(), any(), any())).thenReturn(process); + dataExtractor = mock(DataFrameDataExtractor.class); + when(dataExtractor.collectDataSummary()).thenReturn(new DataFrameDataExtractor.DataSummary(NUM_ROWS, NUM_COLS)); + dataExtractorFactory = mock(DataFrameDataExtractorFactory.class); + when(dataExtractorFactory.newExtractor(anyBoolean())).thenReturn(dataExtractor); + dataFrameAnalyticsConfig = DataFrameAnalyticsConfigTests.createRandom(CONFIG_ID); + listener = mock(ActionListener.class); + resultCaptor = ArgumentCaptor.forClass(MemoryUsageEstimationResult.class); + exceptionCaptor = ArgumentCaptor.forClass(Exception.class); + + processManager = new MemoryUsageEstimationProcessManager(executorServiceForJob, executorServiceForProcess, processFactory); + } + + public void testRunJob_EmptyDataFrame() { + when(dataExtractor.collectDataSummary()).thenReturn(new DataFrameDataExtractor.DataSummary(0, NUM_COLS)); + + processManager.runJobAsync(TASK_ID, dataFrameAnalyticsConfig, dataExtractorFactory, listener); + + verify(listener).onResponse(resultCaptor.capture()); + MemoryUsageEstimationResult result = resultCaptor.getValue(); + assertThat(result, equalTo(PROCESS_RESULT_ZERO)); + + verifyNoMoreInteractions(process, listener); + } + + public void testRunJob_ProcessNotAlive() { + when(process.isProcessAlive()).thenReturn(false); + + processManager.runJobAsync(TASK_ID, dataFrameAnalyticsConfig, dataExtractorFactory, listener); + + verify(listener).onFailure(exceptionCaptor.capture()); + ElasticsearchException exception = (ElasticsearchException) exceptionCaptor.getValue(); + assertThat(exception.status(), equalTo(RestStatus.INTERNAL_SERVER_ERROR)); + assertThat(exception.getMessage(), containsString(TASK_ID)); + assertThat(exception.getMessage(), containsString("Error while starting process")); + + verify(process).isProcessAlive(); + verifyNoMoreInteractions(process, listener); + } + + public void testRunJob_NoResults() throws Exception { + when(process.readAnalyticsResults()).thenReturn(Arrays.asList().iterator()); + + processManager.runJobAsync(TASK_ID, dataFrameAnalyticsConfig, dataExtractorFactory, listener); + + verify(listener).onFailure(exceptionCaptor.capture()); + ElasticsearchException exception = (ElasticsearchException) exceptionCaptor.getValue(); + assertThat(exception.status(), equalTo(RestStatus.INTERNAL_SERVER_ERROR)); + assertThat(exception.getMessage(), containsString(TASK_ID)); + assertThat(exception.getMessage(), containsString("no results")); + + InOrder inOrder = inOrder(process); + inOrder.verify(process).isProcessAlive(); + inOrder.verify(process).readAnalyticsResults(); + inOrder.verify(process).consumeAndCloseOutputStream(); + inOrder.verify(process).close(); + verifyNoMoreInteractions(process, listener); + } + + public void testRunJob_MultipleResults() throws Exception { + when(process.readAnalyticsResults()).thenReturn(Arrays.asList(PROCESS_RESULT, PROCESS_RESULT).iterator()); + + processManager.runJobAsync(TASK_ID, dataFrameAnalyticsConfig, dataExtractorFactory, listener); + + verify(listener).onFailure(exceptionCaptor.capture()); + ElasticsearchException exception = (ElasticsearchException) exceptionCaptor.getValue(); + assertThat(exception.status(), equalTo(RestStatus.INTERNAL_SERVER_ERROR)); + assertThat(exception.getMessage(), containsString(TASK_ID)); + assertThat(exception.getMessage(), containsString("more than one result")); + + InOrder inOrder = inOrder(process); + inOrder.verify(process).isProcessAlive(); + inOrder.verify(process).readAnalyticsResults(); + inOrder.verify(process).consumeAndCloseOutputStream(); + inOrder.verify(process).close(); + verifyNoMoreInteractions(process, listener); + + } + + public void testRunJob_FailsOnClose() throws Exception { + doThrow(ExceptionsHelper.serverError("some LOG(ERROR) lines coming from cpp process")).when(process).close(); + + processManager.runJobAsync(TASK_ID, dataFrameAnalyticsConfig, dataExtractorFactory, listener); + + verify(listener).onFailure(exceptionCaptor.capture()); + ElasticsearchException exception = (ElasticsearchException) exceptionCaptor.getValue(); + assertThat(exception.status(), equalTo(RestStatus.INTERNAL_SERVER_ERROR)); + assertThat(exception.getMessage(), containsString(TASK_ID)); + assertThat(exception.getMessage(), containsString("Error while closing process")); + + InOrder inOrder = inOrder(process); + inOrder.verify(process).isProcessAlive(); + inOrder.verify(process).readAnalyticsResults(); + inOrder.verify(process).consumeAndCloseOutputStream(); + inOrder.verify(process).close(); + verifyNoMoreInteractions(process, listener); + } + + public void testRunJob_Ok() throws Exception { + processManager.runJobAsync(TASK_ID, dataFrameAnalyticsConfig, dataExtractorFactory, listener); + + verify(listener).onResponse(resultCaptor.capture()); + MemoryUsageEstimationResult result = resultCaptor.getValue(); + assertThat(result, equalTo(PROCESS_RESULT)); + + InOrder inOrder = inOrder(process); + inOrder.verify(process).isProcessAlive(); + inOrder.verify(process).readAnalyticsResults(); + inOrder.verify(process).consumeAndCloseOutputStream(); + inOrder.verify(process).close(); + verifyNoMoreInteractions(process, listener); + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/results/AnalyticsResultTests.java similarity index 84% rename from x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultTests.java rename to x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/results/AnalyticsResultTests.java index 22c03d47682e8..13ef2ac502494 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/results/AnalyticsResultTests.java @@ -3,12 +3,10 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.ml.dataframe.process; +package org.elasticsearch.xpack.ml.dataframe.process.results; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.test.AbstractXContentTestCase; -import org.elasticsearch.xpack.ml.dataframe.process.results.RowResults; -import org.elasticsearch.xpack.ml.dataframe.process.results.RowResultsTests; import java.io.IOException; diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/results/MemoryUsageEstimationResultTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/results/MemoryUsageEstimationResultTests.java new file mode 100644 index 0000000000000..735606b35ea0d --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/results/MemoryUsageEstimationResultTests.java @@ -0,0 +1,51 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.dataframe.process.results; + +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractXContentTestCase; + +import java.io.IOException; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; + +public class MemoryUsageEstimationResultTests extends AbstractXContentTestCase { + + public static MemoryUsageEstimationResult createRandomResult() { + return new MemoryUsageEstimationResult( + randomBoolean() ? new ByteSizeValue(randomNonNegativeLong()) : null, + randomBoolean() ? new ByteSizeValue(randomNonNegativeLong()) : null); + } + + @Override + protected MemoryUsageEstimationResult createTestInstance() { + return createRandomResult(); + } + + @Override + protected MemoryUsageEstimationResult doParseInstance(XContentParser parser) throws IOException { + return MemoryUsageEstimationResult.PARSER.apply(parser, null); + } + + @Override + protected boolean supportsUnknownFields() { + return true; + } + + public void testConstructor_NullValues() { + MemoryUsageEstimationResult result = new MemoryUsageEstimationResult(null, null); + assertThat(result.getExpectedMemoryUsageWithOnePartition(), nullValue()); + assertThat(result.getExpectedMemoryUsageWithMaxPartitions(), nullValue()); + } + + public void testConstructor() { + MemoryUsageEstimationResult result = new MemoryUsageEstimationResult(new ByteSizeValue(2048), new ByteSizeValue(1024)); + assertThat(result.getExpectedMemoryUsageWithOnePartition(), equalTo(new ByteSizeValue(2048))); + assertThat(result.getExpectedMemoryUsageWithMaxPartitions(), equalTo(new ByteSizeValue(1024))); + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcessTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcessTests.java new file mode 100644 index 0000000000000..525a9d7183da0 --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcessTests.java @@ -0,0 +1,155 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.process; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +import static org.hamcrest.Matchers.is; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +public class AbstractNativeProcessTests extends ESTestCase { + + private InputStream logStream; + private OutputStream inputStream; + private InputStream outputStream; + private OutputStream restoreStream; + private Consumer onProcessCrash; + private ExecutorService executorService; + private CountDownLatch wait = new CountDownLatch(1); + + @Before + @SuppressWarnings("unchecked") + public void initialize() throws IOException { + logStream = mock(InputStream.class); + // This answer blocks the thread on the executor service. + // In order to unblock it, the test needs to call wait.countDown(). + when(logStream.read(new byte[1024])).thenAnswer( + invocationOnMock -> { + wait.await(); + return -1; + }); + inputStream = mock(OutputStream.class); + outputStream = mock(InputStream.class); + when(outputStream.read(new byte[512])).thenReturn(-1); + restoreStream = mock(OutputStream.class); + onProcessCrash = mock(Consumer.class); + executorService = EsExecutors.newFixed("test", 1, 1, EsExecutors.daemonThreadFactory("test"), new ThreadContext(Settings.EMPTY)); + } + + @After + public void terminateExecutorService() { + ThreadPool.terminate(executorService, 10, TimeUnit.SECONDS); + verifyNoMoreInteractions(onProcessCrash); + } + + public void testStart_DoNotDetectCrashWhenNoInputPipeProvided() throws Exception { + try (AbstractNativeProcess process = new TestNativeProcess(null)) { + process.start(executorService); + wait.countDown(); + } + } + + public void testStart_DoNotDetectCrashWhenProcessIsBeingClosed() throws Exception { + try (AbstractNativeProcess process = new TestNativeProcess(inputStream)) { + process.start(executorService); + process.close(); + wait.countDown(); + } + } + + public void testStart_DoNotDetectCrashWhenProcessIsBeingKilled() throws Exception { + try (AbstractNativeProcess process = new TestNativeProcess(inputStream)) { + process.start(executorService); + process.kill(); + wait.countDown(); + } + } + + public void testStart_DetectCrashWhenInputPipeExists() throws Exception { + try (AbstractNativeProcess process = new TestNativeProcess(inputStream)) { + process.start(executorService); + wait.countDown(); + ThreadPool.terminate(executorService, 10, TimeUnit.SECONDS); + + verify(onProcessCrash).accept("[foo] test process stopped unexpectedly: "); + } + } + + public void testWriteRecord() throws Exception { + try (AbstractNativeProcess process = new TestNativeProcess(inputStream)) { + process.writeRecord(new String[] {"a", "b", "c"}); + process.flushStream(); + + verify(inputStream).write(any(), anyInt(), anyInt()); + } + } + + public void testWriteRecord_FailWhenNoInputPipeProvided() throws Exception { + try (AbstractNativeProcess process = new TestNativeProcess(null)) { + expectThrows(NullPointerException.class, () -> process.writeRecord(new String[] {"a", "b", "c"})); + } + } + + public void testFlush() throws Exception { + try (AbstractNativeProcess process = new TestNativeProcess(inputStream)) { + process.flushStream(); + + verify(inputStream).flush(); + } + } + + public void testFlush_FailWhenNoInputPipeProvided() throws Exception { + try (AbstractNativeProcess process = new TestNativeProcess(null)) { + expectThrows(NullPointerException.class, () -> process.flushStream()); + } + } + + public void testIsReady() throws Exception { + try (AbstractNativeProcess process = new TestNativeProcess(null)) { + assertThat(process.isReady(), is(false)); + process.setReady(); + assertThat(process.isReady(), is(true)); + } + } + + /** + * Mock-based implementation of {@link AbstractNativeProcess}. + */ + private class TestNativeProcess extends AbstractNativeProcess { + + TestNativeProcess(OutputStream inputStream) { + super("foo", logStream, inputStream, outputStream, restoreStream, 0, null, onProcessCrash); + } + + @Override + public String getName() { + return "test"; + } + + @Override + public void persistState() throws IOException { + } + } +} diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.estimate_memory_usage.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.estimate_memory_usage.json new file mode 100644 index 0000000000000..d9ab86533d943 --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.estimate_memory_usage.json @@ -0,0 +1,16 @@ +{ + "ml.estimate_memory_usage": { + "documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current/estimate-memory-usage-dfanalytics.html", + "stability": "experimental", + "methods": [ "POST" ], + "url": { + "path": "/_ml/data_frame/analytics/_estimate_memory_usage", + "paths": [ "/_ml/data_frame/analytics/_estimate_memory_usage" ], + "parts": {} + }, + "body": { + "description" : "Memory usage estimation definition", + "required" : true + } + } +} diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/data_frame_analytics_memory_usage_estimation.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/data_frame_analytics_memory_usage_estimation.yml new file mode 100644 index 0000000000000..0b84f31cfb22d --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/data_frame_analytics_memory_usage_estimation.yml @@ -0,0 +1,75 @@ +--- +setup: + - do: + indices.create: + index: index-source + body: + mappings: + properties: + x: + type: float + y: + type: float + +--- +"Test memory usage estimation for empty data frame": + + - do: + ml.estimate_memory_usage: + body: + data_frame_analytics_config: + source: { index: "index-source" } + analysis: { outlier_detection: {} } + - match: { expected_memory_usage_with_one_partition: "0" } + - match: { expected_memory_usage_with_max_partitions: "0" } + +--- +"Test memory usage estimation for non-empty data frame": + + - do: + index: + index: index-source + refresh: true + body: { x: 1, y: 10 } + - match: { result: "created" } + + - do: + ml.estimate_memory_usage: + body: + data_frame_analytics_config: + source: { index: "index-source" } + analysis: { outlier_detection: {} } + - match: { expected_memory_usage_with_one_partition: "3kb" } + - match: { expected_memory_usage_with_max_partitions: "3kb" } + + - do: + index: + index: index-source + refresh: true + body: { x: 2, y: 20 } + - match: { result: "created" } + + - do: + ml.estimate_memory_usage: + body: + data_frame_analytics_config: + source: { index: "index-source" } + analysis: { outlier_detection: {} } + - match: { expected_memory_usage_with_one_partition: "4kb" } + - match: { expected_memory_usage_with_max_partitions: "4kb" } + + - do: + index: + index: index-source + refresh: true + body: { x: 3, y: 30 } + - match: { result: "created" } + + - do: + ml.estimate_memory_usage: + body: + data_frame_analytics_config: + source: { index: "index-source" } + analysis: { outlier_detection: {} } + - match: { expected_memory_usage_with_one_partition: "6kb" } + - match: { expected_memory_usage_with_max_partitions: "5kb" }