Skip to content

Commit

Permalink
[7.13] [ML] increase the default value of xpack.ml.max_open_jobs from…
Browse files Browse the repository at this point in the history
… 20 to 512 for autoscaling improvements (#72487) (#72550)

* [ML] increase the default value of xpack.ml.max_open_jobs from 20 to 512 for autoscaling improvements (#72487)

This commit increases the xpack.ml.max_open_jobs from 20 to 512. Additionally, it ignores nodes that cannot provide an accurate view into their native memory.

If a node does not have a view into its native memory, we ignore it for assignment.

This effectively fixes a bug with autoscaling. Autoscaling relies on jobs with adequate memory to assign jobs to nodes. If that is hampered by the xpack.ml.max_open_jobs scaling decisions are hampered.
  • Loading branch information
benwtrent committed Apr 30, 2021
1 parent f3a281d commit 26c0df3
Show file tree
Hide file tree
Showing 19 changed files with 105 additions and 186 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Retrieves usage information for {dfeeds}.

`GET _ml/datafeeds/_stats` +

`GET _ml/datafeeds/_all/_stats`
`GET _ml/datafeeds/_all/_stats`

[[ml-get-datafeed-stats-prereqs]]
== {api-prereq-title}
Expand Down Expand Up @@ -144,7 +144,7 @@ include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=search-time]

`404` (Missing resources)::
If `allow_no_match` is `false`, this code indicates that there are no
resources that match the request or only partial matches for the request.
resources that match the request or only partial matches for the request.

[[ml-get-datafeed-stats-example]]
== {api-examples-title}
Expand Down Expand Up @@ -172,7 +172,7 @@ The API returns the following results:
"transport_address": "127.0.0.1:9300",
"attributes": {
"ml.machine_memory": "17179869184",
"ml.max_open_jobs": "20"
"ml.max_open_jobs": "512"
}
},
"assignment_explanation": "",
Expand Down
22 changes: 11 additions & 11 deletions docs/reference/ml/anomaly-detection/apis/get-job-stats.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Retrieves usage information for {anomaly-jobs}.

`GET _ml/anomaly_detectors/_stats` +

`GET _ml/anomaly_detectors/_all/_stats`
`GET _ml/anomaly_detectors/_all/_stats`

[[ml-get-job-stats-prereqs]]
== {api-prereq-title}
Expand Down Expand Up @@ -147,13 +147,13 @@ include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=sparse-bucket-count]

`deleting`::
(Boolean)
Indicates that the process of deleting the job is in progress but not yet
Indicates that the process of deleting the job is in progress but not yet
completed. It is only reported when `true`.

//Begin forecasts_stats
[[forecastsstats]]`forecasts_stats`::
(object) An object that provides statistical information about forecasts
belonging to this job. Some statistics are omitted if no forecasts have been
(object) An object that provides statistical information about forecasts
belonging to this job. Some statistics are omitted if no forecasts have been
made.
+
NOTE: Unless there is at least one forecast, `memory_bytes`, `records`,
Expand All @@ -163,11 +163,11 @@ NOTE: Unless there is at least one forecast, `memory_bytes`, `records`,
[%collapsible%open]
====
`forecasted_jobs`:::
(long) A value of `0` indicates that forecasts do not exist for this job. A
(long) A value of `0` indicates that forecasts do not exist for this job. A
value of `1` indicates that at least one forecast exists.
`memory_bytes`:::
(object) The `avg`, `min`, `max` and `total` memory usage in bytes for forecasts
(object) The `avg`, `min`, `max` and `total` memory usage in bytes for forecasts
related to this job. If there are no forecasts, this property is omitted.
`processing_time_ms`:::
Expand All @@ -176,7 +176,7 @@ forecasts related to this job. If there are no forecasts, this property is
omitted.
`records`:::
(object) The `avg`, `min`, `max` and `total` number of `model_forecast` documents
(object) The `avg`, `min`, `max` and `total` number of `model_forecast` documents
written for forecasts related to this job. If there are no forecasts, this
property is omitted.
Expand Down Expand Up @@ -298,7 +298,7 @@ available only for open jobs.
`attributes`:::
(object)
include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=node-attributes]
`ephemeral_id`:::
(string)
include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=node-ephemeral-id]
Expand Down Expand Up @@ -370,7 +370,7 @@ include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=bucket-time-total]
== {api-response-codes-title}

`404` (Missing resources)::
If `allow_no_match` is `false`, this code indicates that there are no
If `allow_no_match` is `false`, this code indicates that there are no
resources that match the request or only partial matches for the request.

[[ml-get-job-stats-example]]
Expand Down Expand Up @@ -463,7 +463,7 @@ The API returns the following results:
"attributes" : {
"ml.machine_memory" : "17179869184",
"xpack.installed" : "true",
"ml.max_open_jobs" : "20"
"ml.max_open_jobs" : "512"
}
},
"assignment_explanation" : "",
Expand All @@ -481,4 +481,4 @@ The API returns the following results:
}
]
}
----
----
2 changes: 1 addition & 1 deletion docs/reference/settings/ml-settings.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ allowed, fewer jobs will run on a node. Prior to version 7.1, this setting was a
per-node non-dynamic setting. It became a cluster-wide dynamic setting in
version 7.1. As a result, changes to its value after node startup are used only
after every node in the cluster is running version 7.1 or higher. The minimum
value is `1`; the maximum value is `512`. Defaults to `20`.
value is `1`; the maximum value is `512`. Defaults to `512`.

`xpack.ml.nightly_maintenance_requests_per_second`::
(<<cluster-update-settings,Dynamic>>) The rate at which the nightly maintenance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ public void testDedicatedMlNode() throws Exception {
PersistentTask<?> task = tasks.getTask(MlTasks.jobTaskId(jobId));

DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode());
assertThat(node.getAttributes(), hasEntry(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "20"));
assertThat(node.getAttributes(), hasEntry(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "512"));
JobTaskState jobTaskState = (JobTaskState) task.getState();
assertNotNull(jobTaskState);
assertEquals(JobState.OPENED, jobTaskState.getState());
Expand Down Expand Up @@ -451,7 +451,7 @@ public void testMlStateAndResultsIndicesNotAvailable() throws Exception {
assertBusy(() -> assertJobTask(jobId, JobState.OPENED, true));
}

public void testCloseUnassignedLazyJobAndDatafeed() throws Exception {
public void testCloseUnassignedLazyJobAndDatafeed() {
internalCluster().ensureAtLeastNumDataNodes(3);
ensureStableCluster(3);

Expand Down Expand Up @@ -516,7 +516,7 @@ private void assertJobTask(String jobId, JobState expectedState, boolean hasExec
assertNotNull(task.getExecutorNode());
assertFalse(needsReassignment(task.getAssignment(), clusterState.nodes()));
DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode());
assertThat(node.getAttributes(), hasEntry(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "20"));
assertThat(node.getAttributes(), hasEntry(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "512"));

JobTaskState jobTaskState = (JobTaskState) task.getState();
assertNotNull(jobTaskState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,14 @@ public Set<DiscoveryNodeRole> getRoles() {
// older nodes will not react to the dynamic changes. Therefore, in such mixed version clusters
// allocation will be based on the value first read at node startup rather than the current value.
public static final Setting<Integer> MAX_OPEN_JOBS_PER_NODE =
Setting.intSetting("xpack.ml.max_open_jobs", 20, 1, MAX_MAX_OPEN_JOBS_PER_NODE, Property.Dynamic, Property.NodeScope);
Setting.intSetting(
"xpack.ml.max_open_jobs",
MAX_MAX_OPEN_JOBS_PER_NODE,
1,
MAX_MAX_OPEN_JOBS_PER_NODE,
Property.Dynamic,
Property.NodeScope
);

public static final Setting<TimeValue> PROCESS_CONNECT_TIMEOUT =
Setting.timeSetting("xpack.ml.process_connect_timeout", TimeValue.timeValueSeconds(10),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,7 @@ public PersistentTasksCustomMetadata.Assignment getAssignment(TaskParams params,
boolean isMemoryTrackerRecentlyRefreshed = memoryTracker.isRecentlyRefreshed();
Optional<PersistentTasksCustomMetadata.Assignment> optionalAssignment =
getPotentialAssignment(params, clusterState, isMemoryTrackerRecentlyRefreshed);
// NOTE: this will return here if isMemoryTrackerRecentlyRefreshed is false, we don't allow assignment with stale memory
if (optionalAssignment.isPresent()) {
return optionalAssignment.get();
}
Expand All @@ -629,7 +630,6 @@ public PersistentTasksCustomMetadata.Assignment getAssignment(TaskParams params,
Integer.MAX_VALUE,
maxMachineMemoryPercent,
maxNodeMemory,
isMemoryTrackerRecentlyRefreshed,
useAutoMemoryPercentage
);
auditRequireMemoryIfNecessary(params.getId(), auditor, assignment, jobNodeSelector, isMemoryTrackerRecentlyRefreshed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,6 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider
node,
maxOpenJobs,
maxMachineMemoryPercent,
true,
useAuto);
if (nodeLoad.getError() != null) {
logger.warn("[{}] failed to gather node load limits, failure [{}]. Returning no scale",
Expand Down Expand Up @@ -760,7 +759,6 @@ Optional<NativeMemoryCapacity> calculateFutureAvailableCapacity(PersistentTasksC
node,
maxOpenJobs,
maxMachineMemoryPercent,
true,
useAuto);
if (nodeLoad.getError() != null || nodeLoad.isUseMemory() == false) {
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,7 @@ public JobNodeSelector(ClusterState clusterState,

public Tuple<NativeMemoryCapacity, Long> perceivedCapacityAndMaxFreeMemory(int maxMachineMemoryPercent,
boolean useAutoMemoryPercentage,
int maxOpenJobs,
boolean isMemoryTrackerRecentlyRefreshed) {
int maxOpenJobs) {
List<DiscoveryNode> capableNodes = clusterState.getNodes()
.mastersFirstStream()
.filter(n -> this.nodeFilter.apply(n) == null)
Expand All @@ -118,7 +117,6 @@ public Tuple<NativeMemoryCapacity, Long> perceivedCapacityAndMaxFreeMemory(int m
n,
maxOpenJobs,
maxMachineMemoryPercent,
isMemoryTrackerRecentlyRefreshed,
useAutoMemoryPercentage)
)
.filter(nl -> nl.remainingJobs() > 0)
Expand All @@ -132,23 +130,19 @@ public PersistentTasksCustomMetadata.Assignment selectNode(int dynamicMaxOpenJob
int maxConcurrentJobAllocations,
int maxMachineMemoryPercent,
long maxNodeSize,
boolean isMemoryTrackerRecentlyRefreshed,
boolean useAutoMemoryPercentage) {
// TODO: remove in 8.0.0
boolean allNodesHaveDynamicMaxWorkers = clusterState.getNodes().getMinNodeVersion().onOrAfter(Version.V_7_2_0);

// Try to allocate jobs according to memory usage, but if that's not possible (maybe due to a mixed version cluster or maybe
// because of some weird OS problem) then fall back to the old mechanism of only considering numbers of assigned jobs
boolean allocateByMemory = isMemoryTrackerRecentlyRefreshed;
if (isMemoryTrackerRecentlyRefreshed == false) {
logger.warn("Falling back to allocating job [{}] by job counts because a memory requirement refresh could not be scheduled",
jobId);
final Long estimatedMemoryFootprint = memoryTracker.getJobMemoryRequirement(taskName, jobId);
if (estimatedMemoryFootprint == null) {
memoryTracker.asyncRefresh();
String reason = "Not opening job [" + jobId + "] because job memory requirements are stale - refresh requested";
logger.debug(reason);
return new PersistentTasksCustomMetadata.Assignment(null, reason);
}

List<String> reasons = new LinkedList<>();
long maxAvailableCount = Long.MIN_VALUE;
long maxAvailableMemory = Long.MIN_VALUE;
DiscoveryNode minLoadedNodeByCount = null;
DiscoveryNode minLoadedNodeByMemory = null;
for (DiscoveryNode node : clusterState.getNodes()) {

Expand All @@ -165,7 +159,6 @@ public PersistentTasksCustomMetadata.Assignment selectNode(int dynamicMaxOpenJob
node,
dynamicMaxOpenJobs,
maxMachineMemoryPercent,
allocateByMemory,
useAutoMemoryPercentage
);
if (currentLoad.getError() != null) {
Expand All @@ -175,7 +168,7 @@ public PersistentTasksCustomMetadata.Assignment selectNode(int dynamicMaxOpenJob
continue;
}
// Assuming the node is eligible at all, check loading
allocateByMemory = currentLoad.isUseMemory();
boolean canAllocateByMemory = currentLoad.isUseMemory();
int maxNumberOfOpenJobs = currentLoad.getMaxJobs();

if (currentLoad.getNumAllocatingJobs() >= maxConcurrentJobAllocations) {
Expand All @@ -189,8 +182,7 @@ public PersistentTasksCustomMetadata.Assignment selectNode(int dynamicMaxOpenJob
continue;
}

long availableCount = maxNumberOfOpenJobs - currentLoad.getNumAssignedJobs();
if (availableCount == 0) {
if (currentLoad.remainingJobs() == 0) {
reason = createReason(jobId,
nodeNameAndMlAttributes(node),
"This node is full. Number of opened jobs [{}], {} [{}].",
Expand All @@ -202,68 +194,58 @@ public PersistentTasksCustomMetadata.Assignment selectNode(int dynamicMaxOpenJob
continue;
}

if (maxAvailableCount < availableCount) {
maxAvailableCount = availableCount;
minLoadedNodeByCount = node;
if (canAllocateByMemory == false) {
reason = createReason(jobId,
nodeNameAndMlAttributes(node),
"This node is not providing accurate information to determine is load by memory.");
logger.trace(reason);
reasons.add(reason);
continue;
}

if (currentLoad.getMaxMlMemory() <= 0) {
reason = createReason(jobId,
nodeNameAndMlAttributes(node),
"This node is indicating that it has no native memory for machine learning.");
logger.trace(reason);
reasons.add(reason);
continue;
}

if (allocateByMemory) {
if (currentLoad.getMaxMlMemory() > 0) {
Long estimatedMemoryFootprint = memoryTracker.getJobMemoryRequirement(taskName, jobId);
if (estimatedMemoryFootprint != null) {
// If this will be the first job assigned to the node then it will need to
// load the native code shared libraries, so add the overhead for this
if (currentLoad.getNumAssignedJobs() == 0) {
estimatedMemoryFootprint += MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes();
}
long availableMemory = currentLoad.getMaxMlMemory() - currentLoad.getAssignedJobMemory();
if (estimatedMemoryFootprint > availableMemory) {
reason = createReason(jobId,
nodeNameAndMlAttributes(node),
"This node has insufficient available memory. Available memory for ML [{} ({})], "
+ "memory required by existing jobs [{} ({})], "
+ "estimated memory required for this job [{} ({})].",
currentLoad.getMaxMlMemory(),
ByteSizeValue.ofBytes(currentLoad.getMaxMlMemory()).toString(),
currentLoad.getAssignedJobMemory(),
ByteSizeValue.ofBytes(currentLoad.getAssignedJobMemory()).toString(),
estimatedMemoryFootprint,
ByteSizeValue.ofBytes(estimatedMemoryFootprint).toString());
logger.trace(reason);
reasons.add(reason);
continue;
}
long requiredMemoryForJob = estimatedMemoryFootprint;
// If this will be the first job assigned to the node then it will need to
// load the native code shared libraries, so add the overhead for this
if (currentLoad.getNumAssignedJobs() == 0) {
requiredMemoryForJob += MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes();
}
long availableMemory = currentLoad.getMaxMlMemory() - currentLoad.getAssignedJobMemory();
if (requiredMemoryForJob > availableMemory) {
reason = createReason(jobId,
nodeNameAndMlAttributes(node),
"This node has insufficient available memory. Available memory for ML [{} ({})], "
+ "memory required by existing jobs [{} ({})], "
+ "estimated memory required for this job [{} ({})].",
currentLoad.getMaxMlMemory(),
ByteSizeValue.ofBytes(currentLoad.getMaxMlMemory()).toString(),
currentLoad.getAssignedJobMemory(),
ByteSizeValue.ofBytes(currentLoad.getAssignedJobMemory()).toString(),
requiredMemoryForJob,
ByteSizeValue.ofBytes(requiredMemoryForJob).toString());
logger.trace(reason);
reasons.add(reason);
continue;
}

if (maxAvailableMemory < availableMemory) {
maxAvailableMemory = availableMemory;
minLoadedNodeByMemory = node;
}
} else {
// If we cannot get the job memory requirement,
// fall back to simply allocating by job count
allocateByMemory = false;
logger.debug(
() -> new ParameterizedMessage(
"Falling back to allocating job [{}] by job counts because its memory requirement was not available",
jobId));
}
} else {
// If we cannot get the available memory on any machine in
// the cluster, fall back to simply allocating by job count
allocateByMemory = false;
logger.debug(
() -> new ParameterizedMessage(
"Falling back to allocating job [{}] by job counts because machine memory was not available for node [{}]",
jobId,
nodeNameAndMlAttributes(node)));
}
if (maxAvailableMemory < availableMemory) {
maxAvailableMemory = availableMemory;
minLoadedNodeByMemory = node;
}
}

return createAssignment(
allocateByMemory ? minLoadedNodeByMemory : minLoadedNodeByCount,
minLoadedNodeByMemory,
reasons,
allocateByMemory && maxNodeSize > 0L ?
maxNodeSize > 0L ?
NativeMemoryCalculator.allowedBytesForMl(maxNodeSize, maxMachineMemoryPercent, useAutoMemoryPercentage) :
Long.MAX_VALUE);
}
Expand Down

0 comments on commit 26c0df3

Please sign in to comment.