Skip to content

Commit

Permalink
[ML] Add cluster settings to override ML autoscaling (#102413)
Browse files Browse the repository at this point in the history
Add the concept of an autoscaling "dummy" entity, comprised of the xpack.ml.dummy_entity_memory and xpack.ml.dummy_entity_processors settings. These are intended to be used in extreme cases to trigger autoscaling events.
  • Loading branch information
edsavage authored and timgrein committed Nov 30, 2023
1 parent 4b7e7d4 commit 89416a2
Show file tree
Hide file tree
Showing 3 changed files with 551 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,23 @@ public void loadExtensions(ExtensionLoader loader) {
Property.NodeScope
);

// The next two settings currently only have an effect in serverless. They can be set as overrides to
// trigger a scale up of the ML tier so that it could accommodate the dummy entity in addition to
// whatever the standard autoscaling formula thinks is necessary.
public static final Setting<ByteSizeValue> DUMMY_ENTITY_MEMORY = Setting.memorySizeSetting(
"xpack.ml.dummy_entity_memory",
ByteSizeValue.ZERO,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
public static final Setting<Integer> DUMMY_ENTITY_PROCESSORS = Setting.intSetting(
"xpack.ml.dummy_entity_processors",
0,
0,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

public static final Setting<TimeValue> PROCESS_CONNECT_TIMEOUT = Setting.timeSetting(
"xpack.ml.process_connect_timeout",
TimeValue.timeValueSeconds(10),
Expand Down Expand Up @@ -782,7 +799,9 @@ public List<Setting<?>> getSettings() {
NIGHTLY_MAINTENANCE_REQUESTS_PER_SECOND,
MachineLearningField.USE_AUTO_MACHINE_MEMORY_PERCENT,
MAX_ML_NODE_SIZE,
DELAYED_DATA_CHECK_FREQ
DELAYED_DATA_CHECK_FREQ,
DUMMY_ENTITY_MEMORY,
DUMMY_ENTITY_PROCESSORS
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@

import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.core.Tuple.tuple;
import static org.elasticsearch.xpack.ml.MachineLearning.DUMMY_ENTITY_MEMORY;
import static org.elasticsearch.xpack.ml.MachineLearning.DUMMY_ENTITY_PROCESSORS;
import static org.elasticsearch.xpack.ml.MachineLearning.MAX_OPEN_JOBS_PER_NODE;
import static org.elasticsearch.xpack.ml.job.JobNodeSelector.AWAITING_LAZY_ASSIGNMENT;

Expand All @@ -58,7 +60,13 @@ static MlJobRequirements of(long memory, int processors, int jobs) {
static MlJobRequirements of(long memory, int processors) {
return new MlJobRequirements(memory, processors, 1);
}
};
}

record MlDummyAutoscalingEntity(long memory, int processors) {
static MlDummyAutoscalingEntity of(long memory, int processors) {
return new MlDummyAutoscalingEntity(memory, processors);
}
}

private MlAutoscalingResourceTracker() {}

Expand All @@ -85,6 +93,12 @@ public static void getMlAutoscalingStats(
.roundDown()
: 0;

MlDummyAutoscalingEntity mlDummyAutoscalingEntity = new MlDummyAutoscalingEntity(
// Treat a ByteSizeValue of -1 as 0, since 0 is the default dummy entity size
Math.max(0L, DUMMY_ENTITY_MEMORY.get(settings).getBytes()),
DUMMY_ENTITY_PROCESSORS.get(settings)
);

// Todo: MAX_LOW_PRIORITY_MODELS_PER_NODE not checked yet
int maxOpenJobsPerNode = MAX_OPEN_JOBS_PER_NODE.get(settings);

Expand All @@ -95,6 +109,7 @@ public static void getMlAutoscalingStats(
modelMemoryAvailableFirstNode,
processorsAvailableFirstNode,
maxOpenJobsPerNode,
mlDummyAutoscalingEntity,
listener
);
}
Expand All @@ -106,6 +121,7 @@ static void getMemoryAndProcessors(
long perNodeAvailableModelMemoryInBytes,
int perNodeAvailableProcessors,
int maxOpenJobsPerNode,
MlDummyAutoscalingEntity dummyAutoscalingEntity,
ActionListener<MlAutoscalingStats> listener
) {
Map<String, List<MlJobRequirements>> perNodeModelMemoryInBytes = new HashMap<>();
Expand Down Expand Up @@ -262,6 +278,23 @@ static void getMemoryAndProcessors(
minNodes = Math.min(3, Math.max(minNodes, numberOfAllocations));
}

// dummy autoscaling entity
if (dummyEntityFitsOnLeastLoadedNode(
perNodeModelMemoryInBytes,
perNodeAvailableModelMemoryInBytes,
perNodeAvailableProcessors,
dummyAutoscalingEntity
) == false) {
logger.info(
"Scaling up due to dummy entity: dummyEntityMemory: [{}], dummyEntityProcessors: [{}]",
dummyAutoscalingEntity.memory,
dummyAutoscalingEntity.processors
);

modelMemoryBytesSum += dummyAutoscalingEntity.memory;
processorsSum += dummyAutoscalingEntity.processors;
}

// check for downscaling
long removeNodeMemoryInBytes = 0;

Expand All @@ -282,7 +315,8 @@ static void getMemoryAndProcessors(
perNodeModelMemoryInBytes,
perNodeAvailableModelMemoryInBytes,
perNodeAvailableProcessors,
maxOpenJobsPerNode
maxOpenJobsPerNode,
dummyAutoscalingEntity
))) {
removeNodeMemoryInBytes = perNodeMemoryInBytes;
}
Expand All @@ -304,6 +338,73 @@ static void getMemoryAndProcessors(
);
}

/**
* Check if the dummy autoscaling entity task can be added by placing
* the task on the least loaded node.
*
* If there exists a node that can accommodate the dummy entity then return true (nothing to do),
* else return false and increment the memory and processor counts accordingly.
*
* We perform the calculation by identifying the least loaded node in terms of memory
* and determining if the addition of the dummy entity's memory and processor requirements could
* be accommodated on it.
*
* If the calculation returns false then treat the case as for a single trained model job
* that is already assigned, i.e. increment modelMemoryBytesSum and processorsSum appropriately.
*
* @param perNodeJobRequirements per Node lists of requirements
* @param perNodeMemoryInBytes total model memory available on every node
* @param perNodeProcessors total processors on every node
* @param dummyAutoscalingEntity "dummy" entity requirements used to potentially trigger a scaling event
* @return true if the dummy entity can be accommodated, false if not
*/
static boolean dummyEntityFitsOnLeastLoadedNode(
Map<String, List<MlJobRequirements>> perNodeJobRequirements, // total up requirements...
long perNodeMemoryInBytes,
int perNodeProcessors,
MlDummyAutoscalingEntity dummyAutoscalingEntity
) {

if (dummyAutoscalingEntity.processors == 0 && dummyAutoscalingEntity.memory == 0L) {
return true;
}

if (perNodeJobRequirements.size() < 1) {
return false;
}

// Note: we check least loaded based _only_ on memory...
Optional<MlJobRequirements> leastLoadedNodeRequirements = perNodeJobRequirements.values()
.stream()
.map(
value -> value.stream()
.reduce(
MlJobRequirements.of(0L, 0, 0),
(subtotal, element) -> MlJobRequirements.of(
subtotal.memory + element.memory,
subtotal.processors + element.processors,
subtotal.jobs + element.jobs
)
)
)
.min(Comparator.comparingLong(value -> value.memory));

assert (leastLoadedNodeRequirements.isPresent());
assert leastLoadedNodeRequirements.get().memory >= 0L;
assert leastLoadedNodeRequirements.get().processors >= 0;

// Check if the dummy entity could be accommodated
if (leastLoadedNodeRequirements.get().memory + dummyAutoscalingEntity.memory > perNodeMemoryInBytes) {
return false;
}

if (leastLoadedNodeRequirements.get().processors + dummyAutoscalingEntity.processors > perNodeProcessors) {
return false;
}

return true;
}

/**
* Return some autoscaling stats that tell the autoscaler not to change anything, but without making it think an error has occurred.
*/
Expand Down Expand Up @@ -340,7 +441,8 @@ static boolean checkIfOneNodeCouldBeRemoved(
Map<String, List<MlJobRequirements>> perNodeJobRequirements,
long perNodeMemoryInBytes,
int perNodeProcessors,
int maxOpenJobsPerNode
int maxOpenJobsPerNode,
MlDummyAutoscalingEntity dummyAutoscalingEntity
) {
if (perNodeJobRequirements.size() <= 1) {
return false;
Expand Down Expand Up @@ -378,6 +480,10 @@ static boolean checkIfOneNodeCouldBeRemoved(

String candidateNode = leastLoadedNodeAndMemoryUsage.get().getKey();
List<MlJobRequirements> candidateJobRequirements = perNodeJobRequirements.get(candidateNode);
if (dummyAutoscalingEntity.memory > 0L || dummyAutoscalingEntity.processors > 0) {
candidateJobRequirements = new ArrayList<>(candidateJobRequirements);
candidateJobRequirements.add(MlJobRequirements.of(dummyAutoscalingEntity.memory, dummyAutoscalingEntity.processors));
}
perNodeMlJobRequirementSum.remove(candidateNode);

// if all jobs fit on other nodes, we can scale down one node
Expand Down

0 comments on commit 89416a2

Please sign in to comment.