Skip to content

Commit

Permalink
[ML] add new setting xpack.ml.use_auto_machine_memory_percent for aut…
Browse files Browse the repository at this point in the history
…o calculating native memory percentage allowed for machine learning jobs (#63887)

When running ML, sometimes it is best to automatically adjust the
memory allotted for machine learning based on the nodesize
and how much space is given to the JVM

This commit adds a new static setting xpack.ml.use_auto_machine_memory_percent for
allowing this dynamic calculation. The old setting remains as a backup
just in case the limit cannot be automatically determined due to
lack of information.

Closes #63795
  • Loading branch information
benwtrent committed Oct 21, 2020
1 parent a00c7a2 commit 165e063
Show file tree
Hide file tree
Showing 17 changed files with 362 additions and 60 deletions.
16 changes: 16 additions & 0 deletions docs/reference/settings/ml-settings.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,22 @@ connect within the time period specified by this setting then the process is
assumed to have failed. Defaults to `10s`. The minimum value for this setting is
`5s`.

xpack.ml.use_auto_machine_memory_percent::
(<<static-cluster-setting,Static>>) If this setting is `true`, the
`xpack.ml.max_machine_memory_percent` setting is ignored. Instead, the maximum
percentage of the machine's memory that can be used for running {ml} analytics
processes is calculated automatically and takes into account the total node size
and the size of the JVM on the node. The default value is `false`. If this
setting differs between nodes, the value on the current master node is heeded.
+
TIP: If you do not have dedicated {ml} nodes (that is to say, the node has
multiple roles), do not enable this setting. Its calculations assume that {ml}
analytics are the main purpose of the node.
+
IMPORTANT: The calculation assumes that dedicated {ml} nodes have at least
`256MB` memory reserved outside of the JVM. If you have tiny {ml}
nodes in your cluster, you shouldn't use this setting.

[discrete]
[[model-inference-circuit-breaker]]
==== {ml-cap} circuit breaker settings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
import org.elasticsearch.xpack.ml.utils.NativeMemoryCalculator;

public class TooManyJobsIT extends BaseMlIntegTestCase {

Expand Down Expand Up @@ -213,7 +214,7 @@ private void startMlCluster(int numNodes, int maxNumberOfWorkersPerNode) throws

private long calculateMaxMlMemory() {
Settings settings = internalCluster().getInstance(Settings.class);
return Long.parseLong(internalCluster().getInstance(TransportService.class).getLocalNode().getAttributes()
.get(MachineLearning.MACHINE_MEMORY_NODE_ATTR)) * MachineLearning.MAX_MACHINE_MEMORY_PERCENT.get(settings) / 100;
return NativeMemoryCalculator.allowedBytesForMl(internalCluster().getInstance(TransportService.class).getLocalNode(), settings)
.orElse(0L);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ public Set<DiscoveryNodeRole> getRoles() {
private static final String PRE_V7_ML_ENABLED_NODE_ATTR = "ml.enabled";
public static final String MAX_OPEN_JOBS_NODE_ATTR = "ml.max_open_jobs";
public static final String MACHINE_MEMORY_NODE_ATTR = "ml.machine_memory";
public static final String MAX_JVM_SIZE_NODE_ATTR = "ml.max_jvm_size";
public static final Setting<Integer> CONCURRENT_JOB_ALLOCATIONS =
Setting.intSetting("xpack.ml.node_concurrent_job_allocations", 2, 0, Property.Dynamic, Property.NodeScope);
/**
Expand All @@ -421,6 +422,22 @@ public Set<DiscoveryNodeRole> getRoles() {
// can handle.
public static final Setting<Integer> MAX_MACHINE_MEMORY_PERCENT =
Setting.intSetting("xpack.ml.max_machine_memory_percent", 30, 5, 200, Property.Dynamic, Property.NodeScope);
/**
* This boolean value indicates if `max_machine_memory_percent` should be ignored and a automatic calculation is used instead.
*
* This calculation takes into account total node size and the size of the JVM on that node.
*
* If the calculation fails, we fall back to `max_machine_memory_percent`.
*
* This setting is NOT dynamic. This allows the cluster administrator to set it on startup without worry of it
* being edited accidentally later.
* Consequently, it could be that this setting differs between nodes. But, we only ever pay attention to the value
* that is set on the current master. As master nodes are responsible for persistent task assignments.
*/
public static final Setting<Boolean> USE_AUTO_MACHINE_MEMORY_PERCENT = Setting.boolSetting(
"xpack.ml.use_auto_machine_memory_percent",
false,
Property.NodeScope);
public static final Setting<Integer> MAX_LAZY_ML_NODES =
Setting.intSetting("xpack.ml.max_lazy_ml_nodes", 0, 0, 3, Property.Dynamic, Property.NodeScope);

Expand Down Expand Up @@ -508,14 +525,16 @@ public List<Setting<?>> getSettings() {
ModelLoadingService.INFERENCE_MODEL_CACHE_SIZE,
ModelLoadingService.INFERENCE_MODEL_CACHE_TTL,
ResultsPersisterService.PERSIST_RESULTS_MAX_RETRIES,
NIGHTLY_MAINTENANCE_REQUESTS_PER_SECOND
NIGHTLY_MAINTENANCE_REQUESTS_PER_SECOND,
USE_AUTO_MACHINE_MEMORY_PERCENT
);
}

public Settings additionalSettings() {
String mlEnabledNodeAttrName = "node.attr." + PRE_V7_ML_ENABLED_NODE_ATTR;
String maxOpenJobsPerNodeNodeAttrName = "node.attr." + MAX_OPEN_JOBS_NODE_ATTR;
String machineMemoryAttrName = "node.attr." + MACHINE_MEMORY_NODE_ATTR;
String jvmSizeAttrName = "node.attr." + MAX_JVM_SIZE_NODE_ATTR;

if (enabled == false) {
disallowMlNodeAttributes(mlEnabledNodeAttrName, maxOpenJobsPerNodeNodeAttrName, machineMemoryAttrName);
Expand All @@ -531,6 +550,7 @@ public Settings additionalSettings() {
String.valueOf(MAX_OPEN_JOBS_PER_NODE.get(settings)));
addMlNodeAttribute(additionalSettings, machineMemoryAttrName,
Long.toString(machineMemoryFromStats(OsProbe.getInstance().osStats())));
addMlNodeAttribute(additionalSettings, jvmSizeAttrName, Long.toString(Runtime.getRuntime().maxMemory()));
// This is not used in v7 and higher, but users are still prevented from setting it directly to avoid confusion
disallowMlNodeAttributes(mlEnabledNodeAttrName);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.tasks.Task;
Expand All @@ -28,12 +29,14 @@
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.process.MlControllerHolder;
import org.elasticsearch.xpack.ml.utils.NativeMemoryCalculator;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.OptionalLong;
import java.util.concurrent.TimeoutException;

public class TransportMlInfoAction extends HandledTransportAction<MlInfoAction.Request, MlInfoAction.Response> {
Expand Down Expand Up @@ -100,7 +103,7 @@ private ByteSizeValue defaultModelMemoryLimit() {
ByteSizeValue defaultLimit = ByteSizeValue.ofMb(AnalysisLimits.DEFAULT_MODEL_MEMORY_LIMIT_MB);
ByteSizeValue maxModelMemoryLimit = clusterService.getClusterSettings().get(MachineLearningField.MAX_MODEL_MEMORY_LIMIT);
if (maxModelMemoryLimit != null && maxModelMemoryLimit.getBytes() > 0
&& maxModelMemoryLimit.getBytes() < defaultLimit.getBytes()) {
&& maxModelMemoryLimit.getBytes() < defaultLimit.getBytes()) {
return maxModelMemoryLimit;
}
return defaultLimit;
Expand All @@ -112,24 +115,16 @@ private Map<String, Object> datafeedsDefaults() {
return anomalyDetectorsDefaults;
}

static ByteSizeValue calculateEffectiveMaxModelMemoryLimit(int maxMachineMemoryPercent, DiscoveryNodes nodes) {
static ByteSizeValue calculateEffectiveMaxModelMemoryLimit(ClusterSettings clusterSettings, DiscoveryNodes nodes) {

long maxMlMemory = -1;

for (DiscoveryNode node : nodes) {

Map<String, String> nodeAttributes = node.getAttributes();
String machineMemoryStr = nodeAttributes.get(MachineLearning.MACHINE_MEMORY_NODE_ATTR);
if (machineMemoryStr == null) {
continue;
}
long machineMemory;
try {
machineMemory = Long.parseLong(machineMemoryStr);
} catch (NumberFormatException e) {
OptionalLong limit = NativeMemoryCalculator.allowedBytesForMl(node, clusterSettings);
if (limit.isEmpty()) {
continue;
}
maxMlMemory = Math.max(maxMlMemory, machineMemory * maxMachineMemoryPercent / 100);
maxMlMemory = Math.max(maxMlMemory, limit.getAsLong());
}

if (maxMlMemory <= 0) {
Expand All @@ -146,7 +141,8 @@ static ByteSizeValue calculateEffectiveMaxModelMemoryLimit(int maxMachineMemoryP
private Map<String, Object> limits() {
Map<String, Object> limits = new HashMap<>();
ByteSizeValue effectiveMaxModelMemoryLimit = calculateEffectiveMaxModelMemoryLimit(
clusterService.getClusterSettings().get(MachineLearning.MAX_MACHINE_MEMORY_PERCENT), clusterService.state().getNodes());
clusterService.getClusterSettings(),
clusterService.state().getNodes());
ByteSizeValue maxModelMemoryLimit = clusterService.getClusterSettings().get(MachineLearningField.MAX_MODEL_MEMORY_LIMIT);
if (maxModelMemoryLimit != null && maxModelMemoryLimit.getBytes() > 0) {
limits.put("max_model_memory_limit", maxModelMemoryLimit.getStringRep());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
import static org.elasticsearch.xpack.core.ml.MlTasks.AWAITING_UPGRADE;
import static org.elasticsearch.xpack.ml.MachineLearning.MAX_OPEN_JOBS_PER_NODE;
import static org.elasticsearch.xpack.ml.MachineLearning.USE_AUTO_MACHINE_MEMORY_PERCENT;

/*
This class extends from TransportMasterNodeAction for cluster state observing purposes.
Expand Down Expand Up @@ -357,6 +358,7 @@ public static class OpenJobPersistentTasksExecutor extends PersistentTasksExecut
private final Client client;
private final IndexNameExpressionResolver expressionResolver;
private final JobResultsProvider jobResultsProvider;
private final boolean useAutoMemoryPercentage;

private volatile int maxConcurrentJobAllocations;
private volatile int maxMachineMemoryPercent;
Expand All @@ -377,6 +379,7 @@ public OpenJobPersistentTasksExecutor(Settings settings, ClusterService clusterS
this.maxMachineMemoryPercent = MachineLearning.MAX_MACHINE_MEMORY_PERCENT.get(settings);
this.maxLazyMLNodes = MachineLearning.MAX_LAZY_ML_NODES.get(settings);
this.maxOpenJobs = MAX_OPEN_JOBS_PER_NODE.get(settings);
this.useAutoMemoryPercentage = USE_AUTO_MACHINE_MEMORY_PERCENT.get(settings);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(MachineLearning.CONCURRENT_JOB_ALLOCATIONS, this::setMaxConcurrentJobAllocations);
clusterService.getClusterSettings()
Expand Down Expand Up @@ -424,7 +427,11 @@ public PersistentTasksCustomMetadata.Assignment getAssignment(OpenJobAction.JobP
JobNodeSelector jobNodeSelector = new JobNodeSelector(clusterState, jobId, MlTasks.JOB_TASK_NAME, memoryTracker,
job.allowLazyOpen() ? Integer.MAX_VALUE : maxLazyMLNodes, node -> nodeFilter(node, job));
return jobNodeSelector.selectNode(
maxOpenJobs, maxConcurrentJobAllocations, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed);
maxOpenJobs,
maxConcurrentJobAllocations,
maxMachineMemoryPercent,
isMemoryTrackerRecentlyRefreshed,
useAutoMemoryPercentage);
}

@Override
Expand Down Expand Up @@ -523,6 +530,7 @@ void setMaxLazyMLNodes(int maxLazyMLNodes) {
void setMaxOpenJobs(int maxOpenJobs) {
this.maxOpenJobs = maxOpenJobs;
}

}

public static class JobTask extends AllocatedPersistentTask implements OpenJobAction.JobTaskMatcher {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
import static org.elasticsearch.xpack.core.ml.MlTasks.AWAITING_UPGRADE;
import static org.elasticsearch.xpack.ml.MachineLearning.MAX_OPEN_JOBS_PER_NODE;
import static org.elasticsearch.xpack.ml.MachineLearning.USE_AUTO_MACHINE_MEMORY_PERCENT;

/**
* Starts the persistent task for running data frame analytics.
Expand Down Expand Up @@ -599,6 +600,7 @@ public static class TaskExecutor extends PersistentTasksExecutor<TaskParams> {
private final MlMemoryTracker memoryTracker;
private final IndexNameExpressionResolver resolver;
private final IndexTemplateConfig inferenceIndexTemplate;
private final boolean useAutoMemoryPercentage;

private volatile int maxMachineMemoryPercent;
private volatile int maxLazyMLNodes;
Expand All @@ -619,6 +621,7 @@ public TaskExecutor(Settings settings, Client client, ClusterService clusterServ
this.maxMachineMemoryPercent = MachineLearning.MAX_MACHINE_MEMORY_PERCENT.get(settings);
this.maxLazyMLNodes = MachineLearning.MAX_LAZY_ML_NODES.get(settings);
this.maxOpenJobs = MAX_OPEN_JOBS_PER_NODE.get(settings);
this.useAutoMemoryPercentage = USE_AUTO_MACHINE_MEMORY_PERCENT.get(settings);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(MachineLearning.MAX_MACHINE_MEMORY_PERCENT, this::setMaxMachineMemoryPercent);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.MAX_LAZY_ML_NODES, this::setMaxLazyMLNodes);
Expand Down Expand Up @@ -683,7 +686,13 @@ public PersistentTasksCustomMetadata.Assignment getAssignment(TaskParams params,
node -> nodeFilter(node, params));
// Pass an effectively infinite value for max concurrent opening jobs, because data frame analytics jobs do
// not have an "opening" state so would never be rejected for causing too many jobs in the "opening" state
return jobNodeSelector.selectNode(maxOpenJobs, Integer.MAX_VALUE, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed);
return jobNodeSelector.selectNode(
maxOpenJobs,
Integer.MAX_VALUE,
maxMachineMemoryPercent,
isMemoryTrackerRecentlyRefreshed,
useAutoMemoryPercentage
);
}

@Override
Expand Down Expand Up @@ -757,5 +766,6 @@ void setMaxLazyMLNodes(int maxLazyMLNodes) {
void setMaxOpenJobs(int maxOpenJobs) {
this.maxOpenJobs = maxOpenJobs;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ public JobNodeSelector(ClusterState clusterState, String jobId, String taskName,
}

public PersistentTasksCustomMetadata.Assignment selectNode(int dynamicMaxOpenJobs, int maxConcurrentJobAllocations,
int maxMachineMemoryPercent, boolean isMemoryTrackerRecentlyRefreshed) {
int maxMachineMemoryPercent, boolean isMemoryTrackerRecentlyRefreshed,
boolean useAutoMemoryPercentage) {
// Try to allocate jobs according to memory usage, but if that's not possible (maybe due to a mixed version cluster or maybe
// because of some weird OS problem) then fall back to the old mechanism of only considering numbers of assigned jobs
boolean allocateByMemory = isMemoryTrackerRecentlyRefreshed;
Expand Down Expand Up @@ -107,7 +108,8 @@ public PersistentTasksCustomMetadata.Assignment selectNode(int dynamicMaxOpenJob
node,
dynamicMaxOpenJobs,
maxMachineMemoryPercent,
allocateByMemory
allocateByMemory,
useAutoMemoryPercentage
);
if (currentLoad.getError() != null) {
reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
import org.elasticsearch.xpack.ml.utils.NativeMemoryCalculator;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;


public class NodeLoadDetector {
Expand All @@ -45,7 +47,8 @@ public NodeLoad detectNodeLoad(ClusterState clusterState,
DiscoveryNode node,
int dynamicMaxOpenJobs,
int maxMachineMemoryPercent,
boolean isMemoryTrackerRecentlyRefreshed) {
boolean isMemoryTrackerRecentlyRefreshed,
boolean useAutoMachineMemoryCalculation) {
PersistentTasksCustomMetadata persistentTasks = clusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
Map<String, String> nodeAttributes = node.getAttributes();
List<String> errors = new ArrayList<>();
Expand All @@ -60,16 +63,17 @@ public NodeLoad detectNodeLoad(ClusterState clusterState,
maxNumberOfOpenJobs = -1;
}
}
String machineMemoryStr = nodeAttributes.get(MachineLearning.MACHINE_MEMORY_NODE_ATTR);
long machineMemory = -1;
try {
machineMemory = Long.parseLong(machineMemoryStr);
} catch (NumberFormatException e) {
errors.add(MachineLearning.MACHINE_MEMORY_NODE_ATTR + " attribute [" + machineMemoryStr + "] is not a long");
OptionalLong maxMlMemory = NativeMemoryCalculator.allowedBytesForMl(node,
maxMachineMemoryPercent,
useAutoMachineMemoryCalculation);
if (maxMlMemory.isEmpty()) {
errors.add(MachineLearning.MACHINE_MEMORY_NODE_ATTR
+ " attribute ["
+ nodeAttributes.get(MachineLearning.MACHINE_MEMORY_NODE_ATTR)
+ "] is not a long");
}
long maxMlMemory = machineMemory * maxMachineMemoryPercent / 100;

NodeLoad nodeLoad = new NodeLoad(node.getId(), maxMlMemory, maxNumberOfOpenJobs, isMemoryTrackerRecentlyRefreshed);
NodeLoad nodeLoad = new NodeLoad(node.getId(), maxMlMemory.orElse(-1L), maxNumberOfOpenJobs, isMemoryTrackerRecentlyRefreshed);
if (errors.isEmpty() == false) {
nodeLoad.error = Strings.collectionToCommaDelimitedString(errors);
return nodeLoad;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ private void setReassignmentRecheckInterval(TimeValue recheckInterval) {
public void onMaster() {
isMaster = true;
logger.trace("ML memory tracker on master");
asyncRefresh();
}

@Override
Expand Down

0 comments on commit 165e063

Please sign in to comment.