Skip to content

Commit

Permalink
[ML] Make autoscaling and task assignment use same memory staleness d…
Browse files Browse the repository at this point in the history
…efinition (#86684)

Previously autoscaling used a staleness definition of ~7 minutes
and task assignment used a staleness definition of ~90seconds.

This could lead to the assignment explanation of tasks not being
"awaiting lazy assignment" at the moment when autoscaling ran,
thus preventing an autoscaling decision being made.

The solution is to always use the same definition of staleness in
the memory tracker. This is set to the maximum of what the two
interested parties suggest.

Backport of #86632
  • Loading branch information
droberts195 committed May 11, 2022
1 parent cff44b2 commit d247bc0
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 36 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/86632.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 86632
summary: Make autoscaling and task assignment use same memory staleness definition
area: Machine Learning
type: bug
issues:
- 86616
Original file line number Diff line number Diff line change
Expand Up @@ -339,13 +339,10 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider
if (isMaster == false) {
throw new IllegalArgumentException("request for scaling information is only allowed on the master node");
}
final Duration memoryTrackingStale;
long previousTimeStamp = this.lastTimeToScale;
this.lastTimeToScale = this.timeSupplier.getAsLong();
if (previousTimeStamp == 0L) {
memoryTrackingStale = DEFAULT_MEMORY_REFRESH_RATE;
} else {
memoryTrackingStale = Duration.ofMillis(TimeValue.timeValueMinutes(1).millis() + this.lastTimeToScale - previousTimeStamp);
long previousTimeStamp = lastTimeToScale;
lastTimeToScale = timeSupplier.getAsLong();
if (previousTimeStamp > 0L && lastTimeToScale > previousTimeStamp) {
mlMemoryTracker.setAutoscalingCheckInterval(Duration.ofMillis(lastTimeToScale - previousTimeStamp));
}

final ClusterState clusterState = context.state();
Expand Down Expand Up @@ -431,12 +428,10 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider
);
}

if (mlMemoryTracker.isRecentlyRefreshed(memoryTrackingStale) == false) {
if (mlMemoryTracker.isRecentlyRefreshed() == false) {
logger.debug(
() -> new ParameterizedMessage(
"view of job memory is stale given duration [{}]. Not attempting to make scaling decision",
memoryTrackingStale
)
"view of job memory is stale given duration [{}]. Not attempting to make scaling decision",
mlMemoryTracker.getStalenessDuration()
);
return buildDecisionAndRequestRefresh(reasonBuilder);
}
Expand Down Expand Up @@ -478,12 +473,7 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider
);
}

Optional<NativeMemoryCapacity> futureFreedCapacity = calculateFutureAvailableCapacity(
tasks,
memoryTrackingStale,
nodes,
clusterState
);
Optional<NativeMemoryCapacity> futureFreedCapacity = calculateFutureAvailableCapacity(tasks, nodes, clusterState);

final Optional<AutoscalingDeciderResult> scaleUpDecision = checkForScaleUp(
numAnomalyJobsInQueue,
Expand All @@ -509,7 +499,7 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider
resetScaleDownCoolDown();
return noScaleResultOrRefresh(
reasonBuilder,
mlMemoryTracker.isRecentlyRefreshed(memoryTrackingStale) == false,
mlMemoryTracker.isRecentlyRefreshed() == false,
new AutoscalingDeciderResult(
context.currentCapacity(),
reasonBuilder.setSimpleReason(
Expand Down Expand Up @@ -657,7 +647,7 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider

return noScaleResultOrRefresh(
reasonBuilder,
mlMemoryTracker.isRecentlyRefreshed(memoryTrackingStale) == false,
mlMemoryTracker.isRecentlyRefreshed() == false,
new AutoscalingDeciderResult(
context.currentCapacity(),
reasonBuilder.setSimpleReason("Passing currently perceived capacity as no scaling changes were detected to be possible")
Expand Down Expand Up @@ -932,11 +922,10 @@ Optional<AutoscalingDeciderResult> checkForScaleUp(
// - If > 1 "batch" ml tasks are running on the same node, we sum their resources.
Optional<NativeMemoryCapacity> calculateFutureAvailableCapacity(
PersistentTasksCustomMetadata tasks,
Duration jobMemoryExpiry,
List<DiscoveryNode> mlNodes,
Collection<DiscoveryNode> mlNodes,
ClusterState clusterState
) {
if (mlMemoryTracker.isRecentlyRefreshed(jobMemoryExpiry) == false) {
if (mlMemoryTracker.isRecentlyRefreshed() == false) {
return Optional.empty();
}
final List<PersistentTask<DatafeedParams>> jobsWithLookbackDatafeeds = datafeedTasks(tasks).stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
Expand All @@ -67,6 +68,7 @@
public class MlMemoryTracker implements LocalNodeMasterListener {

private static final Duration RECENT_UPDATE_THRESHOLD = Duration.ofMinutes(1);
private static final Duration DEFAULT_AUTOSCALING_CHECK_INTERVAL = Duration.ofMinutes(5);

private final Logger logger = LogManager.getLogger(MlMemoryTracker.class);
private final Map<String, Long> memoryRequirementByAnomalyDetectorJob = new ConcurrentHashMap<>();
Expand All @@ -85,6 +87,7 @@ public class MlMemoryTracker implements LocalNodeMasterListener {
private volatile boolean stopped;
private volatile Instant lastUpdateTime;
private volatile Duration reassignmentRecheckInterval;
private volatile Duration autoscalingCheckInterval = DEFAULT_AUTOSCALING_CHECK_INTERVAL;

public MlMemoryTracker(
Settings settings,
Expand Down Expand Up @@ -121,6 +124,10 @@ private void setReassignmentRecheckInterval(TimeValue recheckInterval) {
reassignmentRecheckInterval = Duration.ofNanos(recheckInterval.getNanos());
}

public void setAutoscalingCheckInterval(Duration autoscalingCheckInterval) {
this.autoscalingCheckInterval = Objects.requireNonNull(autoscalingCheckInterval);
}

@Override
public void onMaster() {
isMaster = true;
Expand Down Expand Up @@ -196,18 +203,21 @@ public void stop() {
* for valid task assignment decisions to be made using it?
*/
public boolean isRecentlyRefreshed() {
return isRecentlyRefreshed(reassignmentRecheckInterval);
Instant localLastUpdateTime = lastUpdateTime;
return isMaster && localLastUpdateTime != null && localLastUpdateTime.plus(getStalenessDuration()).isAfter(Instant.now());
}

/**
* Is the information in this object sufficiently up to date
* for valid task assignment decisions to be made using it?
* @return The definition of "staleness" used by {@link #isRecentlyRefreshed()}. This method is intended only as
* a debugging aid, as calling it separately to {@link #isRecentlyRefreshed()} could return a different
* number if settings were modified in between the two calls.
*/
public boolean isRecentlyRefreshed(Duration customDuration) {
Instant localLastUpdateTime = lastUpdateTime;
return isMaster
&& localLastUpdateTime != null
&& localLastUpdateTime.plus(RECENT_UPDATE_THRESHOLD).plus(customDuration).isAfter(Instant.now());
public Duration getStalenessDuration() {
return max(reassignmentRecheckInterval, autoscalingCheckInterval).plus(RECENT_UPDATE_THRESHOLD);
}

static Duration max(Duration first, Duration second) {
return first.compareTo(second) > 0 ? first : second;
}

/**
Expand Down Expand Up @@ -404,12 +414,13 @@ void refresh(PersistentTasksCustomMetadata persistentTasks, Set<String> jobIdsTo
for (ActionListener<Void> listener : fullRefreshCompletionListeners) {
listener.onResponse(null);
}
logger.trace("ML memory tracker last update time now [{}] and listeners called", lastUpdateTime);
logger.debug("ML memory tracker last update time now [{}] and listeners called", lastUpdateTime);
} else {
Exception e = new NotMasterException("Node ceased to be master during ML memory tracker refresh");
for (ActionListener<Void> listener : fullRefreshCompletionListeners) {
listener.onFailure(e);
}
logger.debug(e.getMessage());
}
fullRefreshCompletionListeners.clear();
}
Expand Down Expand Up @@ -514,7 +525,7 @@ public void refreshAnomalyDetectorJobMemory(String jobId, ActionListener<Long> l
if (stopPhaser.register() != phase.get()) {
// Phases above not equal to `phase` mean we've been stopped, so don't do any operations that involve external interaction
stopPhaser.arriveAndDeregister();
logger.info(() -> new ParameterizedMessage("[{}] not refreshing anomaly detector memory as node is shutting down", jobId));
logger.info("[{}] not refreshing anomaly detector memory as node is shutting down", jobId);
listener.onFailure(new EsRejectedExecutionException("Couldn't run ML memory update - node is shutting down"));
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.elasticsearch.xpack.ml.utils.NativeMemoryCalculator;
import org.junit.Before;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -113,7 +112,7 @@ public class MlAutoscalingDeciderServiceTests extends ESTestCase {
@Before
public void setup() {
mlMemoryTracker = mock(MlMemoryTracker.class);
when(mlMemoryTracker.isRecentlyRefreshed(any())).thenReturn(true);
when(mlMemoryTracker.isRecentlyRefreshed()).thenReturn(true);
when(mlMemoryTracker.asyncRefresh()).thenReturn(true);
when(mlMemoryTracker.getAnomalyDetectorJobMemoryRequirement(any())).thenReturn(DEFAULT_JOB_SIZE);
when(mlMemoryTracker.getDataFrameAnalyticsJobMemoryRequirement(any())).thenReturn(DEFAULT_JOB_SIZE);
Expand Down Expand Up @@ -988,7 +987,6 @@ public void testFutureAvailableCapacity() {

Optional<NativeMemoryCapacity> nativeMemoryCapacity = service.calculateFutureAvailableCapacity(
clusterState.metadata().custom(PersistentTasksCustomMetadata.TYPE),
Duration.ofMillis(10),
clusterState.getNodes().mastersFirstStream().collect(Collectors.toList()),
clusterState
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
import org.junit.Before;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -42,6 +43,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
Expand Down Expand Up @@ -379,6 +381,12 @@ public void testStop() {
assertEquals("Couldn't run ML memory update - node is shutting down", exception.get().getMessage());
}

public void testMaxDuration() {
assertThat(MlMemoryTracker.max(Duration.ofMinutes(1), Duration.ofMinutes(2)), equalTo(Duration.ofMinutes(2)));
assertThat(MlMemoryTracker.max(Duration.ofMinutes(4), Duration.ofMinutes(3)), equalTo(Duration.ofMinutes(4)));
assertThat(MlMemoryTracker.max(Duration.ofMinutes(5), Duration.ofMinutes(5)), equalTo(Duration.ofMinutes(5)));
}

private PersistentTasksCustomMetadata.PersistentTask<OpenJobAction.JobParams> makeTestAnomalyDetectorTask(String jobId) {
return new PersistentTasksCustomMetadata.PersistentTask<>(
MlTasks.jobTaskId(jobId),
Expand Down

0 comments on commit d247bc0

Please sign in to comment.