Skip to content

Commit

Permalink
[ML] Autoscaling for machine learning (#59309)
Browse files Browse the repository at this point in the history
This provides an autoscaling service integration for machine learning. 

The underlying logic is fairly straightforward with a couple of caveats:
- When considering to scale up/down, ML will automatically translate between Node size and the memory that the node will potentially provide for ML after the scaling plan is implemented. 
- If knowledge of job sizes is out of date, we will do a best effort check for scaling up. But, if that cannot be determined with our current view of job memory, we attempt a refresh and return a no_scale event
- We assume that if the auto memory percent calculation is being used, we treat all JVM sizes on the nodes the same.
- For scale down, we keep our last scale down calculation time in memory. So, if master nodes are changed in between, we reset the scale down delay.
  • Loading branch information
benwtrent committed Nov 17, 2020
1 parent 0b69d91 commit ebd0996
Show file tree
Hide file tree
Showing 25 changed files with 2,062 additions and 178 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ public static String jobTaskId(String jobId) {
return JOB_TASK_ID_PREFIX + jobId;
}

public static String jobId(String jobTaskId) {
return jobTaskId.substring(JOB_TASK_ID_PREFIX.length());
}

/**
* Namespaces the task ids for datafeeds.
* A job id can be used as a datafeed id, because they are stored separately in cluster state.
Expand All @@ -67,6 +71,10 @@ public static String dataFrameAnalyticsTaskId(String id) {
return DATA_FRAME_ANALYTICS_TASK_ID_PREFIX + id;
}

public static String dataFrameAnalyticsId(String taskId) {
return taskId.substring(DATA_FRAME_ANALYTICS_TASK_ID_PREFIX.length());
}

@Nullable
public static PersistentTasksCustomMetadata.PersistentTask<?> getJobTask(String jobId, @Nullable PersistentTasksCustomMetadata tasks) {
return tasks == null ? null : tasks.getTask(jobTaskId(jobId));
Expand Down
3 changes: 2 additions & 1 deletion x-pack/plugin/ml/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ esplugin {
description 'Elasticsearch Expanded Pack Plugin - Machine Learning'
classname 'org.elasticsearch.xpack.ml.MachineLearning'
hasNativeController true
extendedPlugins = ['x-pack-core', 'lang-painless']
extendedPlugins = ['x-pack-autoscaling', 'lang-painless']
}


Expand Down Expand Up @@ -50,6 +50,7 @@ dependencies {
compileOnly project(path: xpackModule('core'), configuration: 'default')
testImplementation project(path: xpackModule('core'), configuration: 'testArtifacts')
testImplementation project(path: xpackModule('ilm'), configuration: 'default')
compileOnly project(path: xpackModule('autoscaling'), configuration: 'default')
testImplementation project(path: xpackModule('data-streams'), configuration: 'default')
// This should not be here
testImplementation project(path: xpackModule('security'), configuration: 'testArtifacts')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@
import org.elasticsearch.threadpool.ScalingExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderConfiguration;
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderService;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.XPackSettings;
Expand Down Expand Up @@ -224,6 +226,8 @@
import org.elasticsearch.xpack.ml.action.TransportValidateDetectorAction;
import org.elasticsearch.xpack.ml.action.TransportValidateJobConfigAction;
import org.elasticsearch.xpack.ml.annotations.AnnotationPersister;
import org.elasticsearch.xpack.ml.autoscaling.MlAutoscalingDeciderService;
import org.elasticsearch.xpack.ml.autoscaling.MlAutoscalingNamedWritableProvider;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfigAutoUpdater;
import org.elasticsearch.xpack.ml.datafeed.DatafeedJobBuilder;
import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
Expand Down Expand Up @@ -428,7 +432,7 @@ public Set<DiscoveryNodeRole> getRoles() {
// controls the types of jobs that can be created, and each job alone is considerably smaller than what each node
// can handle.
public static final Setting<Integer> MAX_MACHINE_MEMORY_PERCENT =
Setting.intSetting("xpack.ml.max_machine_memory_percent", 30, 5, 200, Property.Dynamic, Property.NodeScope);
Setting.intSetting("xpack.ml.max_machine_memory_percent", 30, 5, 200, Property.Dynamic, Property.NodeScope);
/**
* This boolean value indicates if `max_machine_memory_percent` should be ignored and a automatic calculation is used instead.
*
Expand Down Expand Up @@ -484,6 +488,17 @@ public Set<DiscoveryNodeRole> getRoles() {
Property.NodeScope
);

/**
* This is the maximum possible node size for a machine learning node. It is useful when determining if a job could ever be opened
* on the cluster.
*
* If the value is the default special case of `0b`, that means the value is ignored when assigning jobs.
*/
public static final Setting<ByteSizeValue> MAX_ML_NODE_SIZE = Setting.byteSizeSetting(
"xpack.ml.max_ml_node_size",
ByteSizeValue.ZERO,
Property.NodeScope);

private static final Logger logger = LogManager.getLogger(MachineLearning.class);

private final Settings settings;
Expand All @@ -497,6 +512,7 @@ public Set<DiscoveryNodeRole> getRoles() {
private final SetOnce<ActionFilter> mlUpgradeModeActionFilter = new SetOnce<>();
private final SetOnce<CircuitBreaker> inferenceModelBreaker = new SetOnce<>();
private final SetOnce<ModelLoadingService> modelLoadingService = new SetOnce<>();
private final SetOnce<MlAutoscalingDeciderService> mlAutoscalingDeciderService = new SetOnce<>();

public MachineLearning(Settings settings, Path configPath) {
this.settings = settings;
Expand Down Expand Up @@ -533,7 +549,8 @@ public List<Setting<?>> getSettings() {
ModelLoadingService.INFERENCE_MODEL_CACHE_TTL,
ResultsPersisterService.PERSIST_RESULTS_MAX_RETRIES,
NIGHTLY_MAINTENANCE_REQUESTS_PER_SECOND,
USE_AUTO_MACHINE_MEMORY_PERCENT
USE_AUTO_MACHINE_MEMORY_PERCENT,
MAX_ML_NODE_SIZE
);
}

Expand Down Expand Up @@ -771,6 +788,8 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
// Perform node startup operations
nativeStorageProvider.cleanupLocalTmpStorageInCaseOfUncleanShutdown();

mlAutoscalingDeciderService.set(new MlAutoscalingDeciderService(memoryTracker, settings, clusterService));

return Arrays.asList(
mlLifeCycleService,
new MlControllerHolder(mlController),
Expand Down Expand Up @@ -1129,6 +1148,7 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
namedWriteables.addAll(new AnalysisStatsNamedWriteablesProvider().getNamedWriteables());
namedWriteables.addAll(MlEvaluationNamedXContentProvider.getNamedWriteables());
namedWriteables.addAll(new MlInferenceNamedXContentProvider().getNamedWriteables());
namedWriteables.addAll(MlAutoscalingNamedWritableProvider.getNamedWriteables());
return namedWriteables;
}

Expand Down Expand Up @@ -1159,4 +1179,13 @@ public void setCircuitBreaker(CircuitBreaker circuitBreaker) {
assert circuitBreaker.getName().equals(TRAINED_MODEL_CIRCUIT_BREAKER_NAME);
this.inferenceModelBreaker.set(circuitBreaker);
}

public Collection<AutoscalingDeciderService<? extends AutoscalingDeciderConfiguration>> deciders() {
if (enabled) {
assert mlAutoscalingDeciderService.get() != null;
return List.of(mlAutoscalingDeciderService.get());
} else {
return List.of();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,7 @@ public PersistentTasksCustomMetadata.Assignment getAssignment(TaskParams params,
maxOpenJobs,
Integer.MAX_VALUE,
maxMachineMemoryPercent,
maxNodeMemory,
isMemoryTrackerRecentlyRefreshed,
useAutoMemoryPercentage
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.ml.autoscaling;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderConfiguration;

import java.io.IOException;
import java.util.Objects;

public class MlAutoscalingDeciderConfiguration implements AutoscalingDeciderConfiguration {
static final String NAME = "ml";

private static final int DEFAULT_ANOMALY_JOBS_IN_QUEUE = 0;
private static final int DEFAULT_ANALYTICS_JOBS_IN_QUEUE = 0;

private static final ParseField NUM_ANOMALY_JOBS_IN_QUEUE = new ParseField("num_anomaly_jobs_in_queue");
private static final ParseField NUM_ANALYTICS_JOBS_IN_QUEUE = new ParseField("num_analytics_jobs_in_queue");
private static final ParseField DOWN_SCALE_DELAY = new ParseField("down_scale_delay");

private static final ObjectParser<MlAutoscalingDeciderConfiguration.Builder, Void> PARSER = new ObjectParser<>(NAME,
MlAutoscalingDeciderConfiguration.Builder::new);

static {
PARSER.declareInt(MlAutoscalingDeciderConfiguration.Builder::setNumAnomalyJobsInQueue, NUM_ANOMALY_JOBS_IN_QUEUE);
PARSER.declareInt(MlAutoscalingDeciderConfiguration.Builder::setNumAnalyticsJobsInQueue, NUM_ANALYTICS_JOBS_IN_QUEUE);
PARSER.declareString(MlAutoscalingDeciderConfiguration.Builder::setDownScaleDelay, DOWN_SCALE_DELAY);
}

public static MlAutoscalingDeciderConfiguration parse(final XContentParser parser) {
return PARSER.apply(parser, null).build();
}

private final int numAnomalyJobsInQueue;
private final int numAnalyticsJobsInQueue;
private final TimeValue downScaleDelay;

MlAutoscalingDeciderConfiguration(int numAnomalyJobsInQueue, int numAnalyticsJobsInQueue, TimeValue downScaleDelay) {
if (numAnomalyJobsInQueue < 0) {
throw new IllegalArgumentException("[" + NUM_ANOMALY_JOBS_IN_QUEUE.getPreferredName() + "] must be non-negative");
}
if (numAnalyticsJobsInQueue < 0) {
throw new IllegalArgumentException("[" + NUM_ANALYTICS_JOBS_IN_QUEUE.getPreferredName() + "] must be non-negative");
}
this.numAnalyticsJobsInQueue = numAnalyticsJobsInQueue;
this.numAnomalyJobsInQueue = numAnomalyJobsInQueue;
this.downScaleDelay = downScaleDelay;
}

public MlAutoscalingDeciderConfiguration(StreamInput in) throws IOException {
numAnomalyJobsInQueue = in.readVInt();
numAnalyticsJobsInQueue = in.readVInt();
downScaleDelay = in.readTimeValue();
}

@Override
public String name() {
return NAME;
}

@Override
public String getWriteableName() {
return NAME;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(numAnomalyJobsInQueue);
out.writeVInt(numAnalyticsJobsInQueue);
out.writeTimeValue(downScaleDelay);
}

public int getNumAnomalyJobsInQueue() {
return numAnomalyJobsInQueue;
}

public int getNumAnalyticsJobsInQueue() {
return numAnalyticsJobsInQueue;
}

public TimeValue getDownScaleDelay() {
return downScaleDelay;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MlAutoscalingDeciderConfiguration that = (MlAutoscalingDeciderConfiguration) o;
return numAnomalyJobsInQueue == that.numAnomalyJobsInQueue &&
numAnalyticsJobsInQueue == that.numAnalyticsJobsInQueue &&
Objects.equals(downScaleDelay, that.downScaleDelay);
}

@Override
public int hashCode() {
return Objects.hash(numAnomalyJobsInQueue, numAnalyticsJobsInQueue, downScaleDelay);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(NUM_ANOMALY_JOBS_IN_QUEUE .getPreferredName(), numAnomalyJobsInQueue);
builder.field(NUM_ANALYTICS_JOBS_IN_QUEUE.getPreferredName(), numAnalyticsJobsInQueue);
builder.field(DOWN_SCALE_DELAY.getPreferredName(), downScaleDelay.getStringRep());
builder.endObject();
return builder;
}

public static Builder builder() {
return new Builder();
}

public static class Builder {

private int numAnomalyJobsInQueue = DEFAULT_ANOMALY_JOBS_IN_QUEUE;
private int numAnalyticsJobsInQueue = DEFAULT_ANALYTICS_JOBS_IN_QUEUE;
private TimeValue downScaleDelay = TimeValue.ZERO;

public Builder setNumAnomalyJobsInQueue(int numAnomalyJobsInQueue) {
this.numAnomalyJobsInQueue = numAnomalyJobsInQueue;
return this;
}

public Builder setNumAnalyticsJobsInQueue(int numAnalyticsJobsInQueue) {
this.numAnalyticsJobsInQueue = numAnalyticsJobsInQueue;
return this;
}

Builder setDownScaleDelay(String unparsedTimeValue) {
return setDownScaleDelay(TimeValue.parseTimeValue(unparsedTimeValue, DOWN_SCALE_DELAY.getPreferredName()));
}

public Builder setDownScaleDelay(TimeValue downScaleDelay) {
this.downScaleDelay = Objects.requireNonNull(downScaleDelay);
return this;
}

public MlAutoscalingDeciderConfiguration build() {
return new MlAutoscalingDeciderConfiguration(numAnomalyJobsInQueue, numAnalyticsJobsInQueue, downScaleDelay);
}
}

}

0 comments on commit ebd0996

Please sign in to comment.