Skip to content

Commit

Permalink
[7.x][ML] Data frame analytics max_num_threads setting (#59254) (#59308)
Browse files Browse the repository at this point in the history
This adds a setting to data frame analytics jobs called
`max_number_threads`. The setting expects a positive integer.
When used the user specifies the max number of threads that may
be used by the analysis. Note that the actual number of threads
used is limited by the number of processors on the node where
the job is assigned. Also, the process may use a couple more threads
for operational functionality that is not the analysis itself.

This setting may also be updated for a stopped job.

More threads may reduce the time it takes to complete the job at the cost
of using more CPU.

Backport of #59254 and #57274
  • Loading branch information
dimitris-athanasiou committed Jul 9, 2020
1 parent 28ef997 commit b224333
Show file tree
Hide file tree
Showing 21 changed files with 260 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public static Builder builder() {
static final ParseField CREATE_TIME = new ParseField("create_time");
static final ParseField VERSION = new ParseField("version");
static final ParseField ALLOW_LAZY_START = new ParseField("allow_lazy_start");
static final ParseField MAX_NUM_THREADS = new ParseField("max_num_threads");

private static final ObjectParser<Builder, Void> PARSER = new ObjectParser<>("data_frame_analytics_config", true, Builder::new);

Expand All @@ -80,6 +81,7 @@ public static Builder builder() {
ValueType.VALUE);
PARSER.declareString(Builder::setVersion, Version::fromString, VERSION);
PARSER.declareBoolean(Builder::setAllowLazyStart, ALLOW_LAZY_START);
PARSER.declareInt(Builder::setMaxNumThreads, MAX_NUM_THREADS);
}

private static DataFrameAnalysis parseAnalysis(XContentParser parser) throws IOException {
Expand All @@ -100,11 +102,13 @@ private static DataFrameAnalysis parseAnalysis(XContentParser parser) throws IOE
private final Instant createTime;
private final Version version;
private final Boolean allowLazyStart;
private final Integer maxNumThreads;

private DataFrameAnalyticsConfig(@Nullable String id, @Nullable String description, @Nullable DataFrameAnalyticsSource source,
@Nullable DataFrameAnalyticsDest dest, @Nullable DataFrameAnalysis analysis,
@Nullable FetchSourceContext analyzedFields, @Nullable ByteSizeValue modelMemoryLimit,
@Nullable Instant createTime, @Nullable Version version, @Nullable Boolean allowLazyStart) {
@Nullable Instant createTime, @Nullable Version version, @Nullable Boolean allowLazyStart,
@Nullable Integer maxNumThreads) {
this.id = id;
this.description = description;
this.source = source;
Expand All @@ -115,6 +119,7 @@ private DataFrameAnalyticsConfig(@Nullable String id, @Nullable String descripti
this.createTime = createTime == null ? null : Instant.ofEpochMilli(createTime.toEpochMilli());;
this.version = version;
this.allowLazyStart = allowLazyStart;
this.maxNumThreads = maxNumThreads;
}

public String getId() {
Expand Down Expand Up @@ -157,6 +162,10 @@ public Boolean getAllowLazyStart() {
return allowLazyStart;
}

public Integer getMaxNumThreads() {
return maxNumThreads;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
Expand Down Expand Up @@ -193,6 +202,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (allowLazyStart != null) {
builder.field(ALLOW_LAZY_START.getPreferredName(), allowLazyStart);
}
if (maxNumThreads != null) {
builder.field(MAX_NUM_THREADS.getPreferredName(), maxNumThreads);
}
builder.endObject();
return builder;
}
Expand All @@ -212,12 +224,14 @@ public boolean equals(Object o) {
&& Objects.equals(modelMemoryLimit, other.modelMemoryLimit)
&& Objects.equals(createTime, other.createTime)
&& Objects.equals(version, other.version)
&& Objects.equals(allowLazyStart, other.allowLazyStart);
&& Objects.equals(allowLazyStart, other.allowLazyStart)
&& Objects.equals(maxNumThreads, other.maxNumThreads);
}

@Override
public int hashCode() {
return Objects.hash(id, description, source, dest, analysis, analyzedFields, modelMemoryLimit, createTime, version, allowLazyStart);
return Objects.hash(id, description, source, dest, analysis, analyzedFields, modelMemoryLimit, createTime, version, allowLazyStart,
maxNumThreads);
}

@Override
Expand All @@ -237,6 +251,7 @@ public static class Builder {
private Instant createTime;
private Version version;
private Boolean allowLazyStart;
private Integer maxNumThreads;

private Builder() {}

Expand Down Expand Up @@ -290,9 +305,14 @@ public Builder setAllowLazyStart(Boolean allowLazyStart) {
return this;
}

public Builder setMaxNumThreads(Integer maxNumThreads) {
this.maxNumThreads = maxNumThreads;
return this;
}

public DataFrameAnalyticsConfig build() {
return new DataFrameAnalyticsConfig(id, description, source, dest, analysis, analyzedFields, modelMemoryLimit, createTime,
version, allowLazyStart);
version, allowLazyStart, maxNumThreads);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,22 +51,25 @@ public static Builder builder() {
DataFrameAnalyticsConfig.MODEL_MEMORY_LIMIT,
VALUE);
PARSER.declareBoolean(Builder::setAllowLazyStart, DataFrameAnalyticsConfig.ALLOW_LAZY_START);

PARSER.declareInt(Builder::setMaxNumThreads, DataFrameAnalyticsConfig.MAX_NUM_THREADS);
}

private final String id;
private final String description;
private final ByteSizeValue modelMemoryLimit;
private final Boolean allowLazyStart;
private final Integer maxNumThreads;

private DataFrameAnalyticsConfigUpdate(String id,
@Nullable String description,
@Nullable ByteSizeValue modelMemoryLimit,
@Nullable Boolean allowLazyStart) {
@Nullable Boolean allowLazyStart,
@Nullable Integer maxNumThreads) {
this.id = id;
this.description = description;
this.modelMemoryLimit = modelMemoryLimit;
this.allowLazyStart = allowLazyStart;
this.maxNumThreads = maxNumThreads;
}

public String getId() {
Expand All @@ -85,6 +88,10 @@ public Boolean isAllowLazyStart() {
return allowLazyStart;
}

public Integer getMaxNumThreads() {
return maxNumThreads;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
Expand All @@ -98,6 +105,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (allowLazyStart != null) {
builder.field(DataFrameAnalyticsConfig.ALLOW_LAZY_START.getPreferredName(), allowLazyStart);
}
if (maxNumThreads != null) {
builder.field(DataFrameAnalyticsConfig.MAX_NUM_THREADS.getPreferredName(), maxNumThreads);
}
builder.endObject();
return builder;
}
Expand All @@ -117,12 +127,13 @@ public boolean equals(Object other) {
return Objects.equals(this.id, that.id)
&& Objects.equals(this.description, that.description)
&& Objects.equals(this.modelMemoryLimit, that.modelMemoryLimit)
&& Objects.equals(this.allowLazyStart, that.allowLazyStart);
&& Objects.equals(this.allowLazyStart, that.allowLazyStart)
&& Objects.equals(this.maxNumThreads, that.maxNumThreads);
}

@Override
public int hashCode() {
return Objects.hash(id, description, modelMemoryLimit, allowLazyStart);
return Objects.hash(id, description, modelMemoryLimit, allowLazyStart, maxNumThreads);
}

public static class Builder {
Expand All @@ -131,6 +142,7 @@ public static class Builder {
private String description;
private ByteSizeValue modelMemoryLimit;
private Boolean allowLazyStart;
private Integer maxNumThreads;

private Builder() {}

Expand Down Expand Up @@ -158,8 +170,13 @@ public Builder setAllowLazyStart(Boolean allowLazyStart) {
return this;
}

public Builder setMaxNumThreads(Integer maxNumThreads) {
this.maxNumThreads = maxNumThreads;
return this;
}

public DataFrameAnalyticsConfigUpdate build() {
return new DataFrameAnalyticsConfigUpdate(id, description, modelMemoryLimit, allowLazyStart);
return new DataFrameAnalyticsConfigUpdate(id, description, modelMemoryLimit, allowLazyStart, maxNumThreads);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1338,6 +1338,7 @@ public void testPutDataFrameAnalyticsConfig_GivenOutlierDetectionAnalysis() thro
assertThat(createdConfig.getAnalyzedFields(), equalTo(config.getAnalyzedFields()));
assertThat(createdConfig.getModelMemoryLimit(), equalTo(ByteSizeValue.parseBytesSizeValue("1gb", ""))); // default value
assertThat(createdConfig.getDescription(), equalTo("some description"));
assertThat(createdConfig.getMaxNumThreads(), equalTo(1));
}

public void testPutDataFrameAnalyticsConfig_GivenRegression() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3040,6 +3040,7 @@ public void testPutDataFrameAnalytics() throws Exception {
.setAnalyzedFields(analyzedFields) // <5>
.setModelMemoryLimit(new ByteSizeValue(5, ByteSizeUnit.MB)) // <6>
.setDescription("this is an example description") // <7>
.setMaxNumThreads(1) // <8>
.build();
// end::put-data-frame-analytics-config

Expand Down Expand Up @@ -3096,6 +3097,7 @@ public void testUpdateDataFrameAnalytics() throws Exception {
.setId("my-analytics-config") // <1>
.setDescription("new description") // <2>
.setModelMemoryLimit(new ByteSizeValue(128, ByteSizeUnit.MB)) // <3>
.setMaxNumThreads(4) // <4>
.build();
// end::update-data-frame-analytics-config-update

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ public static DataFrameAnalyticsConfig randomDataFrameAnalyticsConfig() {
if (randomBoolean()) {
builder.setAllowLazyStart(randomBoolean());
}
if (randomBoolean()) {
builder.setMaxNumThreads(randomIntBetween(1, 20));
}
return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ public static DataFrameAnalyticsConfigUpdate randomDataFrameAnalyticsConfigUpdat
if (randomBoolean()) {
builder.setAllowLazyStart(randomBoolean());
}
if (randomBoolean()) {
builder.setMaxNumThreads(randomIntBetween(1, 20));
}
return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ include-tagged::{doc-tests-file}[{api}-config]
<5> The fields to be included in / excluded from the analysis
<6> The memory limit for the model created as part of the analysis process
<7> Optionally, a human-readable description
<8> The maximum number of threads to be used by the analysis. Defaults to 1.

[id="{upid}-{api}-query-config"]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ include-tagged::{doc-tests-file}[{api}-config-update]
<1> The {dfanalytics-job} ID
<2> The human-readable description
<3> The memory limit for the model created as part of the analysis process
<4> The maximum number of threads to be used by the analysis

[id="{upid}-{api}-query-config"]

Expand Down
11 changes: 10 additions & 1 deletion docs/reference/ml/df-analytics/apis/put-dfanalytics.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,14 @@ include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=description-dfa]
`dest`::
(Required, object)
include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=dest]

`max_num_threads`::
(Optional, integer)
The maximum number of threads to be used by the analysis.
The default value is `1`. Using more threads may decrease the time
necessary to complete the analysis at the cost of using more CPU.
Note that the process may use additional threads for operational
functionality other than the analysis itself.

`model_memory_limit`::
(Optional, string)
Expand Down Expand Up @@ -508,7 +516,8 @@ The API returns the following result:
"model_memory_limit": "1gb",
"create_time" : 1562265491319,
"version" : "7.6.0",
"allow_lazy_start" : false
"allow_lazy_start" : false,
"max_num_threads": 1
}
----
// TESTRESPONSE[s/1562265491319/$body.$_path/]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ the `starting` state until sufficient {ml} node capacity is available.
(Optional, string)
include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=description-dfa]

`max_num_threads`::
(Optional, integer)
The maximum number of threads to be used by the analysis.
The default value is `1`. Using more threads may decrease the time
necessary to complete the analysis at the cost of using more CPU.
Note that the process may use additional threads for operational
functionality other than the analysis itself.

`model_memory_limit`::
(Optional, string)
The approximate maximum amount of memory resources that are permitted for
Expand Down

0 comments on commit b224333

Please sign in to comment.