Skip to content

Commit

Permalink
[ML] fixing ml autoscaling decider bugs around requested node size an…
Browse files Browse the repository at this point in the history
…d tier size (#67731) (#67791)

Three bugs with the ML autoscaler service are corrected in this PR:

The XContent serialized information around waiting jobs is corrected in the verbose scaling reason
Tier will never be below the requested node size
Anomaly and analytics task state are now appropriately checked (using same logic as node load detection)
  • Loading branch information
benwtrent committed Jan 20, 2021
1 parent 0c1ab1d commit ad0fdb3
Show file tree
Hide file tree
Showing 9 changed files with 435 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,11 @@ public String toString() {
public boolean isAnyOf(DataFrameAnalyticsState... candidates) {
return Arrays.stream(candidates).anyMatch(candidate -> this == candidate);
}

/**
* @return {@code false} if state matches any of the given {@code candidates}
*/
public boolean isNoneOf(DataFrameAnalyticsState... candidates) {
return Arrays.stream(candidates).noneMatch(candidate -> this == candidate);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ public boolean isAnyOf(JobState... candidates) {
return Arrays.stream(candidates).anyMatch(candidate -> this == candidate);
}

/**
* @return {@code false} if state matches any of the given {@code candidates}
*/
public boolean isNoneOf(JobState... candidates) {
return Arrays.stream(candidates).noneMatch(candidate -> this == candidate);
}

@Override
public String toString() {
return name().toLowerCase(Locale.ROOT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void testMLAutoscalingCapacity() {
.collect(Collectors.toList());
NativeMemoryCapacity currentScale = MlAutoscalingDeciderService.currentScale(mlNodes, 30, false);
expectedTierBytes = (long)Math.ceil(
(ByteSizeValue.ofMb(50_000 + BASIC_REQUIREMENT_MB + 60_000 + BASIC_REQUIREMENT_MB).getBytes()
(ByteSizeValue.ofMb(50_000 + BASIC_REQUIREMENT_MB + 60_000 + BASELINE_OVERHEAD_MB).getBytes()
+ currentScale.getTier()
) * 100 / 30.0
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderService;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction.DatafeedParams;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.NodeLoad;
import org.elasticsearch.xpack.ml.job.NodeLoadDetector;
Expand Down Expand Up @@ -56,6 +54,7 @@
import static org.elasticsearch.xpack.core.ml.MlTasks.getDataFrameAnalyticsState;
import static org.elasticsearch.xpack.core.ml.MlTasks.getJobStateModifiedForReassignments;
import static org.elasticsearch.xpack.ml.job.JobNodeSelector.AWAITING_LAZY_ASSIGNMENT;
import static org.elasticsearch.xpack.ml.job.NodeLoad.taskStateFilter;

public class MlAutoscalingDeciderService implements AutoscalingDeciderService,
LocalNodeMasterListener {
Expand Down Expand Up @@ -149,16 +148,16 @@ static List<DiscoveryNode> getNodes(final ClusterState clusterState) {
static Optional<NativeMemoryCapacity> requiredCapacityForUnassignedJobs(List<String> unassignedJobs,
Function<String, Long> sizeFunction,
int maxNumInQueue) {
if (unassignedJobs.isEmpty()) {
return Optional.empty();
}
List<Long> jobSizes = unassignedJobs
.stream()
.map(sizeFunction)
.map(l -> l == null ? 0L : l)
.sorted(Comparator.comparingLong(Long::longValue).reversed())
.collect(Collectors.toList());
// Only possible if unassignedJobs was empty.
if (jobSizes.isEmpty()) {
return Optional.empty();
}
jobSizes.sort(Comparator.comparingLong(Long::longValue).reversed());

long tierMemory = 0L;
// Node memory needs to be AT LEAST the size of the largest job + the required overhead.
long nodeMemory = jobSizes.get(0) + MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes();
Expand All @@ -175,17 +174,15 @@ private static Collection<PersistentTask<?>> anomalyDetectionTasks(PersistentTas
return Collections.emptyList();
}

return tasksCustomMetadata.findTasks(MlTasks.JOB_TASK_NAME,
t -> getJobStateModifiedForReassignments(t).isAnyOf(JobState.OPENED, JobState.OPENING));
return tasksCustomMetadata.findTasks(MlTasks.JOB_TASK_NAME, t -> taskStateFilter(getJobStateModifiedForReassignments(t)));
}

private static Collection<PersistentTask<?>> dataframeAnalyticsTasks(PersistentTasksCustomMetadata tasksCustomMetadata) {
if (tasksCustomMetadata == null) {
return Collections.emptyList();
}

return tasksCustomMetadata.findTasks(MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME,
t -> getDataFrameAnalyticsState(t).isAnyOf(DataFrameAnalyticsState.STARTED, DataFrameAnalyticsState.STARTING));
return tasksCustomMetadata.findTasks(MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, t -> taskStateFilter(getDataFrameAnalyticsState(t)));
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -284,7 +281,12 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider
Collection<PersistentTask<?>> anomalyDetectionTasks = anomalyDetectionTasks(tasks);
Collection<PersistentTask<?>> dataframeAnalyticsTasks = dataframeAnalyticsTasks(tasks);
final List<DiscoveryNode> nodes = getNodes(clusterState);
Optional<NativeMemoryCapacity> futureFreedCapacity = calculateFutureFreedCapacity(tasks, memoryTrackingStale);
Optional<NativeMemoryCapacity> futureFreedCapacity = calculateFutureAvailableCapacity(
tasks,
memoryTrackingStale,
nodes,
clusterState
);

final List<String> waitingAnomalyJobs = anomalyDetectionTasks.stream()
.filter(t -> AWAITING_LAZY_ASSIGNMENT.equals(t.getAssignment()))
Expand All @@ -309,7 +311,8 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider
waitingAnalyticsJobs,
futureFreedCapacity.orElse(null),
currentScale,
reasonBuilder);
reasonBuilder
);

if (scaleUpDecision.isPresent()) {
resetScaleDownCoolDown();
Expand All @@ -318,7 +321,7 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider
if (waitingAnalyticsJobs.isEmpty() == false || waitingAnomalyJobs.isEmpty() == false) {
// We don't want to continue to consider a scale down if there are now waiting jobs
resetScaleDownCoolDown();
noScaleResultOrRefresh(reasonBuilder, memoryTrackingStale, new AutoscalingDeciderResult(
return noScaleResultOrRefresh(reasonBuilder, memoryTrackingStale, new AutoscalingDeciderResult(
context.currentCapacity(),
reasonBuilder
.setSimpleReason("Passing currently perceived capacity as there are analytics and anomaly jobs in the queue, " +
Expand Down Expand Up @@ -354,6 +357,23 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider
.max()
.orElse(0L));

// This is an exceptionally weird state
// Our view of the memory is stale or we have tasks where the required job memory is 0, which should be impossible
if (largestJob == 0L && ((dataframeAnalyticsTasks.isEmpty() || anomalyDetectionTasks.isEmpty()) == false)) {
logger.warn(
"The calculated minimum required node size was unexpectedly [0] as there are "
+ "[{}] anomaly job tasks and [{}] data frame analytics tasks",
anomalyDetectionTasks.size(),
dataframeAnalyticsTasks.size()
);
return noScaleResultOrRefresh(reasonBuilder, memoryTrackingStale, new AutoscalingDeciderResult(
context.currentCapacity(),
reasonBuilder
.setSimpleReason("Passing currently perceived capacity as there are running analytics and anomaly jobs, " +
"but their memory usage estimates are inaccurate.")
.build()));
}

final Optional<AutoscalingDeciderResult> scaleDownDecision =
checkForScaleDown(nodes, clusterState, largestJob, currentScale, reasonBuilder);

Expand Down Expand Up @@ -414,6 +434,7 @@ Optional<AutoscalingDeciderResult> checkForScaleUp(int numAnomalyJobsInQueue,
if (waitingAnalyticsJobs.size() > numAnalyticsJobsInQueue
|| waitingAnomalyJobs.size() > numAnomalyJobsInQueue) {
NativeMemoryCapacity updatedCapacity = NativeMemoryCapacity.from(currentScale);
// We check both analytics and anomaly capacity as we want to be able to support the largest job in either queue
Optional<NativeMemoryCapacity> analyticsCapacity = requiredCapacityForUnassignedJobs(waitingAnalyticsJobs,
this::getAnalyticsMemoryRequirement,
numAnalyticsJobsInQueue);
Expand All @@ -422,7 +443,10 @@ Optional<AutoscalingDeciderResult> checkForScaleUp(int numAnomalyJobsInQueue,
numAnomalyJobsInQueue);

updatedCapacity.merge(anomalyCapacity.orElse(NativeMemoryCapacity.ZERO))
.merge(analyticsCapacity.orElse(NativeMemoryCapacity.ZERO));
.merge(analyticsCapacity.orElse(NativeMemoryCapacity.ZERO))
// Since we require new capacity, it COULD be we require a brand new node
// We should account for overhead in the tier capacity just in case.
.merge(new NativeMemoryCapacity(MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes(), 0));
AutoscalingCapacity requiredCapacity = updatedCapacity.autoscalingCapacity(maxMachineMemoryPercent, useAuto);
return Optional.of(new AutoscalingDeciderResult(
requiredCapacity,
Expand Down Expand Up @@ -504,47 +528,60 @@ Optional<AutoscalingDeciderResult> checkForScaleUp(int numAnomalyJobsInQueue,
return Optional.empty();
}

// This calculates the the following the potentially automatically free capacity of sometime in the future
// This calculates the the following the potential future free capacity
// Since jobs with lookback only datafeeds, and data frame analytics jobs all have some potential future end date
// we can assume (without user intervention) that these will eventually stop and free their currently occupied resources.
//
// The capacity is as follows:
// tier: The sum total of the resources that will be removed
// node: The largest block of memory that will be freed on a given node.
// tier: The sum total of the resources that will be eventually be available
// node: The largest block of memory that will be free on a given node.
// - If > 1 "batch" ml tasks are running on the same node, we sum their resources.
Optional<NativeMemoryCapacity> calculateFutureFreedCapacity(PersistentTasksCustomMetadata tasks, Duration jobMemoryExpiry) {
Optional<NativeMemoryCapacity> calculateFutureAvailableCapacity(PersistentTasksCustomMetadata tasks,
Duration jobMemoryExpiry,
List<DiscoveryNode> mlNodes,
ClusterState clusterState) {
if (mlMemoryTracker.isRecentlyRefreshed(jobMemoryExpiry) == false) {
return Optional.empty();
}
final List<PersistentTask<DatafeedParams>> jobsWithLookbackDatafeeds = datafeedTasks(tasks).stream()
.filter(t -> t.getParams().getEndTime() != null && t.getExecutorNode() != null)
.collect(Collectors.toList());
final List<PersistentTask<?>> assignedAnalyticsJobs = dataframeAnalyticsTasks(tasks).stream()
.filter(t -> t.getExecutorNode() != null)
.collect(Collectors.toList());

if (jobsWithLookbackDatafeeds.isEmpty() && assignedAnalyticsJobs.isEmpty()) {
return Optional.of(NativeMemoryCapacity.ZERO);
}
if (mlMemoryTracker.isRecentlyRefreshed(jobMemoryExpiry) == false) {
return Optional.empty();
// what is the future freed capacity, knowing the current capacity and what could be freed up in the future
Map<String, Long> freeMemoryByNodeId = new HashMap<>();
for (DiscoveryNode node : mlNodes) {
NodeLoad nodeLoad = nodeLoadDetector.detectNodeLoad(clusterState,
true,
node,
maxOpenJobs,
maxMachineMemoryPercent,
true,
useAuto);
if (nodeLoad.getError() != null || nodeLoad.isUseMemory() == false) {
return Optional.empty();
}
freeMemoryByNodeId.put(node.getId(), nodeLoad.getFreeMemory());
}
// What is the largest chunk of memory that could be freed on a node in the future
Map<String, Long> maxNodeBytes = new HashMap<>();
for (PersistentTask<DatafeedParams> lookbackOnlyDf : jobsWithLookbackDatafeeds) {
Long jobSize = mlMemoryTracker.getAnomalyDetectorJobMemoryRequirement(lookbackOnlyDf.getParams().getJobId());
if (jobSize == null) {
return Optional.empty();
}
maxNodeBytes.compute(lookbackOnlyDf.getExecutorNode(), (_k, v) -> v == null ? jobSize : jobSize + v);
freeMemoryByNodeId.compute(lookbackOnlyDf.getExecutorNode(), (_k, v) -> v == null ? jobSize : jobSize + v);
}
for (PersistentTask<?> task : assignedAnalyticsJobs) {
Long jobSize = mlMemoryTracker.getDataFrameAnalyticsJobMemoryRequirement(MlTasks.dataFrameAnalyticsId(task.getId()));
if (jobSize == null) {
return Optional.empty();
}
maxNodeBytes.compute(task.getExecutorNode(), (_k, v) -> v == null ? jobSize : jobSize + v);
freeMemoryByNodeId.compute(task.getExecutorNode(), (_k, v) -> v == null ? jobSize : jobSize + v);
}
return Optional.of(new NativeMemoryCapacity(
maxNodeBytes.values().stream().mapToLong(Long::longValue).sum(),
maxNodeBytes.values().stream().mapToLong(Long::longValue).max().orElse(0L)));
freeMemoryByNodeId.values().stream().mapToLong(Long::longValue).sum(),
freeMemoryByNodeId.values().stream().mapToLong(Long::longValue).max().orElse(0L)));
}

private AutoscalingDeciderResult buildDecisionAndRequestRefresh(MlScalingReason.Builder reasonBuilder) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void writeTo(StreamOutput out) throws IOException {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(WAITING_ANALYTICS_JOBS, waitingAnalyticsJobs);
builder.field(WAITING_ANOMALY_JOBS, waitingAnalyticsJobs);
builder.field(WAITING_ANOMALY_JOBS, waitingAnomalyJobs);
builder.startObject(CONFIGURATION).value(passedConfiguration).endObject();
if (largestWaitingAnalyticsJob != null) {
builder.field(LARGEST_WAITING_ANALYTICS_JOB, largestWaitingAnalyticsJob);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,11 @@ AutoscalingCapacity autoscalingCapacity(int maxMemoryPercent, boolean useAuto) {
useAuto
));
double inverseScale = memoryPercentForMl <= 0 ? 0 : 100.0 / memoryPercentForMl;
long actualTier = (long)Math.ceil(tier * inverseScale);
return new AutoscalingCapacity(
new AutoscalingCapacity.AutoscalingResources(null, ByteSizeValue.ofBytes((long)Math.ceil(tier * inverseScale))),
// Tier should always be AT LEAST the largest node size.
// This Math.max catches any strange rounding errors or weird input.
new AutoscalingCapacity.AutoscalingResources(null, ByteSizeValue.ofBytes(Math.max(actualTier, actualNodeSize))),
new AutoscalingCapacity.AutoscalingResources(null, ByteSizeValue.ofBytes(actualNodeSize))
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,16 @@

public class NodeLoad {

public static boolean taskStateFilter(JobState jobState) {
return jobState == null || jobState.isNoneOf(JobState.CLOSED, JobState.FAILED);
}

public static boolean taskStateFilter(DataFrameAnalyticsState dataFrameAnalyticsState) {
// Don't count stopped and failed df-analytics tasks as they don't consume native memory
return dataFrameAnalyticsState == null
|| dataFrameAnalyticsState.isNoneOf(DataFrameAnalyticsState.STOPPED, DataFrameAnalyticsState.FAILED);
}

private static final Logger logger = LogManager.getLogger(NodeLoadDetector.class);

private final long maxMemory;
Expand Down Expand Up @@ -92,6 +102,13 @@ public String getNodeId() {
return nodeId;
}

/**
* @return The available memory on this node
*/
public long getFreeMemory() {
return Math.max(maxMemory - assignedJobMemory, 0L);
}

/**
* @return Returns a comma delimited string of errors if any were encountered.
*/
Expand Down Expand Up @@ -191,7 +208,7 @@ public Builder incNumAllocatingJobs() {
void adjustForAnomalyJob(JobState jobState,
String jobId,
MlMemoryTracker mlMemoryTracker) {
if ((jobState.isAnyOf(JobState.CLOSED, JobState.FAILED) == false) && jobId != null) {
if (taskStateFilter(jobState) && jobId != null) {
// Don't count CLOSED or FAILED jobs, as they don't consume native memory
++numAssignedJobs;
if (jobState == JobState.OPENING) {
Expand All @@ -214,8 +231,7 @@ void adjustForAnalyticsJob(PersistentTasksCustomMetadata.PersistentTask<?> assig
MlMemoryTracker mlMemoryTracker) {
DataFrameAnalyticsState dataFrameAnalyticsState = MlTasks.getDataFrameAnalyticsState(assignedTask);

// Don't count stopped and failed df-analytics tasks as they don't consume native memory
if (dataFrameAnalyticsState.isAnyOf(DataFrameAnalyticsState.STOPPED, DataFrameAnalyticsState.FAILED) == false) {
if (taskStateFilter(dataFrameAnalyticsState)) {
// The native process is only running in the ANALYZING and STOPPING states, but in the STARTED
// and REINDEXING states we're committed to using the memory soon, so account for it here
++numAssignedJobs;
Expand Down

0 comments on commit ad0fdb3

Please sign in to comment.