Skip to content

Commit

Permalink
[ML] Add processor autoscaling decider (#89645)
Browse files Browse the repository at this point in the history
This adds a processor decider to the ML autoscaling decider service.
This first implementation is simple and naive. It simply computes
the required processor capacity to be the max trained model deployment
`threads_per_allocation` for the node, and the sum of all processors
required by trained model deployments for the tier.
  • Loading branch information
dimitris-athanasiou committed Sep 7, 2022
1 parent 56edb88 commit 407dc18
Show file tree
Hide file tree
Showing 9 changed files with 718 additions and 34 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/89645.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 89645
summary: Add processor autoscaling decider
area: Machine Learning
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.Processors;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.env.Environment;
Expand Down Expand Up @@ -767,7 +768,7 @@ public Settings additionalSettings() {
Long.toString(OsProbe.getInstance().osStats().getMem().getAdjustedTotal().getBytes())
);
addMlNodeAttribute(additionalSettings, jvmSizeAttrName, Long.toString(Runtime.getRuntime().maxMemory()));
addMlNodeAttribute(additionalSettings, allocatedProcessorsAttrName, Integer.toString(getAllocatedProcessors()));
addMlNodeAttribute(additionalSettings, allocatedProcessorsAttrName, Double.toString(getAllocatedProcessors().count()));
// This is not used in v8 and higher, but users are still prevented from setting it directly to avoid confusion
disallowMlNodeAttributes(maxOpenJobsPerNodeNodeAttrName);
} else {
Expand All @@ -785,8 +786,8 @@ private void addMlNodeAttribute(Settings.Builder additionalSettings, String attr
}
}

private int getAllocatedProcessors() {
return EsExecutors.allocatedProcessors(settings);
private Processors getAllocatedProcessors() {
return EsExecutors.nodeProcessors(settings);
}

private void disallowMlNodeAttributes(String... mlNodeAttributes) {
Expand Down Expand Up @@ -1448,7 +1449,7 @@ public List<ExecutorBuilder<?>> getExecutorBuilders(Settings unused) {
ScalingExecutorBuilder pytorchComms = new ScalingExecutorBuilder(
NATIVE_INFERENCE_COMMS_THREAD_POOL_NAME,
3,
getAllocatedProcessors() * 3,
getAllocatedProcessors().roundUp() * 3,
TimeValue.timeValueMinutes(1),
false,
"xpack.ml.native_inference_comms_thread_pool"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class MlAutoscalingDeciderService implements AutoscalingDeciderService, L

private final ScaleTimer scaleTimer;
private final MlMemoryAutoscalingDecider memoryDecider;
private final MlProcessorAutoscalingDecider processorDecider;

private volatile boolean isMaster;

Expand Down Expand Up @@ -66,6 +67,7 @@ public MlAutoscalingDeciderService(
nodeLoadDetector,
scaleTimer
);
this.processorDecider = new MlProcessorAutoscalingDecider(scaleTimer, nodeAvailabilityZoneMapper);
clusterService.addLocalNodeMasterListener(this);
}

Expand All @@ -91,12 +93,21 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider
final MlAutoscalingContext mlContext = new MlAutoscalingContext(clusterState);
final NativeMemoryCapacity currentNativeMemoryCapacity = memoryDecider.currentScale(mlContext.mlNodes);
final MlMemoryAutoscalingCapacity currentMemoryCapacity = memoryDecider.capacityFromNativeMemory(currentNativeMemoryCapacity);
final MlProcessorAutoscalingCapacity currentProcessorCapacity = processorDecider.computeCurrentCapacity(mlContext.mlNodes);

final MlScalingReason.Builder reasonBuilder = MlScalingReason.builder(mlContext)
.setCurrentMlCapacity(
new AutoscalingCapacity(
new AutoscalingCapacity.AutoscalingResources(null, currentMemoryCapacity.tierSize(), null),
new AutoscalingCapacity.AutoscalingResources(null, currentMemoryCapacity.nodeSize(), null)
new AutoscalingCapacity.AutoscalingResources(
null,
currentMemoryCapacity.tierSize(),
currentProcessorCapacity.tierProcessors()
),
new AutoscalingCapacity.AutoscalingResources(
null,
currentMemoryCapacity.nodeSize(),
currentProcessorCapacity.nodeProcessors()
)
)
)
.setPassedConfiguration(configuration);
Expand All @@ -109,12 +120,15 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider
}

MlMemoryAutoscalingCapacity memoryCapacity = memoryDecider.scale(configuration, context, mlContext);
reasonBuilder.setSimpleReason(memoryCapacity.reason());
MlProcessorAutoscalingCapacity processorCapacity = processorDecider.scale(configuration, context, mlContext);
reasonBuilder.setSimpleReason(
String.format(Locale.ROOT, "[memory_decider] %s; [processor_decider] %s", memoryCapacity.reason(), processorCapacity.reason())
);

return new AutoscalingDeciderResult(
new AutoscalingCapacity(
new AutoscalingCapacity.AutoscalingResources(null, memoryCapacity.tierSize(), null),
new AutoscalingCapacity.AutoscalingResources(null, memoryCapacity.nodeSize(), null)
new AutoscalingCapacity.AutoscalingResources(null, memoryCapacity.tierSize(), processorCapacity.tierProcessors()),
new AutoscalingCapacity.AutoscalingResources(null, memoryCapacity.nodeSize(), processorCapacity.nodeProcessors())
),
reasonBuilder.build()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.Processors;
import org.elasticsearch.common.xcontent.XContentElasticsearchExtension;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
Expand Down Expand Up @@ -185,8 +186,6 @@ public MlMemoryAutoscalingCapacity scale(Settings configuration, AutoscalingDeci

final List<String> partiallyAllocatedModels = mlContext.findPartiallyAllocatedModels();

// TODO for autoscaling by memory, we only care about if the model is allocated to at least one node (see above)
// We should do this check in our autoscaling by processor count service, which will be a separate decider for readability's sake
if (mlContext.waitingAnalyticsJobs.isEmpty() == false
|| mlContext.waitingSnapshotUpgrades.isEmpty() == false
|| mlContext.waitingAnomalyJobs.isEmpty() == false
Expand Down Expand Up @@ -257,7 +256,8 @@ public MlMemoryAutoscalingCapacity scale(Settings configuration, AutoscalingDeci
if (capacity == null) {
return null;
}
// TODO we should remove this when we can auto-scale (down and up) via a new CPU auto-scaling decider
// We should keep this check here as well as in the processor decider while cloud is not
// reacting to processor autoscaling.
if (modelAssignmentsRequireMoreThanHalfCpu(mlContext.modelAssignments.values(), mlContext.mlNodes)) {
logger.debug("not down-scaling; model assignments require more than half of the ML tier's allocated processors");
return null;
Expand Down Expand Up @@ -822,7 +822,8 @@ static boolean modelAssignmentsRequireMoreThanHalfCpu(Collection<TrainedModelAss
int totalMlProcessors = mlNodes.stream().mapToInt(node -> {
String allocatedProcessorsString = node.getAttributes().get(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR);
try {
return Integer.parseInt(allocatedProcessorsString);
double allocatedProcessorsAsDouble = Double.parseDouble(allocatedProcessorsString);
return allocatedProcessorsAsDouble > 0 ? Processors.of(allocatedProcessorsAsDouble).roundUp() : 0;
} catch (NumberFormatException e) {
assert e == null
: MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.ml.autoscaling;

import org.elasticsearch.common.unit.Processors;

public record MlProcessorAutoscalingCapacity(Processors nodeProcessors, Processors tierProcessors, String reason) {

public static Builder builder(Processors nodeProcessors, Processors tierProcessors) {
return new Builder(nodeProcessors, tierProcessors);
}

@Override
public String toString() {
return "MlProcessorAutoscalingCapacity{"
+ "nodeProcessors="
+ nodeProcessors
+ ", tierProcessors="
+ tierProcessors
+ ", reason='"
+ reason
+ '\''
+ '}';
}

public static class Builder {

private Processors nodeProcessors;
private Processors tierProcessors;
private String reason;

public Builder(Processors nodeProcessors, Processors tierProcessors) {
this.nodeProcessors = nodeProcessors;
this.tierProcessors = tierProcessors;
}

public Builder setReason(String reason) {
this.reason = reason;
return this;
}

MlProcessorAutoscalingCapacity build() {
return new MlProcessorAutoscalingCapacity(nodeProcessors, tierProcessors, reason);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.ml.autoscaling;

import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.Processors;
import org.elasticsearch.common.xcontent.XContentElasticsearchExtension;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderContext;
import org.elasticsearch.xpack.core.ml.inference.assignment.TrainedModelAssignment;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.inference.assignment.TrainedModelAssignmentMetadata;

import java.time.Instant;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

import static java.time.Instant.ofEpochMilli;
import static org.elasticsearch.common.xcontent.XContentElasticsearchExtension.DEFAULT_FORMATTER;
import static org.elasticsearch.core.Strings.format;

class MlProcessorAutoscalingDecider {

private static final Logger logger = LogManager.getLogger(MlProcessorAutoscalingDecider.class);

private final ScaleTimer scaleTimer;
private final NodeAvailabilityZoneMapper nodeAvailabilityZoneMapper;

MlProcessorAutoscalingDecider(ScaleTimer scaleTimer, NodeAvailabilityZoneMapper nodeAvailabilityZoneMapper) {
this.scaleTimer = Objects.requireNonNull(scaleTimer);
this.nodeAvailabilityZoneMapper = Objects.requireNonNull(nodeAvailabilityZoneMapper);
}

public MlProcessorAutoscalingCapacity scale(Settings configuration, AutoscalingDeciderContext context, MlAutoscalingContext mlContext) {
TrainedModelAssignmentMetadata trainedModelAssignmentMetadata = TrainedModelAssignmentMetadata.fromState(context.state());

if (hasUnsatisfiedDeployments(trainedModelAssignmentMetadata, mlContext.mlNodes)) {
logger.debug(() -> "Computing required capacity as there are partially allocated deployments");
scaleTimer.resetScaleDownCoolDown();
return computeRequiredCapacity(trainedModelAssignmentMetadata).setReason(
"requesting scale up as there are unsatisfied deployments"
).build();
}

final MlProcessorAutoscalingCapacity currentCapacity = computeCurrentCapacity(mlContext.mlNodes);

final MlProcessorAutoscalingCapacity requiredCapacity = computeRequiredCapacity(trainedModelAssignmentMetadata).build();

if (requiredCapacity.tierProcessors().roundUp() == currentCapacity.tierProcessors().roundUp()) {
return MlProcessorAutoscalingCapacity.builder(currentCapacity.nodeProcessors(), currentCapacity.tierProcessors())
.setReason("passing currently perceived capacity as it is fully used")
.build();
}

if (MlMemoryAutoscalingDecider.modelAssignmentsRequireMoreThanHalfCpu(
trainedModelAssignmentMetadata.modelAssignments().values(),
mlContext.mlNodes
)) {
return MlProcessorAutoscalingCapacity.builder(currentCapacity.nodeProcessors(), currentCapacity.tierProcessors())
.setReason("not scaling down as model assignments require more than half of the ML tier's allocated processors")
.build();
}

long msLeftToScale = scaleTimer.markDownScaleAndGetMillisLeftFromDelay(configuration);
if (msLeftToScale <= 0) {
return MlProcessorAutoscalingCapacity.builder(requiredCapacity.nodeProcessors(), requiredCapacity.tierProcessors())
.setReason("requesting scale down as tier and/or node size could be smaller")
.build();
}

TimeValue downScaleDelay = MlAutoscalingDeciderService.DOWN_SCALE_DELAY.get(configuration);
logger.debug(
() -> format(
"not scaling down as the current scale down delay [%s] is not satisfied."
+ " The last time scale down was detected [%s]. Calculated scaled down capacity [%s] ",
downScaleDelay.getStringRep(),
DEFAULT_FORMATTER.format(ofEpochMilli(scaleTimer.downScaleDetectedMillis())),
requiredCapacity
)
);
return MlProcessorAutoscalingCapacity.builder(currentCapacity.nodeProcessors(), currentCapacity.tierProcessors())
.setReason(
String.format(
Locale.ROOT,
"Passing currently perceived capacity as down scale delay has not been satisfied; configured delay [%s] "
+ "last detected scale down event [%s]. Will request scale down in approximately [%s]",
downScaleDelay.getStringRep(),
XContentElasticsearchExtension.DEFAULT_FORMATTER.format(Instant.ofEpochMilli(scaleTimer.downScaleDetectedMillis())),
TimeValue.timeValueMillis(msLeftToScale).getStringRep()
)
)
.build();
}

private boolean hasUnsatisfiedDeployments(TrainedModelAssignmentMetadata trainedModelAssignmentMetadata, List<DiscoveryNode> mlNodes) {
final Set<String> mlNodeIds = mlNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toSet());
return trainedModelAssignmentMetadata.modelAssignments()
.values()
.stream()
.anyMatch(deployment -> deployment.isSatisfied(mlNodeIds) == false);
}

private MlProcessorAutoscalingCapacity.Builder computeRequiredCapacity(TrainedModelAssignmentMetadata trainedModelAssignmentMetadata) {
int maxThreadsPerAllocation = 0;
int processorCount = 0;
for (TrainedModelAssignment assignment : trainedModelAssignmentMetadata.modelAssignments().values()) {
int threadsPerAllocation = assignment.getTaskParams().getThreadsPerAllocation();
maxThreadsPerAllocation = Math.max(maxThreadsPerAllocation, threadsPerAllocation);
processorCount += assignment.getTaskParams().getNumberOfAllocations() * threadsPerAllocation;
}

final int numMlAvailabilityZones = nodeAvailabilityZoneMapper.getNumMlAvailabilityZones().orElse(1);
if (numMlAvailabilityZones > 1) {
// We assume cloud provides what we ask for tier processors for each availability zone.
// Thus we need to devide the total processor count required by the number of ML availability zones.
processorCount = (processorCount - 1) / numMlAvailabilityZones + 1;
}
processorCount = Math.max(processorCount, maxThreadsPerAllocation);

return MlProcessorAutoscalingCapacity.builder(
maxThreadsPerAllocation > 0 ? Processors.of(Double.valueOf(maxThreadsPerAllocation)) : Processors.ZERO,
processorCount > 0 ? Processors.of(Double.valueOf(processorCount)) : Processors.ZERO
);
}

MlProcessorAutoscalingCapacity computeCurrentCapacity(List<DiscoveryNode> mlNodes) {
Processors maxNodeProcessors = Processors.ZERO;
Processors tierProcessors = Processors.ZERO;
for (DiscoveryNode node : mlNodes) {
Processors nodeProcessors = getProcessors(node);
if (nodeProcessors.compareTo(maxNodeProcessors) > 0) {
maxNodeProcessors = nodeProcessors;
}
tierProcessors = tierProcessors.plus(nodeProcessors);
}
return MlProcessorAutoscalingCapacity.builder(maxNodeProcessors, tierProcessors).build();
}

private Processors getProcessors(DiscoveryNode node) {
String allocatedProcessorsString = node.getAttributes().get(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR);
if (allocatedProcessorsString == null) {
return Processors.ZERO;
}
try {
double processorsAsDouble = Double.parseDouble(allocatedProcessorsString);
return processorsAsDouble > 0 ? Processors.of(processorsAsDouble) : Processors.ZERO;
} catch (NumberFormatException e) {
assert e == null
: MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR
+ " should parse because we set it internally: invalid value was ["
+ allocatedProcessorsString
+ "]";
return Processors.ZERO;
}
}
}

0 comments on commit 407dc18

Please sign in to comment.