Skip to content

Commit

Permalink
[ML] Refactor memory autoscaling decider into its own class (#89470)
Browse files Browse the repository at this point in the history
This commit factors out the code that decides how much memory
is required during an autoscale request into its own decider
class. This prepares the ML autoscaling decider service to
accommodate more deciders for processors and storage in the
future.
  • Loading branch information
dimitris-athanasiou committed Aug 23, 2022
1 parent af4421d commit fb4adda
Show file tree
Hide file tree
Showing 9 changed files with 2,616 additions and 2,414 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.elasticsearch.xpack.ml.autoscaling;

import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
Expand All @@ -19,6 +20,7 @@
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.snapshot.upgrade.SnapshotUpgradeState;
import org.elasticsearch.xpack.core.ml.job.snapshot.upgrade.SnapshotUpgradeTaskParams;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.inference.assignment.TrainedModelAssignmentMetadata;

import java.util.Collection;
Expand All @@ -42,6 +44,9 @@ class MlAutoscalingContext {
final List<String> waitingAnalyticsJobs;
final List<String> waitingAllocatedModels;

final List<DiscoveryNode> mlNodes;
final PersistentTasksCustomMetadata persistentTasks;

MlAutoscalingContext() {
anomalyDetectionTasks = List.of();
snapshotUpgradeTasks = List.of();
Expand All @@ -52,6 +57,9 @@ class MlAutoscalingContext {
waitingSnapshotUpgrades = List.of();
waitingAnalyticsJobs = List.of();
waitingAllocatedModels = List.of();

mlNodes = List.of();
persistentTasks = null;
}

MlAutoscalingContext(ClusterState clusterState) {
Expand Down Expand Up @@ -79,6 +87,9 @@ class MlAutoscalingContext {
.filter(e -> e.getValue().getAssignmentState().equals(AssignmentState.STARTING) && e.getValue().getNodeRoutingTable().isEmpty())
.map(Map.Entry::getKey)
.toList();

mlNodes = getMlNodes(clusterState);
persistentTasks = clusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
}

private static Collection<PersistentTasksCustomMetadata.PersistentTask<?>> anomalyDetectionTasks(
Expand Down Expand Up @@ -152,4 +163,8 @@ public List<String> findPartiallyAllocatedModels() {
.map(Map.Entry::getKey)
.toList();
}

static List<DiscoveryNode> getMlNodes(final ClusterState clusterState) {
return clusterState.nodes().mastersFirstStream().filter(MachineLearning::isMlNode).toList();
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.ml.autoscaling;

import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingCapacity;

public record MlMemoryAutoscalingCapacity(ByteSizeValue nodeSize, ByteSizeValue tierSize, String reason) {

public static Builder builder(ByteSizeValue nodeSize, ByteSizeValue tierSize) {
return new Builder(nodeSize, tierSize);
}

public static Builder from(AutoscalingCapacity autoscalingCapacity) {
return builder(autoscalingCapacity.node().memory(), autoscalingCapacity.total().memory());
}

@Override
public String toString() {
return "MlMemoryAutoscalingCapacity{" + "nodeSize=" + nodeSize + ", tierSize=" + tierSize + ", reason='" + reason + '\'' + '}';
}

public static class Builder {

private ByteSizeValue nodeSize;
private ByteSizeValue tierSize;
private String reason;

public Builder(ByteSizeValue nodeSize, ByteSizeValue tierSize) {
this.nodeSize = nodeSize;
this.tierSize = tierSize;
}

public Builder setReason(String reason) {
this.reason = reason;
return this;
}

public MlMemoryAutoscalingCapacity build() {
return new MlMemoryAutoscalingCapacity(nodeSize, tierSize, reason);
}
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingCapacity;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.utils.NativeMemoryCalculator;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalLong;

Expand Down Expand Up @@ -107,7 +108,7 @@ NativeMemoryCapacity merge(final NativeMemoryCapacity nativeMemoryCapacity) {
* @return The minimum node size required for ML nodes and the minimum tier size required for the complete ML
* tier.
*/
public AutoscalingCapacity autoscalingCapacity(
public MlMemoryAutoscalingCapacity.Builder autoscalingCapacity(
int maxMemoryPercent,
boolean useAuto,
long mlNativeMemoryForLargestMlNode,
Expand All @@ -132,10 +133,7 @@ public AutoscalingCapacity autoscalingCapacity(
nodeMlNativeMemoryRequirementExcludingOverhead
);
}
return new AutoscalingCapacity(
new AutoscalingCapacity.AutoscalingResources(null, ByteSizeValue.ZERO, null),
new AutoscalingCapacity.AutoscalingResources(null, ByteSizeValue.ZERO, null)
);
return MlMemoryAutoscalingCapacity.builder(ByteSizeValue.ZERO, ByteSizeValue.ZERO);
}

if (mlNativeMemoryForLargestMlNode <= NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes()) {
Expand Down Expand Up @@ -223,9 +221,9 @@ public AutoscalingCapacity autoscalingCapacity(
}
// The assertion above should hold, but the Math.max below catches the case with inconsistent
// inputs plus any bugs that weren't caught in tests.
return new AutoscalingCapacity(
new AutoscalingCapacity.AutoscalingResources(null, ByteSizeValue.ofBytes(Math.max(requiredTierSize, requiredNodeSize)), null),
new AutoscalingCapacity.AutoscalingResources(null, ByteSizeValue.ofBytes(requiredNodeSize), null)
return MlMemoryAutoscalingCapacity.builder(
ByteSizeValue.ofBytes(requiredNodeSize),
ByteSizeValue.ofBytes(Math.max(requiredTierSize, requiredNodeSize))
);
}

Expand Down Expand Up @@ -290,11 +288,27 @@ public static NativeMemoryCapacity currentScale(
Arrays.stream(mlMemory).max().orElse(0L),
// We assume that JVM size is universal, at least, the largest JVM indicates the largest node
machineLearningNodes.stream()
.map(MlAutoscalingDeciderService::getNodeJvmSize)
.map(NativeMemoryCapacity::getNodeJvmSize)
.filter(OptionalLong::isPresent)
.map(OptionalLong::getAsLong)
.max(Long::compare)
.orElse(null)
);
}

static OptionalLong getNodeJvmSize(DiscoveryNode node) {
Map<String, String> nodeAttributes = node.getAttributes();
String valueStr = nodeAttributes.get(MachineLearning.MAX_JVM_SIZE_NODE_ATTR);
try {
return OptionalLong.of(Long.parseLong(valueStr));
} catch (NumberFormatException e) {
assert e == null : "ml.max_jvm_size should parse because we set it internally: invalid value was " + valueStr;
logger.debug(
"could not parse stored string value [{}] in node attribute [{}]",
valueStr,
MachineLearning.MAX_JVM_SIZE_NODE_ATTR
);
}
return OptionalLong.empty();
}
}

0 comments on commit fb4adda

Please sign in to comment.